feat: support GroupsAccumulator for first_value and last_value with string/binary types#21090
feat: support GroupsAccumulator for first_value and last_value with string/binary types#21090UBarney wants to merge 1 commit intoapache:mainfrom
Conversation
| /// to correctly implement `RESPECT NULLS` behavior. | ||
| /// | ||
| pub(crate) struct BytesValueState { | ||
| vals: Vec<Option<Vec<u8>>>, |
There was a problem hiding this comment.
I think this can be much more efficiently stored as values Vec<u8> and offsets Vec<OffsetType>
There was a problem hiding this comment.
I plan to implement it using the following approach and then run some benchmarks:
Data Structures
vals: Vec<u8>: A single, contiguous flat buffer for all raw bytes.offsets: Vec<usize>: The starting position of each group's data in the buffer.lengths: Vec<usize>: The logical length of the current value for each group.capacities: Vec<usize>: The physical space allocated for each group (enables in-place overwrites ifnew_len <= capacity).active_bytes: usize: A running counter of the sum of all currentlengths(used to track fragmentation and trigger GC).
Update Logic
- In-place Overwrite: If
new_len <= capacity, we overwrite the existing slot at the currentoffset. We update the logicallength, whilecapacityandoffsetremain unchanged. - Append: If
new_len > capacity, we append the value to the end ofvalsand update theoffset,length, andcapacityto point to the new location.
GC (Compaction) Logic
- Trigger: When the buffer grows too large (e.g.,
vals.len() > active_bytes * 2). - Action: Re-allocate a new buffer and copy only the latest valid data for each group to clear "dead" bytes left behind by the append path.
There was a problem hiding this comment.
This is only needed for last_value, no?
There was a problem hiding this comment.
Or wait nvm I see the queries having explicit order.
There was a problem hiding this comment.
After implementing the flattened approach with Vec<u8>, the performance actually regressed in several scenarios. Below are the benchmark results:
| ID | SQL | first_val_str Time(s) | first_last_flatten_acc Time(s) | Performance Change | Note |
|---|---|---|---|---|---|
| 1 | select t.id1, first_value(t.id3 order by t.id2, t.id4) as r2 from 'benchmarks/data/h2o/G1_1e8_1e8_100_0.parquet' as t group by t.id1, t.v1; |
0.690 | 0.666 | +1.04x faster 🚀 | Length of t.id3 is constant (12) per group |
| 2 | select l_shipmode, first_value(l_partkey order by l_orderkey, l_linenumber, l_comment, l_suppkey, l_tax) from 'benchmarks/data/tpch_sf10/lineitem' group by l_shipmode; |
0.724 | 0.776 | 1.07x slower 🐌 | |
| 3 | select t.id2, t.id4, first_value(t.v1 order by t.id2, t.id4) as r2 from 'benchmarks/data/h2o/G1_1e8_1e8_100_0.parquet' as t group by t.id2, t.id4; |
7.136 | 7.226 | 1.01x slower 🐌 | |
| 4 | SELECT l_suppkey, FIRST_VALUE(l_comment ORDER BY l_orderkey DESC) as fv FROM 'benchmarks/data/tpch_sf10/lineitem' GROUP BY l_suppkey; |
2.914 | 3.206 | 1.10x slower 🐌 | l_comment length varies (10-43) per group |
| 5 | select t.id1, last_value(t.id3 order by t.id2, t.id4) as r2 from 'benchmarks/data/h2o/G1_1e8_1e8_100_0.parquet' as t group by t.id1, t.v1; |
0.745 | 0.747 | 1.00x slower 🐌 | |
| 6 | select l_shipmode, last_value(l_partkey order by l_orderkey, l_linenumber, l_comment, l_suppkey, l_tax) from 'benchmarks/data/tpch_sf10/lineitem' group by l_shipmode; |
0.802 | 0.780 | +1.03x faster 🚀 | |
| 7 | select t.id2, t.id4, last_value(t.v1 order by t.id2, t.id4) as r2 from 'benchmarks/data/h2o/G1_1e8_1e8_100_0.parquet' as t group by t.id2, t.id4; |
7.256 | 7.277 | 1.00x slower 🐌 |
Flame graphs indicate that the overhead from resize operations is the primary bottleneck.
For first_value / last_value queries without an ORDER BY clause, each group's value is only set once since there is no explicit ordering requirement. In this scenario, it might be more efficient to implement a dedicated ValueState using Vec<Vec<u8>> to store strings/bytes, where each inner Vec<u8> has a fixed maximum size. We can pre-allocate this capacity during creation to avoid frequent resizing. Note that for these unordered queries, last_value can be implemented with the same logic as first_value.
There was a problem hiding this comment.
Hmm yes, this is similar to the block-based aggregation storage @Rachelint was working on (resizes are heavy, especially for write-only data).
330a6e1 to
779706c
Compare
|
|
||
| fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool { | ||
| use DataType::*; | ||
| !args.order_bys.is_empty() |
There was a problem hiding this comment.
We might want to consider adding a new GroupsAccumulator to handle cases without ORDER BY, or perhaps implement a fast path in FirstLastGroupsAccumulator for when no ordering is required
neilconway
left a comment
There was a problem hiding this comment.
Would it be useful to add a self-contained benchmark to measure the performance change here?
|
|
||
| fn update(&mut self, group_idx: usize, array: &ArrayRef, idx: usize) -> Result<()> { | ||
| if array.is_null(idx) { | ||
| self.vals[group_idx] = None; |
There was a problem hiding this comment.
Decrement total_capacity here? A unit test for the update-with-null case might be useful as well.
| /// Note: While this is not a batch interface, it is not a performance bottleneck. | ||
| /// In heavy aggregation benchmarks, the overhead of this method is typically less than 1%. | ||
| /// | ||
| /// Benchmarked queries with < 1% `update` overhead: |
There was a problem hiding this comment.
This seems like more detail than necessary?
|
|
||
|
|
||
| ################# | ||
| # first_value on strings/binary with groups and ordering |
There was a problem hiding this comment.
Test last_value as well?
| @@ -342,8 +422,7 @@ where | |||
| // buffer for `get_filtered_min_of_each_group` | |||
There was a problem hiding this comment.
These comments should be updated.
| .unwrap() | ||
| } | ||
|
|
||
| fn create_groups_primitive_accumulator<T: ArrowPrimitiveType + Send>( |
There was a problem hiding this comment.
create_groups_primitive_accumulator and create_groups_bytes_accumulator are identical except for the ValueState; we could use a single function and pass in the value state as an argument?
Which issue does this PR close?
GroupsAccumulatorforfirst_valueaggregate (speed upfirst_valueandDISTINCT ONqueries) #17899.Rationale for this change
Note: SQL queries for Q2, Q3, Q5, and Q6 are sourced from this PR.
Previously, the
first_valueandlast_valueaggregate functions only supported GroupsAccumulator for primitive types. For string or binary types (Utf8, LargeUtf8, Binary, etc.), they fell back to the slower row-based Accumulator path.This change implements a specialized state management for byte-based types, enabling high-performance grouped aggregation for strings and binary data, especially when used with
ORDER BY.What changes are included in this PR?
ValueStateTrait: Abstracted the state management forfirst_valueandlast_valueto support different storage backends.PrimitiveValueState: Re-implemented the existing primitive handling using the new trait.BytesValueState: Added a new state implementation for Utf8, LargeUtf8, Utf8View, Binary, LargeBinary, and BinaryView. Itoptimizes memory by reusing
Vec<u8>buffers for group updates.FirstLastGroupsAccumulator: Migrated the accumulator to use the generic ValueState trait, allowing it to handle both primitive and byte types uniformly.Are these changes tested?
YES
Are there any user-facing changes?