Skip to content

perf: default multi COUNT(DISTINCT) logical optimizer rewrite#21088

Open
ydgandhi wants to merge 9 commits intoapache:mainfrom
ydgandhi:feat/multi-distinct-count-rewrite
Open

perf: default multi COUNT(DISTINCT) logical optimizer rewrite#21088
ydgandhi wants to merge 9 commits intoapache:mainfrom
ydgandhi:feat/multi-distinct-count-rewrite

Conversation

@ydgandhi
Copy link

Add MultiDistinctCountRewrite in datafusion-optimizer and register it in Optimizer::new() after SingleDistinctToGroupBy. Rewrites 2+ simple COUNT(DISTINCT) on different args into a join of two-phase aggregates; filter distinct_arg IS NOT NULL on each branch for correct NULL semantics.

✅ Unit tests in datafusion-optimizer; ✅ SQL integration test (NULLs) in core_integration.

Which issue does this PR close?

Rationale for this change

Queries with multiple COUNT(DISTINCT col_i) in the same GROUP BY can force independent distinct state per aggregate (e.g. separate hash sets), which scales poorly in memory when several high-cardinality distinct columns appear together.

DataFusion already optimizes the single shared distinct field case via SingleDistinctToGroupBy. This PR adds a conservative logical rewrite for multiple distinct COUNT(DISTINCT …) arguments by splitting work into per-distinct branches joined on the group keys, which reduces peak memory for eligible plans.

COUNT(DISTINCT x) must ignore NULL x; the rewrite applies x IS NOT NULL on each distinct branch before inner grouping so semantics stay aligned with count_distinct behavior.

What changes are included in this PR?

New module:

  • datafusion/optimizer/src/multi_distinct_count_rewrite.rs — MultiDistinctCountRewrite (OptimizerRule, bottom-up).
  • Register rule in datafusion/optimizer/src/optimizer.rs immediately after SingleDistinctToGroupBy.
  • Export module from datafusion/optimizer/src/lib.rs.

Tests:

  • datafusion-optimizer: rewrite vs no-op cases (single distinct, mixed aggs, two distincts).
  • datafusion core_integration: datafusion/core/tests/sql/aggregates/multi_distinct_count_rewrite.rs — SQL over MemTable with NULLs.

Are these changes tested?

Yes.

  • cargo test -p datafusion-optimizer (includes new unit tests).
  • cargo test -p datafusion --test core_integration multi_count_distinct (SQL + NULLs).

Are there any user-facing changes?

  • Behavior / plans: For queries that match the rule, logical plans (and thus EXPLAIN output) can differ: eligible multi–COUNT(DISTINCT) aggregates may appear as joins of sub-aggregates instead of a single Aggregate with multiple distinct counts.

  • Results: Intended to be semantics-preserving for supported patterns (including NULL handling via filters).

  • Public API: No intentional breaking changes to public Rust APIs; this is an internal optimizer rule enabled by default.

  • Docs: No user guide update required unless maintainers want an “optimizer behavior” note; can add if requested.

@github-actions github-actions bot added optimizer Optimizer rules core Core DataFusion crate labels Mar 21, 2026
@Dandandan
Copy link
Contributor

Hi @ydgandhi !

This does seem the same to me as #20940 ?

Can you maybe help review that one?

@ydgandhi
Copy link
Author

ydgandhi commented Mar 21, 2026

Hi @Dandandan - thanks for this work; the cross-join split for multiple distinct aggregates with no GROUP BY is a strong fit for workloads like ClickBench extended.

I’ve been working on a related but different pattern: GROUP BY + several COUNT(DISTINCT …) in the same aggregate (typical BI). In that situation, your rule is not quite applicable, because MultiDistinctToCrossJoin needs an empty GROUP BY and all aggregates to be distinct on different columns.

Here is a concrete example from our internal benchmark suite on an ecommerce_orders table:

SELECT
    seller_name,
    COUNT(*) as total_orders,
    COUNT(DISTINCT delivery_city) as cities_served,
    COUNT(DISTINCT state) as states_served,
    SUM(CASE WHEN order_status = 'Completed' THEN 1 ELSE 0 END) as completed_orders,
    SUM(CASE WHEN order_status = 'Cancelled' THEN 1 ELSE 0 END) as cancelled_orders,
    ROUND(100.0 * SUM(CASE WHEN order_status = 'Completed' THEN 1 ELSE 0 END) / COUNT(*), 2) as success_rate
FROM ecommerce_orders
GROUP BY seller_name
HAVING COUNT(*) > 100
ORDER BY total_orders DESC
LIMIT 100;

This is not “global” multi-distinct: it’s per seller_name, with multiple COUNT(DISTINCT …) plus other aggregates. That’s the class my optimizer rule (MultiDistinctCountRewrite) targets — rewriting the COUNT(DISTINCT …) pieces into joinable sub-aggregates aligned on the same GROUP BY keys, with correct NULL handling where needed. For this table with 20m rows, on my M4 machine the times are 0.42s vs ~17s for the implementation in #20940

In my opinion, they’re complementary: different predicates, different plans, and they can coexist in the optimizer pipeline (we’d want to sanity-check rule order so we don’t double-rewrite the same node).

Happy to align naming, tests, and placement with you and the maintainers.

@xiedeyantu
Copy link
Member

I personally think this PR is a more generalized rewrite approach. #20940 might cover a limited range of scenarios, but it is more rigorous. Could the current PR add some test cases?

@ydgandhi
Copy link
Author

Thanks for the review. I asked cursor to add a few tests


Tests for MultiDistinctCountRewrite (what they cover)

Optimizer unit tests — datafusion/optimizer/src/multi_distinct_count_rewrite.rs

Test What it asserts
rewrites_two_count_distinct GROUP BY a + COUNT(DISTINCT b), COUNT(DISTINCT c) → inner joins, per-branch null filters on b/c, mdc_base + two mdc_d aliases.
rewrites_global_three_count_distinct No GROUP BY, three COUNT(DISTINCT …) → cross/inner join rewrite; no mdc_base (global-only path).
rewrites_two_count_distinct_with_non_distinct_count Grouped BI-style: two distincts + COUNT(a) → join rewrite with mdc_base holding the non-distinct agg.
does_not_rewrite_two_count_distinct_same_column Two COUNT(DISTINCT b) with different aliases → no rewrite (duplicate distinct key).
does_not_rewrite_single_count_distinct Only one COUNT(DISTINCT …)no rewrite (rule needs ≥2 distincts).
rewrites_three_count_distinct_grouped Three grouped COUNT(DISTINCT …) on b, c, atwo inner joins + mdc_base.
rewrites_interleaved_non_distinct_between_distincts Order COUNT(DISTINCT b), COUNT(a), COUNT(DISTINCT c) → rewrite + mdc_base for the middle non-distinct agg (projection order / interleaving).
rewrites_count_distinct_on_cast_exprs COUNT(DISTINCT CAST(b AS Int64)), same for c → rewrite + null filters on the cast expressions.
does_not_rewrite_grouping_sets_multi_distinct GROUPING SETS aggregate with two COUNT(DISTINCT …)no rewrite (rule bails on grouping sets).
does_not_rewrite_mixed_agg COUNT(DISTINCT b) + COUNT(c)no rewrite (only one COUNT(DISTINCT …); rule requires at least two).

SQL integration — datafusion/core/tests/sql/aggregates/multi_distinct_count_rewrite.rs

Test What it asserts
multi_count_distinct_matches_expected_with_nulls End-to-end grouped two COUNT(DISTINCT …) with NULLs in distinct columns; exact sorted batch string vs expected counts.
multi_count_distinct_with_count_star_matches_expected COUNT(*) plus two COUNT(DISTINCT …) per group (BI-style); exact result table.
multi_count_distinct_two_group_keys_matches_expected GROUP BY g1, g2 + two distincts; verifies joins line up on all group keys and numerics match.

@github-actions github-actions bot added the documentation Improvements or additions to documentation label Mar 21, 2026
@ydgandhi ydgandhi force-pushed the feat/multi-distinct-count-rewrite branch from a7348f6 to 4321c50 Compare March 21, 2026 14:54
@github-actions github-actions bot removed the documentation Improvements or additions to documentation label Mar 21, 2026
@Dandandan
Copy link
Contributor

I think a rewrite like this might be useful, but I think it can also hurt performance because of the join on grouping keys.
So I think it needs to have a config value (off by default) or when enabled some benchmarks showing that it is better in large majority of the cases.

I am also wondering if mostly for memory usage a GroupsAccumulator for distinct count / sum might give similar/more improvements.

@xiedeyantu
Copy link
Member

I think a rewrite like this might be useful, but I think it can also hurt performance because of the join on grouping keys. So I think it needs to have a config value (off by default) or when enabled some benchmarks showing that it is better in large majority of the cases.

I am also wondering if mostly for memory usage a GroupsAccumulator for distinct count / sum might give similar/more improvements.

@Dandandan Thank you for the explanation. It’s true that this would add a hash join, but if aggregation can be performed in parallel, there might be advantages in scenarios with two or more COUNT(DISTINCT) operations. I agree to run performance tests across multiple scenarios to evaluate the actual results.

@ydgandhi ydgandhi force-pushed the feat/multi-distinct-count-rewrite branch from 34467ff to 274eeb6 Compare March 23, 2026 05:26
@github-actions github-actions bot added documentation Improvements or additions to documentation common Related to common crate labels Mar 23, 2026
@ydgandhi
Copy link
Author

Thanks @Dandandan @xiedeyantu for the discussion.

Join cost / default behavior: Agreed that joining on grouping keys isn’t free. We’ve added datafusion.optimizer.enable_multi_distinct_count_rewrite with default false, so the rewrite is opt-in until we have benchmarks that justify turning it on by default. Sessions can enable it explicitly when they want to try the plan shape.

Benchmarks: We’re aligned that we should measure latency and memory vs baseline across scenarios (e.g. multiple COUNT(DISTINCT …) with GROUP BY, varying group-key cardinality vs distinct cardinality). We’ll follow up with numbers on the PR.

GroupsAccumulator / execution-layer improvements: That work is complementary: better distinct accumulators help how each aggregate runs; this rule changes logical plan shape when several large distincts share one aggregate. Both can coexist on the roadmap.

Tests: We added optimizer + SQL integration coverage (including cases with lower/CAST inside COUNT(DISTINCT …) and a test that the rule is a no-op when the config is off). Happy to iterate on naming and placement with maintainers.

I think a rewrite like this might be useful, but I think it can also hurt performance because of the join on grouping keys. So I think it needs to have a config value (off by default) or when enabled some benchmarks showing that it is better in large majority of the cases.
I am also wondering if mostly for memory usage a GroupsAccumulator for distinct count / sum might give similar/more improvements.

@Dandandan Thank you for the explanation. It’s true that this would add a hash join, but if aggregation can be performed in parallel, there might be advantages in scenarios with two or more COUNT(DISTINCT) operations. I agree to run performance tests across multiple scenarios to evaluate the actual results.

yash and others added 6 commits March 23, 2026 14:51
Add MultiDistinctCountRewrite in datafusion-optimizer and register it in Optimizer::new() after SingleDistinctToGroupBy. Rewrites 2+ simple COUNT(DISTINCT) on different args into a join of two-phase aggregates; filter distinct_arg IS NOT NULL on each branch for correct NULL semantics.

✅ Unit tests in datafusion-optimizer; ✅ SQL integration test (NULLs) in core_integration.
…gate

✅ Omit base Aggregate when GROUP BY is empty and only COUNT(DISTINCT) branches exist (matches clickbench extended global queries).
✅ First distinct branch seeds the plan; subsequent branches join (empty keys → Cross Join in plan).
✅ Add rewrites_global_three_count_distinct unit test.

❌ Previous shape could error: Aggregate with no grouping and no aggregate expressions.
… tests

✅ Fix projection after join so output columns match the original aggregate list when COUNT(DISTINCT …) and non-distinct aggs are interleaved (schema-compatible with mixed BI-style queries).

✅ Add internal_err guard for inconsistent aggregate index mapping.

✅ Optimizer tests: three grouped COUNT(DISTINCT), non-distinct between distincts, CAST(distinct) args, no rewrite for GROUPING SETS.

✅ SQL integration: COUNT(*) + two COUNT(DISTINCT); two GROUP BY keys with expected results.

❌ Grouping-set / filtered-distinct cases remain explicitly out of scope for this rule (covered by unchanged-plan tests where applicable).

Made-with: Cursor
… rewrite

✅ End-to-end: COUNT(DISTINCT lower(b)) with 'Abc'/'aBC' plus second distinct on c (case collapse = 1).

✅ End-to-end: COUNT(DISTINCT CAST(x AS INT)) with 1.2/1.3 vs second CAST distinct on y (int collision = 1).

✅ docs: REPLY_PR_20940_DANDANDAN.md — integration table rows + note on safe distinct args (lower/upper/CAST).

❌ No optimizer or engine behavior change; asserts semantics match non-rewritten aggregation.

Made-with: Cursor
…t off)

✅ Add datafusion.optimizer.enable_multi_distinct_count_rewrite (default false).

✅ MultiDistinctCountRewrite no-ops when disabled; OptimizerContext::with_enable_multi_distinct_count_rewrite for tests.

✅ SQL integration tests enable the flag via session helper; unit test skips_rewrite_when_config_disabled.

✅ Document option in user-guide configs.md.

❌ Does not change rewrite semantics when enabled.

Made-with: Cursor
@ydgandhi ydgandhi force-pushed the feat/multi-distinct-count-rewrite branch from a25e831 to c64db4b Compare March 23, 2026 09:21
@Dandandan
Copy link
Contributor

Yes, I can see it would improve a lot of queries

Thanks @Dandandan @xiedeyantu for the discussion.

Join cost / default behavior: Agreed that joining on grouping keys isn’t free. We’ve added datafusion.optimizer.enable_multi_distinct_count_rewrite with default false, so the rewrite is opt-in until we have benchmarks that justify turning it on by default. Sessions can enable it explicitly when they want to try the plan shape.

Benchmarks: We’re aligned that we should measure latency and memory vs baseline across scenarios (e.g. multiple COUNT(DISTINCT …) with GROUP BY, varying group-key cardinality vs distinct cardinality). We’ll follow up with numbers on the PR.

GroupsAccumulator / execution-layer improvements: That work is complementary: better distinct accumulators help how each aggregate runs; this rule changes logical plan shape when several large distincts share one aggregate. Both can coexist on the roadmap.

Tests: We added optimizer + SQL integration coverage (including cases with lower/CAST inside COUNT(DISTINCT …) and a test that the rule is a no-op when the config is off). Happy to iterate on naming and placement with maintainers.

I think a rewrite like this might be useful, but I think it can also hurt performance because of the join on grouping keys. So I think it needs to have a config value (off by default) or when enabled some benchmarks showing that it is better in large majority of the cases.
I am also wondering if mostly for memory usage a GroupsAccumulator for distinct count / sum might give similar/more improvements.

@Dandandan Thank you for the explanation. It’s true that this would add a hash join, but if aggregation can be performed in parallel, there might be advantages in scenarios with two or more COUNT(DISTINCT) operations. I agree to run performance tests across multiple scenarios to evaluate the actual results.

Sounds good - agreed.

@ydgandhi
Copy link
Author

Following up with additional real-data benchmark results for MultiDistinctCountRewrite (config-gated by datafusion.optimizer.enable_multi_distinct_count_rewrite, default false).

Dataset generation / setup

I used local TPCH SF1 parquet generated with:

cargo install tpchgen-cli
tpchgen-cli --scale-factor 1 --format parquet --parquet-compression 'ZSTD(1)' --parts 1 --output-dir benchmarks/data/tpch_sf1

Data path used in all queries:
benchmarks/data/tpch_sf1/lineitem/*.parquet

For timing/memory, each run used:

/usr/bin/time -l datafusion-cli -q --format csv -c "SET datafusion.optimizer.enable_multi_distinct_count_rewrite = <true|false>;" -c "<QUERY>"

(Reporting wall time + max RSS from time -l.)

Queries tested

Scenario 1: 3 distincts (grouped)

SELECT SUM(d1) AS s1, SUM(d2) AS s2, SUM(d3) AS s3
FROM (
  SELECT l_suppkey,
         COUNT(DISTINCT l_orderkey) AS d1,
         COUNT(DISTINCT l_partkey) AS d2,
         COUNT(DISTINCT l_linenumber) AS d3
  FROM 'benchmarks/data/tpch_sf1/lineitem/*.parquet'
  GROUP BY l_suppkey
);

Scenario 2: CAST-based distincts

SELECT SUM(cx) AS sx, SUM(cy) AS sy
FROM (
  SELECT l_suppkey,
         COUNT(DISTINCT CAST(l_extendedprice AS INT)) AS cx,
         COUNT(DISTINCT CAST(l_discount * 100 AS INT)) AS cy
  FROM 'benchmarks/data/tpch_sf1/lineitem/*.parquet'
  GROUP BY l_suppkey
);

Scenario 3: lower() + string distinct

SELECT SUM(c1) AS s1, SUM(c2) AS s2
FROM (
  SELECT l_returnflag,
         COUNT(DISTINCT lower(l_shipmode)) AS c1,
         COUNT(DISTINCT l_shipinstruct) AS c2
  FROM 'benchmarks/data/tpch_sf1/lineitem/*.parquet'
  GROUP BY l_returnflag
);

Results (latency + max RSS)

Scenario Rewrite OFF Rewrite ON Outcome
1) three distincts 0.28s, ~570 MB 0.12s, ~730 MB Faster, higher memory
2) cast-based distincts 0.17s, ~349 MB 0.16s, ~350 MB Near tie
3) lower()+string distinct 0.04s, ~99 MB 0.06s, ~94 MB Slightly slower, slightly lower memory

All paired runs returned identical query results.

Takeaway

The rewrite can improve latency in some multi-distinct grouped cases, but can also increase peak memory and/or be neutral/slightly slower depending on cardinality and expression shape. Keeping this rule opt-in (default off) remains appropriate while we expand benchmark coverage.

Yes, I can see it would improve a lot of queries

Thanks @Dandandan @xiedeyantu for the discussion.
Join cost / default behavior: Agreed that joining on grouping keys isn’t free. We’ve added datafusion.optimizer.enable_multi_distinct_count_rewrite with default false, so the rewrite is opt-in until we have benchmarks that justify turning it on by default. Sessions can enable it explicitly when they want to try the plan shape.
Benchmarks: We’re aligned that we should measure latency and memory vs baseline across scenarios (e.g. multiple COUNT(DISTINCT …) with GROUP BY, varying group-key cardinality vs distinct cardinality). We’ll follow up with numbers on the PR.
GroupsAccumulator / execution-layer improvements: That work is complementary: better distinct accumulators help how each aggregate runs; this rule changes logical plan shape when several large distincts share one aggregate. Both can coexist on the roadmap.
Tests: We added optimizer + SQL integration coverage (including cases with lower/CAST inside COUNT(DISTINCT …) and a test that the rule is a no-op when the config is off). Happy to iterate on naming and placement with maintainers.

I think a rewrite like this might be useful, but I think it can also hurt performance because of the join on grouping keys. So I think it needs to have a config value (off by default) or when enabled some benchmarks showing that it is better in large majority of the cases.
I am also wondering if mostly for memory usage a GroupsAccumulator for distinct count / sum might give similar/more improvements.

@Dandandan Thank you for the explanation. It’s true that this would add a hash join, but if aggregation can be performed in parallel, there might be advantages in scenarios with two or more COUNT(DISTINCT) operations. I agree to run performance tests across multiple scenarios to evaluate the actual results.

Sounds good - agreed.

@xiedeyantu
Copy link
Member

Thank you for the detailed test! But I have a few questions.

  1. Why add an extra
    "SUM" outside?
  2. I think it’s better for
    "CAST" not to involve expression calculations.
  3. It seems the third case is a low-cardinality test — could you show a high-cardinality scenario?

@ydgandhi
Copy link
Author

Hi! @xiedeyantu, great feedback. Let me explain my thought process for some of the design choices regarding the tests. See inline

Thank you for the detailed test! But I have a few questions.

  1. Why add an extra
    "SUM" outside?

The outer SELECT SUM(...) FROM ( ... GROUP BY ... ) is a harness choice:

  • The work under measurement is the inner grouped aggregate (multiple COUNT(DISTINCT …)).
  • The outer SUM collapses per-group results to one row, which avoids dumping thousands of rows when comparing wall time / peak RSS.
  • To benchmark only the inner shape, run the subquery alone (or use SELECT COUNT(*) FROM (inner) t if you want a scalar sink without changing the inner aggregates).
  1. I think it’s better for
    "CAST" not to involve expression calculations.

Agreed for a clean test: both distincts should use CAST on a column only. Our Scenario 2 used CAST(l_extendedprice AS INT) (column-only) and CAST(l_discount * 100 AS INT) (expression inside the cast). We'll switch the second to something like CAST(l_discount AS INT) (or another column-only cast) so we're not mixing in arithmetic inside CAST, and so it lines up with the rule's conservative “safe” distinct arguments.

  1. It seems the third case is a low-cardinality test — could you show a high-cardinality scenario?

The lower() case used GROUP BY l_returnflag, which is only a few groups. For a high-cardinality variant we'll use the same select list but a denser key, e.g. GROUP BY l_suppkey (or l_orderkey), so the join after the rewrite has many more keys — still with COUNT(DISTINCT lower(l_shipmode)) and COUNT(DISTINCT l_shipinstruct).

We'll follow up with updated query text and numbers when we rerun.

@ydgandhi
Copy link
Author

ydgandhi commented Mar 24, 2026

Hi @xiedeyantu update on the benchmarking. The new benchmarks use the grouped aggregate only (no outer SUM or other wrapper) so timings reflect the multi–COUNT(DISTINCT …) work directly. Note: Figures below are warm-cache repeats on one machine (relative A/B, not a CI guarantee). Also, these figures are from running each of the query 10 times and flipping the rewrite setting on and off using the grouped queries above (no outer SUM).


Queries tested - original runs

Scenario 1: three distincts (grouped)

SELECT l_suppkey,
       COUNT(DISTINCT l_orderkey) AS d1,
       COUNT(DISTINCT l_partkey) AS d2,
       COUNT(DISTINCT l_linenumber) AS d3
FROM 'benchmarks/data/tpch_sf1/lineitem/*.parquet'
GROUP BY l_suppkey;

Rerun: reviewer-aligned changes

CAST (Scenario 2): both distincts use CAST on a column only (no * expression inside CAST):

SELECT l_suppkey,
       COUNT(DISTINCT CAST(l_extendedprice AS INT)) AS cx,
       COUNT(DISTINCT CAST(l_discount AS INT)) AS cy
FROM 'benchmarks/data/tpch_sf1/lineitem/*.parquet'
GROUP BY l_suppkey;

High cardinality (Scenario 3): same aggregates as before, but GROUP BY l_suppkey instead of l_returnflag:

SELECT l_suppkey,
       COUNT(DISTINCT lower(l_shipmode)) AS c1,
       COUNT(DISTINCT l_shipinstruct) AS c2
FROM 'benchmarks/data/tpch_sf1/lineitem/*.parquet'
GROUP BY l_suppkey;

Results (mean ± sample stddev, n=10 per cell)

Measured with /usr/bin/time -l and target/release/datafusion-cli from the repo root. RSS is maximum resident set size from time -l (MB here are decimal megabytes: bytes / 10⁶). Queries are the reviewer-aligned grouped aggregates above (no outer SUM).

wall (s) mean ± stddev

Scenario Rewrite OFF Rewrite ON Outcome
1) three distincts 0.2490 ± 0.0099 0.1000 ± 0.0000 Lower mean wall time with ON
2) cast-based (column-only CAST) 0.1350 ± 0.0053 0.0700 ± 0.0000 Lower mean wall time with ON
3) lower()+string (GROUP BY l_suppkey) 3.3920 ± 0.6496 0.0800 ± 0.0000 Much lower mean wall time with ON

max RSS (MB) mean ± stddev

Scenario Rewrite OFF Rewrite ON Outcome
1) three distincts 590.03 ± 10.12 722.62 ± 12.70 Higher mean peak RSS with ON
2) cast-based (column-only CAST) 331.83 ± 4.56 638.49 ± 8.29 Higher mean peak RSS with ON
3) lower()+string (GROUP BY l_suppkey) 9755.13 ± 114.94 277.66 ± 1.71 Much lower mean peak RSS with ON

Row outputs matched OFF vs ON for every scenario (same grouped aggregate result). Wall time from time is low-precision on macOS (e.g. all runs may round to the same hundredth of a second, so stddev can be 0).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

common Related to common crate core Core DataFusion crate documentation Improvements or additions to documentation optimizer Optimizer rules

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Optimize multiple COUNT(DISTINCT) memory via logical plan rewrite

3 participants