From f5c936189f855282f01782e51afe8d601dd527e0 Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Thu, 26 Mar 2026 16:28:46 +0000 Subject: [PATCH 1/2] feat: make dispatch plan dynamically sized Replace the fixed-size DynamicDispatchPlan struct with a variable-length packed byte buffer, removing the MAX_STAGES and MAX_SCALAR_OPS limits. Signed-off-by: Alexander Droste --- vortex-cuda/benches/dynamic_dispatch_cuda.rs | 7 +- vortex-cuda/build.rs | 3 - vortex-cuda/kernels/src/dynamic_dispatch.cu | 52 ++-- vortex-cuda/kernels/src/dynamic_dispatch.h | 98 +++--- vortex-cuda/src/dynamic_dispatch/mod.rs | 292 +++++++++++++----- .../src/dynamic_dispatch/plan_builder.rs | 48 ++- vortex-cuda/src/hybrid_dispatch/mod.rs | 4 +- 7 files changed, 316 insertions(+), 188 deletions(-) diff --git a/vortex-cuda/benches/dynamic_dispatch_cuda.rs b/vortex-cuda/benches/dynamic_dispatch_cuda.rs index bf6fada88ab..eb5c27c6a07 100644 --- a/vortex-cuda/benches/dynamic_dispatch_cuda.rs +++ b/vortex-cuda/benches/dynamic_dispatch_cuda.rs @@ -59,7 +59,7 @@ fn run_timed( cuda_ctx: &mut CudaExecutionCtx, array_len: usize, output_buf: &CudaDeviceBuffer, - device_plan: &Arc>, + device_plan: &Arc>, shared_mem_bytes: u32, ) -> VortexResult { let cuda_function = cuda_ctx.load_function("dynamic_dispatch", &[PType::U32])?; @@ -115,8 +115,7 @@ struct BenchRunner { _plan: CudaDispatchPlan, smem_bytes: u32, len: usize, - // Keep alive - device_plan: Arc>, + device_plan: Arc>, output_buf: CudaDeviceBuffer, _plan_buffers: Vec, } @@ -134,7 +133,7 @@ impl BenchRunner { let device_plan = Arc::new( cuda_ctx .stream() - .clone_htod(std::slice::from_ref(&dispatch_plan)) + .clone_htod(dispatch_plan.as_bytes()) .expect("htod plan"), ); diff --git a/vortex-cuda/build.rs b/vortex-cuda/build.rs index 88c1af961ed..fd035398b14 100644 --- a/vortex-cuda/build.rs +++ b/vortex-cuda/build.rs @@ -183,9 +183,6 @@ fn nvcc_compile_ptx( } /// Generate bindings for the dynamic dispatch shared header. -/// -/// `DynamicDispatchPlan` and related types are shared between CUDA kernels -/// and Rust host code. fn generate_dynamic_dispatch_bindings(kernels_src: &Path, out_dir: &Path) { let header = kernels_src.join("dynamic_dispatch.h"); println!("cargo:rerun-if-changed={}", header.display()); diff --git a/vortex-cuda/kernels/src/dynamic_dispatch.cu b/vortex-cuda/kernels/src/dynamic_dispatch.cu index b2a3b5aed44..5ba884d3de5 100644 --- a/vortex-cuda/kernels/src/dynamic_dispatch.cu +++ b/vortex-cuda/kernels/src/dynamic_dispatch.cu @@ -262,7 +262,7 @@ __device__ void apply_scalar_ops(const T *__restrict smem_input, /// final results to `write_dest` at `write_offset`. Input stages write /// back to smem; the output stage writes to global memory. template -__device__ void execute_stage(const struct Stage &stage, +__device__ void execute_stage(const Stage &stage, T *__restrict smem_base, uint64_t chunk_start, uint32_t chunk_len, @@ -293,7 +293,7 @@ __device__ void execute_stage(const struct Stage &stage, /// Each tile decodes exactly one FL block == SMEM_TILE_SIZE elements into /// shared memory. In case BITUNPACK is sliced, we need to account for the /// sub-byte element offset. -__device__ inline uint32_t output_tile_len(const struct Stage &stage, uint32_t block_len, uint32_t tile_off) { +__device__ inline uint32_t output_tile_len(const Stage &stage, uint32_t block_len, uint32_t tile_off) { const uint32_t element_offset = (tile_off == 0 && stage.source.op_code == SourceOp::BITUNPACK) ? stage.source.params.bitunpack.element_offset : 0; @@ -302,42 +302,33 @@ __device__ inline uint32_t output_tile_len(const struct Stage &stage, uint32_t b /// Entry point of the dynamic dispatch kernel. /// -/// Executes the plan's stages in order: -/// 1. Input stages populate shared memory with intermediate data -/// for the output stage to reference. -/// 2. The output stage decodes the root array and writes directly to -/// global memory. +/// 1. Input stages populate shared memory (e.g. dict values, run-end +/// endpoints) for the output stage to reference. +/// 2. The output stage decodes the root encoding and writes to global +/// memory. /// -/// @param output Global memory output buffer -/// @param array_len Total number of elements to produce -/// @param plan Device pointer to the dispatch plan +/// @param output Output buffer +/// @param array_len Total number of elements to produce +/// @param packed_plan Pointer to the packed plan byte buffer template -__device__ void dynamic_dispatch(T *__restrict output, - uint64_t array_len, - const struct DynamicDispatchPlan *__restrict plan) { +__device__ void +dynamic_dispatch(T *__restrict output, uint64_t array_len, const uint8_t *__restrict packed_plan) { - // Dynamically-sized shared memory: The host computes the exact byte count - // needed to hold all stage outputs that must coexist simultaneously, and - // passes the count at kernel launch (see DynamicDispatchPlan::shared_mem_bytes). extern __shared__ char smem_bytes[]; T *smem_base = reinterpret_cast(smem_bytes); - __shared__ struct DynamicDispatchPlan smem_plan; - if (threadIdx.x == 0) { - smem_plan = *plan; - } - __syncthreads(); - - const uint8_t last = smem_plan.num_stages - 1; + const auto *header = reinterpret_cast(packed_plan); + const uint8_t *stage_cursor = packed_plan + sizeof(struct PlanHeader); + const uint8_t last = header->num_stages - 1; // Input stages: Decode inputs into smem regions. - for (uint8_t i = 0; i < last; ++i) { - const struct Stage &stage = smem_plan.stages[i]; + for (uint8_t idx = 0; idx < last; ++idx) { + Stage stage = parse_stage(stage_cursor); T *smem_output = &smem_base[stage.smem_offset]; execute_stage(stage, smem_base, 0, stage.len, smem_output, 0); } - const struct Stage &output_stage = smem_plan.stages[last]; + Stage output_stage = parse_stage(stage_cursor); const uint64_t block_start = static_cast(blockIdx.x) * ELEMENTS_PER_BLOCK; const uint64_t block_end = min(block_start + ELEMENTS_PER_BLOCK, array_len); const uint32_t block_len = static_cast(block_end - block_start); @@ -356,11 +347,10 @@ __device__ void dynamic_dispatch(T *__restrict output, /// Generates a dynamic dispatch kernel entry point for each unsigned integer type. #define GENERATE_DYNAMIC_DISPATCH_KERNEL(suffix, Type) \ - extern "C" __global__ void dynamic_dispatch_##suffix( \ - Type *__restrict output, \ - uint64_t array_len, \ - const struct DynamicDispatchPlan *__restrict plan) { \ - dynamic_dispatch(output, array_len, plan); \ + extern "C" __global__ void dynamic_dispatch_##suffix(Type *__restrict output, \ + uint64_t array_len, \ + const uint8_t *__restrict packed_plan) { \ + dynamic_dispatch(output, array_len, packed_plan); \ } FOR_EACH_UNSIGNED_INT(GENERATE_DYNAMIC_DISPATCH_KERNEL) diff --git a/vortex-cuda/kernels/src/dynamic_dispatch.h b/vortex-cuda/kernels/src/dynamic_dispatch.h index bbbbe853f49..03aee530b56 100644 --- a/vortex-cuda/kernels/src/dynamic_dispatch.h +++ b/vortex-cuda/kernels/src/dynamic_dispatch.h @@ -6,19 +6,15 @@ /// The plan builder walks an encoding tree and emits a linear sequence of /// stages. The kernel executes stages in order within a single launch. /// -/// Shared memory: The plan builder bump-allocates shared memory regions for -/// each input stage's output. The output stage (last) is placed after all -/// input stages. Since all regions must coexist for the output stage to -/// reference, the total shared memory is the end of whichever region extends -/// furthest, in elements, times `sizeof(T)`. +/// ## Stage plan /// -/// Example: RunEnd(ends=FoR(BitPacked), values=FoR(BitPacked)) with 100 runs +/// The plan is packed as a variable-length byte buffer. /// -/// Stage 0 (input): BITUNPACK(7) → FoR(0) → smem[0..100) // run ends -/// Stage 1 (input): BITUNPACK(10) → FoR(50) → smem[100..200) // run values -/// Stage 2 (output): RUNEND(ends=0, values=100) → smem[200..1224) // resolved -/// -/// shared_mem_bytes = (200 + 1024) * sizeof(T) +/// Layout (contiguous bytes): +/// [PlanHeader] +/// [PackedStage 0][ScalarOp × N0] +/// [PackedStage 1][ScalarOp × N1] +/// ... #pragma once @@ -27,12 +23,7 @@ /// Elements processed per CUDA block. #define ELEMENTS_PER_BLOCK 2048 -/// Shared memory tile size for the output stage. Each block decompresses -/// ELEMENTS_PER_BLOCK elements but only holds SMEM_TILE_SIZE in smem at a -/// time — each tile is written to global memory before the next is decoded -/// into the same region. Input stages cannot tile because their outputs must -/// remain accessible for random access (e.g., dictionary lookup, run-end -/// binary search). Smaller tiles reduce smem per block, improving occupancy. +/// Each tile is flushed to global before the next is decoded. #define SMEM_TILE_SIZE 1024 #ifdef __cplusplus @@ -41,14 +32,13 @@ extern "C" { /// Parameters for source ops, which decode data into a stage's shared memory region. union SourceParams { - /// Unpack bit-packed data using FastLanes layout. + /// Unpack FastLanes bit-packed data. struct BitunpackParams { uint8_t bit_width; uint32_t element_offset; // Sub-byte offset } bitunpack; - /// Copy elements verbatim from global memory to shared memory. - /// The input pointer is pre-adjusted on the host to account for slicing. + /// Copy from global to shared memory. struct LoadParams { uint8_t _placeholder; } load; @@ -58,7 +48,7 @@ union SourceParams { uint32_t ends_smem_offset; // element offset to decoded ends in smem uint32_t values_smem_offset; // element offset to decoded values in smem uint64_t num_runs; - uint64_t offset; + uint64_t offset; // slice offset into the run-end encoded array } runend; /// Generate a linear sequence: `value[i] = base + i * multiplier`. @@ -96,38 +86,62 @@ struct ScalarOp { union ScalarParams params; }; -#define MAX_SCALAR_OPS 4 - -/// A single stage in the dispatch plan. -/// -/// Each stage is a pipeline (source + scalar ops) that writes decoded data -/// into a shared memory region at `smem_offset`. Input stage outputs persist -/// in smem so the output stage can reference them (via DICT or RUNEND offsets). -struct Stage { +/// Packed stage header, followed by `num_scalar_ops` inline ScalarOps. +struct PackedStage { uint64_t input_ptr; // global memory pointer to this stage's encoded input uint32_t smem_offset; // element offset within dynamic shared memory for output uint32_t len; // number of elements this stage produces struct SourceOp source; uint8_t num_scalar_ops; - struct ScalarOp scalar_ops[MAX_SCALAR_OPS]; }; -#define MAX_STAGES 4 - -/// Dispatch plan: a sequence of stages. -/// -/// The plan builder walks the encoding tree recursively, emitting an input -/// stage each time it encounters a child array that needs to live in shared -/// memory (e.g., dictionary values, run-end endpoints). Shared memory -/// offsets are assigned with a simple bump allocator. -/// -/// The last stage is the output pipeline which directly writes to global memory. -struct DynamicDispatchPlan { +/// Header for the packed plan byte buffer. +struct __attribute__((aligned(8))) PlanHeader { uint8_t num_stages; - struct Stage stages[MAX_STAGES]; + uint16_t plan_size_bytes; // total size of the packed plan including this header }; #ifdef __cplusplus } #endif + +#ifdef __cplusplus + +/// Stage parsed from the packed plan byte buffer. +/// +/// Input stages decode data (e.g. dict values, run-end endpoints) into a +/// shared memory region for the output stage to reference. The output stage +/// decodes the root encoding and writes to global memory. +struct Stage { + uint64_t input_ptr; // encoded input in global memory + uint32_t smem_offset; // output offset in shared memory (elements) + uint32_t len; // elements produced + struct SourceOp source; // source decode op + uint8_t num_scalar_ops; // number of scalar ops + const struct ScalarOp *scalar_ops; // scalar deoode ops +}; + +/// Parse a single stage from the packed plan byte buffer and advance the cursor. +/// +/// @param cursor Pointer into the packed plan buffer, pointing at a PackedStage. +/// On return, advanced past this stage's ScalarOps. +/// @return A Stage referencing data within the packed plan buffer. +__device__ inline Stage parse_stage(const uint8_t *&cursor) { + const auto *packed_stage = reinterpret_cast(cursor); + cursor += sizeof(struct PackedStage); + + const auto *ops = reinterpret_cast(cursor); + cursor += packed_stage->num_scalar_ops * sizeof(struct ScalarOp); + + return Stage { + .input_ptr = packed_stage->input_ptr, + .smem_offset = packed_stage->smem_offset, + .len = packed_stage->len, + .source = packed_stage->source, + .num_scalar_ops = packed_stage->num_scalar_ops, + .scalar_ops = ops, + }; +} + +#endif diff --git a/vortex-cuda/src/dynamic_dispatch/mod.rs b/vortex-cuda/src/dynamic_dispatch/mod.rs index 3bdd3f25315..fdeff7ab199 100644 --- a/vortex-cuda/src/dynamic_dispatch/mod.rs +++ b/vortex-cuda/src/dynamic_dispatch/mod.rs @@ -20,6 +20,8 @@ #![allow(non_snake_case)] #![allow(clippy::cast_possible_truncation)] +use std::ptr::read_unaligned; +use std::slice::from_raw_parts; use std::sync::Arc; use cudarc::driver::DevicePtr; @@ -47,12 +49,191 @@ pub use plan_builder::find_unfusable_nodes; include!(concat!(env!("OUT_DIR"), "/dynamic_dispatch.rs")); -// SAFETY: C ABI structs with contiguous memory. -unsafe impl cudarc::driver::DeviceRepr for DynamicDispatchPlan {} -unsafe impl cudarc::driver::DeviceRepr for Stage {} +/// Reinterpret a `&T` as a byte slice for serialization into the packed plan. +/// +/// # Safety +/// +/// The caller must ensure `T` is a `#[repr(C)]` type with no padding that +/// contains uninitialised bytes. All the types we serialise (`PlanHeader`, +/// `PackedStage`, `ScalarOp`) satisfy this because they are bindgen-generated +/// `#[repr(C)]` structs whose padding bytes are always written before +/// serialisation. +fn as_bytes(val: &T) -> &[u8] { + unsafe { from_raw_parts(val as *const T as *const u8, size_of::()) } +} + +/// A stage used to build a [`CudaDispatchPlan`] on the host side. +/// +/// This is NOT a C ABI struct — it exists purely on the Rust side and is +/// serialized into the packed plan byte buffer by [`CudaDispatchPlan::new`]. +#[derive(Clone)] +pub struct Stage { + pub input_ptr: u64, + pub smem_offset: u32, + pub len: u32, + pub source: SourceOp, + pub scalar_ops: Vec, +} + +impl Stage { + pub fn new( + input_ptr: u64, + smem_offset: u32, + len: u32, + source: SourceOp, + scalar_ops: &[ScalarOp], + ) -> Self { + Self { + input_ptr, + smem_offset, + len, + source, + scalar_ops: scalar_ops.to_vec(), + } + } +} + +/// Read-only view of a parsed stage from a [`CudaDispatchPlan`]. +/// +/// Returned by [`CudaDispatchPlan::stage`] for test inspection. +#[derive(Clone)] +pub struct ParsedStage { + pub input_ptr: u64, + pub smem_offset: u32, + pub len: u32, + pub source: SourceOp, + pub num_scalar_ops: u8, + pub scalar_ops: Vec, +} + +/// A dispatch plan serialized as a packed byte buffer. +/// +/// Layout (matching the C-side `PlanHeader` + `PackedStage` format in +/// `dynamic_dispatch.h`): +/// +/// ```text +/// [PlanHeader] — sizeof(PlanHeader) bytes +/// [PackedStage 0][ScalarOp × N0] — variable +/// [PackedStage 1][ScalarOp × N1] — variable +/// ... +/// ``` +/// +/// This is uploaded to the device and cooperatively copied into shared memory +/// by the kernel. +#[derive(Clone)] +pub struct CudaDispatchPlan { + bytes: Vec, +} -/// Type alias for the bindgen-generated C ABI struct. -pub type CudaDispatchPlan = DynamicDispatchPlan; +impl CudaDispatchPlan { + /// Build a packed plan from a sequence of stages. + /// + /// The last stage is the output pipeline; earlier stages are input stages. + /// + /// # Panics + /// + /// Panics if `stages` is empty or the serialized plan exceeds 65535 bytes. + pub fn new(stages: &[Stage]) -> Self { + assert!(!stages.is_empty()); + + let header_size = size_of::(); + let stage_header_size = size_of::(); + let scalar_op_size = size_of::(); + + let mut total_size = header_size; + for stage in stages { + total_size += stage_header_size; + total_size += stage.scalar_ops.len() * scalar_op_size; + } + assert!( + total_size <= u16::MAX as usize, + "packed plan size {total_size} exceeds u16::MAX" + ); + + let mut bytes = Vec::with_capacity(total_size); + + // Write header. + let header = PlanHeader { + num_stages: stages.len() as u8, + plan_size_bytes: total_size as u16, + }; + bytes.extend_from_slice(as_bytes(&header)); + + // Write each stage header followed by its scalar ops. + for stage in stages { + let packed_stage = PackedStage { + input_ptr: stage.input_ptr, + smem_offset: stage.smem_offset, + len: stage.len, + source: stage.source, + num_scalar_ops: stage.scalar_ops.len() as u8, + }; + bytes.extend_from_slice(as_bytes(&packed_stage)); + for op in &stage.scalar_ops { + bytes.extend_from_slice(as_bytes(op)); + } + } + + assert_eq!(bytes.len(), total_size); + Self { bytes } + } + + /// The raw packed plan bytes, ready for upload to the device. + pub fn as_bytes(&self) -> &[u8] { + &self.bytes + } + + /// Number of stages in the plan. + pub fn num_stages(&self) -> u8 { + let header: PlanHeader = + unsafe { read_unaligned(self.bytes.as_ptr() as *const PlanHeader) }; + header.num_stages + } + + /// Parse and return a read-only view of the stage at `index`. + /// + /// # Panics + /// + /// Panics if `index >= num_stages()`. + pub fn stage(&self, index: usize) -> ParsedStage { + let header_size = size_of::(); + let stage_header_size = size_of::(); + let scalar_op_size = size_of::(); + + let mut offset = header_size; + + // Skip past stages before `index`. + for _ in 0..index { + assert!(offset + stage_header_size <= self.bytes.len()); + let ps: PackedStage = + unsafe { read_unaligned(self.bytes[offset..].as_ptr() as *const PackedStage) }; + offset += stage_header_size + ps.num_scalar_ops as usize * scalar_op_size; + } + + assert!(offset + stage_header_size <= self.bytes.len()); + let ps: PackedStage = + unsafe { read_unaligned(self.bytes[offset..].as_ptr() as *const PackedStage) }; + offset += stage_header_size; + + let mut scalar_ops = Vec::with_capacity(ps.num_scalar_ops as usize); + for _ in 0..ps.num_scalar_ops { + assert!(offset + scalar_op_size <= self.bytes.len()); + let op: ScalarOp = + unsafe { read_unaligned(self.bytes[offset..].as_ptr() as *const ScalarOp) }; + scalar_ops.push(op); + offset += scalar_op_size; + } + + ParsedStage { + input_ptr: ps.input_ptr, + smem_offset: ps.smem_offset, + len: ps.len, + source: ps.source, + num_scalar_ops: ps.num_scalar_ops, + scalar_ops, + } + } +} impl SourceOp { /// Unpack bit-packed data using FastLanes layout. @@ -81,13 +262,12 @@ impl SourceOp { } } - /// Decode run-end encoding. Offsets reference shared memory regions - /// populated by earlier input stages. + /// Decode run-end encoding. pub fn runend( - ends_smem_offset: u32, - values_smem_offset: u32, - num_runs: u64, - offset: u64, + ends_smem_offset: u32, // smem region holding run-end endpoints + values_smem_offset: u32, // smem region holding per-run values + num_runs: u64, // number of runs (length of ends/values) + offset: u64, // logical offset for sliced arrays ) -> Self { Self { op_code: SourceOp_SourceOpCode_RUNEND, @@ -156,57 +336,7 @@ impl ScalarOp { } } -impl Stage { - pub fn new( - input_ptr: u64, - smem_offset: u32, - len: u32, - source: SourceOp, - scalar_ops: &[ScalarOp], - ) -> Self { - assert!(scalar_ops.len() <= MAX_SCALAR_OPS as usize); - let mut ops: [ScalarOp; MAX_SCALAR_OPS as usize] = unsafe { std::mem::zeroed() }; - ops[..scalar_ops.len()].copy_from_slice(scalar_ops); - Self { - input_ptr, - smem_offset, - len, - source, - num_scalar_ops: scalar_ops.len() as u8, - scalar_ops: ops, - } - } -} - -impl CudaDispatchPlan { - /// Create a dispatch plan from a sequence of stages. - /// The last stage is the output pipeline; earlier stages are input stages. - /// - /// # Panics - /// - /// Panics if `stages` is empty or exceeds `MAX_STAGES`. - pub fn new(stages: impl AsRef<[Stage]>) -> Self { - let stages_slice = stages.as_ref(); - assert!(!stages_slice.is_empty()); - assert!(stages_slice.len() <= MAX_STAGES as usize); - let mut buf: [Stage; MAX_STAGES as usize] = unsafe { std::mem::zeroed() }; - buf[..stages_slice.len()].copy_from_slice(stages_slice); - Self { - num_stages: stages_slice.len() as u8, - stages: buf, - } - } -} - impl MaterializedPlan { - /// Allocate output, upload the plan to the device, and launch the - /// `dynamic_dispatch` kernel. - /// - /// The CUDA kernels are instantiated for unsigned types only. - /// Encoding transforms (FoR, ZigZag, ALP) are bit-identical - /// regardless of signedness. - /// - /// `CudaSlice::drop` enqueues `free` on the stream after kernel execution. pub fn execute( self, output_ptype: PType, @@ -241,9 +371,11 @@ impl MaterializedPlan { } let output_buf = CudaDeviceBuffer::new(ctx.device_alloc::(len.next_multiple_of(1024))?); + + // Upload the packed plan bytes to the device. let device_plan = Arc::new( ctx.stream() - .clone_htod(std::slice::from_ref(&self.dispatch_plan)) + .clone_htod(self.dispatch_plan.as_bytes()) .map_err(|e| vortex_err!("copy plan to device: {e}"))?, ); @@ -345,17 +477,16 @@ mod tests { .map(|&r| ScalarOp::frame_of_ref(r as u64)) .collect(); - let plan = CudaDispatchPlan::new([Stage::new( + let plan = CudaDispatchPlan::new(&[Stage::new( input_ptr, 0, len as u32, SourceOp::bitunpack(bit_width, 0), &scalar_ops, )]); - assert_eq!(plan.stages[0].num_scalar_ops, 4); + assert_eq!(plan.stage(0).num_scalar_ops, 4); - let smem_bytes = (SMEM_TILE_SIZE) * size_of::() as u32; - let actual = run_dynamic_dispatch_plan(&cuda_ctx, len, &plan, smem_bytes)?; + let actual = run_dynamic_dispatch_plan(&cuda_ctx, len, &plan, SMEM_TILE_SIZE * 4)?; assert_eq!(actual, expected); Ok(()) @@ -365,7 +496,7 @@ mod tests { fn test_plan_structure() { // Stage 0: input dict values (BP→FoR) into smem[0..256) // Stage 1: output codes (BP→FoR→DICT) into smem[256..2304), gather from smem[0] - let plan = CudaDispatchPlan::new([ + let plan = CudaDispatchPlan::new(&[ Stage::new( 0xAAAA, 0, @@ -382,20 +513,22 @@ mod tests { ), ]); - assert_eq!(plan.num_stages, 2); + assert_eq!(plan.num_stages(), 2); // Input stage - assert_eq!(plan.stages[0].smem_offset, 0); - assert_eq!(plan.stages[0].len, 256); - assert_eq!(plan.stages[0].input_ptr, 0xAAAA); + let s0 = plan.stage(0); + assert_eq!(s0.smem_offset, 0); + assert_eq!(s0.len, 256); + assert_eq!(s0.input_ptr, 0xAAAA); // Output stage - assert_eq!(plan.stages[1].smem_offset, 256); - assert_eq!(plan.stages[1].len, SMEM_TILE_SIZE); - assert_eq!(plan.stages[1].input_ptr, 0xBBBB); - assert_eq!(plan.stages[1].num_scalar_ops, 2); + let s1 = plan.stage(1); + assert_eq!(s1.smem_offset, 256); + assert_eq!(s1.len, SMEM_TILE_SIZE); + assert_eq!(s1.input_ptr, 0xBBBB); + assert_eq!(s1.num_scalar_ops, 2); assert_eq!( - unsafe { plan.stages[1].scalar_ops[1].params.dict.values_smem_offset }, + unsafe { s1.scalar_ops[1].params.dict.values_smem_offset }, 0 ); } @@ -433,7 +566,7 @@ mod tests { let cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty())?; let (input_ptr, _di) = copy_raw_to_device(&cuda_ctx, &data)?; - let plan = CudaDispatchPlan::new([Stage::new( + let plan = CudaDispatchPlan::new(&[Stage::new( input_ptr, 0, len as u32, @@ -445,8 +578,7 @@ mod tests { ], )]); - let smem_bytes = (100 + SMEM_TILE_SIZE) * size_of::() as u32; - let actual = run_dynamic_dispatch_plan(&cuda_ctx, len, &plan, smem_bytes)?; + let actual = run_dynamic_dispatch_plan(&cuda_ctx, len, &plan, SMEM_TILE_SIZE * 4)?; assert_eq!(actual, expected); Ok(()) @@ -469,7 +601,7 @@ mod tests { let device_plan = Arc::new( cuda_ctx .stream() - .clone_htod(std::slice::from_ref(plan)) + .clone_htod(plan.as_bytes()) .expect("copy plan to device"), ); let (plan_ptr, record_plan) = device_plan.device_ptr(cuda_ctx.stream()); diff --git a/vortex-cuda/src/dynamic_dispatch/plan_builder.rs b/vortex-cuda/src/dynamic_dispatch/plan_builder.rs index c31befb902b..d9fc37c4c2c 100644 --- a/vortex-cuda/src/dynamic_dispatch/plan_builder.rs +++ b/vortex-cuda/src/dynamic_dispatch/plan_builder.rs @@ -54,8 +54,6 @@ use vortex::error::vortex_err; use vortex::session::VortexSession; use super::CudaDispatchPlan; -use super::MAX_SCALAR_OPS; -use super::MAX_STAGES; use super::SMEM_TILE_SIZE; use super::ScalarOp; use super::SourceOp; @@ -65,7 +63,7 @@ use crate::CudaExecutionCtx; /// A plan whose source buffers have been copied to the device, ready for kernel launch. pub struct MaterializedPlan { - /// The C ABI plan struct, ready to upload to the device. + /// Packed plan byte buffer, to upload to the device. pub dispatch_plan: CudaDispatchPlan, /// Device buffer handles that must be kept alive while the plan is in use. pub device_buffers: Vec, @@ -257,22 +255,18 @@ impl UnmaterializedPlan { let output = plan.walk(array.clone())?; plan.stages.push((output, plan.smem_cursor, len)); - assert!(plan.stages.len() <= MAX_STAGES as usize); - assert!( - plan.stages - .iter() - .all(|(s, ..)| (s.scalar_ops.len() as u32) <= MAX_SCALAR_OPS) - ); - Ok(plan) } /// Shared memory bytes needed to launch this plan. /// - /// Shared memory holds the *output* of each stage so later stages can - /// reference it (e.g., dictionary values, run-end endpoints). The total - /// is the sum of all input stage lengths plus the output tile size, - /// multiplied by the element byte width. + /// Stages that require shared memory, offset the cursor during plan + /// construction `SMEM_TILE_SIZE` is used for the final output stage. + /// + /// Currently, the dynamic dispatch only operates on a uniform type + /// such that ` * self.elem_bytes` is sufficient. In the future more + /// fine grained shared memory tracking will be needed once mixed + /// types are supported. pub fn shared_mem_bytes(&self) -> u32 { (self.smem_cursor + SMEM_TILE_SIZE) * self.elem_bytes } @@ -302,20 +296,22 @@ impl UnmaterializedPlan { } }; - let mut stages = Vec::with_capacity(self.stages.len()); - - for (stage, smem_offset, len) in &self.stages { - stages.push(Stage::new( - resolve_ptr(stage), - *smem_offset, - *len, - stage.source, - &stage.scalar_ops, - )); - } + let stages: Vec = self + .stages + .iter() + .map(|(stage, smem_offset, len)| { + Stage::new( + resolve_ptr(stage), + *smem_offset, + *len, + stage.source, + &stage.scalar_ops, + ) + }) + .collect(); Ok(MaterializedPlan { - dispatch_plan: CudaDispatchPlan::new(stages), + dispatch_plan: CudaDispatchPlan::new(&stages), device_buffers, shared_mem_bytes, }) diff --git a/vortex-cuda/src/hybrid_dispatch/mod.rs b/vortex-cuda/src/hybrid_dispatch/mod.rs index 9f841649469..7e12ac46478 100644 --- a/vortex-cuda/src/hybrid_dispatch/mod.rs +++ b/vortex-cuda/src/hybrid_dispatch/mod.rs @@ -84,7 +84,7 @@ pub async fn try_gpu_dispatch( if subtrees.is_empty() { // Whole tree is dyn-dispatch-compatible. if let Ok(plan) = UnmaterializedPlan::new(array).and_then(|p| p.materialize(ctx)) { - debug!(encoding = %array.encoding_id(), num_stages = plan.dispatch_plan.num_stages, "fully-fused dyn dispatch"); + debug!(encoding = %array.encoding_id(), num_stages = plan.dispatch_plan.num_stages(), "fully-fused dyn dispatch"); return plan.execute(output_ptype, array.len(), ctx); } } else if let Some(result) = @@ -138,7 +138,7 @@ async fn try_partial_fuse( let n = subtree_inputs.len(); plan.device_buffers .extend(subtree_inputs.into_iter().map(|(_, h)| h)); - debug!(encoding = %array.encoding_id(), num_stages = plan.dispatch_plan.num_stages, num_subtrees = n, "partially-fused dyn dispatch"); + debug!(encoding = %array.encoding_id(), num_stages = plan.dispatch_plan.num_stages(), num_subtrees = n, "partially-fused dyn dispatch"); plan.execute(output_ptype, array.len(), ctx).map(Some) } From b340968728613f7de54be78c7444273f1ed38cdb Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Fri, 27 Mar 2026 14:13:18 +0000 Subject: [PATCH 2/2] address comments Signed-off-by: Alexander Droste --- vortex-cuda/benches/dynamic_dispatch_cuda.rs | 5 +- vortex-cuda/src/dynamic_dispatch/mod.rs | 77 +++++++++++-------- .../src/dynamic_dispatch/plan_builder.rs | 2 +- 3 files changed, 50 insertions(+), 34 deletions(-) diff --git a/vortex-cuda/benches/dynamic_dispatch_cuda.rs b/vortex-cuda/benches/dynamic_dispatch_cuda.rs index eb5c27c6a07..648f20e089a 100644 --- a/vortex-cuda/benches/dynamic_dispatch_cuda.rs +++ b/vortex-cuda/benches/dynamic_dispatch_cuda.rs @@ -12,6 +12,7 @@ use std::time::Duration; use criterion::BenchmarkId; use criterion::Criterion; use criterion::Throughput; +use cudarc::driver::CudaSlice; use cudarc::driver::DevicePtr; use cudarc::driver::LaunchConfig; use cudarc::driver::PushKernelArg; @@ -59,7 +60,7 @@ fn run_timed( cuda_ctx: &mut CudaExecutionCtx, array_len: usize, output_buf: &CudaDeviceBuffer, - device_plan: &Arc>, + device_plan: &Arc>, shared_mem_bytes: u32, ) -> VortexResult { let cuda_function = cuda_ctx.load_function("dynamic_dispatch", &[PType::U32])?; @@ -115,7 +116,7 @@ struct BenchRunner { _plan: CudaDispatchPlan, smem_bytes: u32, len: usize, - device_plan: Arc>, + device_plan: Arc>, output_buf: CudaDeviceBuffer, _plan_buffers: Vec, } diff --git a/vortex-cuda/src/dynamic_dispatch/mod.rs b/vortex-cuda/src/dynamic_dispatch/mod.rs index fdeff7ab199..b03335c47a5 100644 --- a/vortex-cuda/src/dynamic_dispatch/mod.rs +++ b/vortex-cuda/src/dynamic_dispatch/mod.rs @@ -20,7 +20,8 @@ #![allow(non_snake_case)] #![allow(clippy::cast_possible_truncation)] -use std::ptr::read_unaligned; +use std::borrow::Borrow; +use std::mem::size_of; use std::slice::from_raw_parts; use std::sync::Arc; @@ -33,6 +34,9 @@ use vortex::array::buffer::BufferHandle; use vortex::array::buffer::DeviceBufferExt; use vortex::array::match_each_unsigned_integer_ptype; use vortex::array::validity::Validity; +use vortex::buffer::Alignment; +use vortex::buffer::ByteBuffer; +use vortex::buffer::ByteBufferMut; use vortex::dtype::Nullability; use vortex::dtype::PType; use vortex::error::VortexResult; @@ -59,7 +63,7 @@ include!(concat!(env!("OUT_DIR"), "/dynamic_dispatch.rs")); /// `#[repr(C)]` structs whose padding bytes are always written before /// serialisation. fn as_bytes(val: &T) -> &[u8] { - unsafe { from_raw_parts(val as *const T as *const u8, size_of::()) } + unsafe { from_raw_parts(std::ptr::addr_of!(*val).cast(), size_of::()) } } /// A stage used to build a [`CudaDispatchPlan`] on the host side. @@ -122,7 +126,7 @@ pub struct ParsedStage { /// by the kernel. #[derive(Clone)] pub struct CudaDispatchPlan { - bytes: Vec, + buffer: ByteBuffer, } impl CudaDispatchPlan { @@ -133,15 +137,21 @@ impl CudaDispatchPlan { /// # Panics /// /// Panics if `stages` is empty or the serialized plan exceeds 65535 bytes. - pub fn new(stages: &[Stage]) -> Self { + pub fn new(stages: I) -> Self + where + I: IntoIterator, + I::Item: Borrow, + { + let stages: Vec = stages.into_iter().map(|s| s.borrow().clone()).collect(); assert!(!stages.is_empty()); let header_size = size_of::(); let stage_header_size = size_of::(); let scalar_op_size = size_of::(); + // Calculate total size and validate. let mut total_size = header_size; - for stage in stages { + for stage in &stages { total_size += stage_header_size; total_size += stage.scalar_ops.len() * scalar_op_size; } @@ -150,17 +160,17 @@ impl CudaDispatchPlan { "packed plan size {total_size} exceeds u16::MAX" ); - let mut bytes = Vec::with_capacity(total_size); + let mut buffer = ByteBufferMut::with_capacity_aligned(total_size, Alignment::of::()); // Write header. let header = PlanHeader { num_stages: stages.len() as u8, plan_size_bytes: total_size as u16, }; - bytes.extend_from_slice(as_bytes(&header)); + buffer.extend_from_slice(as_bytes(&header)); // Write each stage header followed by its scalar ops. - for stage in stages { + for stage in &stages { let packed_stage = PackedStage { input_ptr: stage.input_ptr, smem_offset: stage.smem_offset, @@ -168,25 +178,26 @@ impl CudaDispatchPlan { source: stage.source, num_scalar_ops: stage.scalar_ops.len() as u8, }; - bytes.extend_from_slice(as_bytes(&packed_stage)); + buffer.extend_from_slice(as_bytes(&packed_stage)); for op in &stage.scalar_ops { - bytes.extend_from_slice(as_bytes(op)); + buffer.extend_from_slice(as_bytes(op)); } } - assert_eq!(bytes.len(), total_size); - Self { bytes } + assert_eq!(buffer.len(), total_size); + Self { + buffer: buffer.freeze(), + } } /// The raw packed plan bytes, ready for upload to the device. pub fn as_bytes(&self) -> &[u8] { - &self.bytes + self.buffer.as_ref() } /// Number of stages in the plan. pub fn num_stages(&self) -> u8 { - let header: PlanHeader = - unsafe { read_unaligned(self.bytes.as_ptr() as *const PlanHeader) }; + let header: PlanHeader = unsafe { *self.buffer.as_ptr().cast() }; header.num_stages } @@ -204,22 +215,19 @@ impl CudaDispatchPlan { // Skip past stages before `index`. for _ in 0..index { - assert!(offset + stage_header_size <= self.bytes.len()); - let ps: PackedStage = - unsafe { read_unaligned(self.bytes[offset..].as_ptr() as *const PackedStage) }; + assert!(offset + stage_header_size <= self.buffer.len()); + let ps: PackedStage = unsafe { *self.buffer.as_ptr().add(offset).cast() }; offset += stage_header_size + ps.num_scalar_ops as usize * scalar_op_size; } - assert!(offset + stage_header_size <= self.bytes.len()); - let ps: PackedStage = - unsafe { read_unaligned(self.bytes[offset..].as_ptr() as *const PackedStage) }; + assert!(offset + stage_header_size <= self.buffer.len()); + let ps: PackedStage = unsafe { *self.buffer.as_ptr().add(offset).cast() }; offset += stage_header_size; let mut scalar_ops = Vec::with_capacity(ps.num_scalar_ops as usize); for _ in 0..ps.num_scalar_ops { - assert!(offset + scalar_op_size <= self.bytes.len()); - let op: ScalarOp = - unsafe { read_unaligned(self.bytes[offset..].as_ptr() as *const ScalarOp) }; + assert!(offset + scalar_op_size <= self.buffer.len()); + let op: ScalarOp = unsafe { *self.buffer.as_ptr().add(offset).cast() }; scalar_ops.push(op); offset += scalar_op_size; } @@ -263,11 +271,18 @@ impl SourceOp { } /// Decode run-end encoding. + /// + /// # Arguments + /// + /// * `ends_smem_offset` - smem region holding run-end endpoints + /// * `values_smem_offset` - smem region holding per-run values + /// * `num_runs` - number of runs (length of ends/values) + /// * `offset` - logical offset for sliced arrays pub fn runend( - ends_smem_offset: u32, // smem region holding run-end endpoints - values_smem_offset: u32, // smem region holding per-run values - num_runs: u64, // number of runs (length of ends/values) - offset: u64, // logical offset for sliced arrays + ends_smem_offset: u32, + values_smem_offset: u32, + num_runs: u64, + offset: u64, ) -> Self { Self { op_code: SourceOp_SourceOpCode_RUNEND, @@ -477,7 +492,7 @@ mod tests { .map(|&r| ScalarOp::frame_of_ref(r as u64)) .collect(); - let plan = CudaDispatchPlan::new(&[Stage::new( + let plan = CudaDispatchPlan::new([Stage::new( input_ptr, 0, len as u32, @@ -496,7 +511,7 @@ mod tests { fn test_plan_structure() { // Stage 0: input dict values (BP→FoR) into smem[0..256) // Stage 1: output codes (BP→FoR→DICT) into smem[256..2304), gather from smem[0] - let plan = CudaDispatchPlan::new(&[ + let plan = CudaDispatchPlan::new([ Stage::new( 0xAAAA, 0, @@ -566,7 +581,7 @@ mod tests { let cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty())?; let (input_ptr, _di) = copy_raw_to_device(&cuda_ctx, &data)?; - let plan = CudaDispatchPlan::new(&[Stage::new( + let plan = CudaDispatchPlan::new([Stage::new( input_ptr, 0, len as u32, diff --git a/vortex-cuda/src/dynamic_dispatch/plan_builder.rs b/vortex-cuda/src/dynamic_dispatch/plan_builder.rs index d9fc37c4c2c..daf80ddb595 100644 --- a/vortex-cuda/src/dynamic_dispatch/plan_builder.rs +++ b/vortex-cuda/src/dynamic_dispatch/plan_builder.rs @@ -311,7 +311,7 @@ impl UnmaterializedPlan { .collect(); Ok(MaterializedPlan { - dispatch_plan: CudaDispatchPlan::new(&stages), + dispatch_plan: CudaDispatchPlan::new(stages), device_buffers, shared_mem_bytes, })