From bcb93271e2214080ca8660b902dae35134687259 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Wed, 3 Jun 2026 15:08:04 +0100 Subject: [PATCH 1/5] Try pushdown sort for DF Signed-off-by: Adam Gutglick --- vortex-datafusion/src/persistent/source.rs | 66 +++++++++++++++++++++- 1 file changed, 65 insertions(+), 1 deletion(-) diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index f8ad847d5e7..28bddb5951f 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -13,13 +13,17 @@ use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_datasource::file_stream::FileOpener; use datafusion_execution::cache::cache_manager::FileMetadataCache; +use datafusion_physical_expr::EquivalenceProperties; +use datafusion_physical_expr::LexOrdering; use datafusion_physical_expr::PhysicalExprRef; +use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_expr::conjunction; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_plan::DisplayFormatType; use datafusion_physical_plan::PhysicalExpr; +use datafusion_physical_plan::SortOrderPushdownResult; use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation; use datafusion_physical_plan::filter_pushdown::PushedDown; use datafusion_physical_plan::filter_pushdown::PushedDownPredicate; @@ -189,6 +193,7 @@ pub struct VortexSource { natural_split_ranges: Arc]>>>, expression_convertor: Arc, pub(crate) vortex_reader_factory: Option>, + pub(crate) ordered: bool, vx_metrics_registry: Arc, file_metadata_cache: Option>, /// Whether to enable expression pushdown into the underlying Vortex scan. @@ -223,6 +228,7 @@ impl VortexSource { vortex_reader_factory: None, vx_metrics_registry: Arc::new(DefaultMetricsRegistry::default()), file_metadata_cache: None, + ordered: false, options: VortexTableOptions::default(), } } @@ -335,7 +341,7 @@ impl VortexSource { metrics_registry: Arc::clone(&self.vx_metrics_registry), layout_readers: Arc::clone(&self.layout_readers), natural_split_ranges: Arc::clone(&self.natural_split_ranges), - has_output_ordering: !base_config.output_ordering.is_empty(), + has_output_ordering: !base_config.output_ordering.is_empty() || self.ordered, expression_convertor: Arc::clone(&self.expression_convertor), file_metadata_cache: self.file_metadata_cache.clone(), projection_pushdown: self.options.projection_pushdown, @@ -378,6 +384,64 @@ impl FileSource for VortexSource { VORTEX_FILE_EXTENSION } + fn try_pushdown_sort( + &self, + order: &[PhysicalSortExpr], + eq_properties: &EquivalenceProperties, + ) -> DFResult>> { + if order.is_empty() { + return Ok(SortOrderPushdownResult::Unsupported); + } + + if eq_properties.ordering_satisfy(order.iter().cloned())? { + let mut this = self.clone(); + this.ordered = true; + + return Ok(SortOrderPushdownResult::Exact { + inner: Arc::new(this) as Arc, + }); + } + + for prefix_len in 1..order.len() { + let prefix = order[..prefix_len].to_vec(); + if eq_properties.ordering_satisfy(prefix.iter().cloned())? { + return Ok(SortOrderPushdownResult::Unsupported); + } + } + + let sort_order = LexOrdering::new(order.iter().cloned()); + let column_in_file_schema = sort_order.as_ref().is_some_and(|s| { + s.first() + .expr + .as_any() + .downcast_ref::() + .is_some_and(|col| { + self.table_schema + .file_schema() + .field_with_name(col.name()) + .is_ok() + }) + }); + + if !column_in_file_schema { + return Ok(SortOrderPushdownResult::Unsupported); + } + + let is_descending = sort_order + .as_ref() + .is_some_and(|s| s.first().options.descending); + + if !is_descending { + let mut this = self.clone(); + this.ordered = true; + return Ok(SortOrderPushdownResult::Inexact { + inner: Arc::new(this) as Arc, + }); + } + + Ok(SortOrderPushdownResult::Unsupported) + } + fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { From 8de5fe3eae72de33ad73a5b1458649a30ec66bf6 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Mon, 8 Jun 2026 13:38:55 +0100 Subject: [PATCH 2/5] Add tests Signed-off-by: Adam Gutglick --- vortex-datafusion/src/persistent/source.rs | 68 +++++++++++++++++++ .../slt/datafusion/order_pushdown.slt | 44 ++++++++++++ 2 files changed, 112 insertions(+) create mode 100644 vortex-sqllogictest/slt/datafusion/order_pushdown.slt diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index 28bddb5951f..0c5e3657ee8 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -557,6 +557,7 @@ mod tests { use arrow_schema::Schema; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_execution::object_store::ObjectStoreUrl; + use datafusion_physical_expr::expressions::Column; use object_store::memory::InMemory; use vortex::VortexSessionDefault; @@ -596,6 +597,73 @@ mod tests { } } + fn sort_column(name: &str, index: usize) -> PhysicalSortExpr { + let expr: PhysicalExprRef = Arc::new(Column::new(name, index)); + PhysicalSortExpr::new_default(expr) + } + + fn sort_test_schema() -> Arc { + Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])) + } + + fn sort_test_source(schema: Arc) -> VortexSource { + VortexSource::new( + TableSchema::from_file_schema(schema), + VortexSession::default(), + ) + } + + fn assert_ordered_source(inner: Arc) -> anyhow::Result<()> { + let source = inner + .as_any() + .downcast_ref::() + .ok_or_else(|| anyhow::anyhow!("expected VortexSource"))?; + + assert!(source.ordered); + Ok(()) + } + + #[test] + fn try_pushdown_sort_returns_exact_when_ordering_is_satisfied() -> anyhow::Result<()> { + let schema = sort_test_schema(); + let source = sort_test_source(Arc::clone(&schema)); + let order = vec![sort_column("a", 0), sort_column("b", 1)]; + let eq_properties = EquivalenceProperties::new_with_orderings(schema, [order.clone()]); + + let result = source.try_pushdown_sort(&order, &eq_properties)?; + + match result { + SortOrderPushdownResult::Exact { inner } => assert_ordered_source(inner)?, + SortOrderPushdownResult::Inexact { .. } | SortOrderPushdownResult::Unsupported => { + anyhow::bail!("expected exact sort pushdown") + } + } + assert!(!source.ordered); + Ok(()) + } + + #[test] + fn try_pushdown_sort_returns_inexact_for_ascending_file_column() -> anyhow::Result<()> { + let schema = sort_test_schema(); + let source = sort_test_source(Arc::clone(&schema)); + let order = vec![sort_column("a", 0)]; + let eq_properties = EquivalenceProperties::new(schema); + + let result = source.try_pushdown_sort(&order, &eq_properties)?; + + match result { + SortOrderPushdownResult::Inexact { inner } => assert_ordered_source(inner)?, + SortOrderPushdownResult::Exact { .. } | SortOrderPushdownResult::Unsupported => { + anyhow::bail!("expected inexact sort pushdown") + } + } + assert!(!source.ordered); + Ok(()) + } + #[test] fn create_vortex_opener_preserves_expression_convertor() -> anyhow::Result<()> { let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); diff --git a/vortex-sqllogictest/slt/datafusion/order_pushdown.slt b/vortex-sqllogictest/slt/datafusion/order_pushdown.slt new file mode 100644 index 00000000000..49ecba8acb5 --- /dev/null +++ b/vortex-sqllogictest/slt/datafusion/order_pushdown.slt @@ -0,0 +1,44 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright the Vortex contributors + +include ../setup.slt.no + +statement ok +CREATE EXTERNAL TABLE ordered_pushdown ( + c1 VARCHAR NOT NULL, + c2 INT NOT NULL +) +STORED AS vortex +WITH ORDER (c1 ASC) +LOCATION '$__TEST_DIR__/ordered_pushdown/'; + +statement ok +INSERT INTO ordered_pushdown VALUES + ('air', 5), + ('balloon', 42); + +statement ok +INSERT INTO ordered_pushdown VALUES + ('zebra', 5); + +statement ok +INSERT INTO ordered_pushdown VALUES + ('texas', 2000), + ('alabama', 2000); + +query TI +SELECT c1, c2 FROM ordered_pushdown ORDER BY c1 ASC LIMIT 3; +---- +air 5 +alabama 2000 +balloon 42 + +query TT +EXPLAIN SELECT c1, c2 FROM ordered_pushdown ORDER BY c1 ASC LIMIT 3; +---- +logical_plan +01)Sort: ordered_pushdown.c1 ASC NULLS LAST, fetch=3 +02)--TableScan: ordered_pushdown projection=[c1, c2] +physical_plan +01)SortPreservingMergeExec: [c1@0 ASC NULLS LAST], fetch=3 +02)--DataSourceExec: file_groups={}, projection=[c1, c2], limit=3, output_ordering=[c1@0 ASC NULLS LAST], file_type=vortex From 9617893d04a435d755fb06a4bdf7df6e30d90a1e Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Mon, 8 Jun 2026 17:05:44 +0100 Subject: [PATCH 3/5] minimize Signed-off-by: Adam Gutglick --- vortex-datafusion/src/persistent/source.rs | 37 ---------------------- 1 file changed, 37 deletions(-) diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index 0c5e3657ee8..20b2daa687a 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -402,43 +402,6 @@ impl FileSource for VortexSource { }); } - for prefix_len in 1..order.len() { - let prefix = order[..prefix_len].to_vec(); - if eq_properties.ordering_satisfy(prefix.iter().cloned())? { - return Ok(SortOrderPushdownResult::Unsupported); - } - } - - let sort_order = LexOrdering::new(order.iter().cloned()); - let column_in_file_schema = sort_order.as_ref().is_some_and(|s| { - s.first() - .expr - .as_any() - .downcast_ref::() - .is_some_and(|col| { - self.table_schema - .file_schema() - .field_with_name(col.name()) - .is_ok() - }) - }); - - if !column_in_file_schema { - return Ok(SortOrderPushdownResult::Unsupported); - } - - let is_descending = sort_order - .as_ref() - .is_some_and(|s| s.first().options.descending); - - if !is_descending { - let mut this = self.clone(); - this.ordered = true; - return Ok(SortOrderPushdownResult::Inexact { - inner: Arc::new(this) as Arc, - }); - } - Ok(SortOrderPushdownResult::Unsupported) } From 75b3785ccc0f4a0bb7b61bf4eaecaf1e9a72f8e5 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Mon, 8 Jun 2026 17:11:40 +0100 Subject: [PATCH 4/5] simplify Signed-off-by: Adam Gutglick --- vortex-datafusion/src/persistent/source.rs | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index 20b2daa687a..917425dac47 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -14,7 +14,6 @@ use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_datasource::file_stream::FileOpener; use datafusion_execution::cache::cache_manager::FileMetadataCache; use datafusion_physical_expr::EquivalenceProperties; -use datafusion_physical_expr::LexOrdering; use datafusion_physical_expr::PhysicalExprRef; use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_expr::conjunction; @@ -608,25 +607,6 @@ mod tests { Ok(()) } - #[test] - fn try_pushdown_sort_returns_inexact_for_ascending_file_column() -> anyhow::Result<()> { - let schema = sort_test_schema(); - let source = sort_test_source(Arc::clone(&schema)); - let order = vec![sort_column("a", 0)]; - let eq_properties = EquivalenceProperties::new(schema); - - let result = source.try_pushdown_sort(&order, &eq_properties)?; - - match result { - SortOrderPushdownResult::Inexact { inner } => assert_ordered_source(inner)?, - SortOrderPushdownResult::Exact { .. } | SortOrderPushdownResult::Unsupported => { - anyhow::bail!("expected inexact sort pushdown") - } - } - assert!(!source.ordered); - Ok(()) - } - #[test] fn create_vortex_opener_preserves_expression_convertor() -> anyhow::Result<()> { let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); From 5b3f04a21dde66732d24ef66fa43febc5943a427 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Mon, 8 Jun 2026 18:03:56 +0100 Subject: [PATCH 5/5] fix Signed-off-by: Adam Gutglick --- vortex-datafusion/src/persistent/source.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index 917425dac47..086c75adda7 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -580,7 +580,6 @@ mod tests { fn assert_ordered_source(inner: Arc) -> anyhow::Result<()> { let source = inner - .as_any() .downcast_ref::() .ok_or_else(|| anyhow::anyhow!("expected VortexSource"))?;