feat(parquet): row-group and row-range sampling on ParquetSource#22024
feat(parquet): row-group and row-range sampling on ParquetSource#22024adriangb wants to merge 1 commit into
Conversation
Adds two opt-in sampling primitives to parquet scans, both built on the existing `ParquetAccessPlan` infrastructure: * `ParquetSource::with_row_group_sampling(fraction)` — keep `fraction` of row groups in each scanned file. Selection is deferred until the opener has loaded the parquet footer (so we sample by real row-group index, not guess) and is deterministic per `(file_name, row_group_count, fraction)` via a seeded `SmallRng`. * `ParquetSource::with_row_fraction(fraction)` — within each kept row group, keep `fraction` of rows by translating to a `RowSelection` of K small contiguous windows (size controlled by `with_row_cluster_size`, default 32 768 rows). The parquet reader uses the page index to read only the data pages covering the selected rows, so this gives "page-level" IO savings without requiring per-column page alignment. Falls back gracefully (no IO win, still correct) when the page index is missing. The two layers compose: scanning with both `row_group_fraction=0.1` and `row_fraction=0.1` reads ~1% of the rows in ~10% of the row groups, with windows spread out so the sample isn't clustered at one end of each row group. Selection within a row group is deterministic-but-random per `(file_name, row_group_index, fraction, cluster_size)` — same inputs yield the same windows, so re-runs are repeatable. ## Why this lives on `ParquetSource` The natural entry-point for "I want a sample" is at config time, before any metadata IO. The actual *which* row groups / *which* rows selection still has to be deferred to the opener (after the footer is parsed) — that's why `ParquetSampling` carries fractions plus a cluster size, and the opener pulls them through to its lazy decision points. This is intentionally orthogonal to file-level sampling: `ParquetSource` doesn't own the file list (`FileScanConfig.file_groups` does), so a file-fraction setter here would have been a confusing no-op. Callers that want to drop files should rebuild the `FileScanConfig` directly. ## Use cases * `TABLESAMPLE` SQL syntax (any future implementation can lower to these primitives). * Ad-hoc data exploration / `EXPLAIN ANALYZE` against a sample. * Mini-query-style stats sampling (a layered helper can call these to bound the cost of computing approximate min/max/NDV/histograms for the optimizer — out of scope here, see the linked POC in the PR description). * `EXPLAIN ANALYZE`-driven debug runs against a representative slice. ## Tests 5 unit tests on `apply_row_group_sampling` (target count, determinism, file-name dependence, no-op at fraction=1.0, target floor of 1) plus 2 end-to-end tests that build a real parquet file in `InMemory` object store and confirm the row counts emitted are what the sampling implies. `cargo build --workspace`, `cargo fmt --all`, and `cargo clippy -p datafusion-datasource-parquet --all-targets -- -D warnings` are clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
edf02c2 to
a79813f
Compare
|
@geoffreyclaude @alamb is this something you could take a look at as the smaller version of #22000 |
| // it makes sampling reproducible across environments without | ||
| // depending on object-store paths, and decorrelates files | ||
| // assigned to different partitions. | ||
| prepared.sampling.apply_row_group_sampling( |
There was a problem hiding this comment.
The current plumbing passes prepared.partition_index, so multiple files in the same partition collide.
| /// sampling reproducible across environments without the keying | ||
| /// depending on object-store paths. | ||
| #[derive(Debug, Clone)] | ||
| pub struct ParquetSampling { |
| "row_fraction=0.1 should drop the vast majority of rows; got {num_rows}" | ||
| ); | ||
| assert!( | ||
| num_rows <= 16, |
alamb
left a comment
There was a problem hiding this comment.
Thank you for this PR @adriangb
On Usecase
DataFusion has the machinery for fine-grained parquet sampling (ParquetAccessPlan with Skip / Scan / Selection(RowSelection)) but no public way to ask for a sample without constructing the access plan by hand and stuffing it into PartitionedFile.extensions. That works for one-off code but is awkward for ad-hoc data exploration, layered helpers that want to compute approximate stats over a bounded slice, and EXPLAIN ANALYZE-driven debug runs against a representative slice.
I still feel that we should be pursuing this as a extension mechanism rather than pushing the sampling logic into the opener itself as the sampling strategy is fairly opinionated. In sampling makes sense to me, but I am very unsure about building it directly into the parquet opener -- not that I don't see its utility but because DataFusion is already getting really complex and I am trying to find areas to limit its complexity and focus the core vlue props of performance and extensibilty.
It seems to me that as opener state machine is getting more and more complicated, it is fast becoming a mini planner in its own right -- it has a plan ParquetAccessPlan and has several optimizers that optimize that plan (e.g. row group pruning , page pruning, bloom filter pruning, sort order reversal, etc. We seem to be on the path to add even more planning updates as we proceed with dynamic filtering you are considering #22144 and @xudong963 is doing in #21637.
I feel like we don't yet have the right abstractions to add / test these optimizer passes It might be time to step back pR and refactor the opener so that we can more easily test / plan what is going on as well as allow extensions.
One thought I had (perhaps due to my tendency to treat everything as a query plan) would be
- Introduce a trait like
ParquetAccessPlanOptimizerforParquetAccessPlans, similar toPhysicalOptimizerforExecutionPlan. - Make
PagePruningAccessPlanFilterand similar optimizations implement the trait and have a list ofParquetAccessPlanOptimizerin the opener.
With such an API you could model sampling by registering a new ParquetAccessPlan optimizer pass, which I think would be easier than constructing the entire access plan externally
cc @xudong963 and @zhuqi-lucas who have also been working in this area recently
| pub reverse_row_groups: bool, | ||
| /// Sampling config carried from `ParquetSource`. Applied lazily | ||
| /// inside the opener once the parquet metadata is available. | ||
| pub sampling: crate::sampling::ParquetSampling, |
There was a problem hiding this comment.
can we use the normal ParquetSampling syntax here rather than the fully qualified crate::sampling::ParquetSampling
|
Thanks for the review Andrew. I think the biggest thing to point out is that it is not possible to implement this sort of sampling externally by passing in a I'm not sure what other sampling strategies might look like. To me it only really makes sense to sample at the row group / page level. Do you have thoughts on what other sampling strategies for Parquet would look like? I linked to multiple systems which sample at the "block" level. For parquet that is row groups / pages. The pages (row fraction) part is perhaps a bit more questionable, I'm happy to remove that and add that as a followup if you'd like. I'm open to prototyping on some sort of |
Which issue does this PR close?
datafusion.execution.collect_statisticson wide tables #21624 (datafusion.execution.collect_statisticson wide tables).This PR extracts the first layer of #22000 — the opt-in
ParquetSourcesampling primitives — as a self-contained change. The TABLESAMPLE SQL surface,Samplelogical/physical nodes, andSamplePushdownrule are deliberately not included; they will land in follow-ups.Rationale for this change
DataFusion has the machinery for fine-grained parquet sampling (
ParquetAccessPlanwithSkip/Scan/Selection(RowSelection)) but no public way to ask for a sample without constructing the access plan by hand and stuffing it intoPartitionedFile.extensions. That works for one-off code but is awkward for ad-hoc data exploration, layered helpers that want to compute approximate stats over a bounded slice, andEXPLAIN ANALYZE-driven debug runs against a representative slice.This PR adds the lowest layer: opt-in builders on
ParquetSourcethat translate fractions into aParquetAccessPlanlazily inside the opener (after the footer is loaded, so we sample by real row-group index). It is additive and has no behavior change for existing scans. The SQL surface in #22000 is built on top of these primitives.What changes are included in this PR?
with_row_group_sampling(fraction):(file_index, row_group_count, fraction)— re-runs match. The opener passes the executionpartition_indexas the stablefile_index, so sampling is reproducible across environments without depending on object-store paths.max(1, ceil(N * fraction))).fraction >= 1.0.with_row_fraction(fraction):ceil(target / cluster_size).RowSelectionper kept row group; the parquet reader uses the page index to read only the data pages covering the selected rows. This gives "page-level" IO savings without requiring per-column page alignment (which doesn't exist in parquet).(file_index, row_group_index, fraction, cluster_size).build_row_window_selectorsfunction and fuzz-tested across thousands of configurations to guarantee no overlap, in-bounds positions, and full coverage.The two layers compose:
row_group_fraction = 0.1×row_fraction = 0.1reads ~1% of the rows from ~10% of the row groups, with windows spread out so the sample isn't clustered at one end of each row group.Internals
ParquetSamplingstruct re-exported at the crate root.ParquetMorselizer→PreparedParquetOpen.prune_row_groupsright aftercreate_initial_plan.randwith thesmall_rngfeature (already in workspaceCargo.toml).Differences vs. the original commit in #22000
Two pieces of review feedback on the parent PR are folded in here:
apply_*_samplingkeys on a stablefile_index: usizerather thanfile_name: &str, addressing feat: TABLESAMPLE SYSTEM end-to-end + row-group / row sampling on ParquetSource #22000 (comment). The opener passes the executionpartition_index. This removes the on-disk-path dependency from the seed inputs while still decorrelating files in different partitions.build_row_window_selectorsand fuzz-tested (feat: TABLESAMPLE SYSTEM end-to-end + row-group / row sampling on ParquetSource #22000 (comment)). Fuzzing surfaced an overlap bug attarget_rows ≈ total_rowswherewindow_size = ceil(target / n_windows)could exceedstride = total_rows / n_windows; the fix capswindow_sizeatstride.Are these changes tested?
12 tests in
datafusion-datasource-parquet:Row-group sampling (
sampling::tests):row_group_sampling_keeps_target_count—ceil(N * fraction)math.row_group_sampling_is_deterministic— same inputs → same selection.row_group_sampling_differs_per_file_index— differentfile_index→ different sample.row_group_sampling_no_op_when_fraction_is_one— fraction ≥ 1.0 keeps everything.row_group_sampling_target_at_least_one—fraction = 0.001over 100 row groups still keeps 1.row_group_sampling_no_op_when_unset—Noneis a no-op.Row-window selection (
sampling::tests):row_window_selection_basic_layout— hand-checked anchor case.row_window_selection_returns_none_on_invalid_input— degenerate inputs (zero row group, zero target, zero cluster) returnNone.row_window_selection_full_target_no_overlap— the previously-buggytarget_rows == total_rowscase.row_window_selection_fuzz_invariants— 5 000 randomized(total_rows, target_rows, cluster_size, seed)configurations, asserting full coverage, in-bounds positions, and no overlap.row_window_selection_fuzz_determinism— 1 000 iterations verifying identical seeds produce identical layouts.End-to-end (
opener::test):row_group_sampling_end_to_end— writes a 4-row-group parquet toInMemory, scans withfraction = 0.5, asserts exactly 6 rows out (2 row groups × 3 rows).row_fraction_end_to_end— writes a 100-row single-row-group parquet, scans withrow_fraction = 0.1andcluster_size = 4, asserts the result is in the expected range.cargo build,cargo fmt --all, andcargo clippy -p datafusion-datasource-parquet --all-targets --all-features -- -D warningsare clean.Are there any user-facing changes?
ParquetSource:with_row_group_sampling,with_row_fraction,with_row_cluster_size,sampling(), plus theParquetSamplingstruct.🤖 Generated with Claude Code