KAFKA-20132: Fix header-based key deserialization in MeteredTimestampedKeyValueStoreWithHeaders iterator methods#21736
KAFKA-20132: Fix header-based key deserialization in MeteredTimestampedKeyValueStoreWithHeaders iterator methods#21736frankvicky wants to merge 2 commits intoapache:trunkfrom
Conversation
…edKeyValueStoreWithHeaders iterator methods
mjsax
left a comment
There was a problem hiding this comment.
Overall LGTM. Few smaller things. Feel free to merge after addressed.
|
|
||
| final KeyValue<Bytes, byte[]> keyValue = iter.next(); | ||
|
|
||
| if (keyValue == null) { |
There was a problem hiding this comment.
We don't have this guard elsewhere -- why are we adding it? I believe the contract of the Iterator interface is, that if hasNext() returns false, it's invalid to call next() and thus it's ok (and even "correct") to crash instead of returning null ?
I just went back to MeteredTimestampedWindowStoreWithHeaders and it seems we also added this check there -- I think we should also remove it there (we can just do it on this PR)
| if (cachedNext == null) { | ||
| cachedNext = next(); | ||
| } | ||
| return cachedNext == null ? null : cachedNext.key; |
There was a problem hiding this comment.
Do we need cachedNext == null check? I think, we know that it's never null here, and if it would be null, it implies hasNext() did return false, and it's a user error to call peekNextKey and we should rather crash to surface the bug in the user code?
We should also update MeteredTimestampedWindowStoreWithHeaders accordingly
|
|
||
| final KeyValue<Bytes, byte[]> keyValue = iter.next(); | ||
|
|
||
| if (keyValue == null) { |
| if (cachedNext == null) { | ||
| cachedNext = next(); | ||
| } | ||
| return cachedNext == null ? null : cachedNext.key; |
| final KeyValueIterator<String, ValueTimestampHeaders<String>> iterator = metered.range("a", "z"); | ||
|
|
||
| assertTrue(iterator.hasNext()); | ||
| final KeyValue<String, ValueTimestampHeaders<String>> result = iterator.next(); |
There was a problem hiding this comment.
Should we insert a assertEquals(KEY, iterator.peekNextKey()); before this call?
If yes, should add this to all test in the PR, and also add to corresponding exiting tests for window-header store case?
| public void shouldUseHeadersFromValueToDeserializeKeyInRange() { | ||
| setUp(); | ||
|
|
||
| final Serde<String> keySerde = mock(Serde.class); |
There was a problem hiding this comment.
I just see on #21734 from Bill, that he extracted a helper createStoreWithMockSerdes -- seems we could do the same on this test? Maybe also go back to the window-header store test and do the same?
Fixes bugs where
MeteredTimestampedKeyValueStoreWithHeadersiteratormethods fail when key deserializers require headers.
The class inherits iterator-returning methods from
MeteredKeyValueStore: -range(K, K)/reverseRange(K, K)-all()/reverseAll()-prefixScan(P, PS)These methods use
deserializeKey(rawKey)which callsserdes.keyFrom(rawKey, internalContext.headers()), using the currentprocessing context's headers instead of each record's own headers.
Additionally, the
peekNextKey()implementation in iterator classesdeserializes keys with empty headers.