Skip to content

Commit 07736eb

Browse files
feat[substrait]: translate scan statistics via RelCommon hints
Serialize `num_rows` and `total_byte_size` from a `TableProvider` into `RelCommon.hint.stats` on the producer side, and inject them back into the resolved provider on the consumer side when the provider has no statistics of its own. - Producer: writes `row_count` and `record_size` (= bytes / rows) into `RelCommon.hint.stats` for every `TableScan`. - Consumer: exposes a new `hints: SubstraitHints` parameter on `SubstraitConsumer::resolve_table_ref`; `DefaultSubstraitConsumer` wraps the provider in a `StatisticsOverrideTableProvider` when hints are present and the provider lacks the corresponding fields. - `SubstraitHints` is `#[non_exhaustive]` so future fields can be added without further breaking changes. - Hints are advisory: `Exact` statistics become `Inexact` after a round-trip. Tables with exactly 0 rows lose their stats (proto3 default-zero ambiguity — accepted limitation). Closes #21112 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent abf8f61 commit 07736eb

File tree

5 files changed

+605
-15
lines changed

5 files changed

+605
-15
lines changed

datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
use crate::logical_plan::consumer::SubstraitConsumer;
19+
use crate::logical_plan::consumer::SubstraitHints;
1920
use crate::logical_plan::consumer::from_substrait_literal;
2021
use crate::logical_plan::consumer::from_substrait_named_struct;
2122
use crate::logical_plan::consumer::utils::ensure_schema_compatibility;
@@ -40,12 +41,46 @@ pub async fn from_read_rel(
4041
consumer: &impl SubstraitConsumer,
4142
read: &ReadRel,
4243
) -> datafusion::common::Result<LogicalPlan> {
44+
// proto3 defaults numeric fields to 0, so treat 0.0/negative/non-finite
45+
// and values that would overflow usize as "not provided".
46+
// Zero-row tables will lose their stats — accepted proto3 limitation.
47+
let hints = {
48+
let stats = read
49+
.common
50+
.as_ref()
51+
.and_then(|c| c.hint.as_ref())
52+
.and_then(|h| h.stats.as_ref());
53+
let max_hint: f64 = usize::MAX as f64;
54+
let row_count = stats.and_then(|s| {
55+
if s.row_count > 0.0 && s.row_count.is_finite() && s.row_count < max_hint {
56+
Some(s.row_count)
57+
} else {
58+
None
59+
}
60+
});
61+
let record_size = stats.and_then(|s| {
62+
if s.record_size > 0.0
63+
&& s.record_size.is_finite()
64+
&& s.record_size < max_hint
65+
{
66+
Some(s.record_size)
67+
} else {
68+
None
69+
}
70+
});
71+
SubstraitHints {
72+
row_count,
73+
record_size,
74+
}
75+
};
76+
4377
async fn read_with_schema(
4478
consumer: &impl SubstraitConsumer,
4579
table_ref: TableReference,
4680
schema: DFSchema,
4781
projection: &Option<MaskExpression>,
4882
filter: &Option<Box<Expression>>,
83+
hints: SubstraitHints,
4984
) -> datafusion::common::Result<LogicalPlan> {
5085
let schema = schema.replace_qualifier(table_ref.clone());
5186

@@ -57,14 +92,14 @@ pub async fn from_read_rel(
5792
};
5893

5994
let plan = {
60-
let provider = match consumer.resolve_table_ref(&table_ref).await? {
61-
Some(ref provider) => Arc::clone(provider),
95+
let provider = match consumer.resolve_table_ref(&table_ref, hints).await? {
96+
Some(provider) => provider,
6297
_ => return plan_err!("No table named '{table_ref}'"),
6398
};
6499

65100
LogicalPlanBuilder::scan_with_filters(
66101
table_ref,
67-
provider_as_source(Arc::clone(&provider)),
102+
provider_as_source(provider),
68103
None,
69104
filters,
70105
)?
@@ -110,6 +145,7 @@ pub async fn from_read_rel(
110145
substrait_schema,
111146
&read.projection,
112147
&read.filter,
148+
hints,
113149
)
114150
.await
115151
}
@@ -213,6 +249,7 @@ pub async fn from_read_rel(
213249
substrait_schema,
214250
&read.projection,
215251
&read.filter,
252+
hints,
216253
)
217254
.await
218255
}

datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs

Lines changed: 180 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,16 @@ use super::{
2424
};
2525
use crate::extensions::Extensions;
2626
use async_trait::async_trait;
27-
use datafusion::arrow::datatypes::DataType;
28-
use datafusion::catalog::TableProvider;
27+
use datafusion::arrow::datatypes::{DataType, SchemaRef};
28+
use datafusion::catalog::{Session, TableProvider};
29+
use datafusion::common::stats::Precision;
2930
use datafusion::common::{
30-
DFSchema, ScalarValue, TableReference, not_impl_err, substrait_err,
31+
DFSchema, ScalarValue, Statistics, TableReference, not_impl_err, substrait_err,
3132
};
3233
use datafusion::execution::{FunctionRegistry, SessionState};
34+
use datafusion::logical_expr::TableType;
3335
use datafusion::logical_expr::{Expr, Extension, LogicalPlan};
36+
use datafusion::physical_plan::ExecutionPlan;
3437
use std::sync::{Arc, RwLock};
3538
use substrait::proto;
3639
use substrait::proto::expression as substrait_expression;
@@ -44,6 +47,26 @@ use substrait::proto::{
4447
FilterRel, JoinRel, ProjectRel, ReadRel, Rel, SetRel, SortRel, r#type,
4548
};
4649

50+
/// Advisory hints extracted from a Substrait `RelCommon.hint.stats` message,
51+
/// passed to [`SubstraitConsumer::resolve_table_ref`] so that implementors can
52+
/// incorporate them into the returned [`TableProvider`].
53+
///
54+
/// The struct is `#[non_exhaustive]` so that new fields can be added in future
55+
/// versions without breaking existing implementations.
56+
#[non_exhaustive]
57+
#[derive(Debug, Clone, Default)]
58+
pub struct SubstraitHints {
59+
/// Estimated number of rows, from `hint.stats.row_count`.
60+
///
61+
/// `None` means the hint was absent or could not be reliably interpreted
62+
/// (e.g. proto3 default-zero or a non-finite value).
63+
pub row_count: Option<f64>,
64+
/// Estimated average byte size per record, from `hint.stats.record_size`.
65+
///
66+
/// `None` means the hint was absent or non-positive / non-finite.
67+
pub record_size: Option<f64>,
68+
}
69+
4770
#[async_trait]
4871
/// This trait is used to consume Substrait plans, converting them into DataFusion Logical Plans.
4972
/// It can be implemented by users to allow for custom handling of relations, expressions, etc.
@@ -67,7 +90,7 @@ use substrait::proto::{
6790
/// # use datafusion::logical_expr::expr::ScalarFunction;
6891
/// # use datafusion_substrait::extensions::Extensions;
6992
/// # use datafusion_substrait::logical_plan::consumer::{
70-
/// # from_project_rel, from_substrait_rel, from_substrait_rex, SubstraitConsumer
93+
/// # from_project_rel, from_substrait_rel, from_substrait_rex, SubstraitConsumer, SubstraitHints
7194
/// # };
7295
///
7396
/// struct CustomSubstraitConsumer {
@@ -80,6 +103,7 @@ use substrait::proto::{
80103
/// async fn resolve_table_ref(
81104
/// &self,
82105
/// table_ref: &TableReference,
106+
/// _hints: SubstraitHints,
83107
/// ) -> Result<Option<Arc<dyn TableProvider>>> {
84108
/// let table = table_ref.table().to_string();
85109
/// let schema = self.state.schema_for_ref(table_ref.clone())?;
@@ -162,6 +186,7 @@ pub trait SubstraitConsumer: Send + Sync + Sized {
162186
async fn resolve_table_ref(
163187
&self,
164188
table_ref: &TableReference,
189+
hints: SubstraitHints,
165190
) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>>;
166191

167192
// TODO: Remove these two methods
@@ -471,6 +496,94 @@ pub trait SubstraitConsumer: Send + Sync + Sized {
471496
}
472497
}
473498

499+
/// Wraps an inner [`TableProvider`] and overrides its `statistics()` return value.
500+
///
501+
/// Used by [`DefaultSubstraitConsumer`] to inject a row-count hint carried in a
502+
/// Substrait `RelCommon.hint.stats` when the resolved provider has no statistics.
503+
///
504+
/// # Note on `as_any()` behaviour
505+
///
506+
/// `as_any()` intentionally delegates to the inner provider so that callers can
507+
/// still downcast to the concrete inner type (e.g. `MemTable`) through this
508+
/// wrapper. As a consequence, downcasting to `StatisticsOverrideTableProvider`
509+
/// itself via `as_any()` will not work — but since this struct is private,
510+
/// external code should never need to do that.
511+
#[derive(Debug)]
512+
struct StatisticsOverrideTableProvider {
513+
inner: Arc<dyn TableProvider>,
514+
statistics: Statistics,
515+
}
516+
517+
#[async_trait]
518+
impl TableProvider for StatisticsOverrideTableProvider {
519+
fn as_any(&self) -> &dyn std::any::Any {
520+
// Delegate to the inner provider so that downcasting to the concrete
521+
// inner type works transparently through this wrapper.
522+
self.inner.as_any()
523+
}
524+
525+
fn schema(&self) -> SchemaRef {
526+
self.inner.schema()
527+
}
528+
529+
fn constraints(&self) -> Option<&datafusion::common::Constraints> {
530+
self.inner.constraints()
531+
}
532+
533+
fn table_type(&self) -> TableType {
534+
self.inner.table_type()
535+
}
536+
537+
fn supports_filters_pushdown(
538+
&self,
539+
filters: &[&Expr],
540+
) -> datafusion::common::Result<
541+
Vec<datafusion::logical_expr::TableProviderFilterPushDown>,
542+
> {
543+
self.inner.supports_filters_pushdown(filters)
544+
}
545+
546+
fn statistics(&self) -> Option<Statistics> {
547+
Some(self.statistics.clone())
548+
}
549+
550+
async fn scan(
551+
&self,
552+
state: &dyn Session,
553+
projection: Option<&Vec<usize>>,
554+
filters: &[Expr],
555+
limit: Option<usize>,
556+
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
557+
self.inner.scan(state, projection, filters, limit).await
558+
}
559+
560+
async fn insert_into(
561+
&self,
562+
state: &dyn Session,
563+
input: Arc<dyn ExecutionPlan>,
564+
insert_op: datafusion::logical_expr::dml::InsertOp,
565+
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
566+
self.inner.insert_into(state, input, insert_op).await
567+
}
568+
569+
async fn delete_from(
570+
&self,
571+
state: &dyn Session,
572+
filters: Vec<Expr>,
573+
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
574+
self.inner.delete_from(state, filters).await
575+
}
576+
577+
async fn update(
578+
&self,
579+
state: &dyn Session,
580+
assignments: Vec<(String, Expr)>,
581+
filters: Vec<Expr>,
582+
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
583+
self.inner.update(state, assignments, filters).await
584+
}
585+
}
586+
474587
/// Default SubstraitConsumer for converting standard Substrait without user-defined extensions.
475588
///
476589
/// Used as the consumer in [crate::logical_plan::consumer::from_substrait_plan]
@@ -495,11 +608,72 @@ impl SubstraitConsumer for DefaultSubstraitConsumer<'_> {
495608
async fn resolve_table_ref(
496609
&self,
497610
table_ref: &TableReference,
611+
hints: SubstraitHints,
498612
) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>> {
499613
let table = table_ref.table().to_string();
500614
let schema = self.state.schema_for_ref(table_ref.clone())?;
501-
let table_provider = schema.table(&table).await?;
502-
Ok(table_provider)
615+
let provider = schema.table(&table).await?;
616+
// Wrap the provider to inject hint statistics only for fields the
617+
// provider doesn't already have (checked individually, not as a whole).
618+
let has_hints = hints.row_count.is_some() || hints.record_size.is_some();
619+
let provider = match provider {
620+
Some(provider) if has_hints => {
621+
let existing = provider.statistics();
622+
let row_count_absent = existing
623+
.as_ref()
624+
.is_none_or(|s| matches!(s.num_rows, Precision::Absent));
625+
let byte_size_absent = existing
626+
.as_ref()
627+
.is_none_or(|s| matches!(s.total_byte_size, Precision::Absent));
628+
let inject_row_count = hints.row_count.is_some() && row_count_absent;
629+
// Both hints required: total_byte_size = row_count * record_size.
630+
let inject_byte_size = hints.row_count.is_some()
631+
&& hints.record_size.is_some()
632+
&& byte_size_absent;
633+
if inject_row_count || inject_byte_size {
634+
let num_rows = if inject_row_count {
635+
Precision::Inexact(hints.row_count.unwrap().round() as usize)
636+
} else {
637+
existing.as_ref().map_or(Precision::Absent, |s| s.num_rows)
638+
};
639+
let total_byte_size = if inject_byte_size {
640+
// Prefer the provider's own row count for consistency.
641+
let effective_rows = match &num_rows {
642+
Precision::Exact(n) | Precision::Inexact(n) => *n as f64,
643+
Precision::Absent => hints.row_count.unwrap(),
644+
};
645+
let byte_size = effective_rows * hints.record_size.unwrap();
646+
// The product of two sub-usize::MAX values can still overflow.
647+
if byte_size.is_finite() && byte_size < usize::MAX as f64 {
648+
Precision::Inexact(byte_size.round() as usize)
649+
} else {
650+
Precision::Absent
651+
}
652+
} else {
653+
existing
654+
.as_ref()
655+
.map_or(Precision::Absent, |s| s.total_byte_size)
656+
};
657+
let column_statistics =
658+
existing.map(|s| s.column_statistics).unwrap_or_else(|| {
659+
Statistics::unknown_column(&provider.schema())
660+
});
661+
let statistics = Statistics {
662+
num_rows,
663+
total_byte_size,
664+
column_statistics,
665+
};
666+
Some(Arc::new(StatisticsOverrideTableProvider {
667+
inner: provider,
668+
statistics,
669+
}) as Arc<dyn TableProvider>)
670+
} else {
671+
Some(provider)
672+
}
673+
}
674+
provider => provider,
675+
};
676+
Ok(provider)
503677
}
504678

505679
fn get_extensions(&self) -> &Extensions {

datafusion/substrait/src/logical_plan/producer/rel/read_rel.rs

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
use crate::logical_plan::producer::{
1919
SubstraitProducer, to_substrait_literal, to_substrait_named_struct,
2020
};
21+
use datafusion::common::stats::Precision;
2122
use datafusion::common::{DFSchema, ToDFSchema, substrait_datafusion_err};
23+
use datafusion::datasource::source_as_provider;
2224
use datafusion::logical_expr::utils::conjunction;
2325
use datafusion::logical_expr::{EmptyRelation, Expr, TableScan, Values};
2426
use datafusion::scalar::ScalarValue;
@@ -29,7 +31,7 @@ use substrait::proto::expression::mask_expression::{StructItem, StructSelect};
2931
use substrait::proto::expression::nested::Struct as NestedStruct;
3032
use substrait::proto::read_rel::{NamedTable, ReadType, VirtualTable};
3133
use substrait::proto::rel::RelType;
32-
use substrait::proto::{ReadRel, Rel};
34+
use substrait::proto::{ReadRel, Rel, RelCommon, rel_common};
3335

3436
/// Converts rows of literal expressions into Substrait literal structs.
3537
///
@@ -131,9 +133,46 @@ pub fn from_table_scan(
131133
Some(Box::new(filter_expr))
132134
};
133135

136+
// Statistics are serialised as f64 hints. Exact becomes Inexact after a
137+
// round-trip since RelCommon.hint.stats is advisory. Zero-row tables lose
138+
// their stats due to proto3 default-zero — both limitations are accepted.
139+
let common = source_as_provider(&scan.source)
140+
.ok()
141+
.and_then(|provider| provider.statistics())
142+
.and_then(|stats| {
143+
let row_count = match stats.num_rows {
144+
Precision::Exact(n) | Precision::Inexact(n) => Some(n as f64),
145+
Precision::Absent => None,
146+
};
147+
// record_size = total_byte_size / num_rows (both must be present)
148+
let record_size = match (stats.total_byte_size, row_count) {
149+
(Precision::Exact(b) | Precision::Inexact(b), Some(rows))
150+
if rows > 0.0 =>
151+
{
152+
Some(b as f64 / rows)
153+
}
154+
_ => None,
155+
};
156+
if row_count.is_none() && record_size.is_none() {
157+
return None;
158+
}
159+
Some(RelCommon {
160+
emit_kind: None,
161+
hint: Some(rel_common::Hint {
162+
stats: Some(rel_common::hint::Stats {
163+
row_count: row_count.unwrap_or(0.0),
164+
record_size: record_size.unwrap_or(0.0),
165+
..Default::default()
166+
}),
167+
..Default::default()
168+
}),
169+
advanced_extension: None,
170+
})
171+
});
172+
134173
Ok(Box::new(Rel {
135174
rel_type: Some(RelType::Read(Box::new(ReadRel {
136-
common: None,
175+
common,
137176
base_schema: Some(base_schema),
138177
filter: filter_option,
139178
best_effort_filter: None,

0 commit comments

Comments
 (0)