-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Fix(optimizer): Make EnsureCooperative optimizer idempotent under multiple runs
#19757
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Fix(optimizer): Make EnsureCooperative optimizer idempotent under multiple runs
#19757
Conversation
|
cc @milenkovicm |
xudong963
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, cc @pepijnve
| // 1. Node is a leaf or exchange point | ||
| // 2. Node is not already cooperative | ||
| // 3. Not under any CooperativeExec (depth == 0) | ||
| if (is_leaf || is_exchange) && !is_cooperative && coop_depth.get() == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There aren't any implementations in the library that you could use to test this, but I'm not sure this is 100% correct if someone ever implements a non-cooperative exchange operator (i.e. one that doesn't use a Tokio mpsc::channel). I'll see if I can come up with a test case for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's some very contrived test code (in details section below) that illustrates this. The code will output
aggr Lazy NonCooperative
filter Lazy NonCooperative
exch Eager Cooperative
filter Lazy NonCooperative
CooperativeExec
exch Eager NonCooperative
filter Lazy NonCooperative
scan Lazy NonCooperative
Notice that there's a coop missing around the final scan.
The code used to produce this (with the incorrect double coop). The double coop is not intentional, but the two layers of coop are.
aggr Lazy NonCooperative
filter Lazy NonCooperative
exch Eager Cooperative
filter Lazy NonCooperative
CooperativeExec
CooperativeExec
exch Eager NonCooperative
filter Lazy NonCooperative
CooperativeExec
scan Lazy NonCooperative
Details
#[tokio::test]
async fn test_exchange() {
let scan = Arc::new(DummyExec::new("scan".to_string(), None, SchedulingType::NonCooperative, EvaluationType::Lazy));
let filter = Arc::new(DummyExec::new("filter".to_string(), Some(scan), SchedulingType::NonCooperative, EvaluationType::Lazy));
let exchange = Arc::new(DummyExec::new("exch".to_string(), Some(filter), SchedulingType::NonCooperative, EvaluationType::Eager));
let coop = Arc::new(CooperativeExec::new(exchange));
let filter = Arc::new(DummyExec::new("filter".to_string(), Some(coop), SchedulingType::NonCooperative, EvaluationType::Lazy));
let exchange = Arc::new(DummyExec::new("exch".to_string(), Some(filter), SchedulingType::Cooperative, EvaluationType::Eager));
let filter = Arc::new(DummyExec::new("filter".to_string(), Some(exchange), SchedulingType::NonCooperative, EvaluationType::Lazy));
let aggregate = Arc::new(DummyExec::new("aggr".to_string(), Some(filter), SchedulingType::NonCooperative, EvaluationType::Lazy));
let config = ConfigOptions::new();
let optimized = EnsureCooperative::new()
.optimize(aggregate as Arc<dyn ExecutionPlan>, &config)
.unwrap();
let display = displayable(optimized.as_ref()).indent(true).to_string();
println!("{}", display);
}
#[derive(Debug)]
struct DummyExec {
name: String,
input: Option<Arc<dyn ExecutionPlan>>,
scheduling_type: SchedulingType,
evaluation_type: EvaluationType,
properties: PlanProperties,
}
impl DummyExec {
fn new(
name: String,
input: Option<Arc<dyn ExecutionPlan>>,
scheduling_type: SchedulingType,
evaluation_type: EvaluationType,
) -> Self {
DummyExec {
name,
input,
scheduling_type,
evaluation_type,
properties: PlanProperties::new(
EquivalenceProperties::new(Arc::new(Schema::empty())),
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Bounded
).with_scheduling_type(scheduling_type).with_evaluation_type(evaluation_type),
}
}
}
impl DisplayAs for DummyExec {
fn fmt_as(&self, _: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
write!(f, "{} {:?} {:?}", self.name, self.evaluation_type, self.scheduling_type)
}
}
impl ExecutionPlan for DummyExec {
fn name(&self) -> &str {
self.name.as_str()
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &PlanProperties {
&self.properties
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
match &self.input {
None => vec![],
Some(i) => vec![i],
}
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(DummyExec::new(
self.name.clone(),
match children.len() {
0 => None,
_ => Some(children[0].clone()),
},
self.scheduling_type,
self.evaluation_type,
)))
}
fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
todo!()
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch! Totally missed the case where an Eager node breaks the cooperative chain.
My plan is to maintain an ancestry stack that tracks both SchedulingType and EvaluationType. The new logic checks the stack bottom-up: a node is only considered 'protected' (and thus skips wrapping) if it encounters a Cooperative ancestor before any Eager pipeline breaker.
I have also added a test case to cover this scenario. Thanks for the insight!
|
@danielhumanmod thanks for fixing this. I had completely forgotten about the need for idempotence when I wrote this. |
|
Just for illustration, this is what I was getting if physical operator run multiple times (I would bet it run 3 times in this example 😀 ) my naive (wrong) tinking was that change to transform down would fix it. anyway, thanks @danielhumanmod |
Which issue does this PR close?
EnsureCooperativeis not idempotent #19756.Rationale for this change
The previous logic of
EnsureCooperativeoptimizer lacked context awareness regarding ancestor nodes, making it not idempotent across multiple runs.Specifically, we need to ensure that:
CooperativeExecwrappers.CooperativeExec, we should skip and not double-wrap its children.What changes are included in this PR?
To solve this, we cannot rely solely on
transform_up(which lacks parent context) ortransform_down(which makes safe mutation difficult). This PR adoptstransform_down_upwith a depth counter to strictly enforce that nodes are only wrapped when they are not currently under aCooperativeExecscope.Are these changes tested?
More unit tests coverage
Are there any user-facing changes?
No