Skip to content

Commit b78496f

Browse files
committed
Avoid allocating column names
1 parent 532b74e commit b78496f

File tree

3 files changed

+71
-50
lines changed

3 files changed

+71
-50
lines changed

datafusion/optimizer/src/extract_leaf_expressions.rs

Lines changed: 18 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use datafusion_expr::{Expr, ExpressionPlacement, Projection};
3232

3333
use crate::optimizer::ApplyOrder;
3434
use crate::push_down_filter::replace_cols_by_name;
35-
use crate::utils::has_all_column_refs;
35+
use crate::utils::{ColumnReference, has_all_column_refs, schema_columns};
3636
use crate::{OptimizerConfig, OptimizerRule};
3737

3838
/// Prefix for aliases generated by the extraction optimizer passes.
@@ -213,10 +213,11 @@ fn extract_from_plan(
213213
.collect();
214214

215215
// Build per-input column sets for routing expressions to the correct input
216-
let input_column_sets: Vec<std::collections::HashSet<Column>> = input_schemas
217-
.iter()
218-
.map(|schema| schema_columns(schema.as_ref()))
219-
.collect();
216+
let input_column_sets: Vec<std::collections::HashSet<ColumnReference>> =
217+
input_schemas
218+
.iter()
219+
.map(|schema| schema_columns(schema.as_ref()))
220+
.collect();
220221

221222
// Transform expressions via map_expressions with routing
222223
let transformed = plan.map_expressions(|expr| {
@@ -272,7 +273,7 @@ fn extract_from_plan(
272273
/// in both sides of a join).
273274
fn find_owning_input(
274275
expr: &Expr,
275-
input_column_sets: &[std::collections::HashSet<Column>],
276+
input_column_sets: &[std::collections::HashSet<ColumnReference>],
276277
) -> Option<usize> {
277278
let mut found = None;
278279
for (idx, cols) in input_column_sets.iter().enumerate() {
@@ -292,7 +293,7 @@ fn find_owning_input(
292293
fn routing_extract(
293294
expr: Expr,
294295
extractors: &mut [LeafExpressionExtractor],
295-
input_column_sets: &[std::collections::HashSet<Column>],
296+
input_column_sets: &[std::collections::HashSet<ColumnReference>],
296297
) -> Result<Transformed<Expr>> {
297298
expr.transform_down(|e| {
298299
// Skip expressions already aliased with extracted expression pattern
@@ -340,19 +341,6 @@ fn routing_extract(
340341
})
341342
}
342343

343-
/// Returns all columns in the schema (both qualified and unqualified forms)
344-
fn schema_columns(schema: &DFSchema) -> std::collections::HashSet<Column> {
345-
schema
346-
.iter()
347-
.flat_map(|(qualifier, field)| {
348-
[
349-
Column::new(qualifier.cloned(), field.name()),
350-
Column::new_unqualified(field.name()),
351-
]
352-
})
353-
.collect()
354-
}
355-
356344
/// Rewrites extraction pairs and column references from one qualifier
357345
/// space to another.
358346
///
@@ -1072,7 +1060,7 @@ fn route_to_inputs(
10721060
pairs: &[(Expr, String)],
10731061
columns: &IndexSet<Column>,
10741062
node: &LogicalPlan,
1075-
input_column_sets: &[std::collections::HashSet<Column>],
1063+
input_column_sets: &[std::collections::HashSet<ColumnReference>],
10761064
input_schemas: &[Arc<DFSchema>],
10771065
) -> Result<Option<Vec<ExtractionTarget>>> {
10781066
let num_inputs = input_schemas.len();
@@ -1173,7 +1161,7 @@ fn try_push_into_inputs(
11731161
// Build per-input schemas and column sets for routing
11741162
let input_schemas: Vec<Arc<DFSchema>> =
11751163
inputs.iter().map(|i| Arc::clone(i.schema())).collect();
1176-
let input_column_sets: Vec<std::collections::HashSet<Column>> =
1164+
let input_column_sets: Vec<std::collections::HashSet<ColumnReference>> =
11771165
input_schemas.iter().map(|s| schema_columns(s)).collect();
11781166

11791167
// Route pairs and columns to the appropriate inputs
@@ -2436,16 +2424,18 @@ mod tests {
24362424
// Simulate schema_columns output for two sides of a join where both
24372425
// have a "user" column — each set contains the qualified and
24382426
// unqualified form.
2439-
let left_cols: HashSet<Column> = [
2440-
Column::new(Some("test"), "user"),
2441-
Column::new_unqualified("user"),
2427+
let relation = "test".into();
2428+
let left_cols: HashSet<ColumnReference> = [
2429+
ColumnReference::new(Some(&relation), "user"),
2430+
ColumnReference::new_unqualified("user"),
24422431
]
24432432
.into_iter()
24442433
.collect();
24452434

2446-
let right_cols: HashSet<Column> = [
2447-
Column::new(Some("right"), "user"),
2448-
Column::new_unqualified("user"),
2435+
let relation = "right".into();
2436+
let right_cols: HashSet<ColumnReference> = [
2437+
ColumnReference::new(Some(&relation), "user"),
2438+
ColumnReference::new_unqualified("user"),
24492439
]
24502440
.into_iter()
24512441
.collect();

datafusion/optimizer/src/push_down_filter.rs

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ use datafusion_expr::{
4545

4646
use crate::optimizer::ApplyOrder;
4747
use crate::simplify_expressions::simplify_predicates;
48-
use crate::utils::{has_all_column_refs, is_restrict_null_predicate};
48+
use crate::utils::{
49+
ColumnReference, has_all_column_refs, is_restrict_null_predicate, schema_columns,
50+
};
4951
use crate::{OptimizerConfig, OptimizerRule};
5052
use datafusion_expr::ExpressionPlacement;
5153

@@ -190,11 +192,11 @@ struct ColumnChecker<'a> {
190192
/// schema of left join input
191193
left_schema: &'a DFSchema,
192194
/// columns in left_schema, computed on demand
193-
left_columns: Option<HashSet<Column>>,
195+
left_columns: Option<HashSet<ColumnReference<'a>>>,
194196
/// schema of right join input
195197
right_schema: &'a DFSchema,
196198
/// columns in left_schema, computed on demand
197-
right_columns: Option<HashSet<Column>>,
199+
right_columns: Option<HashSet<ColumnReference<'a>>>,
198200
}
199201

200202
impl<'a> ColumnChecker<'a> {
@@ -224,20 +226,6 @@ impl<'a> ColumnChecker<'a> {
224226
}
225227
}
226228

227-
/// Returns all columns in the schema
228-
fn schema_columns(schema: &DFSchema) -> HashSet<Column> {
229-
schema
230-
.iter()
231-
.flat_map(|(qualifier, field)| {
232-
[
233-
Column::new(qualifier.cloned(), field.name()),
234-
// we need to push down filter using unqualified column as well
235-
Column::new_unqualified(field.name()),
236-
]
237-
})
238-
.collect::<HashSet<_>>()
239-
}
240-
241229
/// Determine whether the predicate can evaluate as the join conditions
242230
fn can_evaluate_as_join_condition(predicate: &Expr) -> Result<bool> {
243231
let mut is_evaluate = true;
@@ -320,7 +308,7 @@ fn can_evaluate_as_join_condition(predicate: &Expr) -> Result<bool> {
320308
/// * do nothing.
321309
fn extract_or_clauses_for_join<'a>(
322310
filters: &'a [Expr],
323-
schema_cols: &'a HashSet<Column>,
311+
schema_cols: &'a HashSet<ColumnReference>,
324312
) -> impl Iterator<Item = Expr> + 'a {
325313
// new formed OR clauses and their column references
326314
filters.iter().filter_map(move |expr| {
@@ -353,7 +341,10 @@ fn extract_or_clauses_for_join<'a>(
353341
/// Otherwise, return None.
354342
///
355343
/// For other clause, apply the rule above to extract clause.
356-
fn extract_or_clause(expr: &Expr, schema_columns: &HashSet<Column>) -> Option<Expr> {
344+
fn extract_or_clause(
345+
expr: &Expr,
346+
schema_columns: &HashSet<ColumnReference>,
347+
) -> Option<Expr> {
357348
let mut predicate = None;
358349

359350
match expr {

datafusion/optimizer/src/utils.rs

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use std::collections::{BTreeSet, HashMap, HashSet};
2222
use crate::analyzer::type_coercion::TypeCoercionRewriter;
2323
use arrow::array::{Array, RecordBatch, new_null_array};
2424
use arrow::datatypes::{DataType, Field, Schema};
25+
use datafusion_common::TableReference;
2526
use datafusion_common::cast::as_boolean_array;
2627
use datafusion_common::tree_node::{TransformedResult, TreeNode};
2728
use datafusion_common::{Column, DFSchema, Result, ScalarValue};
@@ -37,12 +38,17 @@ use std::sync::Arc;
3738
pub use datafusion_expr::expr_rewriter::NamePreserver;
3839

3940
/// Returns true if `expr` contains all columns in `schema_cols`
40-
pub(crate) fn has_all_column_refs(expr: &Expr, schema_cols: &HashSet<Column>) -> bool {
41+
pub(crate) fn has_all_column_refs(
42+
expr: &Expr,
43+
schema_cols: &HashSet<ColumnReference<'_>>,
44+
) -> bool {
4145
let column_refs = expr.column_refs();
4246
// note can't use HashSet::intersect because of different types (owned vs References)
43-
schema_cols
47+
column_refs
4448
.iter()
45-
.filter(|c| column_refs.contains(c))
49+
.filter(|c| {
50+
schema_cols.contains(&ColumnReference::new(c.relation.as_ref(), c.name()))
51+
})
4652
.count()
4753
== column_refs.len()
4854
}
@@ -62,6 +68,40 @@ pub(crate) fn replace_qualified_name(
6268
replace_col(expr, &replace_map)
6369
}
6470

71+
///Column reference to avoid copying string around
72+
#[derive(PartialEq, Eq, Hash, Debug)]
73+
pub(crate) struct ColumnReference<'a> {
74+
pub relation: Option<&'a TableReference>,
75+
pub name: &'a str,
76+
}
77+
78+
impl<'a> ColumnReference<'a> {
79+
pub fn new(relation: Option<&'a TableReference>, name: &'a str) -> Self {
80+
Self { relation, name }
81+
}
82+
83+
pub fn new_unqualified(name: &'a str) -> Self {
84+
Self {
85+
relation: None,
86+
name,
87+
}
88+
}
89+
}
90+
91+
/// Returns references to all columns in the schema
92+
pub(crate) fn schema_columns(schema: &DFSchema) -> HashSet<ColumnReference<'_>> {
93+
schema
94+
.iter()
95+
.flat_map(|(qualifier, field)| {
96+
[
97+
ColumnReference::new(qualifier, field.name()),
98+
// we need to push down filter using unqualified column as well
99+
ColumnReference::new_unqualified(field.name()),
100+
]
101+
})
102+
.collect::<HashSet<_>>()
103+
}
104+
65105
/// Log the plan in debug/tracing mode after some part of the optimizer runs
66106
pub fn log_plan(description: &str, plan: &LogicalPlan) {
67107
debug!("{description}:\n{}\n", plan.display_indent());

0 commit comments

Comments
 (0)