Skip to content

Commit 7708bc3

Browse files
hareshkhshivbhatia10Shiv Bhatia
authored
[branch-52] Fix push_down_filter for children with non-empty fetch fields (#21057) (#21141)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Related to #21063 - Related to #21078 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> Currently if we see a filter with a limit underneath, we don't push the filter past the limit. However, sort nodes and table scan nodes can have fetch fields which do essentially the same thing, and we don't stop filters being pushed past them. This is a correctness bug that can lead to undefined behaviour. I added checks for exactly this condition so we don't push the filter down. I think the prior expectation was that there would be a limit node between any of these nodes, but this is also not true. In `push_down_limit.rs`, there's code that does this optimisation when a limit has a sort under it: ```rust LogicalPlan::Sort(mut sort) => { let new_fetch = { let sort_fetch = skip + fetch; Some(sort.fetch.map(|f| f.min(sort_fetch)).unwrap_or(sort_fetch)) }; if new_fetch == sort.fetch { if skip > 0 { original_limit(skip, fetch, LogicalPlan::Sort(sort)) } else { Ok(Transformed::yes(LogicalPlan::Sort(sort))) } } else { sort.fetch = new_fetch; limit.input = Arc::new(LogicalPlan::Sort(sort)); Ok(Transformed::yes(LogicalPlan::Limit(limit))) } } ``` The first time this runs, it sets the internal fetch of the sort to new_fetch, and on the second optimisation pass it hits the branch where we just get rid of the limit node altogether, leaving the sort node exposed to potential filters which can now push down into it. There is also a related fix in `gather_filters_for_pushdown` in `SortExec`, which does the same thing for physical plan nodes. If we see that a given execution plan has non-empty fetch, it should not allow any parent filters to be pushed down. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> Added checks in the optimisation rule to avoid pushing filters past children with built-in limits. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes: - Unit tests in `push_down_filter.rs` - Fixed an existing test in `window.slt` - Unit tests for the physical plan change in `sort.rs` - New slt test in `push_down_filter_sort_fetch.slt` for this exact behaviour ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> No Co-authored-by: Shiv Bhatia <shivbhatia10@gmail.com> Co-authored-by: Shiv Bhatia <sbhatia@palantir.com>
1 parent e5bad58 commit 7708bc3

File tree

6 files changed

+283
-19
lines changed

6 files changed

+283
-19
lines changed

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1388,6 +1388,82 @@ impl LogicalPlan {
13881388
}
13891389
}
13901390

1391+
/// Returns the skip (offset) of this plan node, if it has one.
1392+
///
1393+
/// Only [`LogicalPlan::Limit`] carries a skip value; all other variants
1394+
/// return `Ok(None)`. Returns `Ok(None)` for a zero skip.
1395+
pub fn skip(&self) -> Result<Option<usize>> {
1396+
match self {
1397+
LogicalPlan::Limit(limit) => match limit.get_skip_type()? {
1398+
SkipType::Literal(0) => Ok(None),
1399+
SkipType::Literal(n) => Ok(Some(n)),
1400+
SkipType::UnsupportedExpr => Ok(None),
1401+
},
1402+
LogicalPlan::Sort(_) => Ok(None),
1403+
LogicalPlan::TableScan(_) => Ok(None),
1404+
LogicalPlan::Projection(_) => Ok(None),
1405+
LogicalPlan::Filter(_) => Ok(None),
1406+
LogicalPlan::Window(_) => Ok(None),
1407+
LogicalPlan::Aggregate(_) => Ok(None),
1408+
LogicalPlan::Join(_) => Ok(None),
1409+
LogicalPlan::Repartition(_) => Ok(None),
1410+
LogicalPlan::Union(_) => Ok(None),
1411+
LogicalPlan::EmptyRelation(_) => Ok(None),
1412+
LogicalPlan::Subquery(_) => Ok(None),
1413+
LogicalPlan::SubqueryAlias(_) => Ok(None),
1414+
LogicalPlan::Statement(_) => Ok(None),
1415+
LogicalPlan::Values(_) => Ok(None),
1416+
LogicalPlan::Explain(_) => Ok(None),
1417+
LogicalPlan::Analyze(_) => Ok(None),
1418+
LogicalPlan::Extension(_) => Ok(None),
1419+
LogicalPlan::Distinct(_) => Ok(None),
1420+
LogicalPlan::Dml(_) => Ok(None),
1421+
LogicalPlan::Ddl(_) => Ok(None),
1422+
LogicalPlan::Copy(_) => Ok(None),
1423+
LogicalPlan::DescribeTable(_) => Ok(None),
1424+
LogicalPlan::Unnest(_) => Ok(None),
1425+
LogicalPlan::RecursiveQuery(_) => Ok(None),
1426+
}
1427+
}
1428+
1429+
/// Returns the fetch (limit) of this plan node, if it has one.
1430+
///
1431+
/// [`LogicalPlan::Sort`], [`LogicalPlan::TableScan`], and
1432+
/// [`LogicalPlan::Limit`] may carry a fetch value; all other variants
1433+
/// return `Ok(None)`.
1434+
pub fn fetch(&self) -> Result<Option<usize>> {
1435+
match self {
1436+
LogicalPlan::Sort(Sort { fetch, .. }) => Ok(*fetch),
1437+
LogicalPlan::TableScan(TableScan { fetch, .. }) => Ok(*fetch),
1438+
LogicalPlan::Limit(limit) => match limit.get_fetch_type()? {
1439+
FetchType::Literal(s) => Ok(s),
1440+
FetchType::UnsupportedExpr => Ok(None),
1441+
},
1442+
LogicalPlan::Projection(_) => Ok(None),
1443+
LogicalPlan::Filter(_) => Ok(None),
1444+
LogicalPlan::Window(_) => Ok(None),
1445+
LogicalPlan::Aggregate(_) => Ok(None),
1446+
LogicalPlan::Join(_) => Ok(None),
1447+
LogicalPlan::Repartition(_) => Ok(None),
1448+
LogicalPlan::Union(_) => Ok(None),
1449+
LogicalPlan::EmptyRelation(_) => Ok(None),
1450+
LogicalPlan::Subquery(_) => Ok(None),
1451+
LogicalPlan::SubqueryAlias(_) => Ok(None),
1452+
LogicalPlan::Statement(_) => Ok(None),
1453+
LogicalPlan::Values(_) => Ok(None),
1454+
LogicalPlan::Explain(_) => Ok(None),
1455+
LogicalPlan::Analyze(_) => Ok(None),
1456+
LogicalPlan::Extension(_) => Ok(None),
1457+
LogicalPlan::Distinct(_) => Ok(None),
1458+
LogicalPlan::Dml(_) => Ok(None),
1459+
LogicalPlan::Ddl(_) => Ok(None),
1460+
LogicalPlan::Copy(_) => Ok(None),
1461+
LogicalPlan::DescribeTable(_) => Ok(None),
1462+
LogicalPlan::Unnest(_) => Ok(None),
1463+
LogicalPlan::RecursiveQuery(_) => Ok(None),
1464+
}
1465+
}
1466+
13911467
/// If this node's expressions contains any references to an outer subquery
13921468
pub fn contains_outer_reference(&self) -> bool {
13931469
let mut contains = false;

datafusion/optimizer/src/push_down_filter.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -791,6 +791,13 @@ impl OptimizerRule for PushDownFilter {
791791
filter.predicate = new_predicate;
792792
}
793793

794+
// If the child has a fetch (limit) or skip (offset), pushing a filter
795+
// below it would change semantics: the limit/offset should apply before
796+
// the filter, not after.
797+
if filter.input.fetch()?.is_some() || filter.input.skip()?.is_some() {
798+
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
799+
}
800+
794801
match Arc::unwrap_or_clone(filter.input) {
795802
LogicalPlan::Filter(child_filter) => {
796803
let parents_predicates = split_conjunction_owned(filter.predicate);
@@ -4222,4 +4229,63 @@ mod tests {
42224229
"
42234230
)
42244231
}
4232+
4233+
#[test]
4234+
fn filter_not_pushed_down_through_table_scan_with_fetch() -> Result<()> {
4235+
let scan = test_table_scan()?;
4236+
let scan_with_fetch = match scan {
4237+
LogicalPlan::TableScan(scan) => LogicalPlan::TableScan(TableScan {
4238+
fetch: Some(10),
4239+
..scan
4240+
}),
4241+
_ => unreachable!(),
4242+
};
4243+
let plan = LogicalPlanBuilder::from(scan_with_fetch)
4244+
.filter(col("a").gt(lit(10i64)))?
4245+
.build()?;
4246+
// Filter must NOT be pushed into the table scan when it has a fetch (limit)
4247+
assert_optimized_plan_equal!(
4248+
plan,
4249+
@r"
4250+
Filter: test.a > Int64(10)
4251+
TableScan: test, fetch=10
4252+
"
4253+
)
4254+
}
4255+
4256+
#[test]
4257+
fn filter_push_down_through_sort_without_fetch() -> Result<()> {
4258+
let table_scan = test_table_scan()?;
4259+
let plan = LogicalPlanBuilder::from(table_scan)
4260+
.sort(vec![col("a").sort(true, true)])?
4261+
.filter(col("a").gt(lit(10i64)))?
4262+
.build()?;
4263+
// Filter should be pushed below the sort
4264+
assert_optimized_plan_equal!(
4265+
plan,
4266+
@r"
4267+
Sort: test.a ASC NULLS FIRST
4268+
TableScan: test, full_filters=[test.a > Int64(10)]
4269+
"
4270+
)
4271+
}
4272+
4273+
#[test]
4274+
fn filter_not_pushed_down_through_sort_with_fetch() -> Result<()> {
4275+
let table_scan = test_table_scan()?;
4276+
let plan = LogicalPlanBuilder::from(table_scan)
4277+
.sort_with_limit(vec![col("a").sort(true, true)], Some(5))?
4278+
.filter(col("a").gt(lit(10i64)))?
4279+
.build()?;
4280+
// Filter must NOT be pushed below the sort when it has a fetch (limit),
4281+
// because the limit should apply before the filter.
4282+
assert_optimized_plan_equal!(
4283+
plan,
4284+
@r"
4285+
Filter: test.a > Int64(10)
4286+
Sort: test.a ASC NULLS FIRST, fetch=5
4287+
TableScan: test
4288+
"
4289+
)
4290+
}
42254291
}

datafusion/physical-plan/src/filter_pushdown.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ use std::sync::Arc;
4040
use datafusion_common::Result;
4141
use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
4242
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
43-
use itertools::Itertools;
4443

4544
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
4645
pub enum FilterPushdownPhase {
@@ -359,6 +358,17 @@ impl ChildFilterDescription {
359358
})
360359
}
361360

361+
/// Mark all parent filters as unsupported for this child.
362+
pub fn all_unsupported(parent_filters: &[Arc<dyn PhysicalExpr>]) -> Self {
363+
Self {
364+
parent_filters: parent_filters
365+
.iter()
366+
.map(|f| PushedDownPredicate::unsupported(Arc::clone(f)))
367+
.collect(),
368+
self_filters: vec![],
369+
}
370+
}
371+
362372
/// Add a self filter (from the current node) to be pushed down to this child.
363373
pub fn with_self_filter(mut self, filter: Arc<dyn PhysicalExpr>) -> Self {
364374
self.self_filters.push(filter);
@@ -434,15 +444,9 @@ impl FilterDescription {
434444
children: &[&Arc<dyn crate::ExecutionPlan>],
435445
) -> Self {
436446
let mut desc = Self::new();
437-
let child_filters = parent_filters
438-
.iter()
439-
.map(|f| PushedDownPredicate::unsupported(Arc::clone(f)))
440-
.collect_vec();
441447
for _ in 0..children.len() {
442-
desc = desc.with_child(ChildFilterDescription {
443-
parent_filters: child_filters.clone(),
444-
self_filters: vec![],
445-
});
448+
desc =
449+
desc.with_child(ChildFilterDescription::all_unsupported(parent_filters));
446450
}
447451
desc
448452
}

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 81 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1404,12 +1404,23 @@ impl ExecutionPlan for SortExec {
14041404
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
14051405
config: &datafusion_common::config::ConfigOptions,
14061406
) -> Result<FilterDescription> {
1407-
if !matches!(phase, FilterPushdownPhase::Post) {
1407+
if phase != FilterPushdownPhase::Post {
1408+
if self.fetch.is_some() {
1409+
return Ok(FilterDescription::all_unsupported(
1410+
&parent_filters,
1411+
&self.children(),
1412+
));
1413+
}
14081414
return FilterDescription::from_children(parent_filters, &self.children());
14091415
}
14101416

1411-
let mut child =
1412-
ChildFilterDescription::from_child(&parent_filters, self.input())?;
1417+
// In Post phase: block parent filters when fetch is set,
1418+
// but still push the TopK dynamic filter (self-filter).
1419+
let mut child = if self.fetch.is_some() {
1420+
ChildFilterDescription::all_unsupported(&parent_filters)
1421+
} else {
1422+
ChildFilterDescription::from_child(&parent_filters, self.input())?
1423+
};
14131424

14141425
if let Some(filter) = &self.filter
14151426
&& config.optimizer.enable_topk_dynamic_filter_pushdown
@@ -1430,8 +1441,10 @@ mod tests {
14301441
use super::*;
14311442
use crate::coalesce_partitions::CoalescePartitionsExec;
14321443
use crate::collect;
1444+
use crate::empty::EmptyExec;
14331445
use crate::execution_plan::Boundedness;
14341446
use crate::expressions::col;
1447+
use crate::filter_pushdown::{FilterPushdownPhase, PushedDown};
14351448
use crate::test;
14361449
use crate::test::TestMemoryExec;
14371450
use crate::test::exec::{BlockingExec, assert_strong_count_converges_to_zero};
@@ -1441,6 +1454,7 @@ mod tests {
14411454
use arrow::compute::SortOptions;
14421455
use arrow::datatypes::*;
14431456
use datafusion_common::cast::as_primitive_array;
1457+
use datafusion_common::config::ConfigOptions;
14441458
use datafusion_common::test_util::batches_to_string;
14451459
use datafusion_common::{DataFusionError, Result, ScalarValue};
14461460
use datafusion_execution::RecordBatchStream;
@@ -2705,4 +2719,68 @@ mod tests {
27052719

27062720
Ok(())
27072721
}
2722+
2723+
fn make_sort_exec_with_fetch(fetch: Option<usize>) -> SortExec {
2724+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2725+
let input = Arc::new(EmptyExec::new(schema));
2726+
SortExec::new(
2727+
[PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into(),
2728+
input,
2729+
)
2730+
.with_fetch(fetch)
2731+
}
2732+
2733+
#[test]
2734+
fn test_sort_with_fetch_blocks_filter_pushdown() -> Result<()> {
2735+
let sort = make_sort_exec_with_fetch(Some(10));
2736+
let desc = sort.gather_filters_for_pushdown(
2737+
FilterPushdownPhase::Pre,
2738+
vec![Arc::new(Column::new("a", 0))],
2739+
&ConfigOptions::new(),
2740+
)?;
2741+
// Sort with fetch (TopK) must not allow filters to be pushed below it.
2742+
assert!(matches!(
2743+
desc.parent_filters()[0][0].discriminant,
2744+
PushedDown::No
2745+
));
2746+
Ok(())
2747+
}
2748+
2749+
#[test]
2750+
fn test_sort_without_fetch_allows_filter_pushdown() -> Result<()> {
2751+
let sort = make_sort_exec_with_fetch(None);
2752+
let desc = sort.gather_filters_for_pushdown(
2753+
FilterPushdownPhase::Pre,
2754+
vec![Arc::new(Column::new("a", 0))],
2755+
&ConfigOptions::new(),
2756+
)?;
2757+
// Plain sort (no fetch) is filter-commutative.
2758+
assert!(matches!(
2759+
desc.parent_filters()[0][0].discriminant,
2760+
PushedDown::Yes
2761+
));
2762+
Ok(())
2763+
}
2764+
2765+
#[test]
2766+
fn test_sort_with_fetch_allows_topk_self_filter_in_post_phase() -> Result<()> {
2767+
let sort = make_sort_exec_with_fetch(Some(10));
2768+
assert!(sort.filter.is_some(), "TopK filter should be created");
2769+
2770+
let mut config = ConfigOptions::new();
2771+
config.optimizer.enable_topk_dynamic_filter_pushdown = true;
2772+
let desc = sort.gather_filters_for_pushdown(
2773+
FilterPushdownPhase::Post,
2774+
vec![Arc::new(Column::new("a", 0))],
2775+
&config,
2776+
)?;
2777+
// Parent filters are still blocked in the Post phase.
2778+
assert!(matches!(
2779+
desc.parent_filters()[0][0].discriminant,
2780+
PushedDown::No
2781+
));
2782+
// But the TopK self-filter should be pushed down.
2783+
assert_eq!(desc.self_filters()[0].len(), 1);
2784+
Ok(())
2785+
}
27082786
}

datafusion/sqllogictest/test_files/limit.slt

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -869,6 +869,45 @@ limit 1000;
869869
statement ok
870870
DROP TABLE test_limit_with_partitions;
871871

872+
# Tests for filter pushdown behavior with Sort + LIMIT (fetch).
873+
874+
statement ok
875+
CREATE TABLE t(id INT, value INT) AS VALUES
876+
(1, 100),
877+
(2, 200),
878+
(3, 300),
879+
(4, 400),
880+
(5, 500);
881+
882+
# Take the 3 smallest values (100, 200, 300), then filter value > 200.
883+
query II
884+
SELECT * FROM (SELECT * FROM t ORDER BY value LIMIT 3) sub WHERE sub.value > 200;
885+
----
886+
3 300
887+
888+
# Take the 3 largest values (500, 400, 300), then filter value < 400.
889+
query II
890+
SELECT * FROM (SELECT * FROM t ORDER BY value DESC LIMIT 3) sub WHERE sub.value < 400;
891+
----
892+
3 300
893+
894+
# The filter stays above the sort+fetch in the plan.
895+
query TT
896+
EXPLAIN SELECT * FROM (SELECT * FROM t ORDER BY value LIMIT 3) sub WHERE sub.value > 200;
897+
----
898+
logical_plan
899+
01)SubqueryAlias: sub
900+
02)--Filter: t.value > Int32(200)
901+
03)----Sort: t.value ASC NULLS LAST, fetch=3
902+
04)------TableScan: t projection=[id, value]
903+
physical_plan
904+
01)FilterExec: value@1 > 200
905+
02)--SortExec: TopK(fetch=3), expr=[value@1 ASC NULLS LAST], preserve_partitioning=[false]
906+
03)----DataSourceExec: partitions=1, partition_sizes=[1]
907+
908+
statement ok
909+
DROP TABLE t;
910+
872911
# Tear down src_table table:
873912
statement ok
874913
DROP TABLE src_table;

datafusion/sqllogictest/test_files/window.slt

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3198,16 +3198,17 @@ EXPLAIN SELECT * FROM (SELECT *, ROW_NUMBER() OVER(ORDER BY a ASC) as rn1
31983198
----
31993199
logical_plan
32003200
01)Sort: rn1 ASC NULLS LAST
3201-
02)--Sort: rn1 ASC NULLS LAST, fetch=5
3202-
03)----Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
3203-
04)------Filter: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW < UInt64(50)
3201+
02)--Filter: rn1 < UInt64(50)
3202+
03)----Sort: rn1 ASC NULLS LAST, fetch=5
3203+
04)------Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
32043204
05)--------WindowAggr: windowExpr=[[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
32053205
06)----------TableScan: annotated_data_infinite2 projection=[a0, a, b, c, d]
32063206
physical_plan
3207-
01)ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1]
3208-
02)--FilterExec: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 < 50, fetch=5
3209-
03)----BoundedWindowAggExec: wdw=[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
3210-
04)------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]
3207+
01)FilterExec: rn1@5 < 50
3208+
02)--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1]
3209+
03)----GlobalLimitExec: skip=0, fetch=5
3210+
04)------BoundedWindowAggExec: wdw=[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
3211+
05)--------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]
32113212

32123213
# Top level sort is pushed down through BoundedWindowAggExec as its SUM result does already satisfy the required
32133214
# global order. The existing sort is for the second-term lexicographical ordering requirement, which is being

0 commit comments

Comments
 (0)