Skip to content

Commit 592b879

Browse files
committed
refine code
1 parent 6d9430d commit 592b879

File tree

2 files changed

+8
-9
lines changed

2 files changed

+8
-9
lines changed

datafusion/core/tests/physical_optimizer/enforce_distribution.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3406,8 +3406,7 @@ fn preserve_ordering_for_streaming_sorted_aggregate() -> Result<()> {
34063406

34073407
let test_config = TestConfig::default().with_query_execution_partitions(2);
34083408

3409-
let plan_distrib =
3410-
test_config.to_plan(physical_plan.clone(), &DISTRIB_DISTRIB_SORT);
3409+
let plan_distrib = test_config.to_plan(physical_plan.clone(), &DISTRIB_DISTRIB_SORT);
34113410
assert_plan!(plan_distrib, @r"
34123411
AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[COUNT(b)], ordering_mode=Sorted
34133412
RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2, preserve_order=true, sort_exprs=a@0 ASC
@@ -3441,8 +3440,7 @@ fn preserve_ordering_for_streaming_partially_sorted_aggregate() -> Result<()> {
34413440

34423441
let test_config = TestConfig::default().with_query_execution_partitions(2);
34433442

3444-
let plan_distrib =
3445-
test_config.to_plan(physical_plan.clone(), &DISTRIB_DISTRIB_SORT);
3443+
let plan_distrib = test_config.to_plan(physical_plan.clone(), &DISTRIB_DISTRIB_SORT);
34463444
assert_plan!(plan_distrib, @r"
34473445
AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b], aggr=[COUNT(c)], ordering_mode=PartiallySorted([0])
34483446
RepartitionExec: partitioning=Hash([a@0, b@1], 2), input_partitions=2, preserve_order=true, sort_exprs=a@0 ASC

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1377,6 +1377,12 @@ pub fn ensure_distribution(
13771377
}
13781378
};
13791379

1380+
let streaming_benefit = if child.data {
1381+
preserving_order_enables_streaming(&plan, &child.plan)?
1382+
} else {
1383+
false
1384+
};
1385+
13801386
// There is an ordering requirement of the operator:
13811387
if let Some(required_input_ordering) = required_input_ordering {
13821388
// Either:
@@ -1388,11 +1394,6 @@ pub fn ensure_distribution(
13881394
.equivalence_properties()
13891395
.ordering_satisfy_requirement(sort_req.clone())?;
13901396

1391-
let streaming_benefit = if child.data {
1392-
preserving_order_enables_streaming(&plan, &child.plan)?
1393-
} else {
1394-
false
1395-
};
13961397
if (!ordering_satisfied || !order_preserving_variants_desirable)
13971398
&& !streaming_benefit
13981399
&& child.data

0 commit comments

Comments
 (0)