Skip to content

Commit a09ed90

Browse files
feat[substrait]: translate scan statistics via RelCommon hints
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent abf8f61 commit a09ed90

5 files changed

Lines changed: 767 additions & 15 deletions

File tree

datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
use crate::logical_plan::consumer::SubstraitConsumer;
19+
use crate::logical_plan::consumer::SubstraitHints;
1920
use crate::logical_plan::consumer::from_substrait_literal;
2021
use crate::logical_plan::consumer::from_substrait_named_struct;
2122
use crate::logical_plan::consumer::utils::ensure_schema_compatibility;
@@ -40,12 +41,51 @@ pub async fn from_read_rel(
4041
consumer: &impl SubstraitConsumer,
4142
read: &ReadRel,
4243
) -> datafusion::common::Result<LogicalPlan> {
44+
// proto3 defaults all numeric fields to 0, so treat 0.0 (or negative) as
45+
// "not provided" to avoid injecting bogus stats when a producer sets
46+
// hint.stats for other fields but leaves a field unset.
47+
// Note: zero-row / zero-size values will also be silently dropped — this
48+
// is an accepted limitation of the proto3 default.
49+
// We also reject non-finite values (Inf, NaN) and values at or above
50+
// usize::MAX, since the consumer will later cast these to usize. Rust's
51+
// float-to-integer cast saturates rather than panicking (since 1.45), but
52+
// silently capping to usize::MAX would produce nonsensical statistics.
53+
let hints = {
54+
let stats = read
55+
.common
56+
.as_ref()
57+
.and_then(|c| c.hint.as_ref())
58+
.and_then(|h| h.stats.as_ref());
59+
// usize::MAX as f64 may round up slightly above the true maximum, so
60+
// using it as a strict upper bound excludes values that would overflow.
61+
let max_hint: f64 = usize::MAX as f64;
62+
let row_count = stats.and_then(|s| {
63+
if s.row_count > 0.0 && s.row_count.is_finite() && s.row_count < max_hint {
64+
Some(s.row_count)
65+
} else {
66+
None
67+
}
68+
});
69+
let record_size = stats.and_then(|s| {
70+
if s.record_size > 0.0 && s.record_size.is_finite() && s.record_size < max_hint {
71+
Some(s.record_size)
72+
} else {
73+
None
74+
}
75+
});
76+
SubstraitHints {
77+
row_count,
78+
record_size,
79+
}
80+
};
81+
4382
async fn read_with_schema(
4483
consumer: &impl SubstraitConsumer,
4584
table_ref: TableReference,
4685
schema: DFSchema,
4786
projection: &Option<MaskExpression>,
4887
filter: &Option<Box<Expression>>,
88+
hints: SubstraitHints,
4989
) -> datafusion::common::Result<LogicalPlan> {
5090
let schema = schema.replace_qualifier(table_ref.clone());
5191

@@ -57,14 +97,17 @@ pub async fn from_read_rel(
5797
};
5898

5999
let plan = {
60-
let provider = match consumer.resolve_table_ref(&table_ref).await? {
61-
Some(ref provider) => Arc::clone(provider),
100+
let provider = match consumer
101+
.resolve_table_ref(&table_ref, hints)
102+
.await?
103+
{
104+
Some(provider) => provider,
62105
_ => return plan_err!("No table named '{table_ref}'"),
63106
};
64107

65108
LogicalPlanBuilder::scan_with_filters(
66109
table_ref,
67-
provider_as_source(Arc::clone(&provider)),
110+
provider_as_source(provider),
68111
None,
69112
filters,
70113
)?
@@ -110,6 +153,7 @@ pub async fn from_read_rel(
110153
substrait_schema,
111154
&read.projection,
112155
&read.filter,
156+
hints,
113157
)
114158
.await
115159
}
@@ -213,6 +257,7 @@ pub async fn from_read_rel(
213257
substrait_schema,
214258
&read.projection,
215259
&read.filter,
260+
hints,
216261
)
217262
.await
218263
}

datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs

Lines changed: 150 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,16 @@ use super::{
2424
};
2525
use crate::extensions::Extensions;
2626
use async_trait::async_trait;
27-
use datafusion::arrow::datatypes::DataType;
28-
use datafusion::catalog::TableProvider;
27+
use datafusion::arrow::datatypes::{DataType, SchemaRef};
28+
use datafusion::catalog::{Session, TableProvider};
29+
use datafusion::common::stats::Precision;
2930
use datafusion::common::{
30-
DFSchema, ScalarValue, TableReference, not_impl_err, substrait_err,
31+
DFSchema, ScalarValue, Statistics, TableReference, not_impl_err, substrait_err,
3132
};
3233
use datafusion::execution::{FunctionRegistry, SessionState};
34+
use datafusion::logical_expr::TableType;
3335
use datafusion::logical_expr::{Expr, Extension, LogicalPlan};
36+
use datafusion::physical_plan::ExecutionPlan;
3437
use std::sync::{Arc, RwLock};
3538
use substrait::proto;
3639
use substrait::proto::expression as substrait_expression;
@@ -44,6 +47,26 @@ use substrait::proto::{
4447
FilterRel, JoinRel, ProjectRel, ReadRel, Rel, SetRel, SortRel, r#type,
4548
};
4649

50+
/// Advisory hints extracted from a Substrait `RelCommon.hint.stats` message,
51+
/// passed to [`SubstraitConsumer::resolve_table_ref`] so that implementors can
52+
/// incorporate them into the returned [`TableProvider`].
53+
///
54+
/// The struct is `#[non_exhaustive]` so that new fields can be added in future
55+
/// versions without breaking existing implementations.
56+
#[non_exhaustive]
57+
#[derive(Debug, Clone, Default)]
58+
pub struct SubstraitHints {
59+
/// Estimated number of rows, from `hint.stats.row_count`.
60+
///
61+
/// `None` means the hint was absent or could not be reliably interpreted
62+
/// (e.g. proto3 default-zero or a non-finite value).
63+
pub row_count: Option<f64>,
64+
/// Estimated average byte size per record, from `hint.stats.record_size`.
65+
///
66+
/// `None` means the hint was absent or non-positive / non-finite.
67+
pub record_size: Option<f64>,
68+
}
69+
4770
#[async_trait]
4871
/// This trait is used to consume Substrait plans, converting them into DataFusion Logical Plans.
4972
/// It can be implemented by users to allow for custom handling of relations, expressions, etc.
@@ -67,7 +90,7 @@ use substrait::proto::{
6790
/// # use datafusion::logical_expr::expr::ScalarFunction;
6891
/// # use datafusion_substrait::extensions::Extensions;
6992
/// # use datafusion_substrait::logical_plan::consumer::{
70-
/// # from_project_rel, from_substrait_rel, from_substrait_rex, SubstraitConsumer
93+
/// # from_project_rel, from_substrait_rel, from_substrait_rex, SubstraitConsumer, SubstraitHints
7194
/// # };
7295
///
7396
/// struct CustomSubstraitConsumer {
@@ -80,6 +103,7 @@ use substrait::proto::{
80103
/// async fn resolve_table_ref(
81104
/// &self,
82105
/// table_ref: &TableReference,
106+
/// _hints: SubstraitHints,
83107
/// ) -> Result<Option<Arc<dyn TableProvider>>> {
84108
/// let table = table_ref.table().to_string();
85109
/// let schema = self.state.schema_for_ref(table_ref.clone())?;
@@ -162,6 +186,7 @@ pub trait SubstraitConsumer: Send + Sync + Sized {
162186
async fn resolve_table_ref(
163187
&self,
164188
table_ref: &TableReference,
189+
hints: SubstraitHints,
165190
) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>>;
166191

167192
// TODO: Remove these two methods
@@ -471,6 +496,57 @@ pub trait SubstraitConsumer: Send + Sync + Sized {
471496
}
472497
}
473498

499+
/// Wraps an inner [`TableProvider`] and overrides its `statistics()` return value.
500+
///
501+
/// Used by [`DefaultSubstraitConsumer`] to inject a row-count hint carried in a
502+
/// Substrait `RelCommon.hint.stats` when the resolved provider has no statistics.
503+
#[derive(Debug)]
504+
struct StatisticsOverrideTableProvider {
505+
inner: Arc<dyn TableProvider>,
506+
statistics: Statistics,
507+
}
508+
509+
#[async_trait]
510+
impl TableProvider for StatisticsOverrideTableProvider {
511+
fn as_any(&self) -> &dyn std::any::Any {
512+
self
513+
}
514+
515+
fn schema(&self) -> SchemaRef {
516+
self.inner.schema()
517+
}
518+
519+
fn constraints(&self) -> Option<&datafusion::common::Constraints> {
520+
self.inner.constraints()
521+
}
522+
523+
fn table_type(&self) -> TableType {
524+
self.inner.table_type()
525+
}
526+
527+
fn supports_filters_pushdown(
528+
&self,
529+
filters: &[&Expr],
530+
) -> datafusion::common::Result<Vec<datafusion::logical_expr::TableProviderFilterPushDown>>
531+
{
532+
self.inner.supports_filters_pushdown(filters)
533+
}
534+
535+
fn statistics(&self) -> Option<Statistics> {
536+
Some(self.statistics.clone())
537+
}
538+
539+
async fn scan(
540+
&self,
541+
state: &dyn Session,
542+
projection: Option<&Vec<usize>>,
543+
filters: &[Expr],
544+
limit: Option<usize>,
545+
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
546+
self.inner.scan(state, projection, filters, limit).await
547+
}
548+
}
549+
474550
/// Default SubstraitConsumer for converting standard Substrait without user-defined extensions.
475551
///
476552
/// Used as the consumer in [crate::logical_plan::consumer::from_substrait_plan]
@@ -495,11 +571,79 @@ impl SubstraitConsumer for DefaultSubstraitConsumer<'_> {
495571
async fn resolve_table_ref(
496572
&self,
497573
table_ref: &TableReference,
574+
hints: SubstraitHints,
498575
) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>> {
499576
let table = table_ref.table().to_string();
500577
let schema = self.state.schema_for_ref(table_ref.clone())?;
501-
let table_provider = schema.table(&table).await?;
502-
Ok(table_provider)
578+
let provider = schema.table(&table).await?;
579+
// If the Substrait plan provides hints and the provider is missing the
580+
// corresponding statistics fields, wrap it to expose those hints to
581+
// DataFusion (e.g. for downstream optimizer rules).
582+
// We check each field individually so that a provider returning
583+
// Some(Statistics { num_rows: Absent, ... }) also gets hints injected,
584+
// and any already-known values from the provider are preserved.
585+
let has_hints = hints.row_count.is_some() || hints.record_size.is_some();
586+
let provider = match provider {
587+
Some(provider) if has_hints => {
588+
let existing = provider.statistics();
589+
let row_count_absent = existing
590+
.as_ref()
591+
.map_or(true, |s| matches!(s.num_rows, Precision::Absent));
592+
let byte_size_absent = existing
593+
.as_ref()
594+
.map_or(true, |s| matches!(s.total_byte_size, Precision::Absent));
595+
let inject_row_count = hints.row_count.is_some() && row_count_absent;
596+
// total_byte_size = row_count * record_size, so both hints must be
597+
// present to reconstruct it. A record_size-only hint (row_count absent)
598+
// is therefore silently ignored here, since we cannot compute a
599+
// meaningful total_byte_size from record_size alone.
600+
let inject_byte_size = hints.row_count.is_some()
601+
&& hints.record_size.is_some()
602+
&& byte_size_absent;
603+
if inject_row_count || inject_byte_size {
604+
let num_rows = if inject_row_count {
605+
Precision::Inexact(hints.row_count.unwrap().round() as usize)
606+
} else {
607+
existing
608+
.as_ref()
609+
.map_or(Precision::Absent, |s| s.num_rows.clone())
610+
};
611+
let total_byte_size = if inject_byte_size {
612+
// Use the effective row count that was resolved above:
613+
// the provider's own value when present (keeping
614+
// num_rows and total_byte_size internally consistent),
615+
// or the hint's row_count when the provider had none.
616+
let effective_rows = match &num_rows {
617+
Precision::Exact(n) | Precision::Inexact(n) => *n as f64,
618+
Precision::Absent => hints.row_count.unwrap(),
619+
};
620+
Precision::Inexact(
621+
(effective_rows * hints.record_size.unwrap()).round() as usize,
622+
)
623+
} else {
624+
existing
625+
.as_ref()
626+
.map_or(Precision::Absent, |s| s.total_byte_size.clone())
627+
};
628+
let column_statistics = existing
629+
.map(|s| s.column_statistics)
630+
.unwrap_or_else(|| Statistics::unknown_column(&provider.schema()));
631+
let statistics = Statistics {
632+
num_rows,
633+
total_byte_size,
634+
column_statistics,
635+
};
636+
Some(Arc::new(StatisticsOverrideTableProvider {
637+
inner: provider,
638+
statistics,
639+
}) as Arc<dyn TableProvider>)
640+
} else {
641+
Some(provider)
642+
}
643+
}
644+
provider => provider,
645+
};
646+
Ok(provider)
503647
}
504648

505649
fn get_extensions(&self) -> &Extensions {

datafusion/substrait/src/logical_plan/producer/rel/read_rel.rs

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
use crate::logical_plan::producer::{
1919
SubstraitProducer, to_substrait_literal, to_substrait_named_struct,
2020
};
21+
use datafusion::common::stats::Precision;
2122
use datafusion::common::{DFSchema, ToDFSchema, substrait_datafusion_err};
23+
use datafusion::datasource::source_as_provider;
2224
use datafusion::logical_expr::utils::conjunction;
2325
use datafusion::logical_expr::{EmptyRelation, Expr, TableScan, Values};
2426
use datafusion::scalar::ScalarValue;
@@ -29,7 +31,7 @@ use substrait::proto::expression::mask_expression::{StructItem, StructSelect};
2931
use substrait::proto::expression::nested::Struct as NestedStruct;
3032
use substrait::proto::read_rel::{NamedTable, ReadType, VirtualTable};
3133
use substrait::proto::rel::RelType;
32-
use substrait::proto::{ReadRel, Rel};
34+
use substrait::proto::{ReadRel, Rel, RelCommon, rel_common};
3335

3436
/// Converts rows of literal expressions into Substrait literal structs.
3537
///
@@ -131,9 +133,53 @@ pub fn from_table_scan(
131133
Some(Box::new(filter_expr))
132134
};
133135

136+
// Both Exact and Inexact statistics are serialised as f64 hints.
137+
// On the consumer side, hints are always read back as Precision::Inexact,
138+
// so Exact values become Inexact after a round-trip — this is an accepted
139+
// semantic limitation since RelCommon.hint.stats is advisory.
140+
// Zero values write 0.0, which the consumer discards due to the proto3
141+
// default-zero ambiguity; that loss is also accepted.
142+
//
143+
// source_as_provider succeeds for any TableScan (whose source is always a
144+
// TableProviderWrapper), so .ok() here only suppresses the error type; it
145+
// will never silently discard a real failure in this context.
146+
let common = source_as_provider(&scan.source)
147+
.ok()
148+
.and_then(|provider| provider.statistics())
149+
.and_then(|stats| {
150+
let row_count = match stats.num_rows {
151+
Precision::Exact(n) | Precision::Inexact(n) => Some(n as f64),
152+
Precision::Absent => None,
153+
};
154+
// record_size = total_byte_size / num_rows (both must be present)
155+
let record_size = match (stats.total_byte_size, row_count) {
156+
(Precision::Exact(b) | Precision::Inexact(b), Some(rows))
157+
if rows > 0.0 =>
158+
{
159+
Some(b as f64 / rows)
160+
}
161+
_ => None,
162+
};
163+
if row_count.is_none() && record_size.is_none() {
164+
return None;
165+
}
166+
Some(RelCommon {
167+
emit_kind: None,
168+
hint: Some(rel_common::Hint {
169+
stats: Some(rel_common::hint::Stats {
170+
row_count: row_count.unwrap_or(0.0),
171+
record_size: record_size.unwrap_or(0.0),
172+
..Default::default()
173+
}),
174+
..Default::default()
175+
}),
176+
advanced_extension: None,
177+
})
178+
});
179+
134180
Ok(Box::new(Rel {
135181
rel_type: Some(RelType::Read(Box::new(ReadRel {
136-
common: None,
182+
common,
137183
base_schema: Some(base_schema),
138184
filter: filter_option,
139185
best_effort_filter: None,

0 commit comments

Comments
 (0)