-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Is your feature request related to a problem or challenge?
This revisits the per partition sink API originally proposed in #6339. That issue proposed write_stream(partition: usize, ...) but the simpler write_all(single_stream) was implemented instead.
Today, DataSinkExec requires single partition input, specifically, required_input_distribution() returns Distribution::SinglePartition, which causes the physical optimizer to insert a merge/coalesce. This serializes writes even when upstream produces multiple input partitions and the sink could handle them independently.
Describe the solution you'd like
Add an optional partition aware sink interface. Sinks that implement it receive multiple input partitions directly; sinks that don't continue working unchanged.
something like this:
#[async_trait]
pub trait PartitionedDataSink: DataSink {
/// Write a single input partition stream. Called once per input partition.
async fn write_partition(
&self,
partition: usize,
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64>;
/// Called once after all partitions succeed. Commit here.
async fn finish(
&self,
partition_results: Vec<u64>,
context: &Arc<TaskContext>,
) -> Result<u64>;
/// Called on error/cancel after writers stop. Rollback / cleanup here.
async fn abort(
&self,
error: DataFusionError,
context: &Arc<TaskContext>,
) -> Result<()>;
}
write_partition calls may run concurrently on the same sink instance; implementations must be thread safe (or internally shard state per partition).
Capability hook (provided method on DataSink, non-breaking):
pub trait DataSink: DisplayAs + Debug + Send + Sync {
// ... existing methods ...
/// Returns self as a partition aware sink, if supported.
fn as_partitioned(&self) -> Option<&dyn PartitionedDataSink> { None }
}
Partition-aware sinks override to return Some(self).
Execution changes:
- If sink.as_partitioned().is_some(): return UnspecifiedDistribution (no forced merge), drive write_partition for each input partition in parallel
- If all succeed: call finish() once, return single row result
- On error/cancel: stop other writers promptly, call abort(), return coherent error
- If sink.as_partitioned().is_none(): current behavior (return SinglePartition, auto merge)
Ordering semantics:
- Sinks requiring global ordering should continue to require single partition input
- Per partition ordering is compatible with partition parallel execution
- Add regression coverage for ordering bugs (see CopyTo plan loses ordering requirement during physical plan optimization #16784)
Describe alternatives you've considered
- Do nothing - downstream projects manage parallelism outside DataFusion
- Require all sinks to handle multiple partitions - too disruptive
- Separate capability model disconnected from optimizer - prefer mapping to existing distribution/ordering enforcement
Additional context
Related issues:
- Simplified TableProvider::Insert API #6339
- [EPIC] Streaming partitioned writes #6569
- [DISCUSS] Single Source
ExecutionPlanAcross AllTableProviders#13838 - CopyTo plan loses ordering requirement during physical plan optimization #16784
Opening as a proposal to gather feedback. Happy to start smaller (e.g., just the trait + capability hook) if preferred.