Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 75 additions & 1 deletion vortex-datafusion/src/persistent/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::file_stream::FileOpener;
use datafusion_execution::cache::cache_manager::FileMetadataCache;
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_expr::PhysicalExprRef;
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_expr::conjunction;
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
use datafusion_physical_expr_common::physical_expr::fmt_sql;
use datafusion_physical_plan::DisplayFormatType;
use datafusion_physical_plan::PhysicalExpr;
use datafusion_physical_plan::SortOrderPushdownResult;
use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
use datafusion_physical_plan::filter_pushdown::PushedDown;
use datafusion_physical_plan::filter_pushdown::PushedDownPredicate;
Expand Down Expand Up @@ -189,6 +192,7 @@ pub struct VortexSource {
natural_split_ranges: Arc<DashMap<Path, Arc<[Range<u64>]>>>,
expression_convertor: Arc<dyn ExpressionConvertor>,
pub(crate) vortex_reader_factory: Option<Arc<dyn VortexReaderFactory>>,
pub(crate) ordered: bool,
vx_metrics_registry: Arc<dyn MetricsRegistry>,
file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
/// Whether to enable expression pushdown into the underlying Vortex scan.
Expand Down Expand Up @@ -223,6 +227,7 @@ impl VortexSource {
vortex_reader_factory: None,
vx_metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
file_metadata_cache: None,
ordered: false,
options: VortexTableOptions::default(),
}
}
Expand Down Expand Up @@ -335,7 +340,7 @@ impl VortexSource {
metrics_registry: Arc::clone(&self.vx_metrics_registry),
layout_readers: Arc::clone(&self.layout_readers),
natural_split_ranges: Arc::clone(&self.natural_split_ranges),
has_output_ordering: !base_config.output_ordering.is_empty(),
has_output_ordering: !base_config.output_ordering.is_empty() || self.ordered,
expression_convertor: Arc::clone(&self.expression_convertor),
file_metadata_cache: self.file_metadata_cache.clone(),
projection_pushdown: self.options.projection_pushdown,
Expand Down Expand Up @@ -378,6 +383,27 @@ impl FileSource for VortexSource {
VORTEX_FILE_EXTENSION
}

fn try_pushdown_sort(
&self,
order: &[PhysicalSortExpr],
eq_properties: &EquivalenceProperties,
) -> DFResult<SortOrderPushdownResult<Arc<dyn FileSource>>> {
if order.is_empty() {
return Ok(SortOrderPushdownResult::Unsupported);
}

if eq_properties.ordering_satisfy(order.iter().cloned())? {
let mut this = self.clone();
this.ordered = true;

return Ok(SortOrderPushdownResult::Exact {
inner: Arc::new(this) as Arc<dyn FileSource>,
});
}

Ok(SortOrderPushdownResult::Unsupported)
}

fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
Expand Down Expand Up @@ -493,6 +519,7 @@ mod tests {
use arrow_schema::Schema;
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_physical_expr::expressions::Column;
use object_store::memory::InMemory;
use vortex::VortexSessionDefault;

Expand Down Expand Up @@ -532,6 +559,53 @@ mod tests {
}
}

fn sort_column(name: &str, index: usize) -> PhysicalSortExpr {
let expr: PhysicalExprRef = Arc::new(Column::new(name, index));
PhysicalSortExpr::new_default(expr)
}

fn sort_test_schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]))
}

fn sort_test_source(schema: Arc<Schema>) -> VortexSource {
VortexSource::new(
TableSchema::from_file_schema(schema),
VortexSession::default(),
)
}

fn assert_ordered_source(inner: Arc<dyn FileSource>) -> anyhow::Result<()> {
let source = inner
.downcast_ref::<VortexSource>()
.ok_or_else(|| anyhow::anyhow!("expected VortexSource"))?;

assert!(source.ordered);
Ok(())
}

#[test]
fn try_pushdown_sort_returns_exact_when_ordering_is_satisfied() -> anyhow::Result<()> {
let schema = sort_test_schema();
let source = sort_test_source(Arc::clone(&schema));
let order = vec![sort_column("a", 0), sort_column("b", 1)];
let eq_properties = EquivalenceProperties::new_with_orderings(schema, [order.clone()]);

let result = source.try_pushdown_sort(&order, &eq_properties)?;

match result {
SortOrderPushdownResult::Exact { inner } => assert_ordered_source(inner)?,
SortOrderPushdownResult::Inexact { .. } | SortOrderPushdownResult::Unsupported => {
anyhow::bail!("expected exact sort pushdown")
}
}
assert!(!source.ordered);
Ok(())
}

#[test]
fn create_vortex_opener_preserves_expression_convertor() -> anyhow::Result<()> {
let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
Expand Down
44 changes: 44 additions & 0 deletions vortex-sqllogictest/slt/datafusion/order_pushdown.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright the Vortex contributors

include ../setup.slt.no

statement ok
CREATE EXTERNAL TABLE ordered_pushdown (
c1 VARCHAR NOT NULL,
c2 INT NOT NULL
)
STORED AS vortex
WITH ORDER (c1 ASC)
LOCATION '$__TEST_DIR__/ordered_pushdown/';

statement ok
INSERT INTO ordered_pushdown VALUES
('air', 5),
('balloon', 42);

statement ok
INSERT INTO ordered_pushdown VALUES
('zebra', 5);

statement ok
INSERT INTO ordered_pushdown VALUES
('texas', 2000),
('alabama', 2000);

query TI
SELECT c1, c2 FROM ordered_pushdown ORDER BY c1 ASC LIMIT 3;
----
air 5
alabama 2000
balloon 42

query TT
EXPLAIN SELECT c1, c2 FROM ordered_pushdown ORDER BY c1 ASC LIMIT 3;
----
logical_plan
01)Sort: ordered_pushdown.c1 ASC NULLS LAST, fetch=3
02)--TableScan: ordered_pushdown projection=[c1, c2]
physical_plan
01)SortPreservingMergeExec: [c1@0 ASC NULLS LAST], fetch=3
02)--DataSourceExec: file_groups={<slt:ignore>}, projection=[c1, c2], limit=3, output_ordering=[c1@0 ASC NULLS LAST], file_type=vortex
Loading