Skip to content

Commit f665d1e

Browse files
feat[substrait]: translate scan statistics via RelCommon hints
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> fix[substrait]: forward as_any/write methods in StatisticsOverrideTableProvider, clarify docs Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent abf8f61 commit f665d1e

5 files changed

Lines changed: 798 additions & 15 deletions

File tree

datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
use crate::logical_plan::consumer::SubstraitConsumer;
19+
use crate::logical_plan::consumer::SubstraitHints;
1920
use crate::logical_plan::consumer::from_substrait_literal;
2021
use crate::logical_plan::consumer::from_substrait_named_struct;
2122
use crate::logical_plan::consumer::utils::ensure_schema_compatibility;
@@ -40,12 +41,54 @@ pub async fn from_read_rel(
4041
consumer: &impl SubstraitConsumer,
4142
read: &ReadRel,
4243
) -> datafusion::common::Result<LogicalPlan> {
44+
// proto3 defaults all numeric fields to 0, so treat 0.0 (or negative) as
45+
// "not provided" to avoid injecting bogus stats when a producer sets
46+
// hint.stats for other fields but leaves a field unset.
47+
// Note: zero-row / zero-size values will also be silently dropped — this
48+
// is an accepted limitation of the proto3 default.
49+
// We also reject non-finite values (Inf, NaN) and values large enough to
50+
// overflow a usize cast. Rust's float-to-integer cast saturates rather
51+
// than panicking (since 1.45), but silently capping to usize::MAX would
52+
// produce nonsensical statistics. The threshold used is `usize::MAX as
53+
// f64`, which on 64-bit targets rounds up to 2^64 — one ULP above the
54+
// largest exactly-representable value below usize::MAX — so the strict
55+
// `< max_hint` check correctly excludes all values that would overflow.
56+
let hints = {
57+
let stats = read
58+
.common
59+
.as_ref()
60+
.and_then(|c| c.hint.as_ref())
61+
.and_then(|h| h.stats.as_ref());
62+
// usize::MAX as f64 may round up slightly above the true maximum, so
63+
// using it as a strict upper bound excludes values that would overflow.
64+
let max_hint: f64 = usize::MAX as f64;
65+
let row_count = stats.and_then(|s| {
66+
if s.row_count > 0.0 && s.row_count.is_finite() && s.row_count < max_hint {
67+
Some(s.row_count)
68+
} else {
69+
None
70+
}
71+
});
72+
let record_size = stats.and_then(|s| {
73+
if s.record_size > 0.0 && s.record_size.is_finite() && s.record_size < max_hint {
74+
Some(s.record_size)
75+
} else {
76+
None
77+
}
78+
});
79+
SubstraitHints {
80+
row_count,
81+
record_size,
82+
}
83+
};
84+
4385
async fn read_with_schema(
4486
consumer: &impl SubstraitConsumer,
4587
table_ref: TableReference,
4688
schema: DFSchema,
4789
projection: &Option<MaskExpression>,
4890
filter: &Option<Box<Expression>>,
91+
hints: SubstraitHints,
4992
) -> datafusion::common::Result<LogicalPlan> {
5093
let schema = schema.replace_qualifier(table_ref.clone());
5194

@@ -57,14 +100,17 @@ pub async fn from_read_rel(
57100
};
58101

59102
let plan = {
60-
let provider = match consumer.resolve_table_ref(&table_ref).await? {
61-
Some(ref provider) => Arc::clone(provider),
103+
let provider = match consumer
104+
.resolve_table_ref(&table_ref, hints)
105+
.await?
106+
{
107+
Some(provider) => provider,
62108
_ => return plan_err!("No table named '{table_ref}'"),
63109
};
64110

65111
LogicalPlanBuilder::scan_with_filters(
66112
table_ref,
67-
provider_as_source(Arc::clone(&provider)),
113+
provider_as_source(provider),
68114
None,
69115
filters,
70116
)?
@@ -110,6 +156,7 @@ pub async fn from_read_rel(
110156
substrait_schema,
111157
&read.projection,
112158
&read.filter,
159+
hints,
113160
)
114161
.await
115162
}
@@ -213,6 +260,7 @@ pub async fn from_read_rel(
213260
substrait_schema,
214261
&read.projection,
215262
&read.filter,
263+
hints,
216264
)
217265
.await
218266
}

datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs

Lines changed: 176 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,16 @@ use super::{
2424
};
2525
use crate::extensions::Extensions;
2626
use async_trait::async_trait;
27-
use datafusion::arrow::datatypes::DataType;
28-
use datafusion::catalog::TableProvider;
27+
use datafusion::arrow::datatypes::{DataType, SchemaRef};
28+
use datafusion::catalog::{Session, TableProvider};
29+
use datafusion::common::stats::Precision;
2930
use datafusion::common::{
30-
DFSchema, ScalarValue, TableReference, not_impl_err, substrait_err,
31+
DFSchema, ScalarValue, Statistics, TableReference, not_impl_err, substrait_err,
3132
};
3233
use datafusion::execution::{FunctionRegistry, SessionState};
34+
use datafusion::logical_expr::TableType;
3335
use datafusion::logical_expr::{Expr, Extension, LogicalPlan};
36+
use datafusion::physical_plan::ExecutionPlan;
3437
use std::sync::{Arc, RwLock};
3538
use substrait::proto;
3639
use substrait::proto::expression as substrait_expression;
@@ -44,6 +47,26 @@ use substrait::proto::{
4447
FilterRel, JoinRel, ProjectRel, ReadRel, Rel, SetRel, SortRel, r#type,
4548
};
4649

50+
/// Advisory hints extracted from a Substrait `RelCommon.hint.stats` message,
51+
/// passed to [`SubstraitConsumer::resolve_table_ref`] so that implementors can
52+
/// incorporate them into the returned [`TableProvider`].
53+
///
54+
/// The struct is `#[non_exhaustive]` so that new fields can be added in future
55+
/// versions without breaking existing implementations.
56+
#[non_exhaustive]
57+
#[derive(Debug, Clone, Default)]
58+
pub struct SubstraitHints {
59+
/// Estimated number of rows, from `hint.stats.row_count`.
60+
///
61+
/// `None` means the hint was absent or could not be reliably interpreted
62+
/// (e.g. proto3 default-zero or a non-finite value).
63+
pub row_count: Option<f64>,
64+
/// Estimated average byte size per record, from `hint.stats.record_size`.
65+
///
66+
/// `None` means the hint was absent or non-positive / non-finite.
67+
pub record_size: Option<f64>,
68+
}
69+
4770
#[async_trait]
4871
/// This trait is used to consume Substrait plans, converting them into DataFusion Logical Plans.
4972
/// It can be implemented by users to allow for custom handling of relations, expressions, etc.
@@ -67,7 +90,7 @@ use substrait::proto::{
6790
/// # use datafusion::logical_expr::expr::ScalarFunction;
6891
/// # use datafusion_substrait::extensions::Extensions;
6992
/// # use datafusion_substrait::logical_plan::consumer::{
70-
/// # from_project_rel, from_substrait_rel, from_substrait_rex, SubstraitConsumer
93+
/// # from_project_rel, from_substrait_rel, from_substrait_rex, SubstraitConsumer, SubstraitHints
7194
/// # };
7295
///
7396
/// struct CustomSubstraitConsumer {
@@ -80,6 +103,7 @@ use substrait::proto::{
80103
/// async fn resolve_table_ref(
81104
/// &self,
82105
/// table_ref: &TableReference,
106+
/// _hints: SubstraitHints,
83107
/// ) -> Result<Option<Arc<dyn TableProvider>>> {
84108
/// let table = table_ref.table().to_string();
85109
/// let schema = self.state.schema_for_ref(table_ref.clone())?;
@@ -162,6 +186,7 @@ pub trait SubstraitConsumer: Send + Sync + Sized {
162186
async fn resolve_table_ref(
163187
&self,
164188
table_ref: &TableReference,
189+
hints: SubstraitHints,
165190
) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>>;
166191

167192
// TODO: Remove these two methods
@@ -471,6 +496,83 @@ pub trait SubstraitConsumer: Send + Sync + Sized {
471496
}
472497
}
473498

499+
/// Wraps an inner [`TableProvider`] and overrides its `statistics()` return value.
500+
///
501+
/// Used by [`DefaultSubstraitConsumer`] to inject a row-count hint carried in a
502+
/// Substrait `RelCommon.hint.stats` when the resolved provider has no statistics.
503+
#[derive(Debug)]
504+
struct StatisticsOverrideTableProvider {
505+
inner: Arc<dyn TableProvider>,
506+
statistics: Statistics,
507+
}
508+
509+
#[async_trait]
510+
impl TableProvider for StatisticsOverrideTableProvider {
511+
fn as_any(&self) -> &dyn std::any::Any {
512+
self.inner.as_any()
513+
}
514+
515+
fn schema(&self) -> SchemaRef {
516+
self.inner.schema()
517+
}
518+
519+
fn constraints(&self) -> Option<&datafusion::common::Constraints> {
520+
self.inner.constraints()
521+
}
522+
523+
fn table_type(&self) -> TableType {
524+
self.inner.table_type()
525+
}
526+
527+
fn supports_filters_pushdown(
528+
&self,
529+
filters: &[&Expr],
530+
) -> datafusion::common::Result<Vec<datafusion::logical_expr::TableProviderFilterPushDown>>
531+
{
532+
self.inner.supports_filters_pushdown(filters)
533+
}
534+
535+
fn statistics(&self) -> Option<Statistics> {
536+
Some(self.statistics.clone())
537+
}
538+
539+
async fn scan(
540+
&self,
541+
state: &dyn Session,
542+
projection: Option<&Vec<usize>>,
543+
filters: &[Expr],
544+
limit: Option<usize>,
545+
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
546+
self.inner.scan(state, projection, filters, limit).await
547+
}
548+
549+
async fn insert_into(
550+
&self,
551+
state: &dyn Session,
552+
input: Arc<dyn ExecutionPlan>,
553+
insert_op: datafusion::logical_expr::dml::InsertOp,
554+
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
555+
self.inner.insert_into(state, input, insert_op).await
556+
}
557+
558+
async fn delete_from(
559+
&self,
560+
state: &dyn Session,
561+
filters: Vec<Expr>,
562+
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
563+
self.inner.delete_from(state, filters).await
564+
}
565+
566+
async fn update(
567+
&self,
568+
state: &dyn Session,
569+
assignments: Vec<(String, Expr)>,
570+
filters: Vec<Expr>,
571+
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
572+
self.inner.update(state, assignments, filters).await
573+
}
574+
}
575+
474576
/// Default SubstraitConsumer for converting standard Substrait without user-defined extensions.
475577
///
476578
/// Used as the consumer in [crate::logical_plan::consumer::from_substrait_plan]
@@ -495,11 +597,79 @@ impl SubstraitConsumer for DefaultSubstraitConsumer<'_> {
495597
async fn resolve_table_ref(
496598
&self,
497599
table_ref: &TableReference,
600+
hints: SubstraitHints,
498601
) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>> {
499602
let table = table_ref.table().to_string();
500603
let schema = self.state.schema_for_ref(table_ref.clone())?;
501-
let table_provider = schema.table(&table).await?;
502-
Ok(table_provider)
604+
let provider = schema.table(&table).await?;
605+
// If the Substrait plan provides hints and the provider is missing the
606+
// corresponding statistics fields, wrap it to expose those hints to
607+
// DataFusion (e.g. for downstream optimizer rules).
608+
// We check each field individually so that a provider returning
609+
// Some(Statistics { num_rows: Absent, ... }) also gets hints injected,
610+
// and any already-known values from the provider are preserved.
611+
let has_hints = hints.row_count.is_some() || hints.record_size.is_some();
612+
let provider = match provider {
613+
Some(provider) if has_hints => {
614+
let existing = provider.statistics();
615+
let row_count_absent = existing
616+
.as_ref()
617+
.map_or(true, |s| matches!(s.num_rows, Precision::Absent));
618+
let byte_size_absent = existing
619+
.as_ref()
620+
.map_or(true, |s| matches!(s.total_byte_size, Precision::Absent));
621+
let inject_row_count = hints.row_count.is_some() && row_count_absent;
622+
// total_byte_size = row_count * record_size, so both hints must be
623+
// present to reconstruct it. A record_size-only hint (row_count absent)
624+
// is therefore silently ignored here, since we cannot compute a
625+
// meaningful total_byte_size from record_size alone.
626+
let inject_byte_size = hints.row_count.is_some()
627+
&& hints.record_size.is_some()
628+
&& byte_size_absent;
629+
if inject_row_count || inject_byte_size {
630+
let num_rows = if inject_row_count {
631+
Precision::Inexact(hints.row_count.unwrap().round() as usize)
632+
} else {
633+
existing
634+
.as_ref()
635+
.map_or(Precision::Absent, |s| s.num_rows.clone())
636+
};
637+
let total_byte_size = if inject_byte_size {
638+
// Use the effective row count that was resolved above:
639+
// the provider's own value when present (keeping
640+
// num_rows and total_byte_size internally consistent),
641+
// or the hint's row_count when the provider had none.
642+
let effective_rows = match &num_rows {
643+
Precision::Exact(n) | Precision::Inexact(n) => *n as f64,
644+
Precision::Absent => hints.row_count.unwrap(),
645+
};
646+
Precision::Inexact(
647+
(effective_rows * hints.record_size.unwrap()).round() as usize,
648+
)
649+
} else {
650+
existing
651+
.as_ref()
652+
.map_or(Precision::Absent, |s| s.total_byte_size.clone())
653+
};
654+
let column_statistics = existing
655+
.map(|s| s.column_statistics)
656+
.unwrap_or_else(|| Statistics::unknown_column(&provider.schema()));
657+
let statistics = Statistics {
658+
num_rows,
659+
total_byte_size,
660+
column_statistics,
661+
};
662+
Some(Arc::new(StatisticsOverrideTableProvider {
663+
inner: provider,
664+
statistics,
665+
}) as Arc<dyn TableProvider>)
666+
} else {
667+
Some(provider)
668+
}
669+
}
670+
provider => provider,
671+
};
672+
Ok(provider)
503673
}
504674

505675
fn get_extensions(&self) -> &Extensions {

0 commit comments

Comments
 (0)