[Experiment] Adaptive filter pushdown#22144
Draft
adriangb wants to merge 58 commits into
Draft
Conversation
Replaces PR #9's morsel-per-row-group split with in-decoder strategy swap. One `ParquetPushDecoder` per file, one `BoxStream` per file, filter placement re-evaluated at every row-group boundary using the shared `SelectivityTracker`. - The chunk loop (`ParquetAccessPlan::split_into_chunks`, `Vec<BoxStream>` returns from `build_stream`). - Per-chunk `AsyncFileReader::create_reader` minting and per-chunk `RowFilter` rebuild. - The `EarlyStoppingStream`-on-chunk-0-only special case for the non-`Clone` `FilePruner`. - `LazyMorselShared` per-morsel Arc churn — the source of the ~10% aggregate ClickBench regression you flagged in PR #9 review. `AdaptiveParquetStream` (new in `opener.rs`) drives one row group at a time via `try_next_reader`: 1. Pull a `ParquetRecordBatchReader` for the next row group. 2. Iterate it synchronously; each batch goes through any post-scan filters (which feed per-filter stats into the tracker) and then through the projector. 3. When the reader exhausts, ask the tracker to re-partition filters based on accumulated stats. If the row-filter set changed, build a new `RowFilter` and call the new arrow-rs `ParquetPushDecoder::swap_strategy` before requesting the next reader. Post-scan filters update in lockstep. `PushBuffers` carries through the swap so already-fetched bytes are preserved, and the optional-filter mid-stream skip mechanism (existing `OptionalFilterPhysicalExpr` + `tracker.is_filter_skipped`) keeps working unchanged inside `apply_post_scan_filters_with_stats`. - `selectivity.rs` — `SelectivityTracker`, `PartitionedFilters`, `FilterId`, Welford CI bounds. Verbatim. - `row_filter.rs` — new `build_row_filter` signature returning `(Option<RowFilter>, UnbuildableFilters)` plus `total_compressed_bytes`, plus `DatafusionArrowPredicate` stat hooks. - `physical_expr.rs` — `OptionalFilterPhysicalExpr`, `snapshot_generation` helpers. `Display` is **pass-through** here (PR #9 used `Optional(...)`), keeping every existing sqllogictest expected output intact. - `config.rs` — adds `filter_pushdown_min_bytes_per_sec` / `filter_collecting_byte_ratio_threshold` / `filter_confidence_z`. **`reorder_filters` is preserved as a deprecated no-op** (per request) — the adaptive tracker subsumes it. - `selectivity_tracker.rs` bench — verbatim. - Per-file plumbing in `source.rs`: `predicate_conjuncts: Vec<(FilterId, Arc<PhysicalExpr>)>` instead of a single AND-ed predicate so per-conjunct stats accumulate across files. Depends on `pydantic/arrow-rs:adaptive-strategy-swap`, which adds `ParquetPushDecoder::can_swap_strategy()` / `swap_strategy(StrategySwap)` and the `StrategySwap` builder. The `Cargo.toml` `[patch.crates-io]` block points at it. - Sub-row-group adaptation (would need a `ParquetRecordBatchReader::pause` primitive in arrow-rs to yield a residual `RowSelection`); useful for TPCDS-style single-huge-row-group files. Defer. - Three new config knobs aren't in the proto schema yet; `from_proto` fills with config defaults so a roundtrip preserves behavior. - `cargo test -p datafusion-datasource-parquet --lib` — 143 passed - `cargo test -p datafusion --lib` — 410 passed - `cargo test -p datafusion --test core_integration` — 935 passed - `cargo test -p datafusion-sqllogictest --test sqllogictests` — all pass except `encrypted_parquet.slt` (pre-existing on upstream/main, not related to this change) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Fix 6 broken intra-doc links in `opener.rs`: `RowFilter`, `PushBuffers`, `AsyncFileReader::create_reader`, `SelectivityTracker` weren't visible from the doc-comment scope. Reword to plain backticks for the names that don't have a stable in-scope path; route `SelectivityTracker` through `crate::selectivity::SelectivityTracker`. - Regenerate `docs/source/user-guide/configs.md` via `dev/update_config_docs.sh` to surface the three new `filter_pushdown_min_bytes_per_sec` / `filter_collecting_byte_ratio_threshold` / `filter_confidence_z` rows the CI doc check expects. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…33dd62 Picks up the rustdoc fix from the arrow-rs companion branch so the DataFusion CI doc job resolves clean too. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The example asserts `pushdown_rows_pruned=1` to demonstrate that the row-filter path actually evicts rows. Under the adaptive scheduler's default `filter_pushdown_min_bytes_per_sec = 100 MB/s`, a small example file's filter starts on the post-scan path (where `pushdown_rows_pruned` stays 0) and the assertion fires. Set `filter_pushdown_min_bytes_per_sec = 0` to disable the throughput check and force every filter to row-level — the same lever `physical_plan/parquet.rs` test harness uses. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two fixes for benchmark regressions and crashes on hits_partitioned ClickBench queries: # Hard failures (Q36, Q38, Q41, Q42) `build_stream` was building the wide ProjectionMask from `user projection ∪ post_scan_conjuncts` only, but a row-level conjunct can get demoted to post-scan mid-stream by `maybe_swap_strategy`. When that happened, the demoted filter's column wasn't in the `stream_schema`, and the post-scan rebase via `reassign_expr_columns` fired a `Schema error: Unable to get field named "..."` against the narrow batch. Fix: include **every** predicate conjunct's columns in the wide projection regardless of current placement. Filter-only columns are still stripped after post-scan filtering by the projector, so the user-visible schema is unchanged. # Initial-placement regressions (Q10, Q11, Q13, Q14, Q26) Queries shaped like `SELECT col, ... FROM t WHERE col <> '' GROUP BY col` had the filter column already in the user projection. The byte-ratio heuristic was counting filter bytes against projection bytes naively, so `MobilePhoneModel_bytes / (MobilePhoneModel_bytes + UserID_bytes) ≈ 0.5` exceeded the 0.20 threshold and pushed the filter to post-scan — even though row-level was strictly better (zero extra I/O, late materialization saves UserID decode for pruned rows). Fix: change the heuristic numerator from `filter_bytes` to **extra** bytes — bytes for filter columns *not* already in the user projection. A filter that only references projection columns now gets `byte_ratio = 0` and starts at row-level. Threading required: add `projection_columns: &HashSet<usize>` to `SelectivityTracker::partition_filters` (and the inner impl); opener's `AdaptiveParquetStream` carries it for mid-stream re-evals. # Test plan - All 4 hard-failure queries (Q36/Q38/Q41/Q42) now run to completion locally on hits_partitioned. - 143 datasource-parquet unit tests pass (38 partition_filters call-sites in the test module updated to the new signature). - Benchmark expectations: Q23/Q22/Q6 wins should hold; Q10/Q11/Q13/Q14 regressions should resolve via the better initial placement. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ement Bench showed Q10/Q11/Q13/Q14/Q26 still regressing 1.20-1.47x even after the overlap-aware heuristic. These queries are shaped like \`SELECT col, ... FROM t WHERE col <> '' GROUP BY col\` — filter column entirely in projection, so \`extra_bytes = 0\` and \`byte_ratio = 0\`. The previous heuristic placed them at row-level since \`0 <= threshold\`, but row-level *isn't* free even at zero extra I/O: predicate-cache eviction on heavy string columns means the filter column gets decoded twice (once for the predicate eval, once for the projection), and the late-materialization payoff depends on a selectivity we don't know yet. Local timings on hits_partitioned (release mode): | Query | main + no-pushdown (baseline) | branch (old heuristic) | branch (new heuristic) | |-------|------------------------------:|-----------------------:|-----------------------:| | Q23 | 3708 ms | 219 ms* | 219 ms | | Q22 | 1344 ms | 902 ms* | 902 ms | | Q26 | 41 ms | 60 ms | 48 ms | | Q10 | 82 ms | 109 ms | 88 ms | Q23/Q22 wins are preserved (Q23 +17x faster vs baseline, Q22 +1.5x). Q10/Q26 regressions go from 1.32-1.45x to 1.07-1.17x — the residual is the cost of pushdown_filters=true vs false generally, not our adaptive layer. Why Q23 isn't hurt: its huge speedup comes from row-group statistics pruning via the TopK dynamic filter on EventTime, not from row-level filter evaluation. Pruning is independent of row-level vs post-scan placement; the dynamic filter still reaches the source and the PruningPredicate still applies. (Local repro confirms — Q23 actually gets slightly faster on the new heuristic because we skip the double-decode of the heavy URL string column.) Implementation: change the new-filter row-level condition from \`byte_ratio <= threshold\` to \`extra_bytes > 0 && byte_ratio <= threshold\`. Pure-overlap filters (extra_bytes == 0) start at post-scan; the tracker promotes them later if measured bytes-saved-per-sec justifies it. Filters with non-zero extra cost that fits within \`byte_ratio_threshold\` (small int predicate against a heavy string projection) still start at row-level — that's the case where the heuristic is genuinely useful. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two changes that work together to make Q10/Q11/Q13/Q14/Q26 stop regressing without giving up the Q23/Q22 wins. # 1. Prune-rate gate on PostScan → RowFilter promotion Adds a second gate on top of the existing `filter_pushdown_min_bytes_per_sec` CI bound: a filter only gets promoted from post-scan to row-level if it actually prunes >= 99% of rows it sees. Why: the bytes-saved-per-sec metric is "potential savings if at row-level" (rows_pruned × non-filter-projection-bytes-per-row ÷ eval_time). For ClickBench Q10 (\`MobilePhoneModel <> ''\`) the selectivity is ~94% and the projection is heavy, so bytes-saved-per-sec clears the 100 MB/s threshold easily. But row-level *actually loses* to post-scan there because survivors are uniformly scattered: at 8K rows per page, p^N for p=0.94 is ~10^-220 — effectively zero pages can be skipped, RowSelection-driven decode is just as expensive as a contiguous post-scan read but with extra predicate-cache eviction on the heavy string column. The 0.99 gate captures the scatter problem structurally: - Clustered survivors (TopK dynamic filter, hash-join build): prune_rate trivially ≥ 0.99 once K shrinks. Page-skip works. Promote. - Uniform survivors at moderate selectivity (Q10/Q11/Q13/Q14/Q26): prune_rate stays at 0.5–0.95. Page-skip can't work no matter how big bytes-saved-per-sec is. Stay at post-scan. Q22's `Title LIKE '%Google%'` (prune_rate ~1.0) and Q23's `URL LIKE '%google%'` (similar) trivially clear the gate, so their big wins are preserved. # 2. Drop STATS_SAMPLE_INTERVAL (1/32 → every batch) I added the 1/32 sampling earlier when the per-batch `Instant + tracker.update` was clearly hot — but at the time the heuristic was over-promoting these queries to row-level, making the per-batch path matter much more. Now that the prune-rate gate keeps them at post-scan, sampling actually *hurts*: with 1/32 the Welford accumulator converges 32× slower, so the tracker takes longer to realize "this filter is bad at row-level" and the in-flight filter flips state more often. Updating every batch is faster on every query I measured (Q23, Q22, Q26, Q10). `SKIP_FLAG_CHECK_INTERVAL = 4` stays — it gates the OptionalFilter skip-flag check, not the Welford update, and removing *it* added ~200ms to Q22 (the post-update lock-juggle isn't free). # Local timings (warm, hits_partitioned, 12 partitions) | Query | main+nopush | branch | Δ | |-------|------------:|-------:|---| | Q23 | 3271ms | 168ms | **+19.5x** | | Q22 | 1069ms | 901ms | +1.19x | | Q26 | 39ms | 41ms | matches (+2ms) | | Q10 | 68ms | 59ms | **+1.15x** | All four ≥ baseline. Q26 is essentially break-even; the residual 2ms is below run-to-run noise. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Earlier I had two sampling/gate constants protecting the hot per-batch
update path:
- \`STATS_SAMPLE_INTERVAL = 32\` in opener.rs: skip the
\`Instant::now\` + \`tracker.update\` work on 31 of every 32 batches.
- \`SKIP_FLAG_CHECK_INTERVAL = 4\` in selectivity.rs: inside
tracker.update, skip the post-stats CI-bound + lock-juggle path on
3 of every 4 calls.
Both were "right" given the prior over-promotion problem (filters
landing at row-level when they shouldn't, making the per-batch path
hot and the CI calc wasted). With the new \`prune_rate >= 0.99\` gate
those filters stay at post-scan and the measurements no longer
support sampling:
- Removing \`STATS_SAMPLE_INTERVAL\` (every batch updates) is
*faster* than 1/32 across Q23/Q22/Q26/Q10. Slower convergence on
1/32 made the tracker take longer to settle, so the in-flight
filter chain flipped state more often.
- \`SKIP_FLAG_CHECK_INTERVAL = 4\` was protecting *non-optional*
filters from a wasted-work path (post-stats CI calc + lock release
+ is_optional HashMap read + lock reacquire) that they didn't need
at all. The right fix is to early-return for non-optional filters
*before* that path, not to amortize it across 4 calls.
This refactor:
1. Caches \`is_optional: bool\` inline on \`SelectivityStats\`.
Non-optional filters early-return after the Welford update with
a single field load on the already-held stats lock — no extra
HashMap, no \`RwLock::read()\`, no \`drop\` + reacquire.
2. For optional filters (hash-join build / TopK dynamic), the
skip-flag CI check now runs every batch. That's what we want:
when a filter's selectivity collapses, the skip flag should fire
ASAP. Q26's TopK dynamic filter benefits visibly from this.
3. Drops the now-redundant \`SelectivityTracker::is_optional\`
HashMap and \`PartitionResult::new_filter_ids\` (was duplicating
\`new_optional_flags\`). The is_optional bit moves to where it's
read.
4. Drops the sampling in \`apply_post_scan_filters_with_stats\`.
\`tracker.update\` is now cheap enough on the fast path that
sampling actively hurts (slower convergence > saved work).
Local timings (warm, hits_partitioned, 12 partitions):
| Query | main+nopush | branch | Δ |
|-------|------------:|-------:|---|
| Q23 | 3271ms | 139ms | **+23.5x** |
| Q22 | 1069ms | 898ms | +1.19x |
| Q26 | 39ms | 39ms | matches |
| Q10 | 68ms | 59ms | **+1.15x** |
143 lib tests pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the all-or-nothing batch-level "if matched == 0, all skippable;
otherwise 0" computation with a sub-batch windowed analysis fed by a
new \`count_skippable_bytes\` helper. The metric is now:
for each batch:
skippable_bytes_for_batch = total_other_projection_bytes_for_batch
× (windows-with-zero-survivors / total-windows)
with W = 8192 rows (short-circuited so total_windows=1 ⇒ binary
"is the whole batch all-pruned" — equivalent to the old behavior on
typical 8K batch sizes, but with the structure in place for finer W
on larger pages or different writers).
Why: \`filter_pushdown_min_bytes_per_sec\` is the right *unit* but the
metric feeding it overestimated savings whenever the filter pruned
rows that the row-level decoder couldn't actually drop a page on. A
50% filter on uniform data still costs full IO at row-level (every
page has survivors); a 50% filter on contiguous data lets the
decoder skip half the pages. The windowed analysis discriminates
these — same formula at post-scan (predicting what row-level would
save) and at row-level (measuring what the decoder did skip, modulo
within-window RowSelection narrowing which is an uncounted bonus).
Same metric on both sides means \`min_bytes_per_sec\` is the only knob;
no separate prune-rate gate. The 0.99 gate is now redundant — if
prune-rate is high enough that page-skip works, the metric already
clears the threshold; if prune-rate is high but scatter is uniform
(case C, ClickBench Q10/Q11/Q13/Q14/Q26), the metric stays low and
the filter stays at post-scan.
Helper short-circuits when:
- batch is fully pruned (\`true_count == 0\`) → all skippable,
- batch has no zeros (\`true_count == n\`) → 0 skippable,
- there's only one window (\`n ≤ W\`) and the answer is determined.
This avoids ~2× per-batch \`true_count\` work that was visible as a
regression when I first wired the helper through.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Last bench (97c62a6) added 5 new individual regressions vs the prior best-bench commit (05590a2): Q4 +39ms, Q31 +64ms, Q35 +48ms, Q40 1.47x, plus several smaller. Total query time ticked back up. Two changes between those commits did the damage: 1. Removed `STATS_SAMPLE_INTERVAL=32`. Locally the un-sampled version was faster, but on the 12-vCPU GKE bench every partition contends on the same per-filter `Mutex<SelectivityStats>` and the lock contention dominates. Restoring the 1-in-32 sampling cuts hot- path lock pressure to ~3% of what it was while still giving the Welford accumulator hundreds of samples per query. 2. Removed the `prune_rate >= 0.99` gate. The scatter-aware metric alone is too lenient on ClickBench data: columns like `MobilePhoneModel` and `SearchPhrase` have natural runs of empty values that occasionally cluster into batch-level "all pruned" events even when the filter's overall selectivity isn't high enough for row-level to actually win once arrow-rs's predicate- cache double-decode of heavy string columns kicks in. The prune_rate floor is a belt-and-braces guard; it's compatible with the scatter metric (both must pass) and prunes the cases where the metric over-promotes. Keeping the scatter helper structure in place — the `count_skippable_bytes` framework stays so that when `arrow-rs` exposes pages-skipped-via-RowSelection (option 1 from the earlier plan), the row-level path can swap from the windowed estimate to the true measurement with no formula change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Reverts the `1-in-32` sampling gate. The earlier rationale was lock contention on `Mutex<SelectivityStats>`, but the empirical effect on ClickBench is that promotion happens 32× later, which dominates the contention savings for short-running selective queries (Q22/Q23/Q24). Q23 went from 169ms (every-batch) to 443ms (1/32 sampled) on the 12-vCPU bench while regressing many small queries. Sample every batch so the Welford accumulator hits the CI threshold inside the first row group. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The gate was added on the theory that arrow-rs's row-level path double-decoded heavy string columns when filter and projection overlapped, costing more than the ~60-95% selectivity could recover. EXPLAIN ANALYZE on ClickBench refutes that theory: Q23 (URL LIKE '%google%') shows predicate_cache_inner_records=8.76M and predicate_cache_records=83.67K — the cache works correctly, heavy strings are decoded once and reused for both predicate and projection. The residual ClickBench regressions we attributed to "double-decode" (Q26 / Q31) trace to a different cause: post-scan filtering inside the opener shifts batch-arrival order at downstream TopK, which changes the convergence point of TopK's dynamic filter and slightly weakens file-stats pruning. Forced row-level promotion of Q26 makes it slower (59ms) than post-scan (41ms), confirming the gate isn't preventing a real regression. Single promotion gate now: CI lower bound on scatter-aware bytes-saved-per-sec ≥ filter_pushdown_min_bytes_per_sec. This lets strongly-selective contiguous filters (90% prune rate, page-aligned runs) get promoted, which the 0.99 cutoff was incorrectly blocking. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds a `LimitedBatchCoalescer` to `AdaptiveParquetStream`'s post-scan filter path, mirroring `FilterExec`'s behavior. Without this, inline post-scan filtering yields tiny batches (1-100 rows each on selective predicates) directly to TopK, which delays the dynamic filter from tightening: TopK only progressively improves its threshold one small batch at a time, while `FilterExec`'s coalescer ensures the first batch to TopK already contains thousands of survivors and lets TopK pick a near-optimal top-K threshold in one shot. Symptom this fixes: on `Q26` (`SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY EventTime LIMIT 10`) at 12 partitions, branch matches 33-34 file ranges vs main+pushdown=false's 28. With the coalescer, branch matches 30-32 — closing ~1/3 of the gap. The remaining ~2-pruning difference is unexplained but small. Coalescer params match `FilterExec`: target_batch_size from session, biggest_coalesce_batch_size = target/2 (set inside `LimitedBatchCoalescer::new`). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This reverts commit d146ebe.
When a filter is first observed, consult per-conjunct row-group statistics pruning rate as a selectivity prior. If pruning rate >= prior_promote_threshold (default 0.5) place at row-level; if <= prior_demote_threshold (default 0.05) and stats present, place post-scan; else fall back to existing byte-ratio heuristic. Skips the prior entirely when no row-group statistics are available for the filter's columns, since 'no stats' would look identical to 'genuinely non-selective' otherwise. Refs report.md §7.2.b
Merge of exp/page-pruning-prior and exp/latency-aware-z. The two levers should compose well: the prior settles initial placement from row-group statistics on the first row group, and the latency-aware z then drives evidence-based moves only when the runtime measurements disagree with the prior. Goal: keep exp1's wins on regression queries (TPC-DS Q26) while avoiding exp2's borderline-flip outliers (ClickBench Q37 under latency).
Documented four hypotheses tried to fix Q64 inside the morselization+adaptive merge: placeholder→postscan, all-postscan via min_bytes=INF, wider stream schema with all demoted filters applied stream-side, and a sanity check confirming the regression is structural to apache#21766+pushdown=true regardless of adaptive logic. None of the four moved Q64 below ~22 s. The fix has to come from within apache#21766 (or sub-RG adaptation in arrow-rs), not the adaptive scheduler.
Implemented per-conjunct page-pruning prior on exp/page-pruning-prior-v2 (page-first, row-group fallback gated on 'page index NOT loaded'). Smoked at +18 % vs exp3. Two failure modes: page-pruning eval is itself an 'extra pruning run' in the cost sense (walks page index per-conjunct per-file), and removing the row-group fallback lost exp3's Q26-style demote. A proper architecture (extract per-conjunct rates from the existing opener prunings as a side effect) needs ~200-300 LOC of PruningPredicate API additions; documented as a follow-up. exp/pp-plus-laz remains the recommended landing target.
…r pruning
Refactors the prior so it consumes per-FilterId page-pruning rates
extracted from the page-index pruning the opener already runs, with
NO extra pruning passes:
PagePruningAccessPlanFilter:
- new optional 'tags: Option<Vec<usize>>' field
- new_tagged() constructor that accepts pre-split conjuncts each
tagged with a caller id (typically FilterId)
- prune_plan_with_per_conjunct_stats() variant that runs the same
pruning iteration as prune_plan_with_page_index but also
surfaces a Vec<PerConjunctPageStats> with rows-seen / rows-skipped
per conjunct.
Opener (build_stream):
- When predicate_conjuncts is set, build the page filter via
new_tagged so per-conjunct stats can survive the split.
- Reorder: prune_by_limit + page-index pruning now run BEFORE the
initial partition_filters call, so per-FilterId rates are
available as the prior on the very first placement decision.
- Capture per-conjunct rates into HashMap<FilterId, f64>, thread
into AdaptiveParquetStream as page_pruning_rates, and pass on
every partition_filters call (initial + mid-stream swap).
SelectivityTracker::partition_filters:
- New page_pruning_rates parameter.
- The initial-placement prior now reads from this map; falls back
to byte-ratio when no rate is available (page index disabled,
multi-column predicate, schema mismatch).
- The old per-conjunct re-evaluation 'pruning_rate_for_filter' is
no longer called on the production path.
The per-conjunct page-pruning rates path is now the production path.
The old per-conjunct row-group re-evaluation helpers (pruning_rate_for_filter,
build_per_conjunct_pruning_predicate) were never reached after the
partition_filters refactor — removed.
Also drops the temporary debug! traces from page_filter.rs and opener.rs
that confirmed the architecture works (TPC-DS Q26 fires page-prior with
pruned_rate=0.726 ≥ 0.5 → row-level, cd_gender/marital_status fire with
pruned_rate=0.000 ≤ 0.05 → post-scan). ClickBench's hits_partitioned
files lack page indexes, so the prior never fires there — falls back
to byte-ratio per the design ('if user disables page pruning we don't
get this data → only seed based on bytes heuristic').
After r5+r6+ consolidation, the top-level use is unused; the helper imports it locally to keep the module surface clean.
Three findings from this round: 1. r11 attempted to use per-conjunct page-pruning rates as a tertiary key in the post-scan/row-filter ordering closure, after Welford effectiveness and before filter_scan_size. The change compiled cleanly but produced no measurable change in TPC-DS-lat smoke (76985 vs r10 ctrl 76785 — within noise). Stash dropped. 2. The headline finding: re-running r10 and exp3 back-to-back in the same machine state showed they are at parity on TPC-DS-lat smoke (r10 76785, exp3 77237 — 0.6% apart). The previously recorded '7% gap to exp3' was cross-session machine-state variance, not a real perf delta. 3. Therefore round 6's architecturally-correct stack (r6+r7+r8, plus the round-9 partial-AND attempt kept neutral) is the take-it-now branch. No further pure-prior iterations needed.
Pre-existing lints from the round-6 work that were committed without clippy passing. None of these are behavioural; they're presentation fixes: - opener.rs: replace banned std::time::Instant with the WASM-safe datafusion_common::instant::Instant - page_filter.rs: PagePruningAccessPlanFilter::new_tagged takes schema by &SchemaRef instead of by value (it never consumed it) - pruning_predicate.rs: inline the format args in a debug! call - row_group_filter.rs: fold prune_by_bloom_filters into its single remaining caller (a test helper) so the unused legacy variant goes away; drop the redundant .into_iter() on a zip target - selectivity.rs: #[expect(too_many_arguments)] on partition_filters (8 args) and the inner mutating partition_filters (11 args) — the shape is intentional. Remove the cfg(test) gate on partition_filters_for_test so the bench target can call it - benches/selectivity_tracker.rs: switch the bench to the partition_filters_for_test helper, which forwards to the public signature using metadata-derived schemas cargo clippy -p datafusion-datasource-parquet -p datafusion-pruning --all-targets --all-features -- -D warnings now passes.
ClickBench-lat smoke (3 iters, same-state side-by-side): r10 : 89434 ms exp3 : 88654 ms diff : +780 ms (0.9%, within noise) Combined with the earlier TPC-DS-lat finding (within 0.6%), the round-6 architecturally-correct stack is at parity with exp3 on both major latency-pushdown smokes. Round 6 done.
Commit 97c62a6 ("feat(parquet): scatter-aware bytes-saved metric") reformulated SelectivityStats::update so that callers pre-compute the skippable_bytes argument (= rows_pruned × bytes_per_row in the simple case) instead of having update derive it internally from matched/total/total_bytes. The tests in this file weren't updated at the time and have been failing since. Apply the new caller-side semantics: - 'all rows matched, no pruning' calls now pass 0 for skippable_bytes (no late-mat payoff for batches the filter doesn't shrink) - 'high effectiveness' calls scale the supplied bytes by the rows-pruned ratio - test_effectiveness_zero_bytes_seen now asserts Some(0.0) since a zero-payoff batch is a legitimate Welford sample (whose comment in update() explicitly justifies recording it) cargo test -p datafusion-datasource-parquet --lib: 143 passed, 0 failed (up from 133 passed, 10 failed).
Adds the third workload to the same-state comparison table: TPC-H-lat smoke: r10 23373 ms, exp3 23721 ms (-348 ms / 1.5%). Across TPC-DS-lat, ClickBench-lat, and TPC-H-lat smokes the r10 architecturally-correct stack is at parity with exp3 (within 0.6-1.5%, with directional wins on two of three workloads). The round-6 architecture work is done.
Confirms the memory's success criterion on both pushdown-relevant no-latency workloads, same machine state, solo sequential runs: TPC-DS no-lat: r10 16683 vs main 16971 (1.7% faster) ClickBench no-lat: r10 18304 vs main 22811 (19.7% faster)
- Add design.md as an upstream-ready proposal-style spec for the six-commit pr/round6-stack (problem, goals/non-goals, mechanism, alternatives, validation, migration, open questions). - slides/datafusion-meetup-05-2026/make_plots.py now reads the R6-STACK-pushdown[-lat] result dirs (the clean-stack branch's bench output) and labels the bars 'main / main + pushdown / change' for clarity. - Regenerate the four chart PNGs with the new framing and numbers. TPC-H SSD chart in particular flips visually: the change column now sits below 'main' instead of above 'main + pushdown'. - Rewrite the four content slides to match: ClickBench / TPC-DS / TPC-H SSD all show the change beating both 'main' and 'main + pushdown'; TPC-H S3 now reads 'parity with main, 0.46× of main + pushdown'; the closing slide replaces the deferred 'latency-aware z' bullet (which is now in the stack) with 'pushdown=on by default' as the next milestone. - Regenerate presentation.html via marp-cli. - Extend report.md §10 with the clean-stack listing, the new three-column 'main / main + pushdown / change' bench tables, and a §10.3 explaining the literal_columns() bugfix the workspace test suite uncovered.
…ommunity Strip PR #11 / round-6 / stacked-branch chrome from design.md and report.md; frame everything as a proposed change on top of apache/datafusion@main. Refresh report.md per-query tables with current data from MAIN-{no,}pushdown vs R6-STACK-pushdown so the drill-downs match the headline numbers (TPC-H flips from regression to win). Tighten the slide speaker notes accordingly. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…-balance The previous framing claimed the TPC-H win came from demoting filters to post-scan to let a `FilterExec`-above-`RepartitionExec` shuffle re-balance partition skew. That's wrong on two counts: 1. The post-scan filter is applied inside the parquet opener (`apply_post_scan_filters_with_stats`), not as a separate `FilterExec` operator. The two paths are per-partition and equivalent in cost. 2. There is no shuffle between the filter and the scan; cross- partition skew on single-row-group files is a different problem, addressed by apache#21766. Reframe TPC-H's 0.89× as coming entirely from correct row-level placement on filters that benefit (Q18 dynamic filter is ~46 of the 89 ms total; Q1/Q3/Q19 contribute smaller page-skipping wins). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Slide 6's chart was pointed at Q9 (the old worst-loss query) but the takeaway leads with Q18 as the headline dynamic-filter win. Swap the TPC-H no-lat chart to Q18 (111 → 65 ms = 0.59× of main) and trim the takeaway prose so it fits within the slide bounds. TPC-H S3 chart still uses Q9 — there it tells the 2.58× pushdown-on- main regression story that the change neutralises. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
When a PruningPredicate is built via try_new_tagged_conjuncts the
wrapper's predicate_expr is a literal-true placeholder, so the
wrapper's literal_guarantees is empty. literal_columns() was reading
literal_guarantees only — it returned an empty vec for tagged
predicates, and downstream consumers (notably
ParquetOpener::open which uses literal_columns to decide which
bloom filters to fetch) saw 'no columns of interest' and skipped
the bloom-filter pruning altogether.
Fix is to union each leaf sub-predicate's literal_columns into the
wrapper's result, deduplicating, then merge with whatever the
wrapper itself reports. Plain non-tagged predicates are unchanged.
This makes parquet bloom-filter pruning fire correctly for
adaptive-scheduler files (per-conjunct rates require tagged
predicates, so before this fix every adaptive scan effectively had
bloom filters disabled). Visible in:
cargo test -p datafusion --test parquet_integration:
pre-fix: 186 passed, 14 failed
post-fix: 200 passed, 0 failed
cargo test --workspace --no-fail-fast:
pre-fix: 9236 passed, 19 failed
post-fix: 9240 passed, 0 failed
The remaining 5 of the 19 failures (predicate_cache_* and
single_file*) are addressed in the next commit.
…pushdown=on' contract
Several legacy tests assumed 'pushdown_filters=true' meant 'every
filter runs at row-level'. Under the adaptive selectivity tracker
that's only true once enough bytes-saved-per-sec evidence
accumulates; the tracker's default (min_bytes_per_sec = INFINITY)
keeps every filter at post-scan in short, deterministic test
queries so the row-level path / predicate cache / row-group
pruning these tests assert on never runs.
Set filter_pushdown_min_bytes_per_sec = 0.0 alongside
pushdown_filters = true in the affected setup paths so the tracker
promotes every filter immediately and the legacy contract holds:
- core/src/test_util/parquet.rs::ParquetScanOptions::config —
central test helper used by single_file* and friends.
- core/tests/parquet/mod.rs ContextWithParquet::new RowGroup branch
— covers all row_group_pruning::prune_* and
test_bloom_filter_* tests.
- core/tests/parquet/filter_pushdown.rs predicate_cache_* tests
that build SessionConfig directly.
The predicate_cache_pushdown_disable test is left alone — it
asserts 0 records *because* the cache is disabled, not because of
the placement default.
cargo test --workspace --no-fail-fast: 9240 passed, 0 failed.
…eholder) The slide 5 Q64 takeaway and several design/report passages claimed that `main + pushdown` regresses because it "evaluates dynamic filters at row-level before the build side finishes." This is wrong: `HashJoinExec` blocks the probe-side stream on `collect_build_side` (stream.rs:505) until the build is fully done; the dynamic filter is updated in shared_bounds.rs:581/687 *before* the finalizer notifies waiters. By the time any probe-side `ParquetScan` opens a file, the dynamic filter is always populated — there is no placeholder-eval window. What's actually happening on Q64: the populated dynamic filters (`key BETWEEN min AND max` from build-side bounds) just aren't selective enough on this data to recoup row-level cost (per-batch ArrowPredicate, extra I/O for filter cols not in projection, repeated re-evaluation across the many self-joins of `store_sales`). `main + pushdown` runs them row-level regardless; the change calls `fresh_rate_for_dynamic_conjunct` to re-evaluate each populated dynamic filter's pruning rate against the file's row-group stats and keeps the unselective ones post-scan. Also reframe §3.3/§4.4 in design.md and §3.3 in report.md: the reason `fresh_rate_for_dynamic_conjunct` exists is that the static `try_new_tagged_conjuncts` path can't introspect *through* the `DynamicFilterPhysicalExpr` wrapper without an explicit snapshot — not that the side-effect rates are "stale because they were taken when the filter was a placeholder". Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…the hash_lookup case
Prior framing said the static `try_new_tagged_conjuncts` path can't
introspect through `DynamicFilterPhysicalExpr` / `OptionalFilterPhysicalExpr`
without an explicit snapshot. Looking at the code, this is wrong:
- `PruningPredicate::try_new` calls `snapshot_physical_expr_opt`
unconditionally (pruning_predicate.rs:512).
- `snapshot_physical_expr_opt` is a `transform_up` that calls
`.snapshot()` on every node — no-op for static expressions, and
both wrappers implement `snapshot()` to unwrap themselves.
So for the common bounds shape `col >= lo AND col <= hi`, the static
side-effect path captures a useful per-conjunct rate without any
help. The real reason `fresh_rate_for_dynamic_conjunct` exists is
narrower: it handles the `col >= lo AND col <= hi AND hash_lookup(...)`
shape that some hash-join dynamic filters publish — `hash_lookup` is
unhandled by `build_predicate_expression`, which flattens the whole
AND toward always-true, so the static path skips the conjunct and the
per-conjunct rate map has no entry for it. The refresh's partial-AND
fallback snapshots the inner expression, splits it, and evaluates each
prunable sub-part separately, returning the max as a promote-only signal.
The whole-conjunct first try in `fresh_rate_for_dynamic_conjunct` is
essentially redundant with what the static path already does; kept as
a defensive cheap check.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…proposal Concrete examples make the lottery framing land harder than the mode-comparison table alone: - ClickBench Q23 (URL LIKE '%google%'): main 3 612 ms → main+pd 121 ms — 30× speedup - ClickBench Q11 (filter col overlaps projection): essentially a tie - ClickBench Q21 (mandatory unselective filter): main 859 ms → main+pd 1 669 ms — 1.9× slowdown - TPC-DS Q64 (chained hash-join dynamic filters): main 471 ms → main+pd 20 010 ms — 42× slowdown Same workload (ClickBench has both Q23 and Q21), opposite outcomes — the point being that the user can't reason about the filter / projection / plan-shape interaction per query. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…+ loss)
Replace the 'lottery in numbers' table with two slides, each showing
the actual SQL of one ClickBench query:
- Slide 3 'When pushdown wins big': Q23 (SELECT * ... WHERE URL LIKE
'%google%' ... LIMIT 10), main 3 612 ms → +pd 121 ms, 30× faster.
- Slide 4 'When pushdown loses big': Q30 (... WHERE SearchPhrase <> ''
GROUP BY ...), main 276 ms → +pd 547 ms, 1.98× slower.
Both queries hit the same ClickBench dataset — same flag, opposite
outcomes. Speaker notes give the mechanism: row-level eval + sparse
RowSelection → page-skipping (win) vs mandatory unselective filter
with no row-skipping payoff and an extra column read (loss).
Renumber slide-comment headers to keep them sequential (1..10).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
42c3a8b to
858013c
Compare
Contributor
|
Perhaps if we had an API such as described here, this would be easier to implement |
The decoder's projection mask is now built from (user projection ∪ initial post-scan filter columns) and rebuilt at any row-group boundary where the optimal mask cols change — e.g. a filter promoting out of post-scan, or a dynamic placeholder waking up and being placed post-scan. arrow-rs's `StrategySwap::with_projection` installs the new mask before the next row group is read; we rebuild `stream_schema`, `projector`, and the post-scan filter rebase to match. The file-open and mid-stream code paths now share a single `build_decoder_projection_state` helper so the (read_plan, stream_schema, projector, rebased post-scan) chain stays in sync. Smoke bench (TPC-DS/TPC-H/ClickBench, `--simulate-latency`): sum-of-medians 1.6% faster vs r6 baseline. Notable per-query wins on filter-only-heavy workloads under latency: ClickBench Q42 -27.5%, Q23 -10.4%; TPC-H Q8/Q9 -8%. Two queries (TPC-DS Q26, TPC-H Q18) looked like regressions in a 3-round bench but cleared as noise on a 5-round rerun. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Thank you for opening this pull request! Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch). Details |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
No description provided.