Skip to content

KAFKA-20132: Fix header-based key deserialization in MeteredTimestampedKeyValueStoreWithHeaders iterator methods#21736

Open
frankvicky wants to merge 2 commits intoapache:trunkfrom
frankvicky:fix-kv-get-bug
Open

KAFKA-20132: Fix header-based key deserialization in MeteredTimestampedKeyValueStoreWithHeaders iterator methods#21736
frankvicky wants to merge 2 commits intoapache:trunkfrom
frankvicky:fix-kv-get-bug

Conversation

@frankvicky
Copy link
Contributor

@frankvicky frankvicky commented Mar 12, 2026

Fixes bugs where MeteredTimestampedKeyValueStoreWithHeaders iterator
methods fail when key deserializers require headers.

The class inherits iterator-returning methods from
MeteredKeyValueStore: - range(K, K) / reverseRange(K, K) -
all() / reverseAll() - prefixScan(P, PS)

These methods use deserializeKey(rawKey) which calls
serdes.keyFrom(rawKey, internalContext.headers()), using the current
processing context's headers instead of each record's own headers.

Additionally, the peekNextKey() implementation in iterator classes
deserializes keys with empty headers.

@github-actions github-actions bot added triage PRs from the community streams labels Mar 12, 2026
@frankvicky frankvicky added the kip Requires or implements a KIP label Mar 12, 2026
@mjsax mjsax removed the triage PRs from the community label Mar 12, 2026
Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. Few smaller things. Feel free to merge after addressed.


final KeyValue<Bytes, byte[]> keyValue = iter.next();

if (keyValue == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have this guard elsewhere -- why are we adding it? I believe the contract of the Iterator interface is, that if hasNext() returns false, it's invalid to call next() and thus it's ok (and even "correct") to crash instead of returning null ?

I just went back to MeteredTimestampedWindowStoreWithHeaders and it seems we also added this check there -- I think we should also remove it there (we can just do it on this PR)

if (cachedNext == null) {
cachedNext = next();
}
return cachedNext == null ? null : cachedNext.key;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need cachedNext == null check? I think, we know that it's never null here, and if it would be null, it implies hasNext() did return false, and it's a user error to call peekNextKey and we should rather crash to surface the bug in the user code?

We should also update MeteredTimestampedWindowStoreWithHeaders accordingly


final KeyValue<Bytes, byte[]> keyValue = iter.next();

if (keyValue == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same

if (cachedNext == null) {
cachedNext = next();
}
return cachedNext == null ? null : cachedNext.key;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

final KeyValueIterator<String, ValueTimestampHeaders<String>> iterator = metered.range("a", "z");

assertTrue(iterator.hasNext());
final KeyValue<String, ValueTimestampHeaders<String>> result = iterator.next();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we insert a assertEquals(KEY, iterator.peekNextKey()); before this call?

If yes, should add this to all test in the PR, and also add to corresponding exiting tests for window-header store case?

public void shouldUseHeadersFromValueToDeserializeKeyInRange() {
setUp();

final Serde<String> keySerde = mock(Serde.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just see on #21734 from Bill, that he extracted a helper createStoreWithMockSerdes -- seems we could do the same on this test? Maybe also go back to the window-header store test and do the same?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants