Skip to content

feat: support GroupsAccumulator for first_value and last_value with string/binary types#21090

Open
UBarney wants to merge 1 commit intoapache:mainfrom
UBarney:first_val_group_acc_string
Open

feat: support GroupsAccumulator for first_value and last_value with string/binary types#21090
UBarney wants to merge 1 commit intoapache:mainfrom
UBarney:first_val_group_acc_string

Conversation

@UBarney
Copy link
Copy Markdown
Contributor

@UBarney UBarney commented Mar 21, 2026

Which issue does this PR close?

Rationale for this change

ID SQL main(s) thisPR Time(s) Performance Change
1 select t.id1, first_value(t.id3 order by t.id2, t.id4) as r2 from 'benchmarks/data/h2o/G1_1e8_1e8_100_0.parquet' as t group by t.id1, t.v1; 1.162 0.716 +1.62x faster 🚀
2 select l_shipmode, first_value(l_partkey order by l_orderkey, l_linenumber, l_comment, l_suppkey, l_tax) from 'benchmarks/data/tpch_sf10/lineitem' group by l_shipmode; 0.967 0.823 +1.17x faster 🚀
3 select t.id2, t.id4, first_value(t.v1 order by t.id2, t.id4) as r2 from 'benchmarks/data/h2o/G1_1e8_1e8_100_0.parquet' as t group by t.id2, t.id4; 7.567 6.998 +1.08x faster 🚀
4 select t.id1, last_value(t.id3 order by t.id2, t.id4) as r2 from 'benchmarks/data/h2o/G1_1e8_1e8_100_0.parquet' as t group by t.id1, t.v1; 1.068 0.721 +1.48x faster 🚀
5 select l_shipmode, last_value(l_partkey order by l_orderkey, l_linenumber, l_comment, l_suppkey, l_tax) from 'benchmarks/data/tpch_sf10/lineitem' group by l_shipmode; 0.728 0.714 +1.02x faster 🚀
6 select t.id2, t.id4, last_value(t.v1 order by t.id2, t.id4) as r2 from 'benchmarks/data/h2o/G1_1e8_1e8_100_0.parquet' as t group by t.id2, t.id4; 6.937 7.040 1.01x slower 🐌

Note: SQL queries for Q2, Q3, Q5, and Q6 are sourced from this PR.

Previously, the first_value and last_value aggregate functions only supported GroupsAccumulator for primitive types. For string or binary types (Utf8, LargeUtf8, Binary, etc.), they fell back to the slower row-based Accumulator path.

This change implements a specialized state management for byte-based types, enabling high-performance grouped aggregation for strings and binary data, especially when used with ORDER BY.

What changes are included in this PR?

  • New ValueState Trait: Abstracted the state management for first_value and last_value to support different storage backends.
  • PrimitiveValueState : Re-implemented the existing primitive handling using the new trait.
  • BytesValueState: Added a new state implementation for Utf8, LargeUtf8, Utf8View, Binary, LargeBinary, and BinaryView. It
    optimizes memory by reusing Vec<u8> buffers for group updates.
  • Refactored FirstLastGroupsAccumulator: Migrated the accumulator to use the generic ValueState trait, allowing it to handle both primitive and byte types uniformly.

Are these changes tested?

YES

Are there any user-facing changes?

@github-actions github-actions bot added sqllogictest SQL Logic Tests (.slt) functions Changes to functions implementation labels Mar 21, 2026
/// to correctly implement `RESPECT NULLS` behavior.
///
pub(crate) struct BytesValueState {
vals: Vec<Option<Vec<u8>>>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be much more efficiently stored as values Vec<u8> and offsets Vec<OffsetType>

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to implement it using the following approach and then run some benchmarks:

Data Structures

  1. vals: Vec<u8>: A single, contiguous flat buffer for all raw bytes.
  2. offsets: Vec<usize>: The starting position of each group's data in the buffer.
  3. lengths: Vec<usize>: The logical length of the current value for each group.
  4. capacities: Vec<usize>: The physical space allocated for each group (enables in-place overwrites if new_len <= capacity).
  5. active_bytes: usize: A running counter of the sum of all current lengths (used to track fragmentation and trigger GC).

Update Logic

  • In-place Overwrite: If new_len <= capacity, we overwrite the existing slot at the current offset. We update the logical length, while capacity and offset remain unchanged.
  • Append: If new_len > capacity, we append the value to the end of vals and update the offset, length, and capacity to point to the new location.

GC (Compaction) Logic

  • Trigger: When the buffer grows too large (e.g., vals.len() > active_bytes * 2).
  • Action: Re-allocate a new buffer and copy only the latest valid data for each group to clear "dead" bytes left behind by the append path.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only needed for last_value, no?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or wait nvm I see the queries having explicit order.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After implementing the flattened approach with Vec<u8>, the performance actually regressed in several scenarios. Below are the benchmark results:

ID SQL first_val_str Time(s) first_last_flatten_acc Time(s) Performance Change Note
1 select t.id1, first_value(t.id3 order by t.id2, t.id4) as r2 from 'benchmarks/data/h2o/G1_1e8_1e8_100_0.parquet' as t group by t.id1, t.v1; 0.690 0.666 +1.04x faster 🚀 Length of t.id3 is constant (12) per group
2 select l_shipmode, first_value(l_partkey order by l_orderkey, l_linenumber, l_comment, l_suppkey, l_tax) from 'benchmarks/data/tpch_sf10/lineitem' group by l_shipmode; 0.724 0.776 1.07x slower 🐌
3 select t.id2, t.id4, first_value(t.v1 order by t.id2, t.id4) as r2 from 'benchmarks/data/h2o/G1_1e8_1e8_100_0.parquet' as t group by t.id2, t.id4; 7.136 7.226 1.01x slower 🐌
4 SELECT l_suppkey, FIRST_VALUE(l_comment ORDER BY l_orderkey DESC) as fv FROM 'benchmarks/data/tpch_sf10/lineitem' GROUP BY l_suppkey; 2.914 3.206 1.10x slower 🐌 l_comment length varies (10-43) per group
5 select t.id1, last_value(t.id3 order by t.id2, t.id4) as r2 from 'benchmarks/data/h2o/G1_1e8_1e8_100_0.parquet' as t group by t.id1, t.v1; 0.745 0.747 1.00x slower 🐌
6 select l_shipmode, last_value(l_partkey order by l_orderkey, l_linenumber, l_comment, l_suppkey, l_tax) from 'benchmarks/data/tpch_sf10/lineitem' group by l_shipmode; 0.802 0.780 +1.03x faster 🚀
7 select t.id2, t.id4, last_value(t.v1 order by t.id2, t.id4) as r2 from 'benchmarks/data/h2o/G1_1e8_1e8_100_0.parquet' as t group by t.id2, t.id4; 7.256 7.277 1.00x slower 🐌

Flame graphs indicate that the overhead from resize operations is the primary bottleneck.

image

For first_value / last_value queries without an ORDER BY clause, each group's value is only set once since there is no explicit ordering requirement. In this scenario, it might be more efficient to implement a dedicated ValueState using Vec<Vec<u8>> to store strings/bytes, where each inner Vec<u8> has a fixed maximum size. We can pre-allocate this capacity during creation to avoid frequent resizing. Note that for these unordered queries, last_value can be implemented with the same logic as first_value.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm yes, this is similar to the block-based aggregation storage @Rachelint was working on (resizes are heavy, especially for write-only data).

@UBarney UBarney force-pushed the first_val_group_acc_string branch from 330a6e1 to 779706c Compare March 26, 2026 10:29
@UBarney UBarney marked this pull request as ready for review March 26, 2026 10:44

fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool {
use DataType::*;
!args.order_bys.is_empty()
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to consider adding a new GroupsAccumulator to handle cases without ORDER BY, or perhaps implement a fast path in FirstLastGroupsAccumulator for when no ordering is required

Copy link
Copy Markdown
Contributor

@neilconway neilconway left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be useful to add a self-contained benchmark to measure the performance change here?


fn update(&mut self, group_idx: usize, array: &ArrayRef, idx: usize) -> Result<()> {
if array.is_null(idx) {
self.vals[group_idx] = None;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decrement total_capacity here? A unit test for the update-with-null case might be useful as well.

/// Note: While this is not a batch interface, it is not a performance bottleneck.
/// In heavy aggregation benchmarks, the overhead of this method is typically less than 1%.
///
/// Benchmarked queries with < 1% `update` overhead:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like more detail than necessary?



#################
# first_value on strings/binary with groups and ordering
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test last_value as well?

@@ -342,8 +422,7 @@ where
// buffer for `get_filtered_min_of_each_group`
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These comments should be updated.

.unwrap()
}

fn create_groups_primitive_accumulator<T: ArrowPrimitiveType + Send>(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create_groups_primitive_accumulator and create_groups_bytes_accumulator are identical except for the ValueState; we could use a single function and pass in the value state as an argument?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

functions Changes to functions implementation sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement GroupsAccumulator for first_value aggregate (speed up first_value and DISTINCT ON queries)

3 participants