Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions vortex-cuda/benches/dynamic_dispatch_cuda.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::time::Duration;
use criterion::BenchmarkId;
use criterion::Criterion;
use criterion::Throughput;
use cudarc::driver::CudaSlice;
use cudarc::driver::DevicePtr;
use cudarc::driver::LaunchConfig;
use cudarc::driver::PushKernelArg;
Expand Down Expand Up @@ -59,7 +60,7 @@ fn run_timed(
cuda_ctx: &mut CudaExecutionCtx,
array_len: usize,
output_buf: &CudaDeviceBuffer,
device_plan: &Arc<cudarc::driver::CudaSlice<CudaDispatchPlan>>,
device_plan: &Arc<CudaSlice<u8>>,
shared_mem_bytes: u32,
) -> VortexResult<Duration> {
let cuda_function = cuda_ctx.load_function("dynamic_dispatch", &[PType::U32])?;
Expand Down Expand Up @@ -115,8 +116,7 @@ struct BenchRunner {
_plan: CudaDispatchPlan,
smem_bytes: u32,
len: usize,
// Keep alive
device_plan: Arc<cudarc::driver::CudaSlice<CudaDispatchPlan>>,
device_plan: Arc<CudaSlice<u8>>,
output_buf: CudaDeviceBuffer,
_plan_buffers: Vec<vortex::array::buffer::BufferHandle>,
}
Expand All @@ -134,7 +134,7 @@ impl BenchRunner {
let device_plan = Arc::new(
cuda_ctx
.stream()
.clone_htod(std::slice::from_ref(&dispatch_plan))
.clone_htod(dispatch_plan.as_bytes())
.expect("htod plan"),
);

Expand Down
3 changes: 0 additions & 3 deletions vortex-cuda/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,6 @@ fn nvcc_compile_ptx(
}

/// Generate bindings for the dynamic dispatch shared header.
///
/// `DynamicDispatchPlan` and related types are shared between CUDA kernels
/// and Rust host code.
fn generate_dynamic_dispatch_bindings(kernels_src: &Path, out_dir: &Path) {
let header = kernels_src.join("dynamic_dispatch.h");
println!("cargo:rerun-if-changed={}", header.display());
Expand Down
52 changes: 21 additions & 31 deletions vortex-cuda/kernels/src/dynamic_dispatch.cu
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ __device__ void apply_scalar_ops(const T *__restrict smem_input,
/// final results to `write_dest` at `write_offset`. Input stages write
/// back to smem; the output stage writes to global memory.
template <typename T, StorePolicy S>
__device__ void execute_stage(const struct Stage &stage,
__device__ void execute_stage(const Stage &stage,
T *__restrict smem_base,
uint64_t chunk_start,
uint32_t chunk_len,
Expand Down Expand Up @@ -293,7 +293,7 @@ __device__ void execute_stage(const struct Stage &stage,
/// Each tile decodes exactly one FL block == SMEM_TILE_SIZE elements into
/// shared memory. In case BITUNPACK is sliced, we need to account for the
/// sub-byte element offset.
__device__ inline uint32_t output_tile_len(const struct Stage &stage, uint32_t block_len, uint32_t tile_off) {
__device__ inline uint32_t output_tile_len(const Stage &stage, uint32_t block_len, uint32_t tile_off) {
const uint32_t element_offset = (tile_off == 0 && stage.source.op_code == SourceOp::BITUNPACK)
? stage.source.params.bitunpack.element_offset
: 0;
Expand All @@ -302,42 +302,33 @@ __device__ inline uint32_t output_tile_len(const struct Stage &stage, uint32_t b

/// Entry point of the dynamic dispatch kernel.
///
/// Executes the plan's stages in order:
/// 1. Input stages populate shared memory with intermediate data
/// for the output stage to reference.
/// 2. The output stage decodes the root array and writes directly to
/// global memory.
/// 1. Input stages populate shared memory (e.g. dict values, run-end
/// endpoints) for the output stage to reference.
/// 2. The output stage decodes the root encoding and writes to global
/// memory.
///
/// @param output Global memory output buffer
/// @param array_len Total number of elements to produce
/// @param plan Device pointer to the dispatch plan
/// @param output Output buffer
/// @param array_len Total number of elements to produce
/// @param packed_plan Pointer to the packed plan byte buffer
template <typename T>
__device__ void dynamic_dispatch(T *__restrict output,
uint64_t array_len,
const struct DynamicDispatchPlan *__restrict plan) {
__device__ void
dynamic_dispatch(T *__restrict output, uint64_t array_len, const uint8_t *__restrict packed_plan) {

// Dynamically-sized shared memory: The host computes the exact byte count
// needed to hold all stage outputs that must coexist simultaneously, and
// passes the count at kernel launch (see DynamicDispatchPlan::shared_mem_bytes).
extern __shared__ char smem_bytes[];
T *smem_base = reinterpret_cast<T *>(smem_bytes);

__shared__ struct DynamicDispatchPlan smem_plan;
if (threadIdx.x == 0) {
smem_plan = *plan;
}
__syncthreads();

const uint8_t last = smem_plan.num_stages - 1;
const auto *header = reinterpret_cast<const struct PlanHeader *>(packed_plan);
const uint8_t *stage_cursor = packed_plan + sizeof(struct PlanHeader);
const uint8_t last = header->num_stages - 1;

// Input stages: Decode inputs into smem regions.
for (uint8_t i = 0; i < last; ++i) {
const struct Stage &stage = smem_plan.stages[i];
for (uint8_t idx = 0; idx < last; ++idx) {
Stage stage = parse_stage(stage_cursor);
T *smem_output = &smem_base[stage.smem_offset];
execute_stage<T, StorePolicy::WRITEBACK>(stage, smem_base, 0, stage.len, smem_output, 0);
}

const struct Stage &output_stage = smem_plan.stages[last];
Stage output_stage = parse_stage(stage_cursor);
const uint64_t block_start = static_cast<uint64_t>(blockIdx.x) * ELEMENTS_PER_BLOCK;
const uint64_t block_end = min(block_start + ELEMENTS_PER_BLOCK, array_len);
const uint32_t block_len = static_cast<uint32_t>(block_end - block_start);
Expand All @@ -356,11 +347,10 @@ __device__ void dynamic_dispatch(T *__restrict output,

/// Generates a dynamic dispatch kernel entry point for each unsigned integer type.
#define GENERATE_DYNAMIC_DISPATCH_KERNEL(suffix, Type) \
extern "C" __global__ void dynamic_dispatch_##suffix( \
Type *__restrict output, \
uint64_t array_len, \
const struct DynamicDispatchPlan *__restrict plan) { \
dynamic_dispatch<Type>(output, array_len, plan); \
extern "C" __global__ void dynamic_dispatch_##suffix(Type *__restrict output, \
uint64_t array_len, \
const uint8_t *__restrict packed_plan) { \
dynamic_dispatch<Type>(output, array_len, packed_plan); \
}

FOR_EACH_UNSIGNED_INT(GENERATE_DYNAMIC_DISPATCH_KERNEL)
98 changes: 56 additions & 42 deletions vortex-cuda/kernels/src/dynamic_dispatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,15 @@
/// The plan builder walks an encoding tree and emits a linear sequence of
/// stages. The kernel executes stages in order within a single launch.
///
/// Shared memory: The plan builder bump-allocates shared memory regions for
/// each input stage's output. The output stage (last) is placed after all
/// input stages. Since all regions must coexist for the output stage to
/// reference, the total shared memory is the end of whichever region extends
/// furthest, in elements, times `sizeof(T)`.
/// ## Stage plan
///
/// Example: RunEnd(ends=FoR(BitPacked), values=FoR(BitPacked)) with 100 runs
/// The plan is packed as a variable-length byte buffer.
///
/// Stage 0 (input): BITUNPACK(7) → FoR(0) → smem[0..100) // run ends
/// Stage 1 (input): BITUNPACK(10) → FoR(50) → smem[100..200) // run values
/// Stage 2 (output): RUNEND(ends=0, values=100) → smem[200..1224) // resolved
///
/// shared_mem_bytes = (200 + 1024) * sizeof(T)
/// Layout (contiguous bytes):
/// [PlanHeader]
/// [PackedStage 0][ScalarOp × N0]
/// [PackedStage 1][ScalarOp × N1]
/// ...

#pragma once

Expand All @@ -27,12 +23,7 @@
/// Elements processed per CUDA block.
#define ELEMENTS_PER_BLOCK 2048

/// Shared memory tile size for the output stage. Each block decompresses
/// ELEMENTS_PER_BLOCK elements but only holds SMEM_TILE_SIZE in smem at a
/// time — each tile is written to global memory before the next is decoded
/// into the same region. Input stages cannot tile because their outputs must
/// remain accessible for random access (e.g., dictionary lookup, run-end
/// binary search). Smaller tiles reduce smem per block, improving occupancy.
/// Each tile is flushed to global before the next is decoded.
#define SMEM_TILE_SIZE 1024

#ifdef __cplusplus
Expand All @@ -41,14 +32,13 @@ extern "C" {

/// Parameters for source ops, which decode data into a stage's shared memory region.
union SourceParams {
/// Unpack bit-packed data using FastLanes layout.
/// Unpack FastLanes bit-packed data.
struct BitunpackParams {
uint8_t bit_width;
uint32_t element_offset; // Sub-byte offset
} bitunpack;

/// Copy elements verbatim from global memory to shared memory.
/// The input pointer is pre-adjusted on the host to account for slicing.
/// Copy from global to shared memory.
struct LoadParams {
uint8_t _placeholder;
} load;
Expand All @@ -58,7 +48,7 @@ union SourceParams {
uint32_t ends_smem_offset; // element offset to decoded ends in smem
uint32_t values_smem_offset; // element offset to decoded values in smem
uint64_t num_runs;
uint64_t offset;
uint64_t offset; // slice offset into the run-end encoded array
} runend;

/// Generate a linear sequence: `value[i] = base + i * multiplier`.
Expand Down Expand Up @@ -96,38 +86,62 @@ struct ScalarOp {
union ScalarParams params;
};

#define MAX_SCALAR_OPS 4

/// A single stage in the dispatch plan.
///
/// Each stage is a pipeline (source + scalar ops) that writes decoded data
/// into a shared memory region at `smem_offset`. Input stage outputs persist
/// in smem so the output stage can reference them (via DICT or RUNEND offsets).
struct Stage {
/// Packed stage header, followed by `num_scalar_ops` inline ScalarOps.
struct PackedStage {
uint64_t input_ptr; // global memory pointer to this stage's encoded input
uint32_t smem_offset; // element offset within dynamic shared memory for output
uint32_t len; // number of elements this stage produces

struct SourceOp source;
uint8_t num_scalar_ops;
struct ScalarOp scalar_ops[MAX_SCALAR_OPS];
};

#define MAX_STAGES 4

/// Dispatch plan: a sequence of stages.
///
/// The plan builder walks the encoding tree recursively, emitting an input
/// stage each time it encounters a child array that needs to live in shared
/// memory (e.g., dictionary values, run-end endpoints). Shared memory
/// offsets are assigned with a simple bump allocator.
///
/// The last stage is the output pipeline which directly writes to global memory.
struct DynamicDispatchPlan {
/// Header for the packed plan byte buffer.
struct __attribute__((aligned(8))) PlanHeader {
uint8_t num_stages;
struct Stage stages[MAX_STAGES];
uint16_t plan_size_bytes; // total size of the packed plan including this header
};

#ifdef __cplusplus
}
#endif

#ifdef __cplusplus

/// Stage parsed from the packed plan byte buffer.
///
/// Input stages decode data (e.g. dict values, run-end endpoints) into a
/// shared memory region for the output stage to reference. The output stage
/// decodes the root encoding and writes to global memory.
struct Stage {
uint64_t input_ptr; // encoded input in global memory
uint32_t smem_offset; // output offset in shared memory (elements)
uint32_t len; // elements produced
struct SourceOp source; // source decode op
uint8_t num_scalar_ops; // number of scalar ops
const struct ScalarOp *scalar_ops; // scalar deoode ops
};

/// Parse a single stage from the packed plan byte buffer and advance the cursor.
///
/// @param cursor Pointer into the packed plan buffer, pointing at a PackedStage.
/// On return, advanced past this stage's ScalarOps.
/// @return A Stage referencing data within the packed plan buffer.
__device__ inline Stage parse_stage(const uint8_t *&cursor) {
const auto *packed_stage = reinterpret_cast<const struct PackedStage *>(cursor);
cursor += sizeof(struct PackedStage);

const auto *ops = reinterpret_cast<const struct ScalarOp *>(cursor);
cursor += packed_stage->num_scalar_ops * sizeof(struct ScalarOp);

return Stage {
.input_ptr = packed_stage->input_ptr,
.smem_offset = packed_stage->smem_offset,
.len = packed_stage->len,
.source = packed_stage->source,
.num_scalar_ops = packed_stage->num_scalar_ops,
.scalar_ops = ops,
};
}

#endif
Loading
Loading