We have a few instances of "give an task index and/or total number of tasks, determine what partitions to execute". This logic is copy pasted in a few places.
Also, implementors of TaskEstimator or even ExecutionPlan are expected to use task_idx and num_tasks.
The risk is that an implementor may execute the wrong partition if they get the math wrong. Even within the code base, we have this math copy-pasted a few times. Depending on the type of network boundary, the math is different too - determining what partitions to run with a Coalesce is different than for a Shuffle.
|
let off = self.properties.partitioning.partition_count() * task_context.task_index; |
|
off..(off + self.properties.partitioning.partition_count()), |
Is there something better we can do? Ex. pass an explicit range around instead of num_tasks and task_idx.
We have a few instances of "give an task index and/or total number of tasks, determine what partitions to execute". This logic is copy pasted in a few places.
Also, implementors of
TaskEstimatoror evenExecutionPlanare expected to usetask_idxandnum_tasks.The risk is that an implementor may execute the wrong partition if they get the math wrong. Even within the code base, we have this math copy-pasted a few times. Depending on the type of network boundary, the math is different too - determining what partitions to run with a Coalesce is different than for a Shuffle.
datafusion-distributed/src/execution_plans/network_shuffle.rs
Line 232 in 030ee2c
datafusion-distributed/src/execution_plans/network_shuffle.rs
Line 238 in 030ee2c
Is there something better we can do? Ex. pass an explicit range around instead of
num_tasksandtask_idx.