diff --git a/vortex-array/src/arrays/constant/vtable/canonical.rs b/vortex-array/src/arrays/constant/vtable/canonical.rs index e38b2863e4b..dd891778949 100644 --- a/vortex-array/src/arrays/constant/vtable/canonical.rs +++ b/vortex-array/src/arrays/constant/vtable/canonical.rs @@ -787,7 +787,6 @@ mod tests { let element_validity = elements .validity() .vortex_expect("constant canonical element validity should be derivable"); - assert!(element_validity.execute_is_valid(0, &mut ctx).unwrap()); assert!(!element_validity.execute_is_valid(1, &mut ctx).unwrap()); assert!(element_validity.execute_is_valid(2, &mut ctx).unwrap()); diff --git a/vortex-array/src/arrays/listview/rebuild.rs b/vortex-array/src/arrays/listview/rebuild.rs index 1201b76d466..63ed33c6498 100644 --- a/vortex-array/src/arrays/listview/rebuild.rs +++ b/vortex-array/src/arrays/listview/rebuild.rs @@ -9,7 +9,6 @@ use vortex_error::VortexResult; use crate::Canonical; use crate::ExecutionCtx; use crate::IntoArray; -use crate::LEGACY_SESSION; use crate::arrays::ConstantArray; use crate::arrays::ListViewArray; use crate::arrays::PrimitiveArray; @@ -20,7 +19,6 @@ use crate::builtins::ArrayBuiltins; use crate::dtype::IntegerPType; use crate::dtype::Nullability; use crate::dtype::PType; -use crate::executor::VortexSessionExecute; use crate::match_each_integer_ptype; use crate::match_each_unsigned_integer_ptype; use crate::scalar::Scalar; @@ -212,11 +210,10 @@ impl ListViewArray { let mut new_sizes = BufferMut::::with_capacity(len); let mut take_indices = BufferMut::::with_capacity(self.elements().len()); - let mut ctx = LEGACY_SESSION.create_execution_ctx(); let validity = self.validity()?; let mut n_elements = NewOffset::zero(); for index in 0..len { - if !validity.execute_is_valid(index, &mut ctx)? { + if !validity.execute_is_valid(index, ctx)? { new_offsets.push(n_elements); new_sizes.push(S::zero()); continue; @@ -295,11 +292,10 @@ impl ListViewArray { let mut new_elements_builder = builder_with_capacity(element_dtype.as_ref(), self.elements().len()); - let mut ctx = LEGACY_SESSION.create_execution_ctx(); let validity = self.validity()?; let mut n_elements = NewOffset::zero(); for index in 0..len { - if !validity.execute_is_valid(index, &mut ctx)? { + if !validity.execute_is_valid(index, ctx)? { // For NULL lists, place them after the previous item's data to maintain the // no-overlap invariant for zero-copy to `ListArray` arrays. new_offsets.push(n_elements); @@ -486,7 +482,6 @@ mod tests { // Verify nullability is preserved assert_eq!(flattened.dtype().nullability(), Nullability::Nullable); - let mut ctx = SESSION.create_execution_ctx(); assert!(flattened.validity()?.execute_is_valid(0, &mut ctx)?); assert!(!flattened.validity()?.execute_is_valid(1, &mut ctx)?); assert!(flattened.validity()?.execute_is_valid(2, &mut ctx)?); diff --git a/vortex-cuda/kernels/src/dynamic_dispatch.cu b/vortex-cuda/kernels/src/dynamic_dispatch.cu index 2f9fbfcada0..fa19b47fe1a 100644 --- a/vortex-cuda/kernels/src/dynamic_dispatch.cu +++ b/vortex-cuda/kernels/src/dynamic_dispatch.cu @@ -199,6 +199,9 @@ scalar_op(T *values, const struct ScalarOp &op, char *__restrict smem, uint64_t } break; } + case ScalarOp::CAST: + // Values are casted as part of LOAD as defined by the dispatch plan. + break; default: __builtin_unreachable(); } diff --git a/vortex-cuda/kernels/src/dynamic_dispatch.h b/vortex-cuda/kernels/src/dynamic_dispatch.h index c4dbe55c9f3..e3b1d0f8df3 100644 --- a/vortex-cuda/kernels/src/dynamic_dispatch.h +++ b/vortex-cuda/kernels/src/dynamic_dispatch.h @@ -173,6 +173,7 @@ struct SourceOp { /// change it: /// - ALP: encoded int → float (e.g. i32 → f32, i64 → f64) /// - DICT: codes type → values type (e.g. u8 → u32) +/// - CAST: current type → requested output type /// /// The plan builder uses `output_ptype` to determine the element width /// for shared memory allocation and to propagate type information @@ -200,10 +201,10 @@ union ScalarParams { }; struct ScalarOp { - enum ScalarOpCode { FOR, ZIGZAG, ALP, DICT } op_code; - /// The PType this op produces. For type-preserving ops (FOR, ZIGZAG) - /// this equals the input PType. For type-changing ops (ALP, DICT) this - /// is the new output PType. + enum ScalarOpCode { FOR, ZIGZAG, ALP, DICT, CAST } op_code; + /// The PType this op produces. For type-preserving ops (FOR, ZIGZAG) this + /// equals the input PType. For type-changing ops (ALP, DICT, CAST) this is + /// the new output PType. enum PTypeTag output_ptype; union ScalarParams params; }; diff --git a/vortex-cuda/src/arrow/canonical.rs b/vortex-cuda/src/arrow/canonical.rs index de20df48ecc..558fdd9e96e 100644 --- a/vortex-cuda/src/arrow/canonical.rs +++ b/vortex-cuda/src/arrow/canonical.rs @@ -12,12 +12,20 @@ use futures::future::BoxFuture; use vortex::array::ArrayRef; use vortex::array::Canonical; use vortex::array::arrays::DecimalArray; +use vortex::array::arrays::Dict; +use vortex::array::arrays::DictArray; +use vortex::array::arrays::FixedSizeList; use vortex::array::arrays::FixedSizeListArray; +use vortex::array::arrays::List; use vortex::array::arrays::ListArray; +use vortex::array::arrays::ListView; +use vortex::array::arrays::ListViewArray; use vortex::array::arrays::PrimitiveArray; +use vortex::array::arrays::Struct; use vortex::array::arrays::StructArray; use vortex::array::arrays::bool::BoolDataParts; use vortex::array::arrays::decimal::DecimalDataParts; +use vortex::array::arrays::dict::DictOwnedExt; use vortex::array::arrays::extension::ExtensionArrayExt; use vortex::array::arrays::fixed_size_list::FixedSizeListArrayExt; use vortex::array::arrays::fixed_size_list::FixedSizeListDataParts; @@ -55,6 +63,7 @@ use crate::arrow::ArrowDeviceArray; use crate::arrow::ExportDeviceArray; use crate::arrow::PrivateData; use crate::arrow::SyncEvent; +use crate::arrow::arrow_device_export_dictionary_codes_dtype; use crate::arrow::cuda_decimal_value_type; use crate::executor::CudaArrayExt; @@ -71,9 +80,7 @@ impl ExportDeviceArray for CanonicalDeviceArrayExport { array: ArrayRef, ctx: &mut CudaExecutionCtx, ) -> VortexResult { - let cuda_array = array.execute_cuda(ctx).await?; - - let (arrow_array, sync_event) = export_canonical(cuda_array, ctx).await?; + let (arrow_array, sync_event) = export_array(array, ctx).await?; Ok(ArrowDeviceArray { array: arrow_array, @@ -85,6 +92,43 @@ impl ExportDeviceArray for CanonicalDeviceArrayExport { } } +/// Export arrays whose Arrow layout depends on their concrete children before CUDA +/// canonicalization can erase that structure (for example nested dictionaries). +/// All other arrays are executed on CUDA, then exported directly from the canonical result. +fn export_array( + array: ArrayRef, + ctx: &mut CudaExecutionCtx, +) -> BoxFuture<'_, VortexResult<(ArrowArray, SyncEvent)>> { + Box::pin(async { + let array = match array.try_downcast::() { + Ok(dict) => return export_dict(dict, ctx).await, + Err(array) => array, + }; + let array = match array.try_downcast::() { + Ok(struct_array) => return export_struct(struct_array, ctx).await, + Err(array) => array, + }; + let array = match array.try_downcast::() { + Ok(list) => { + return export_list(list, ListChildExport::PreserveConcreteLayout, ctx).await; + } + Err(array) => array, + }; + let array = match array.try_downcast::() { + Ok(fixed_size_list) => return export_fixed_size_list(fixed_size_list, ctx).await, + Err(array) => array, + }; + let array = match array.try_downcast::() { + Ok(list_view) => return export_list_view(list_view, ctx).await, + Err(array) => array, + }; + + let cuda_array = array.execute_cuda(ctx).await?; + export_canonical(cuda_array, ctx).await + }) +} + +/// Export a canonical CUDA array using the Arrow C Device layout for its logical type. fn export_canonical( cuda_array: Canonical, ctx: &mut CudaExecutionCtx, @@ -156,23 +200,7 @@ fn export_canonical( ctx, ) } - Canonical::List(listview) => { - // cuDF imports standard Arrow `List`, while Vortex canonical lists are list-views. - // Try the GPU path first; host list-views can fall back to a CPU rebuild. - let is_host = listview.as_ref().is_host(); - let gpu_err = match export_device_list_view(listview.clone(), ctx).await { - Ok(exported) => return Ok(exported), - Err(err) => err, - }; - - // CPU rebuild requires host-resident buffers; device-resident arrays keep the GPU error. - if !is_host { - return Err(gpu_err); - } - - let list = list_from_list_view(listview, ctx.execution_ctx())?; - export_list(list, ctx).await - } + Canonical::List(listview) => export_list_view(listview, ctx).await, Canonical::FixedSizeList(fixed_size_list) => { export_fixed_size_list(fixed_size_list, ctx).await } @@ -233,6 +261,73 @@ fn export_canonical( }) } +/// Export a Vortex dictionary array as an Arrow dictionary array. +/// +/// Owns the codes buffers and recursively exported dictionary values. +async fn export_dict( + array: DictArray, + ctx: &mut CudaExecutionCtx, +) -> VortexResult<(ArrowArray, SyncEvent)> { + let len = array.len(); + let parts = array.into_parts(); + let PrimitiveDataParts { + buffer, validity, .. + } = export_dictionary_codes(parts.codes, ctx).await?; + let (validity_buffer, null_count) = export_arrow_validity_buffer(validity, len, 0, ctx).await?; + let codes_buffer = ctx.ensure_on_device(buffer).await?; + let (dictionary, _) = export_array(parts.values, ctx).await?; + + let mut private_data = PrivateData::new_with_dictionary( + vec![validity_buffer, Some(codes_buffer)], + vec![], + Some(dictionary), + ctx, + )?; + let sync_event = private_data.sync_event(); + let dictionary = private_data.dictionary; + + let arrow_array = ArrowArray { + length: len as i64, + null_count, + offset: 0, + n_buffers: 2, + buffers: private_data.buffer_ptrs.as_mut_ptr(), + n_children: 0, + children: ptr::null_mut(), + release: Some(release_array), + dictionary, + private_data: Box::into_raw(private_data).cast(), + }; + + Ok((arrow_array, sync_event)) +} + +async fn export_dictionary_codes( + codes: ArrayRef, + ctx: &mut CudaExecutionCtx, +) -> VortexResult { + let target_dtype = arrow_device_export_dictionary_codes_dtype(codes.dtype())?; + let target_ptype = target_dtype.as_ptype(); + let codes = if codes.dtype() == &target_dtype { + codes + } else { + codes.cast(target_dtype)? + } + .execute_cuda(ctx) + .await?; + let Canonical::Primitive(codes) = codes else { + vortex_bail!("dictionary codes must be primitive, got {}", codes.dtype()); + }; + + let parts = codes.into_data_parts(); + vortex_ensure!( + parts.ptype == target_ptype, + "dictionary codes export produced {}", + parts.ptype + ); + Ok(parts) +} + /// Exports decimals with value buffers cast to Arrow's Decimal32/64/128/256 layout. /// /// Decimal values are already decoded; this only adapts the physical buffer width. Storage-to-Arrow @@ -369,10 +464,55 @@ pub(super) async fn export_arrow_validity_buffer( } /// Export a standard Vortex list as Arrow `List`: validity, offsets, and one child array. +async fn export_list_view( + listview: ListViewArray, + ctx: &mut CudaExecutionCtx, +) -> VortexResult<(ArrowArray, SyncEvent)> { + // cuDF imports standard Arrow `List`, while Vortex canonical lists are list-views. + // Try the GPU path first; host list-views can fall back to a CPU rebuild. + let is_host = listview.as_ref().is_host(); + let gpu_err = match export_device_list_view(listview.clone(), ctx).await { + Ok(exported) => return Ok(exported), + Err(err) => err, + }; + + // CPU rebuild requires host-resident buffers; device-resident arrays keep the GPU error. + if !is_host { + return Err(gpu_err); + } + + // The CPU fallback packs ListView ranges into a contiguous List. + export_list( + list_from_list_view(listview, ctx.execution_ctx())?, + ListChildExport::RebuiltListViewChild, + ctx, + ) + .await +} + async fn export_list( array: ListArray, + child_export: ListChildExport, ctx: &mut CudaExecutionCtx, ) -> VortexResult<(ArrowArray, SyncEvent)> { + let (elements, len, validity_buffer, null_count, offsets_buffer) = + list_layout_parts(array, ctx).await?; + export_list_layout( + elements, + len, + validity_buffer, + null_count, + offsets_buffer, + child_export, + ctx, + ) + .await +} + +async fn list_layout_parts( + array: ListArray, + ctx: &mut CudaExecutionCtx, +) -> VortexResult<(ArrayRef, usize, Option, i64, BufferHandle)> { let len = array.len(); let ListDataParts { elements, @@ -383,30 +523,67 @@ async fn export_list( let (validity_buffer, null_count) = export_arrow_validity_buffer(validity, len, 0, ctx).await?; let offsets_buffer = export_arrow_list_offsets(offsets, ctx).await?; + Ok((elements, len, validity_buffer, null_count, offsets_buffer)) +} - export_list_layout( - elements, +#[derive(Clone, Copy)] +pub(super) enum ListChildExport { + /// Preserve concrete child layouts, such as dictionaries, + /// so exported data matches the schema. + PreserveConcreteLayout, + /// Canonicalize temporary encodings introduced by the host ListView + /// rebuild, while still preserving rebuilt dictionary children. + RebuiltListViewChild, +} + +impl ListChildExport { + async fn export( + self, + elements: ArrayRef, + ctx: &mut CudaExecutionCtx, + ) -> VortexResult { + let (elements_child, _) = match self { + ListChildExport::PreserveConcreteLayout => export_array(elements, ctx).await?, + ListChildExport::RebuiltListViewChild if elements.as_opt::().is_some() => { + export_array(elements, ctx).await? + } + ListChildExport::RebuiltListViewChild => { + export_canonical(elements.execute_cuda(ctx).await?, ctx).await? + } + }; + Ok(elements_child) + } +} + +/// Build the shared Arrow `List` parent once offsets and validity are ready on device. +pub(super) async fn export_list_layout( + elements: ArrayRef, + len: usize, + validity_buffer: Option, + null_count: i64, + offsets_buffer: BufferHandle, + child_export: ListChildExport, + ctx: &mut CudaExecutionCtx, +) -> VortexResult<(ArrowArray, SyncEvent)> { + let elements_child = child_export.export(elements, ctx).await?; + export_list_layout_with_child( + elements_child, len, validity_buffer, null_count, offsets_buffer, ctx, ) - .await } -/// Build the shared Arrow `List` parent once offsets and validity are ready on device. -pub(super) async fn export_list_layout( - elements: ArrayRef, +fn export_list_layout_with_child( + elements_child: ArrowArray, len: usize, validity_buffer: Option, null_count: i64, offsets_buffer: BufferHandle, ctx: &mut CudaExecutionCtx, ) -> VortexResult<(ArrowArray, SyncEvent)> { - let cuda_elements = elements.execute_cuda(ctx).await?; - let (elements_child, _) = export_canonical(cuda_elements, ctx).await?; - let mut private_data = PrivateData::new( vec![validity_buffer, Some(offsets_buffer)], vec![elements_child], @@ -450,6 +627,7 @@ async fn export_fixed_size_list( validity_buffer, null_count, offsets_buffer, + ListChildExport::PreserveConcreteLayout, ctx, ) .await @@ -517,8 +695,7 @@ async fn export_struct( let mut children = Vec::with_capacity(fields.len()); for field in fields.iter() { - let cuda_field = field.clone().execute_cuda(ctx).await?; - let (arrow_field, _) = export_canonical(cuda_field, ctx).await?; + let (arrow_field, _) = export_array(field.clone(), ctx).await?; children.push(arrow_field); } @@ -590,6 +767,7 @@ unsafe extern "C" fn release_array(array: *mut ArrowArray) { if !private_data_ptr.is_null() { let mut private_data = Box::from_raw(private_data_ptr.cast::()); release_children(&mut private_data); + release_dictionary(&mut private_data); } // update the release function to NULL to avoid any possibility of double-frees. @@ -597,18 +775,34 @@ unsafe extern "C" fn release_array(array: *mut ArrowArray) { } } +/// Release all owned child arrays stored in private data. unsafe fn release_children(private_data: &mut PrivateData) { unsafe { let children = mem::take(&mut private_data.children); for child in children { - if !child.is_null() { - if let Some(release) = (*child).release { - release(child); - } - // Children are allocated with Box::into_raw in PrivateData::new, so the - // release callback must also reclaim the ArrowArray allocation itself. - drop(Box::from_raw(child)); + release_array_ptr(child); + } + } +} + +/// Release the owned dictionary array, if this Arrow array has one. +unsafe fn release_dictionary(private_data: &mut PrivateData) { + unsafe { + let dictionary = ptr::replace(&raw mut private_data.dictionary, ptr::null_mut()); + release_array_ptr(dictionary); + } +} + +/// Release an ArrowArray pointer allocated for private data. +unsafe fn release_array_ptr(array: *mut ArrowArray) { + unsafe { + if !array.is_null() { + if let Some(release) = (*array).release { + release(array); } + // Owned arrays are allocated with Box::into_raw in PrivateData, so the release callback + // must also reclaim the ArrowArray allocation itself. + drop(Box::from_raw(array)); } } } @@ -627,6 +821,7 @@ mod tests { use vortex::array::IntoArray; use vortex::array::arrays::BoolArray; use vortex::array::arrays::DecimalArray; + use vortex::array::arrays::DictArray; use vortex::array::arrays::FixedSizeListArray; use vortex::array::arrays::ListArray; use vortex::array::arrays::ListViewArray; @@ -864,6 +1059,20 @@ mod tests { .collect()) } + fn private_data_buffer_i16_values( + array: &ArrowArray, + buffer_idx: usize, + ) -> VortexResult> { + let private_data = unsafe { &*array.private_data.cast::() }; + let buffer = private_data.buffers[buffer_idx] + .as_ref() + .vortex_expect("buffer should be present"); + Ok(Buffer::::from_byte_buffer(buffer.to_host_sync()) + .iter() + .copied() + .collect()) + } + fn assert_exported_decimal_values( value_buffer: &BufferHandle, expected: &[T], @@ -966,6 +1175,127 @@ mod tests { Ok(()) } + #[crate::test] + async fn test_export_dictionary() -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let out_of_line = "a dictionary value stored out-of-line"; + let array = DictArray::try_new( + PrimitiveArray::from_option_iter([Some(0u8), None, Some(1), Some(0)]).into_array(), + VarBinViewArray::from_iter_str(["alpha", out_of_line]).into_array(), + )? + .into_array(); + let mut exported = array.export_device_array_with_schema(&mut ctx).await?; + + let field = Field::try_from(&exported.schema)?; + assert_eq!( + field, + Field::new( + "", + DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8View)), + true, + ) + ); + assert_eq!(exported.array.array.length, 4); + assert_eq!(exported.array.array.null_count, 1); + assert_eq!(exported.array.array.n_buffers, 2); + assert_eq!(exported.array.array.n_children, 0); + assert!(!exported.array.array.dictionary.is_null()); + assert_eq!(exported.array.device_type, ARROW_DEVICE_CUDA); + + let private_data = unsafe { &*exported.array.array.private_data.cast::() }; + assert_eq!( + private_data.buffers[1] + .as_ref() + .vortex_expect("codes buffer should be present") + .len(), + 4 * size_of::() + ); + + let dictionary = unsafe { &*exported.array.array.dictionary }; + assert_varbinview_layout(dictionary, 2, 0, &[out_of_line.len()])?; + + unsafe { release_exported_array(&raw mut exported.array.array) }; + Ok(()) + } + + #[crate::test] + async fn test_export_struct_preserves_dictionary_child() -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let dictionary = DictArray::try_new( + PrimitiveArray::from_option_iter([Some(0u8), None, Some(1)]).into_array(), + VarBinViewArray::from_iter_str(["alpha", "beta"]).into_array(), + )? + .into_array(); + let array = StructArray::new( + FieldNames::from_iter(["dict"]), + vec![dictionary], + 3, + Validity::NonNullable, + ) + .into_array(); + let mut exported = array.export_device_array_with_schema(&mut ctx).await?; + + let field = Field::try_from(&exported.schema)?; + assert_eq!( + field, + Field::new( + "", + DataType::Struct(Fields::from(vec![Field::new( + "dict", + DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8View)), + true, + )])), + false, + ) + ); + assert_eq!(exported.array.array.n_children, 1); + let children = unsafe { std::slice::from_raw_parts(exported.array.array.children, 1) }; + let dict_child = unsafe { &*children[0] }; + assert!(!dict_child.dictionary.is_null()); + assert_eq!(dict_child.null_count, 1); + + unsafe { release_exported_array(&raw mut exported.array.array) }; + Ok(()) + } + + #[crate::test] + async fn test_export_dictionary_with_nullable_values() -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let array = DictArray::try_new( + PrimitiveArray::from_iter([0u8, 1, 0]).into_array(), + PrimitiveArray::from_option_iter([Some(10i32), None]).into_array(), + )? + .into_array(); + let mut exported = array.export_device_array_with_schema(&mut ctx).await?; + + let field = Field::try_from(&exported.schema)?; + assert_eq!( + field, + Field::new( + "", + DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Int32)), + true, + ) + ); + assert_eq!(exported.array.array.null_count, 0); + assert_eq!( + private_data_buffer_i16_values(&exported.array.array, 1)?, + [0, 1, 0] + ); + let dictionary = unsafe { &*exported.array.array.dictionary }; + assert_eq!(dictionary.null_count, 1); + assert_eq!(private_data_buffer_i32_values(dictionary, 1)?.len(), 2); + + unsafe { release_exported_array(&raw mut exported.array.array) }; + Ok(()) + } + async fn assert_exported_decimal( array: ArrayRef, expected_data_type: DataType, @@ -1347,6 +1677,53 @@ mod tests { Ok(()) } + #[crate::test] + async fn test_export_host_non_contiguous_dictionary_list_view_preserves_dictionary_child() + -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let elements = DictArray::try_new( + PrimitiveArray::from_option_iter([Some(0u8), None, Some(1), Some(2)]).into_array(), + PrimitiveArray::from_iter([10i32, 20, 30]).into_array(), + )? + .into_array(); + let array = ListViewArray::new( + elements, + PrimitiveArray::from_iter([0i32, 1]).into_array(), + PrimitiveArray::from_iter([3i32, 2]).into_array(), + Validity::NonNullable, + ) + .into_array(); + let mut exported = array.export_device_array_with_schema(&mut ctx).await?; + + let field = Field::try_from(&exported.schema)?; + assert_eq!( + field, + Field::new_list( + "", + Field::new( + Field::LIST_FIELD_DEFAULT_NAME, + DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Int32)), + true, + ), + false, + ) + ); + assert_eq!( + private_data_buffer_i32_values(&exported.array.array, 1)?, + [0, 3, 5] + ); + let list_children = unsafe { std::slice::from_raw_parts(exported.array.array.children, 1) }; + let dict_child = unsafe { &*list_children[0] }; + assert!(!dict_child.dictionary.is_null()); + assert_eq!(dict_child.length, 5); + assert_eq!(dict_child.n_buffers, 2); + + unsafe { release_exported_array(&raw mut exported.array.array) }; + Ok(()) + } + #[rstest] #[case::i32_i32(PType::I32, PType::I32)] #[case::u32_u16(PType::U32, PType::U16)] @@ -1481,6 +1858,114 @@ mod tests { Ok(()) } + #[crate::test] + async fn test_export_device_non_contiguous_dictionary_list_view() -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let codes = primitive_on_device([0u8, 1, 2, 0, 1], &mut ctx).await?; + let values = PrimitiveArray::from_iter([10i32, 20, 30]).into_array(); + let elements = DictArray::try_new(codes, values)?.into_array(); + let offsets = primitive_i32_on_device([3, 0, 2], &mut ctx).await?; + let sizes = primitive_i32_on_device([2, 2, 1], &mut ctx).await?; + let array = + ListViewArray::new(elements, offsets, sizes, Validity::NonNullable).into_array(); + let mut exported = array.export_device_array_with_schema(&mut ctx).await?; + + let field = Field::try_from(&exported.schema)?; + assert_eq!( + field, + Field::new_list( + "", + Field::new( + Field::LIST_FIELD_DEFAULT_NAME, + DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Int32)), + false, + ), + false, + ) + ); + assert_eq!(exported.array.array.length, 3); + assert_eq!( + private_data_buffer_i32_values(&exported.array.array, 1)?, + [0, 2, 4, 5] + ); + let children = unsafe { std::slice::from_raw_parts(exported.array.array.children, 1) }; + let elements = unsafe { &*children[0] }; + assert!(!elements.dictionary.is_null()); + assert_eq!( + private_data_buffer_i16_values(elements, 1)?, + [0, 1, 0, 1, 2] + ); + let dictionary = unsafe { &*elements.dictionary }; + assert_eq!(private_data_buffer_i32_values(dictionary, 1)?, [10, 20, 30]); + assert_eq!(exported.array.device_type, ARROW_DEVICE_CUDA); + + unsafe { release_exported_array(&raw mut exported.array.array) }; + Ok(()) + } + + #[crate::test] + async fn test_export_device_non_contiguous_dictionary_list_view_nullable_values() + -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let codes = primitive_on_device([0u8, 1, 2, 0, 1], &mut ctx).await?; + let values = PrimitiveArray::from_option_iter([Some(10i32), None, Some(30)]).into_array(); + let elements = DictArray::try_new(codes, values)?.into_array(); + let offsets = primitive_i32_on_device([3, 0, 2], &mut ctx).await?; + let sizes = primitive_i32_on_device([2, 2, 1], &mut ctx).await?; + let array = + ListViewArray::new(elements, offsets, sizes, Validity::NonNullable).into_array(); + let mut exported = array.export_device_array_with_schema(&mut ctx).await?; + + let children = unsafe { std::slice::from_raw_parts(exported.array.array.children, 1) }; + let elements = unsafe { &*children[0] }; + assert_eq!(elements.null_count, 0); + assert_eq!( + private_data_buffer_i16_values(elements, 1)?, + [0, 1, 0, 1, 2] + ); + let dictionary = unsafe { &*elements.dictionary }; + assert_eq!(dictionary.null_count, 1); + + unsafe { release_exported_array(&raw mut exported.array.array) }; + Ok(()) + } + + #[crate::test] + async fn test_export_device_non_contiguous_dictionary_list_view_nullable_codes_errors() + -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let codes = PrimitiveArray::from_option_iter([Some(0u8), None, Some(2), Some(0), Some(1)]); + let codes_handle = ctx.ensure_on_device(codes.buffer_handle().clone()).await?; + let codes = PrimitiveArray::from_buffer_handle(codes_handle, PType::U8, codes.validity()?) + .into_array(); + let values = + PrimitiveArray::from_option_iter([Some(10i32), Some(20), Some(30)]).into_array(); + let elements = DictArray::try_new(codes, values)?.into_array(); + let offsets = primitive_i32_on_device([3, 0, 2], &mut ctx).await?; + let sizes = primitive_i32_on_device([2, 2, 1], &mut ctx).await?; + let array = + ListViewArray::new(elements, offsets, sizes, Validity::NonNullable).into_array(); + let err = match array.export_device_array(&mut ctx).await { + Ok(mut exported) => { + unsafe { release_exported_array(&raw mut exported.array) }; + vortex_bail!("nullable dictionary codes should be unsupported") + } + Err(err) => err, + }; + + assert!( + err.to_string().contains("nullable dictionary codes"), + "unexpected error: {err}" + ); + Ok(()) + } + #[rstest] #[case::out_of_bounds(&[3], &[2], "offsets/sizes are invalid")] #[case::negative_offset(&[-1], &[1], "offsets exceed i32 range")] diff --git a/vortex-cuda/src/arrow/list_view.rs b/vortex-cuda/src/arrow/list_view.rs index f8a0be2f047..b14859dfbd5 100644 --- a/vortex-cuda/src/arrow/list_view.rs +++ b/vortex-cuda/src/arrow/list_view.rs @@ -11,8 +11,11 @@ use cudarc::driver::PushKernelArg; use vortex::array::ArrayRef; use vortex::array::Canonical; use vortex::array::IntoArray; +use vortex::array::arrays::Dict; +use vortex::array::arrays::DictArray; use vortex::array::arrays::ListViewArray; use vortex::array::arrays::PrimitiveArray; +use vortex::array::arrays::dict::DictOwnedExt; use vortex::array::arrays::listview::ListViewDataParts; use vortex::array::arrays::primitive::PrimitiveDataParts; use vortex::array::buffer::BufferHandle; @@ -27,6 +30,7 @@ use vortex::error::vortex_err; use super::ArrowArray; use super::SyncEvent; +use super::canonical::ListChildExport; use super::canonical::export_arrow_validity_buffer; use super::canonical::export_list_layout; use crate::CudaBufferExt; @@ -37,8 +41,7 @@ use crate::executor::CudaArrayExt; /// Export a Vortex list-view as Arrow `List` using device kernels. /// -/// Contiguous list-views reuse their child elements. Non-contiguous list-views are rebuilt on GPU -/// only when the child is primitive and non-nullable/non-null; other child shapes are rejected. +/// Reuses contiguous children; rebuilds non-contiguous primitive or dictionary-code children. pub(super) async fn export_device_list_view( array: ListViewArray, ctx: &mut CudaExecutionCtx, @@ -75,24 +78,41 @@ pub(super) async fn export_device_list_view( validity_buffer, null_count, offsets_buffer, + ListChildExport::PreserveConcreteLayout, ctx, ) .await } - DeviceListViewOffsets::RequiresRebuild => { - export_rebuilt_primitive_list_view( - elements, - offsets_ptype, - offsets_buffer, - sizes_ptype, - sizes_buffer, - len, - validity_buffer, - null_count, - ctx, - ) - .await - } + DeviceListViewOffsets::RequiresRebuild => match elements.try_downcast::() { + Ok(dict) => { + export_rebuilt_dict_list_view( + dict, + offsets_ptype, + offsets_buffer, + sizes_ptype, + sizes_buffer, + len, + validity_buffer, + null_count, + ctx, + ) + .await + } + Err(elements) => { + export_rebuilt_primitive_list_view( + elements, + offsets_ptype, + offsets_buffer, + sizes_ptype, + sizes_buffer, + len, + validity_buffer, + null_count, + ctx, + ) + .await + } + }, } } @@ -128,6 +148,7 @@ async fn export_device_list_view_offsets( }) } +/// Rebuild primitive list-view offsets and values for concrete offset and size types. async fn rebuild_primitive_list_view_typed( offsets: BufferHandle, sizes: BufferHandle, @@ -177,6 +198,7 @@ where Ok((output_offsets, values)) } +/// Allocate the device status word used by list-view rebuild kernels. async fn new_list_view_status(ctx: &mut CudaExecutionCtx) -> VortexResult { ctx.ensure_on_device(BufferHandle::new_host( Buffer::from(vec![0u32]).into_byte_buffer(), @@ -184,6 +206,7 @@ async fn new_list_view_status(ctx: &mut CudaExecutionCtx) -> VortexResult VortexResult<()> { match Buffer::::from_byte_buffer(status.try_to_host()?.await?)[0] { 0 => Ok(()), @@ -197,6 +220,7 @@ async fn check_list_view_rebuild_status(status: &BufferHandle) -> VortexResult<( } } +/// Initialize the exclusive-scan input for rebuilt Arrow List offsets. fn init_list_view_rebuild_scan( sizes: &BufferHandle, status: &BufferHandle, @@ -226,6 +250,7 @@ where Ok(scan_input) } +/// Validate rebuilt Arrow List offsets and flag invalid views on device. fn validate_list_view_rebuild_offsets( sizes: &BufferHandle, output_offsets: &BufferHandle, @@ -253,6 +278,7 @@ where }) } +/// Read the final rebuilt offset to determine the output child length. async fn total_values_from_offsets( output_offsets: &BufferHandle, list_len: usize, @@ -267,6 +293,7 @@ async fn total_values_from_offsets( usize::try_from(total_values).map_err(Into::into) } +/// Gather primitive child bytes from each list-view range into contiguous list order. #[expect(clippy::too_many_arguments)] fn gather_rebuilt_primitive_values( offsets: &BufferHandle, @@ -317,7 +344,55 @@ where Ok(output_values) } -#[expect(clippy::cognitive_complexity, clippy::too_many_arguments)] +/// Rebuild non-contiguous dictionary list-view codes while reusing the dictionary values. +#[expect(clippy::too_many_arguments)] +async fn export_rebuilt_dict_list_view( + dict: DictArray, + offsets_ptype: PType, + offsets_buffer: BufferHandle, + sizes_ptype: PType, + sizes_buffer: BufferHandle, + len: usize, + validity_buffer: Option, + null_count: i64, + ctx: &mut CudaExecutionCtx, +) -> VortexResult<(ArrowArray, SyncEvent)> { + let parts = dict.into_parts(); + let canonical_codes = parts.codes.execute_cuda(ctx).await?; + let Canonical::Primitive(codes) = canonical_codes else { + vortex_bail!( + "cannot export non-contiguous device-resident ListViewArray with dictionary codes of {}: GPU child rebuild only supports primitive dictionary codes", + canonical_codes.dtype() + ); + }; + + let (offsets_buffer, rebuilt_codes) = rebuild_primitive_list_view_child( + codes, + "dictionary codes", + offsets_ptype, + offsets_buffer, + sizes_ptype, + sizes_buffer, + len, + ctx, + ) + .await?; + let rebuilt_dict = DictArray::try_new(rebuilt_codes.into_array(), parts.values)?.into_array(); + + export_list_layout( + rebuilt_dict, + len, + validity_buffer, + null_count, + offsets_buffer, + ListChildExport::PreserveConcreteLayout, + ctx, + ) + .await +} + +/// Rebuild a non-contiguous primitive list-view child and export it as an Arrow List. +#[expect(clippy::too_many_arguments)] async fn export_rebuilt_primitive_list_view( elements: ArrayRef, offsets_ptype: PType, @@ -336,6 +411,43 @@ async fn export_rebuilt_primitive_list_view( canonical_elements.dtype() ); }; + + let (offsets_buffer, rebuilt_elements) = rebuild_primitive_list_view_child( + elements, + "primitive child", + offsets_ptype, + offsets_buffer, + sizes_ptype, + sizes_buffer, + len, + ctx, + ) + .await?; + + export_list_layout( + rebuilt_elements.into_array(), + len, + validity_buffer, + null_count, + offsets_buffer, + ListChildExport::PreserveConcreteLayout, + ctx, + ) + .await +} + +/// Gather a non-contiguous primitive child into list order and return new offsets and values. +#[expect(clippy::cognitive_complexity, clippy::too_many_arguments)] +async fn rebuild_primitive_list_view_child( + elements: PrimitiveArray, + child_name: &str, + offsets_ptype: PType, + offsets_buffer: BufferHandle, + sizes_ptype: PType, + sizes_buffer: BufferHandle, + len: usize, + ctx: &mut CudaExecutionCtx, +) -> VortexResult<(BufferHandle, PrimitiveArray)> { let elements_len = elements.len(); let PrimitiveDataParts { ptype, @@ -346,7 +458,7 @@ async fn export_rebuilt_primitive_list_view( vortex_ensure!( validity.no_nulls(), - "cannot export non-contiguous device-resident ListViewArray with nullable primitive child: GPU child validity rebuild is not implemented" + "cannot export non-contiguous device-resident ListViewArray with nullable {child_name}: GPU child validity rebuild is not implemented" ); let values_buffer = ctx.ensure_on_device(buffer).await?; @@ -365,20 +477,13 @@ async fn export_rebuilt_primitive_list_view( }) })?; - let rebuilt_elements = - PrimitiveArray::from_buffer_handle(values_buffer, ptype, validity).into_array(); - - export_list_layout( - rebuilt_elements, - len, - validity_buffer, - null_count, + Ok(( offsets_buffer, - ctx, - ) - .await + PrimitiveArray::from_buffer_handle(values_buffer, ptype, validity), + )) } +/// Execute an integer array on CUDA and return its primitive type and device buffer. async fn primitive_device_buffer( array: ArrayRef, name: &str, @@ -395,6 +500,7 @@ async fn primitive_device_buffer( Ok((ptype, ctx.ensure_on_device(buffer).await?)) } +/// Compute Arrow List offsets and report whether child values must be rebuilt. async fn export_device_list_view_offsets_typed( offsets: BufferHandle, sizes: BufferHandle, diff --git a/vortex-cuda/src/arrow/mod.rs b/vortex-cuda/src/arrow/mod.rs index 99fa14add3d..956b6a6d2ed 100644 --- a/vortex-cuda/src/arrow/mod.rs +++ b/vortex-cuda/src/arrow/mod.rs @@ -26,11 +26,22 @@ use cudarc::driver::CudaEvent; use cudarc::driver::CudaStream; use cudarc::runtime::sys::cudaEvent_t; use vortex::array::ArrayRef; +use vortex::array::arrays::Dict; +use vortex::array::arrays::FixedSizeList; +use vortex::array::arrays::List; +use vortex::array::arrays::ListView; +use vortex::array::arrays::Struct; +use vortex::array::arrays::dict::DictArraySlotsExt; +use vortex::array::arrays::fixed_size_list::FixedSizeListArrayExt; +use vortex::array::arrays::list::ListArrayExt; +use vortex::array::arrays::listview::ListViewArrayExt; +use vortex::array::arrays::struct_::StructArrayExt; use vortex::array::arrow::ArrowSessionExt; use vortex::array::buffer::BufferHandle; use vortex::dtype::DType; use vortex::dtype::DecimalDType; use vortex::dtype::DecimalType; +use vortex::dtype::PType; use vortex::dtype::StructFields; use vortex::error::VortexResult; use vortex::error::vortex_err; @@ -94,13 +105,25 @@ pub(crate) struct PrivateData { pub(crate) cuda_event: CudaEvent, pub(crate) cuda_event_ptr: cudaEvent_t, pub(crate) children: Box<[*mut ArrowArray]>, + pub(crate) dictionary: *mut ArrowArray, } impl PrivateData { + /// Create private data for arrays that own buffers and child arrays but no dictionary. pub(crate) fn new( buffers: Vec>, children: Vec, ctx: &mut CudaExecutionCtx, + ) -> VortexResult> { + Self::new_with_dictionary(buffers, children, None, ctx) + } + + /// Create private data and optionally own an Arrow dictionary child. + pub(crate) fn new_with_dictionary( + buffers: Vec>, + children: Vec, + dictionary: Option, + ctx: &mut CudaExecutionCtx, ) -> VortexResult> { let buffers = buffers.into_boxed_slice(); let buffer_ptrs: Box<[*const c_void]> = buffers @@ -130,16 +153,22 @@ impl PrivateData { .record_event(None) .map_err(|_| vortex_err!("failed to create cudaEvent_t"))?; + let dictionary = dictionary + .map(|array| Box::into_raw(Box::new(array))) + .unwrap_or(ptr::null_mut()); + Ok(Box::new(Self { buffers, buffer_ptrs, cuda_stream: Arc::clone(ctx.stream()), children, + dictionary, cuda_event_ptr: cuda_event.cu_event().cast(), cuda_event, })) } + /// Return a stable pointer to the recorded CUDA event handle. pub(crate) fn sync_event(&mut self) -> SyncEvent { (&raw mut self.cuda_event_ptr).cast() } @@ -216,29 +245,134 @@ fn arrow_schema_for_array( array: &ArrayRef, ctx: &mut CudaExecutionCtx, ) -> VortexResult { - let dtype = arrow_device_export_dtype(array.dtype()); - match &dtype { - DType::Struct(struct_dtype, _) => Ok(FFI_ArrowSchema::try_from(Schema::new( - cuda_arrow_struct_fields(struct_dtype, ctx)?, - ))?), - _ => Ok(FFI_ArrowSchema::try_from(cuda_arrow_field( - "", &dtype, ctx, - )?)?), + if let Some(struct_array) = array.as_opt::() { + return Ok(FFI_ArrowSchema::try_from(Schema::new( + arrow_device_export_struct_fields_for_array( + struct_array.names().iter(), + struct_array.iter_unmasked_fields(), + ctx, + )?, + ))?); } + + Ok(FFI_ArrowSchema::try_from( + arrow_device_export_field_for_array("", array, ctx)?, + )?) } -fn cuda_arrow_struct_fields( +/// Build struct fields from a logical dtype when no concrete child arrays are available. +fn arrow_device_export_struct_fields( struct_dtype: &StructFields, ctx: &mut CudaExecutionCtx, ) -> VortexResult> { let mut fields = Vec::with_capacity(struct_dtype.nfields()); for (field_name, field_dtype) in struct_dtype.names().iter().zip(struct_dtype.fields()) { - fields.push(cuda_arrow_field(field_name.as_ref(), &field_dtype, ctx)?); + fields.push(arrow_device_export_field( + field_name.as_ref(), + &field_dtype, + ctx, + )?); } Ok(fields) } -fn cuda_arrow_field( +/// Build struct fields from concrete arrays so nested encodings like dictionaries are preserved. +fn arrow_device_export_struct_fields_for_array<'a>( + names: impl IntoIterator, + fields: impl IntoIterator, + ctx: &mut CudaExecutionCtx, +) -> VortexResult> { + names + .into_iter() + .zip(fields) + .map(|(name, array)| arrow_device_export_field_for_array(name.as_ref(), array, ctx)) + .collect() +} + +/// Build the Arrow field matching how this concrete array will be exported. +fn arrow_device_export_field_for_array( + name: impl AsRef, + array: &ArrayRef, + ctx: &mut CudaExecutionCtx, +) -> VortexResult { + let name = name.as_ref(); + + if let Some(dict) = array.as_opt::() { + let codes_dtype = arrow_device_export_dictionary_codes_dtype(dict.codes().dtype())?; + let codes_type = arrow_device_export_field("", &codes_dtype, ctx)? + .data_type() + .clone(); + let values_type = arrow_device_export_field_for_array("", dict.values(), ctx)? + .data_type() + .clone(); + return Ok(Field::new( + name, + DataType::Dictionary(Box::new(codes_type), Box::new(values_type)), + array.dtype().is_nullable(), + )); + } + + if let Some(struct_array) = array.as_opt::() { + return Ok(Field::new( + name, + DataType::Struct( + arrow_device_export_struct_fields_for_array( + struct_array.names().iter(), + struct_array.iter_unmasked_fields(), + ctx, + )? + .into(), + ), + array.dtype().is_nullable(), + )); + } + + if let Some(list) = array.as_opt::() { + let element = arrow_device_export_field_for_array( + Field::LIST_FIELD_DEFAULT_NAME, + list.elements(), + ctx, + )?; + return Ok(Field::new_list(name, element, array.dtype().is_nullable())); + } + + if let Some(list) = array.as_opt::() { + let element = arrow_device_export_field_for_array( + Field::LIST_FIELD_DEFAULT_NAME, + list.elements(), + ctx, + )?; + return Ok(Field::new_list(name, element, array.dtype().is_nullable())); + } + + if let Some(list) = array.as_opt::() { + let element = arrow_device_export_field_for_array( + Field::LIST_FIELD_DEFAULT_NAME, + list.elements(), + ctx, + )?; + return Ok(Field::new_list(name, element, array.dtype().is_nullable())); + } + + arrow_device_export_field(name, &arrow_device_export_dtype(array.dtype()), ctx) +} + +/// Return the signed dictionary code dtype used by Arrow Device export. +fn arrow_device_export_dictionary_codes_dtype(codes_dtype: &DType) -> VortexResult { + // cuDF's Arrow Device importer only accepts signed dictionary indices. + let ptype = match codes_dtype.as_ptype() { + PType::U8 => PType::I16, + PType::U16 => PType::I32, + PType::U32 | PType::U64 => PType::I64, + ptype @ (PType::I8 | PType::I16 | PType::I32 | PType::I64) => ptype, + ptype => return Err(vortex_err!("dictionary codes must be integer, got {ptype}")), + }; + + Ok(DType::Primitive(ptype, codes_dtype.nullability())) +} + +/// Build the Arrow field for a dtype-only export schema fallback. +fn arrow_device_export_field( name: impl AsRef, dtype: &DType, ctx: &mut CudaExecutionCtx, @@ -250,9 +384,9 @@ fn cuda_arrow_field( .to_arrow_field(name.as_ref(), dtype)?; let data_type = match dtype { - DType::Decimal(decimal_dtype, _) => cuda_arrow_decimal_data_type(*decimal_dtype), + DType::Decimal(decimal_dtype, _) => arrow_device_export_decimal_data_type(*decimal_dtype), DType::Struct(struct_dtype, _) => { - DataType::Struct(cuda_arrow_struct_fields(struct_dtype, ctx)?.into()) + DataType::Struct(arrow_device_export_struct_fields(struct_dtype, ctx)?.into()) } _ => return Ok(field), }; @@ -263,7 +397,8 @@ fn cuda_arrow_field( ) } -fn cuda_arrow_decimal_data_type(decimal_dtype: DecimalDType) -> DataType { +/// Return the Arrow decimal type with the device-export physical width. +fn arrow_device_export_decimal_data_type(decimal_dtype: DecimalDType) -> DataType { match cuda_decimal_value_type(decimal_dtype) { DecimalType::I32 => DataType::Decimal32(decimal_dtype.precision(), decimal_dtype.scale()), DecimalType::I64 => DataType::Decimal64(decimal_dtype.precision(), decimal_dtype.scale()), @@ -273,6 +408,7 @@ fn cuda_arrow_decimal_data_type(decimal_dtype: DecimalDType) -> DataType { } } +/// Return the decimal storage width Arrow expects for a precision. pub(crate) fn cuda_decimal_value_type(decimal_dtype: DecimalDType) -> DecimalType { match decimal_dtype.precision() { 1..=9 => DecimalType::I32, @@ -283,6 +419,7 @@ pub(crate) fn cuda_decimal_value_type(decimal_dtype: DecimalDType) -> DecimalTyp } } +/// Adapt Vortex logical dtypes to the Arrow Device layout this exporter emits. fn arrow_device_export_dtype(dtype: &DType) -> DType { match dtype { DType::List(element, nullability) => { diff --git a/vortex-cuda/src/dynamic_dispatch/mod.rs b/vortex-cuda/src/dynamic_dispatch/mod.rs index 32e84795732..186f2297db1 100644 --- a/vortex-cuda/src/dynamic_dispatch/mod.rs +++ b/vortex-cuda/src/dynamic_dispatch/mod.rs @@ -405,6 +405,15 @@ impl ScalarOp { } } + /// Cast to the requested output type. + pub fn cast(output_ptype: PTypeTag) -> Self { + Self { + op_code: ScalarOp_ScalarOpCode_CAST, + output_ptype, + params: unsafe { std::mem::zeroed() }, + } + } + /// Dictionary gather: use current value as index into decoded values /// in shared memory (populated by an earlier input stage). pub fn dict(values_smem_byte_offset: u32, output_ptype: PTypeTag) -> Self { @@ -532,11 +541,14 @@ mod tests { use vortex::array::IntoArray; use vortex::array::arrays::DictArray; use vortex::array::arrays::PrimitiveArray; + use vortex::array::builtins::ArrayBuiltins; use vortex::array::scalar::Scalar; use vortex::array::validity::Validity; use vortex::array::validity::Validity::NonNullable; use vortex::buffer::Buffer; + use vortex::dtype::DType; use vortex::dtype::NativePType; + use vortex::dtype::PType; use vortex::encodings::alp::ALP; use vortex::encodings::alp::ALPArrayExt; use vortex::encodings::alp::ALPArraySlotsExt; @@ -591,6 +603,27 @@ mod tests { } } + #[crate::test] + fn test_cast_u64_to_i64_is_not_fused() -> VortexResult<()> { + let supported = PrimitiveArray::from_iter([0u32, 1]) + .into_array() + .cast(DType::Primitive(PType::I64, Nullability::NonNullable))?; + assert!(matches!( + DispatchPlan::new(&supported, CudaDispatchMode::DynDispatchOnly)?, + DispatchPlan::Fused(_) + )); + + let u64_to_i64 = PrimitiveArray::from_iter([0u64, i64::MAX as u64 + 1]) + .into_array() + .cast(DType::Primitive(PType::I64, Nullability::NonNullable))?; + assert!(matches!( + DispatchPlan::new(&u64_to_i64, CudaDispatchMode::DynDispatchOnly)?, + DispatchPlan::Unfused + )); + + Ok(()) + } + #[crate::test] fn test_max_scalar_ops() -> VortexResult<()> { let bit_width: u8 = 6; diff --git a/vortex-cuda/src/dynamic_dispatch/plan_builder.rs b/vortex-cuda/src/dynamic_dispatch/plan_builder.rs index d20af9f7d21..f98dbb74a28 100644 --- a/vortex-cuda/src/dynamic_dispatch/plan_builder.rs +++ b/vortex-cuda/src/dynamic_dispatch/plan_builder.rs @@ -14,8 +14,10 @@ use vortex::array::ArrayRef; use vortex::array::ArrayVTable; use vortex::array::arrays::Dict; use vortex::array::arrays::Primitive; +use vortex::array::arrays::ScalarFn; use vortex::array::arrays::Slice; use vortex::array::arrays::dict::DictArraySlotsExt; +use vortex::array::arrays::scalar_fn::ScalarFnArrayExt; use vortex::array::arrays::slice::SliceArrayExt; use vortex::array::buffer::BufferHandle; use vortex::array::patches::Patches; @@ -38,6 +40,8 @@ use vortex::encodings::zigzag::ZigZagArrayExt; use vortex::error::VortexResult; use vortex::error::vortex_bail; use vortex::error::vortex_err; +use vortex::scalar_fn::ScalarFnVTable; +use vortex::scalar_fn::fns::cast::Cast; use super::CudaDispatchPlan; use super::MaterializedStage; @@ -75,6 +79,9 @@ fn is_dyn_dispatch_compatible(array: &ArrayRef) -> bool { } let id = array.encoding_id(); + if id == Cast.id() { + return is_dyn_dispatch_cast_compatible(array); + } if id == ALP.id() { let arr = array.as_::(); return matches!(arr.dtype().as_ptype(), PType::F32 | PType::F64); @@ -126,6 +133,22 @@ fn is_dyn_dispatch_compatible(array: &ArrayRef) -> bool { || id == Sequence.id() } +fn is_dyn_dispatch_cast_compatible(array: &ArrayRef) -> bool { + let cast = array.as_::(); + + let Ok(source_ptype) = PType::try_from(cast.child_at(0).dtype()) else { + return false; + }; + let target_ptype = cast.scalar_fn().as_::().as_ptype(); + + // Implemented as unsigned dictionary-code casts to cuDF's signed index types. + // LOAD/BITUNPACK materialize directly into the target-width output type. + matches!( + (source_ptype, target_ptype), + (PType::U8, PType::I16) | (PType::U16, PType::I32) | (PType::U32, PType::I64) + ) +} + /// Returns `true` if a registered standalone kernel can decode the entire /// `array` tree in a single launch without recursing into `execute_cuda` /// for child encodings. @@ -500,6 +523,8 @@ impl FusedPlan { self.walk_slice(array, pending_subtrees) } else if id == Sequence.id() { self.walk_sequence(array) + } else if id == Cast.id() { + self.walk_cast(array, pending_subtrees) } else { vortex_bail!( "Encoding {:?} not supported by dynamic dispatch plan builder", @@ -793,6 +818,21 @@ impl FusedPlan { )) } + fn walk_cast( + &mut self, + array: ArrayRef, + pending_subtrees: &mut Vec, + ) -> VortexResult { + let cast = array.as_::(); + let target_ptype = ptype_to_tag(cast.scalar_fn().as_::().as_ptype()); + let mut pipeline = self.walk(cast.child_at(0).clone(), pending_subtrees)?; + // LOAD/BITUNPACK directly widen into the output type without an additional cast op. + pipeline + .scalar_ops + .push((ScalarOp::cast(target_ptype), None)); + Ok(pipeline) + } + fn walk_runend( &mut self, array: ArrayRef, @@ -836,16 +876,19 @@ impl FusedPlan { // into smem (reinterpret_cast), so we must allocate at least // output_elem_bytes per element — even if the stage's final ptype // is narrower. Otherwise the writes overflow into the next region. + let stage_bytes = Self::smem_stage_bytes(&spec, len, self.output_elem_bytes); + self.stages.push((spec, smem_byte_offset, len)); + self.smem_byte_cursor += stage_bytes; + smem_byte_offset + } + + fn smem_stage_bytes(spec: &Stage, len: u32, output_elem_bytes: u32) -> u32 { let final_ptype = spec .scalar_ops .last() .map(|(op, _)| op.output_ptype) .unwrap_or(spec.source_ptype); let final_elem_bytes = tag_to_ptype(final_ptype).byte_width() as u32; - let elem_bytes = final_elem_bytes.max(self.output_elem_bytes); - let stage_bytes = len * elem_bytes; - self.stages.push((spec, smem_byte_offset, len)); - self.smem_byte_cursor += stage_bytes; - smem_byte_offset + len * final_elem_bytes.max(output_elem_bytes) } } diff --git a/vortex-test/e2e-cuda/src/lib.rs b/vortex-test/e2e-cuda/src/lib.rs index 04b84efb03d..afd4d8e5e22 100644 --- a/vortex-test/e2e-cuda/src/lib.rs +++ b/vortex-test/e2e-cuda/src/lib.rs @@ -17,11 +17,13 @@ use arrow_array::Date32Array; use arrow_array::Decimal32Array; use arrow_array::Decimal64Array; use arrow_array::Decimal128Array; +use arrow_array::DictionaryArray; use arrow_array::StringArray; use arrow_array::cast::AsArray; use arrow_array::ffi::FFI_ArrowArray; use arrow_array::ffi::from_ffi; use arrow_array::make_array; +use arrow_array::types::Int16Type; use arrow_schema::DataType; use arrow_schema::Field; use arrow_schema::Fields; @@ -31,6 +33,7 @@ use vortex::array::ArrayRef as VortexArrayRef; use vortex::array::IntoArray; use vortex::array::VortexSessionExecute; use vortex::array::arrays::DecimalArray; +use vortex::array::arrays::DictArray as VortexDictArray; use vortex::array::arrays::FixedSizeListArray; use vortex::array::arrays::ListArray; use vortex::array::arrays::PrimitiveArray; @@ -122,6 +125,16 @@ fn fixed_size_list_as_list_array() -> VortexArrayRef { .into_array() } +/// Build a small dictionary column for cuDF Arrow Device import validation. +fn dictionary_array() -> VortexArrayRef { + VortexDictArray::try_new( + PrimitiveArray::from_option_iter([Some(0u8), Some(1), None, Some(2), Some(1)]).into_array(), + VarBinViewArray::from_iter_str(["apple", "banana", "cherry"]).into_array(), + ) + .expect("dictionary array") + .into_array() +} + /// # Safety /// `schema_ptr` and `array_ptr` must be valid writable pointers. #[unsafe(no_mangle)] @@ -183,6 +196,7 @@ fn export_array_inner(schema_ptr: &mut FFI_ArrowSchema, array_ptr: &mut ArrowDev "decimal128", "strings", "dates", + "dictionary", "lists", "fixed_lists", ]), @@ -193,6 +207,7 @@ fn export_array_inner(schema_ptr: &mut FFI_ArrowSchema, array_ptr: &mut ArrowDev decimal128.into_array(), strings.into_array(), dates.into_array(), + dictionary_array(), list_array(), fixed_size_list_array(), ], @@ -278,6 +293,17 @@ fn validate_array_inner(ffi_schema: &FFI_ArrowSchema, ffi_array: &mut FFI_ArrowA None, ]); let date = Date32Array::from(vec![Some(100i32), None, Some(300), Some(400), None]); + let dictionary = Arc::new( + vec![ + Some("apple"), + Some("banana"), + None, + Some("cherry"), + Some("banana"), + ] + .into_iter() + .collect::>(), + ); let list = SESSION .arrow() .execute_arrow(list_array(), None, &mut SESSION.create_execution_ctx()) @@ -298,6 +324,7 @@ fn validate_array_inner(ffi_schema: &FFI_ArrowSchema, ffi_array: &mut FFI_ArrowA Field::new("decimal128", decimal128.data_type().clone(), true), Field::new("strings", string.data_type().clone(), true), Field::new("dates", date.data_type().clone(), true), + Field::new("dictionary", dictionary.data_type().clone(), true), cudf_list_field("lists"), cudf_list_field("fixed_lists"), ]); @@ -308,13 +335,14 @@ fn validate_array_inner(ffi_schema: &FFI_ArrowSchema, ffi_array: &mut FFI_ArrowA return 1; } - let expected_arrays: [ArrowArrayRef; 6] = [ + let expected_arrays: [ArrowArrayRef; 7] = [ primitive, Arc::new(decimal32), Arc::new(decimal64), Arc::new(decimal128), Arc::new(string), Arc::new(date), + dictionary, ]; for (idx, (expected, actual)) in expected_arrays @@ -328,11 +356,11 @@ fn validate_array_inner(ffi_schema: &FFI_ArrowSchema, ffi_array: &mut FFI_ArrowA } } - if !list_values_eq(list.as_ref(), struct_array.column(6).as_ref()) { + if !list_values_eq(list.as_ref(), struct_array.column(7).as_ref()) { eprintln!("wrong values for lists column"); return 1; } - if !list_values_eq(fixed_size_list.as_ref(), struct_array.column(7).as_ref()) { + if !list_values_eq(fixed_size_list.as_ref(), struct_array.column(8).as_ref()) { eprintln!("wrong values for fixed_lists column"); return 1; }