Skip to content

feat(parquet): row-group and row-range sampling on ParquetSource#22024

Open
adriangb wants to merge 1 commit into
apache:mainfrom
pydantic:worktree-parquet-sampling-base
Open

feat(parquet): row-group and row-range sampling on ParquetSource#22024
adriangb wants to merge 1 commit into
apache:mainfrom
pydantic:worktree-parquet-sampling-base

Conversation

@adriangb
Copy link
Copy Markdown
Contributor

@adriangb adriangb commented May 5, 2026

Which issue does this PR close?

This PR extracts the first layer of #22000 — the opt-in ParquetSource sampling primitives — as a self-contained change. The TABLESAMPLE SQL surface, Sample logical/physical nodes, and SamplePushdown rule are deliberately not included; they will land in follow-ups.

Rationale for this change

DataFusion has the machinery for fine-grained parquet sampling (ParquetAccessPlan with Skip / Scan / Selection(RowSelection)) but no public way to ask for a sample without constructing the access plan by hand and stuffing it into PartitionedFile.extensions. That works for one-off code but is awkward for ad-hoc data exploration, layered helpers that want to compute approximate stats over a bounded slice, and EXPLAIN ANALYZE-driven debug runs against a representative slice.

This PR adds the lowest layer: opt-in builders on ParquetSource that translate fractions into a ParquetAccessPlan lazily inside the opener (after the footer is loaded, so we sample by real row-group index). It is additive and has no behavior change for existing scans. The SQL surface in #22000 is built on top of these primitives.

What changes are included in this PR?

ParquetSource::new(schema)
    .with_row_group_sampling(0.1)   // keep ~10% of row groups per file
    .with_row_fraction(0.05)        // within each kept row group, keep ~5% of rows
    .with_row_cluster_size(8192);   // controls window granularity (default 32 768)

with_row_group_sampling(fraction):

  • Selection is deferred until the opener has loaded the parquet footer, so we sample by real row-group index.
  • Deterministic per (file_index, row_group_count, fraction) — re-runs match. The opener passes the execution partition_index as the stable file_index, so sampling is reproducible across environments without depending on object-store paths.
  • Always keeps at least one row group (target = max(1, ceil(N * fraction))).
  • No-op when fraction >= 1.0.

with_row_fraction(fraction):

  • Translates the per-row-group target into K contiguous windows spread evenly across the row group, each placed at a random offset within its stride. Window count = ceil(target / cluster_size).
  • Materializes a RowSelection per kept row group; the parquet reader uses the page index to read only the data pages covering the selected rows. This gives "page-level" IO savings without requiring per-column page alignment (which doesn't exist in parquet).
  • Falls back gracefully when the page index is missing — the reader still returns the right rows, the IO win just disappears.
  • Deterministic per (file_index, row_group_index, fraction, cluster_size).
  • Window-layout math is extracted into a dedicated build_row_window_selectors function and fuzz-tested across thousands of configurations to guarantee no overlap, in-bounds positions, and full coverage.

The two layers compose: row_group_fraction = 0.1 × row_fraction = 0.1 reads ~1% of the rows from ~10% of the row groups, with windows spread out so the sample isn't clustered at one end of each row group.

Internals

  • New ParquetSampling struct re-exported at the crate root.
  • Plumbed through ParquetMorselizerPreparedParquetOpen.
  • Two free functions invoked from prune_row_groups right after create_initial_plan.
  • New dep: rand with the small_rng feature (already in workspace Cargo.toml).

Differences vs. the original commit in #22000

Two pieces of review feedback on the parent PR are folded in here:

Are these changes tested?

12 tests in datafusion-datasource-parquet:

Row-group sampling (sampling::tests):

  • row_group_sampling_keeps_target_countceil(N * fraction) math.
  • row_group_sampling_is_deterministic — same inputs → same selection.
  • row_group_sampling_differs_per_file_index — different file_index → different sample.
  • row_group_sampling_no_op_when_fraction_is_one — fraction ≥ 1.0 keeps everything.
  • row_group_sampling_target_at_least_onefraction = 0.001 over 100 row groups still keeps 1.
  • row_group_sampling_no_op_when_unsetNone is a no-op.

Row-window selection (sampling::tests):

  • row_window_selection_basic_layout — hand-checked anchor case.
  • row_window_selection_returns_none_on_invalid_input — degenerate inputs (zero row group, zero target, zero cluster) return None.
  • row_window_selection_full_target_no_overlap — the previously-buggy target_rows == total_rows case.
  • row_window_selection_fuzz_invariants — 5 000 randomized (total_rows, target_rows, cluster_size, seed) configurations, asserting full coverage, in-bounds positions, and no overlap.
  • row_window_selection_fuzz_determinism — 1 000 iterations verifying identical seeds produce identical layouts.

End-to-end (opener::test):

  • row_group_sampling_end_to_end — writes a 4-row-group parquet to InMemory, scans with fraction = 0.5, asserts exactly 6 rows out (2 row groups × 3 rows).
  • row_fraction_end_to_end — writes a 100-row single-row-group parquet, scans with row_fraction = 0.1 and cluster_size = 4, asserts the result is in the expected range.

cargo build, cargo fmt --all, and cargo clippy -p datafusion-datasource-parquet --all-targets --all-features -- -D warnings are clean.

Are there any user-facing changes?

🤖 Generated with Claude Code

Adds two opt-in sampling primitives to parquet scans, both built on
the existing `ParquetAccessPlan` infrastructure:

* `ParquetSource::with_row_group_sampling(fraction)` — keep `fraction`
  of row groups in each scanned file. Selection is deferred until the
  opener has loaded the parquet footer (so we sample by real row-group
  index, not guess) and is deterministic per `(file_name,
  row_group_count, fraction)` via a seeded `SmallRng`.

* `ParquetSource::with_row_fraction(fraction)` — within each kept row
  group, keep `fraction` of rows by translating to a `RowSelection` of
  K small contiguous windows (size controlled by
  `with_row_cluster_size`, default 32 768 rows). The parquet reader
  uses the page index to read only the data pages covering the
  selected rows, so this gives "page-level" IO savings without
  requiring per-column page alignment. Falls back gracefully (no
  IO win, still correct) when the page index is missing.

The two layers compose: scanning with both `row_group_fraction=0.1`
and `row_fraction=0.1` reads ~1% of the rows in ~10% of the row
groups, with windows spread out so the sample isn't clustered at one
end of each row group.

Selection within a row group is deterministic-but-random per
`(file_name, row_group_index, fraction, cluster_size)` — same inputs
yield the same windows, so re-runs are repeatable.

## Why this lives on `ParquetSource`

The natural entry-point for "I want a sample" is at config time,
before any metadata IO. The actual *which* row groups / *which* rows
selection still has to be deferred to the opener (after the footer is
parsed) — that's why `ParquetSampling` carries fractions plus a cluster
size, and the opener pulls them through to its lazy decision points.

This is intentionally orthogonal to file-level sampling: `ParquetSource`
doesn't own the file list (`FileScanConfig.file_groups` does), so a
file-fraction setter here would have been a confusing no-op. Callers
that want to drop files should rebuild the `FileScanConfig` directly.

## Use cases

* `TABLESAMPLE` SQL syntax (any future implementation can lower to
  these primitives).
* Ad-hoc data exploration / `EXPLAIN ANALYZE` against a sample.
* Mini-query-style stats sampling (a layered helper can call these
  to bound the cost of computing approximate min/max/NDV/histograms
  for the optimizer — out of scope here, see the linked POC in the
  PR description).
* `EXPLAIN ANALYZE`-driven debug runs against a representative slice.

## Tests

5 unit tests on `apply_row_group_sampling` (target count, determinism,
file-name dependence, no-op at fraction=1.0, target floor of 1) plus
2 end-to-end tests that build a real parquet file in `InMemory` object
store and confirm the row counts emitted are what the sampling implies.

`cargo build --workspace`, `cargo fmt --all`, and
`cargo clippy -p datafusion-datasource-parquet --all-targets -- -D warnings`
are clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@adriangb adriangb force-pushed the worktree-parquet-sampling-base branch from edf02c2 to a79813f Compare May 6, 2026 17:38
@adriangb
Copy link
Copy Markdown
Contributor Author

@geoffreyclaude @alamb is this something you could take a look at as the smaller version of #22000

// it makes sampling reproducible across environments without
// depending on object-store paths, and decorrelates files
// assigned to different partitions.
prepared.sampling.apply_row_group_sampling(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current plumbing passes prepared.partition_index, so multiple files in the same partition collide.

/// sampling reproducible across environments without the keying
/// depending on object-store paths.
#[derive(Debug, Clone)]
pub struct ParquetSampling {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to make it pub?

"row_fraction=0.1 should drop the vast majority of rows; got {num_rows}"
);
assert!(
num_rows <= 16,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be flaky?

Copy link
Copy Markdown
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this PR @adriangb

On Usecase

DataFusion has the machinery for fine-grained parquet sampling (ParquetAccessPlan with Skip / Scan / Selection(RowSelection)) but no public way to ask for a sample without constructing the access plan by hand and stuffing it into PartitionedFile.extensions. That works for one-off code but is awkward for ad-hoc data exploration, layered helpers that want to compute approximate stats over a bounded slice, and EXPLAIN ANALYZE-driven debug runs against a representative slice.

I still feel that we should be pursuing this as a extension mechanism rather than pushing the sampling logic into the opener itself as the sampling strategy is fairly opinionated. In sampling makes sense to me, but I am very unsure about building it directly into the parquet opener -- not that I don't see its utility but because DataFusion is already getting really complex and I am trying to find areas to limit its complexity and focus the core vlue props of performance and extensibilty.

It seems to me that as opener state machine is getting more and more complicated, it is fast becoming a mini planner in its own right -- it has a plan ParquetAccessPlan and has several optimizers that optimize that plan (e.g. row group pruning , page pruning, bloom filter pruning, sort order reversal, etc. We seem to be on the path to add even more planning updates as we proceed with dynamic filtering you are considering #22144 and @xudong963 is doing in #21637.

I feel like we don't yet have the right abstractions to add / test these optimizer passes It might be time to step back pR and refactor the opener so that we can more easily test / plan what is going on as well as allow extensions.

One thought I had (perhaps due to my tendency to treat everything as a query plan) would be

  1. Introduce a trait likeParquetAccessPlanOptimizer for ParquetAccessPlans , similar to PhysicalOptimizer for ExecutionPlan.
  2. Make PagePruningAccessPlanFilter and similar optimizations implement the trait and have a list of ParquetAccessPlanOptimizer in the opener.

With such an API you could model sampling by registering a new ParquetAccessPlan optimizer pass, which I think would be easier than constructing the entire access plan externally

cc @xudong963 and @zhuqi-lucas who have also been working in this area recently

pub reverse_row_groups: bool,
/// Sampling config carried from `ParquetSource`. Applied lazily
/// inside the opener once the parquet metadata is available.
pub sampling: crate::sampling::ParquetSampling,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use the normal ParquetSampling syntax here rather than the fully qualified crate::sampling::ParquetSampling

@adriangb
Copy link
Copy Markdown
Contributor Author

adriangb commented May 13, 2026

Thanks for the review Andrew.

I think the biggest thing to point out is that it is not possible to implement this sort of sampling externally by passing in a ParquetAccessPlan: you don't know what the row groups and pages look like until you open the file. So it has to be done inside of the opener, either directly or via some extension.

I'm not sure what other sampling strategies might look like. To me it only really makes sense to sample at the row group / page level. Do you have thoughts on what other sampling strategies for Parquet would look like? I linked to multiple systems which sample at the "block" level. For parquet that is row groups / pages. The pages (row fraction) part is perhaps a bit more questionable, I'm happy to remove that and add that as a followup if you'd like.

I'm open to prototyping on some sort of ParquetAccessPlanOptimizer but I'm not sure it will end up being a simple abstraction, I suspect it will be quite leaky. That is: every time you want to add a new optimizer you have to change the API to add more inputs / more context or more outputs / things it can change. The adaptive dynamic filter work for example has to touch a lot more than just the ParquetAccessPlan. I'd guess we'd end up with a very leaky abstraction. IMO doing this as structured in this PR and factoring out as much code into it's own modules and such probably gets us 90% of the wins without forcing us into APIs we then have to constantly churn.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

datasource Changes to the datasource crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants