Skip to content

[Proposal] Partition parallel DataSink execution #19774

@ethan-tyler

Description

@ethan-tyler

Is your feature request related to a problem or challenge?

This revisits the per partition sink API originally proposed in #6339. That issue proposed write_stream(partition: usize, ...) but the simpler write_all(single_stream) was implemented instead.

Today, DataSinkExec requires single partition input, specifically, required_input_distribution() returns Distribution::SinglePartition, which causes the physical optimizer to insert a merge/coalesce. This serializes writes even when upstream produces multiple input partitions and the sink could handle them independently.

Describe the solution you'd like

Add an optional partition aware sink interface. Sinks that implement it receive multiple input partitions directly; sinks that don't continue working unchanged.

something like this:

#[async_trait]
pub trait PartitionedDataSink: DataSink {
    /// Write a single input partition stream. Called once per input partition.
    async fn write_partition(
        &self,
        partition: usize,
        data: SendableRecordBatchStream,
        context: &Arc<TaskContext>,
    ) -> Result<u64>;

    /// Called once after all partitions succeed. Commit here.
    async fn finish(
        &self,
        partition_results: Vec<u64>,
        context: &Arc<TaskContext>,
    ) -> Result<u64>;

    /// Called on error/cancel after writers stop. Rollback / cleanup here.
    async fn abort(
        &self,
        error: DataFusionError,
        context: &Arc<TaskContext>,
    ) -> Result<()>;
}

write_partition calls may run concurrently on the same sink instance; implementations must be thread safe (or internally shard state per partition).

Capability hook (provided method on DataSink, non-breaking):

pub trait DataSink: DisplayAs + Debug + Send + Sync {
    // ... existing methods ...

    /// Returns self as a partition aware sink, if supported.
    fn as_partitioned(&self) -> Option<&dyn PartitionedDataSink> { None }
}

Partition-aware sinks override to return Some(self).

Execution changes:

  • If sink.as_partitioned().is_some(): return UnspecifiedDistribution (no forced merge), drive write_partition for each input partition in parallel
  • If all succeed: call finish() once, return single row result
  • On error/cancel: stop other writers promptly, call abort(), return coherent error
  • If sink.as_partitioned().is_none(): current behavior (return SinglePartition, auto merge)

Ordering semantics:

Describe alternatives you've considered

  1. Do nothing - downstream projects manage parallelism outside DataFusion
  2. Require all sinks to handle multiple partitions - too disruptive
  3. Separate capability model disconnected from optimizer - prefer mapping to existing distribution/ordering enforcement

Additional context

Related issues:

Opening as a proposal to gather feedback. Happy to start smaller (e.g., just the trait + capability hook) if preferred.

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions