perf: default multi COUNT(DISTINCT) logical optimizer rewrite#21088
perf: default multi COUNT(DISTINCT) logical optimizer rewrite#21088ydgandhi wants to merge 9 commits intoapache:mainfrom
Conversation
|
Hi @Dandandan - thanks for this work; the cross-join split for multiple distinct aggregates with no I’ve been working on a related but different pattern: Here is a concrete example from our internal benchmark suite on an SELECT
seller_name,
COUNT(*) as total_orders,
COUNT(DISTINCT delivery_city) as cities_served,
COUNT(DISTINCT state) as states_served,
SUM(CASE WHEN order_status = 'Completed' THEN 1 ELSE 0 END) as completed_orders,
SUM(CASE WHEN order_status = 'Cancelled' THEN 1 ELSE 0 END) as cancelled_orders,
ROUND(100.0 * SUM(CASE WHEN order_status = 'Completed' THEN 1 ELSE 0 END) / COUNT(*), 2) as success_rate
FROM ecommerce_orders
GROUP BY seller_name
HAVING COUNT(*) > 100
ORDER BY total_orders DESC
LIMIT 100;This is not “global” multi-distinct: it’s per In my opinion, they’re complementary: different predicates, different plans, and they can coexist in the optimizer pipeline (we’d want to sanity-check rule order so we don’t double-rewrite the same node). Happy to align naming, tests, and placement with you and the maintainers. |
|
I personally think this PR is a more generalized rewrite approach. #20940 might cover a limited range of scenarios, but it is more rigorous. Could the current PR add some test cases? |
|
Thanks for the review. I asked cursor to add a few tests Tests for
|
| Test | What it asserts |
|---|---|
rewrites_two_count_distinct |
GROUP BY a + COUNT(DISTINCT b), COUNT(DISTINCT c) → inner joins, per-branch null filters on b/c, mdc_base + two mdc_d aliases. |
rewrites_global_three_count_distinct |
No GROUP BY, three COUNT(DISTINCT …) → cross/inner join rewrite; no mdc_base (global-only path). |
rewrites_two_count_distinct_with_non_distinct_count |
Grouped BI-style: two distincts + COUNT(a) → join rewrite with mdc_base holding the non-distinct agg. |
does_not_rewrite_two_count_distinct_same_column |
Two COUNT(DISTINCT b) with different aliases → no rewrite (duplicate distinct key). |
does_not_rewrite_single_count_distinct |
Only one COUNT(DISTINCT …) → no rewrite (rule needs ≥2 distincts). |
rewrites_three_count_distinct_grouped |
Three grouped COUNT(DISTINCT …) on b, c, a → two inner joins + mdc_base. |
rewrites_interleaved_non_distinct_between_distincts |
Order COUNT(DISTINCT b), COUNT(a), COUNT(DISTINCT c) → rewrite + mdc_base for the middle non-distinct agg (projection order / interleaving). |
rewrites_count_distinct_on_cast_exprs |
COUNT(DISTINCT CAST(b AS Int64)), same for c → rewrite + null filters on the cast expressions. |
does_not_rewrite_grouping_sets_multi_distinct |
GROUPING SETS aggregate with two COUNT(DISTINCT …) → no rewrite (rule bails on grouping sets). |
does_not_rewrite_mixed_agg |
COUNT(DISTINCT b) + COUNT(c) → no rewrite (only one COUNT(DISTINCT …); rule requires at least two). |
SQL integration — datafusion/core/tests/sql/aggregates/multi_distinct_count_rewrite.rs
| Test | What it asserts |
|---|---|
multi_count_distinct_matches_expected_with_nulls |
End-to-end grouped two COUNT(DISTINCT …) with NULLs in distinct columns; exact sorted batch string vs expected counts. |
multi_count_distinct_with_count_star_matches_expected |
COUNT(*) plus two COUNT(DISTINCT …) per group (BI-style); exact result table. |
multi_count_distinct_two_group_keys_matches_expected |
GROUP BY g1, g2 + two distincts; verifies joins line up on all group keys and numerics match. |
a7348f6 to
4321c50
Compare
|
I think a rewrite like this might be useful, but I think it can also hurt performance because of the join on grouping keys. I am also wondering if mostly for memory usage a |
@Dandandan Thank you for the explanation. It’s true that this would add a hash join, but if aggregation can be performed in parallel, there might be advantages in scenarios with two or more COUNT(DISTINCT) operations. I agree to run performance tests across multiple scenarios to evaluate the actual results. |
34467ff to
274eeb6
Compare
|
Thanks @Dandandan @xiedeyantu for the discussion. Join cost / default behavior: Agreed that joining on grouping keys isn’t free. We’ve added Benchmarks: We’re aligned that we should measure latency and memory vs baseline across scenarios (e.g. multiple GroupsAccumulator / execution-layer improvements: That work is complementary: better distinct accumulators help how each aggregate runs; this rule changes logical plan shape when several large distincts share one aggregate. Both can coexist on the roadmap. Tests: We added optimizer + SQL integration coverage (including cases with
|
Add MultiDistinctCountRewrite in datafusion-optimizer and register it in Optimizer::new() after SingleDistinctToGroupBy. Rewrites 2+ simple COUNT(DISTINCT) on different args into a join of two-phase aggregates; filter distinct_arg IS NOT NULL on each branch for correct NULL semantics. ✅ Unit tests in datafusion-optimizer; ✅ SQL integration test (NULLs) in core_integration.
…gate ✅ Omit base Aggregate when GROUP BY is empty and only COUNT(DISTINCT) branches exist (matches clickbench extended global queries). ✅ First distinct branch seeds the plan; subsequent branches join (empty keys → Cross Join in plan). ✅ Add rewrites_global_three_count_distinct unit test. ❌ Previous shape could error: Aggregate with no grouping and no aggregate expressions.
… tests ✅ Fix projection after join so output columns match the original aggregate list when COUNT(DISTINCT …) and non-distinct aggs are interleaved (schema-compatible with mixed BI-style queries). ✅ Add internal_err guard for inconsistent aggregate index mapping. ✅ Optimizer tests: three grouped COUNT(DISTINCT), non-distinct between distincts, CAST(distinct) args, no rewrite for GROUPING SETS. ✅ SQL integration: COUNT(*) + two COUNT(DISTINCT); two GROUP BY keys with expected results. ❌ Grouping-set / filtered-distinct cases remain explicitly out of scope for this rule (covered by unchanged-plan tests where applicable). Made-with: Cursor
… rewrite ✅ End-to-end: COUNT(DISTINCT lower(b)) with 'Abc'/'aBC' plus second distinct on c (case collapse = 1). ✅ End-to-end: COUNT(DISTINCT CAST(x AS INT)) with 1.2/1.3 vs second CAST distinct on y (int collision = 1). ✅ docs: REPLY_PR_20940_DANDANDAN.md — integration table rows + note on safe distinct args (lower/upper/CAST). ❌ No optimizer or engine behavior change; asserts semantics match non-rewritten aggregation. Made-with: Cursor
…t off) ✅ Add datafusion.optimizer.enable_multi_distinct_count_rewrite (default false). ✅ MultiDistinctCountRewrite no-ops when disabled; OptimizerContext::with_enable_multi_distinct_count_rewrite for tests. ✅ SQL integration tests enable the flag via session helper; unit test skips_rewrite_when_config_disabled. ✅ Document option in user-guide configs.md. ❌ Does not change rewrite semantics when enabled. Made-with: Cursor
a25e831 to
c64db4b
Compare
|
Yes, I can see it would improve a lot of queries
Sounds good - agreed. |
|
Following up with additional real-data benchmark results for Dataset generation / setupI used local TPCH SF1 parquet generated with: cargo install tpchgen-cli
tpchgen-cli --scale-factor 1 --format parquet --parquet-compression 'ZSTD(1)' --parts 1 --output-dir benchmarks/data/tpch_sf1Data path used in all queries: For timing/memory, each run used: /usr/bin/time -l datafusion-cli -q --format csv -c "SET datafusion.optimizer.enable_multi_distinct_count_rewrite = <true|false>;" -c "<QUERY>"(Reporting wall time + max RSS from Queries testedScenario 1: 3 distincts (grouped)SELECT SUM(d1) AS s1, SUM(d2) AS s2, SUM(d3) AS s3
FROM (
SELECT l_suppkey,
COUNT(DISTINCT l_orderkey) AS d1,
COUNT(DISTINCT l_partkey) AS d2,
COUNT(DISTINCT l_linenumber) AS d3
FROM 'benchmarks/data/tpch_sf1/lineitem/*.parquet'
GROUP BY l_suppkey
);Scenario 2: CAST-based distinctsSELECT SUM(cx) AS sx, SUM(cy) AS sy
FROM (
SELECT l_suppkey,
COUNT(DISTINCT CAST(l_extendedprice AS INT)) AS cx,
COUNT(DISTINCT CAST(l_discount * 100 AS INT)) AS cy
FROM 'benchmarks/data/tpch_sf1/lineitem/*.parquet'
GROUP BY l_suppkey
);Scenario 3: lower() + string distinctSELECT SUM(c1) AS s1, SUM(c2) AS s2
FROM (
SELECT l_returnflag,
COUNT(DISTINCT lower(l_shipmode)) AS c1,
COUNT(DISTINCT l_shipinstruct) AS c2
FROM 'benchmarks/data/tpch_sf1/lineitem/*.parquet'
GROUP BY l_returnflag
);Results (latency + max RSS)
All paired runs returned identical query results. TakeawayThe rewrite can improve latency in some multi-distinct grouped cases, but can also increase peak memory and/or be neutral/slightly slower depending on cardinality and expression shape. Keeping this rule opt-in (default off) remains appropriate while we expand benchmark coverage.
|
|
Thank you for the detailed test! But I have a few questions.
|
|
Hi! @xiedeyantu, great feedback. Let me explain my thought process for some of the design choices regarding the tests. See inline
The outer
Agreed for a clean test: both distincts should use
The We'll follow up with updated query text and numbers when we rerun. |
|
Hi @xiedeyantu update on the benchmarking. The new benchmarks use the grouped aggregate only (no outer Queries tested - original runsScenario 1: three distincts (grouped)SELECT l_suppkey,
COUNT(DISTINCT l_orderkey) AS d1,
COUNT(DISTINCT l_partkey) AS d2,
COUNT(DISTINCT l_linenumber) AS d3
FROM 'benchmarks/data/tpch_sf1/lineitem/*.parquet'
GROUP BY l_suppkey;Rerun: reviewer-aligned changesCAST (Scenario 2): both distincts use SELECT l_suppkey,
COUNT(DISTINCT CAST(l_extendedprice AS INT)) AS cx,
COUNT(DISTINCT CAST(l_discount AS INT)) AS cy
FROM 'benchmarks/data/tpch_sf1/lineitem/*.parquet'
GROUP BY l_suppkey;High cardinality (Scenario 3): same aggregates as before, but SELECT l_suppkey,
COUNT(DISTINCT lower(l_shipmode)) AS c1,
COUNT(DISTINCT l_shipinstruct) AS c2
FROM 'benchmarks/data/tpch_sf1/lineitem/*.parquet'
GROUP BY l_suppkey;Results (mean ± sample stddev, n=10 per cell)Measured with wall (s) mean ± stddev
max RSS (MB) mean ± stddev
Row outputs matched OFF vs ON for every scenario (same grouped aggregate result). Wall time from |
Add MultiDistinctCountRewrite in datafusion-optimizer and register it in Optimizer::new() after SingleDistinctToGroupBy. Rewrites 2+ simple COUNT(DISTINCT) on different args into a join of two-phase aggregates; filter distinct_arg IS NOT NULL on each branch for correct NULL semantics.
✅ Unit tests in datafusion-optimizer; ✅ SQL integration test (NULLs) in core_integration.
Which issue does this PR close?
Rationale for this change
Queries with multiple COUNT(DISTINCT col_i) in the same GROUP BY can force independent distinct state per aggregate (e.g. separate hash sets), which scales poorly in memory when several high-cardinality distinct columns appear together.
DataFusion already optimizes the single shared distinct field case via SingleDistinctToGroupBy. This PR adds a conservative logical rewrite for multiple distinct COUNT(DISTINCT …) arguments by splitting work into per-distinct branches joined on the group keys, which reduces peak memory for eligible plans.
COUNT(DISTINCT x) must ignore NULL x; the rewrite applies x IS NOT NULL on each distinct branch before inner grouping so semantics stay aligned with count_distinct behavior.
What changes are included in this PR?
New module:
Tests:
Are these changes tested?
Yes.
Are there any user-facing changes?
Behavior / plans: For queries that match the rule, logical plans (and thus EXPLAIN output) can differ: eligible multi–COUNT(DISTINCT) aggregates may appear as joins of sub-aggregates instead of a single Aggregate with multiple distinct counts.
Results: Intended to be semantics-preserving for supported patterns (including NULL handling via filters).
Public API: No intentional breaking changes to public Rust APIs; this is an internal optimizer rule enabled by default.
Docs: No user guide update required unless maintainers want an “optimizer behavior” note; can add if requested.