Skip to content

KAFKA-20158: Fix header-based key deserialization in MeteredSessionStoreWithHeaders iterator methods#21734

Open
bbejeck wants to merge 1 commit intoapache:trunkfrom
bbejeck:KAFKA-20158_fix_header_based_key_deserialization_in_metered_session_store
Open

KAFKA-20158: Fix header-based key deserialization in MeteredSessionStoreWithHeaders iterator methods#21734
bbejeck wants to merge 1 commit intoapache:trunkfrom
bbejeck:KAFKA-20158_fix_header_based_key_deserialization_in_metered_session_store

Conversation

@bbejeck
Copy link
Member

@bbejeck bbejeck commented Mar 12, 2026

Fixes a bug where MeteredSessionStoreWithHeaders iterator methods fail
when key deserializers require headers.

The class inherits iterator-returning methods from
MeteredSessionStore: - fetch(K) / backwardFetch(K) - fetch(K, K) /
backwardFetch(K, K) - findSessions(K, long, long) /
backwardFindSessions(K, long, long) - findSessions(K, K, long, long)
/ backwardFindSessions(K, K, long, long) - findSessions(long, long)

These methods use serdes.keyFrom(bytes, new RecordHeaders()) which
provides empty headers, causing deserialization failures when headers
are required.

The fix overrides each method in MeteredSessionStoreWithHeaders with a
custom iterator that deserializes the value first to extract headers
from AggregationWithHeaders, then uses those headers to deserialize the
key.

This is the session store equivalent of #21705 which fixed the same
issue for MeteredTimestampedWindowStoreWithHeaders.

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. Some minor suggestions. Feel free to merge after addressed.

}

final KeyValue<Windowed<Bytes>, byte[]> next = iter.next();
if (next == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should remove this check. Cf my comments on #21736

if (cachedNext == null) {
cachedNext = next();
}
return cachedNext == null ? null : cachedNext.key;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar -- I would remove the null check (cf #21736)

public void shouldUseHeadersFromValueToDeserializeKeyInFetch() {
setUp();
final Serde<String> keySerde = mock(Serde.class);
final Serde<AggregationWithHeaders<String>> valueSerde = mock(Serde.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seem we don't use valueSerde below, but only pass it into createStoreWithMockSerdes(...) -- can't we move it inside createStoreWithMockSerdes(...) directly, and removing the parameter?

final KeyValueIterator<Windowed<String>, AggregationWithHeaders<String>> iterator = store.fetch(KEY);

assertTrue(iterator.hasNext());
final KeyValue<Windowed<String>, AggregationWithHeaders<String>> result = iterator.next();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we insert a assertEquals(KEY, iterator.peekNextKey().key); before this call? (Same for other tests below)

cf #21736

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants