diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java index 649799c7976d0..973196cfb1142 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java @@ -108,6 +108,7 @@ import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.streams.query.StateQueryRequest.inStore; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -1498,14 +1499,10 @@ private void shouldHandleSessionKeyDSLQueries() { throw new AssertionError(queryResult.toString()); } assertThat(partitionResult.getFailureReason(), is(FailureReason.UNKNOWN_QUERY_TYPE)); - assertThat(partitionResult.getFailureMessage(), is( - "This store" - + " (class org.apache.kafka.streams.state.internals.MeteredSessionStore)" - + " doesn't know how to execute the given query" - + " (WindowRangeQuery{key=Optional.empty, timeFrom=Optional[1970-01-01T00:00:00Z], timeTo=Optional[1970-01-01T00:00:00Z]})" - + " because SessionStores only support WindowRangeQuery.withKey." - + " Contact the store maintainer if you need support for a new query type." - )); + assertThat(partitionResult.getFailureMessage(), + containsString("doesn't know how to execute the given query")); + assertThat(partitionResult.getFailureMessage(), + containsString("because SessionStores only support WindowRangeQuery.withKey.")); } } } @@ -1571,14 +1568,10 @@ private void shouldHandleSessionKeyPAPIQueries() { throw new AssertionError(queryResult.toString()); } assertThat(partitionResult.getFailureReason(), is(FailureReason.UNKNOWN_QUERY_TYPE)); - assertThat(partitionResult.getFailureMessage(), is( - "This store" - + " (class org.apache.kafka.streams.state.internals.MeteredSessionStore)" - + " doesn't know how to execute the given query" - + " (WindowRangeQuery{key=Optional.empty, timeFrom=Optional[1970-01-01T00:00:00Z], timeTo=Optional[1970-01-01T00:00:00Z]})" - + " because SessionStores only support WindowRangeQuery.withKey." - + " Contact the store maintainer if you need support for a new query type." - )); + assertThat(partitionResult.getFailureMessage(), + containsString("doesn't know how to execute the given query")); + assertThat(partitionResult.getFailureMessage(), + containsString("because SessionStores only support WindowRangeQuery.withKey.")); } } } diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index aa28372613457..848a2b06d3973 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -17,6 +17,9 @@ package org.apache.kafka.streams.integration; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; @@ -25,6 +28,7 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.server.util.MockTime; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; @@ -58,6 +62,9 @@ import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlySessionStore; +import org.apache.kafka.streams.state.SessionBytesStoreSupplier; +import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.Stores; import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.consumer.ConsoleConsumer; @@ -71,11 +78,15 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.ByteArrayOutputStream; import java.io.PrintStream; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -718,8 +729,9 @@ public void shouldAggregateSlidingWindows(final TestInfo testInfo) throws Except } } - @Test - public void shouldCountSessionWindows() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void shouldCountSessionWindows(final boolean withHeaders) throws Exception { final long sessionGap = 5 * 60 * 1000L; final List> t1Messages = Arrays.asList( new KeyValue<>("bob", "start"), @@ -797,15 +809,31 @@ public void shouldCountSessionWindows() throws Exception { final Map, KeyValue> results = new HashMap<>(); final CountDownLatch latch = new CountDownLatch(13); - builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String())) + final var groupedByKey = builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String())) .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) - .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(sessionGap))) - .count() - .toStream() - .process(() -> (Processor, Long, Object, Object>) record -> { - results.put(record.key(), KeyValue.pair(record.value(), record.timestamp())); - latch.countDown(); - }); + .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(sessionGap))); + + if (withHeaders) { + final SessionBytesStoreSupplier supplier = Stores.persistentSessionStoreWithHeaders( + "CountSessionWithHeadersStore", + ofMillis(sessionGap + TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS) + 1) + ); + groupedByKey + .count(Materialized.as(supplier)) + .toStream() + .process(() -> (Processor, Long, Object, Object>) record -> { + results.put(record.key(), KeyValue.pair(record.value(), record.timestamp())); + latch.countDown(); + }); + } else { + groupedByKey + .count() + .toStream() + .process(() -> (Processor, Long, Object, Object>) record -> { + results.put(record.key(), KeyValue.pair(record.value(), record.timestamp())); + latch.countDown(); + }); + } startStreams(); latch.await(30, TimeUnit.SECONDS); @@ -819,88 +847,44 @@ public void shouldCountSessionWindows() throws Exception { assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))), equalTo(KeyValue.pair(1L, t3))); } - @Test - public void shouldReduceSessionWindows() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void shouldReduceSessionWindows(final boolean withHeaders) throws Exception { final long sessionGap = 1000L; // something to do with time - final List> t1Messages = Arrays.asList( - new KeyValue<>("bob", "start"), - new KeyValue<>("penny", "start"), - new KeyValue<>("jo", "pause"), - new KeyValue<>("emily", "pause") - ); + + final Properties producerConfig = TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + StringSerializer.class, + StringSerializer.class, + new Properties()); final long t1 = mockTime.milliseconds(); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( - userSessionsStream, - t1Messages, - TestUtils.producerConfig( - CLUSTER.bootstrapServers(), - StringSerializer.class, - StringSerializer.class, - new Properties()), - t1 - ); final long t2 = t1 + (sessionGap / 2); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( - userSessionsStream, - Collections.singletonList( - new KeyValue<>("emily", "resume") - ), - TestUtils.producerConfig( - CLUSTER.bootstrapServers(), - StringSerializer.class, - StringSerializer.class, - new Properties()), - t2 - ); final long t3 = t1 + sessionGap + 1; - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( - userSessionsStream, - Arrays.asList( - new KeyValue<>("bob", "pause"), - new KeyValue<>("penny", "stop") - ), - TestUtils.producerConfig( - CLUSTER.bootstrapServers(), - StringSerializer.class, - StringSerializer.class, - new Properties()), - t3 - ); final long t4 = t3 + (sessionGap / 2); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( - userSessionsStream, - Arrays.asList( - new KeyValue<>("bob", "resume"), // bobs session continues - new KeyValue<>("jo", "resume") // jo's starts new session - ), - TestUtils.producerConfig( - CLUSTER.bootstrapServers(), - StringSerializer.class, - StringSerializer.class, - new Properties()), - t4 - ); final long t5 = t4 - 1; - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( - userSessionsStream, - Collections.singletonList( - new KeyValue<>("jo", "late") // jo has late arrival - ), - TestUtils.producerConfig( - CLUSTER.bootstrapServers(), - StringSerializer.class, - StringSerializer.class, - new Properties()), - t5 - ); + + produceSessionWindowData(producerConfig, withHeaders, t1, t2, t3, t4, t5, sessionGap); final Map, KeyValue> results = new HashMap<>(); final CountDownLatch latch = new CountDownLatch(13); final String userSessionsStore = "UserSessionsStore"; + final Materialized> materialized; + if (withHeaders) { + final SessionBytesStoreSupplier supplier = Stores.persistentSessionStoreWithHeaders( + userSessionsStore, + ofMillis(sessionGap + ofMinutes(1).toMillis() + 1) + ); + materialized = Materialized.as(supplier); + } else { + materialized = Materialized.as(userSessionsStore); + } + builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String())) - .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .windowedBy(SessionWindows.ofInactivityGapAndGrace(ofMillis(sessionGap), ofMinutes(1))) .reduce((value1, value2) -> value1 + ":" + value2, Materialized.as(userSessionsStore)) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SessionWindows.ofInactivityGapAndGrace(ofMillis(sessionGap), ofMinutes(1))) + .reduce((value1, value2) -> value1 + ":" + value2, materialized) .toStream() .process(() -> (Processor, String, Object, Object>) record -> { results.put(record.key(), KeyValue.pair(record.value(), record.timestamp())); @@ -919,9 +903,61 @@ public void shouldReduceSessionWindows() throws Exception { assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3, t4))), equalTo(KeyValue.pair("pause:resume", t4))); assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))), equalTo(KeyValue.pair("stop", t3))); - // verify can query data via IQ + verifySessionStore(userSessionsStore, t1, t3, t4); + + } + + private void produceSessionWindowData(final Properties producerConfig, + final boolean withHeaders, + final long t1, final long t2, final long t3, + final long t4, final long t5, + final long sessionGap) throws Exception { + final List> t1Messages = Arrays.asList( + new KeyValue<>("bob", "start"), + new KeyValue<>("penny", "start"), + new KeyValue<>("jo", "pause"), + new KeyValue<>("emily", "pause") + ); + + produceWithOptionalHeaders(t1Messages, producerConfig, withHeaders, "t1", t1); + produceWithOptionalHeaders( + Collections.singletonList(new KeyValue<>("emily", "resume")), + producerConfig, withHeaders, "t2", t2); + produceWithOptionalHeaders( + Arrays.asList(new KeyValue<>("bob", "pause"), new KeyValue<>("penny", "stop")), + producerConfig, withHeaders, "t3", t3); + produceWithOptionalHeaders( + Arrays.asList( + new KeyValue<>("bob", "resume"), // bobs session continues + new KeyValue<>("jo", "resume")), // jo's starts new session + producerConfig, withHeaders, "t4", t4); + produceWithOptionalHeaders( + Collections.singletonList(new KeyValue<>("jo", "late")), // jo has late arrival + producerConfig, withHeaders, "t5", t5); + } + + private void produceWithOptionalHeaders(final Collection> records, + final Properties producerConfig, + final boolean withHeaders, + final String batchId, + final long timestamp) throws Exception { + if (withHeaders) { + final Headers headers = new RecordHeaders(Arrays.asList( + new RecordHeader("batch", batchId.getBytes(StandardCharsets.UTF_8)), + new RecordHeader("source", "test".getBytes(StandardCharsets.UTF_8)) + )); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + userSessionsStream, records, producerConfig, headers, timestamp, false); + } else { + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + userSessionsStream, records, producerConfig, timestamp); + } + } + + private void verifySessionStore(final String storeName, + final long t1, final long t3, final long t4) throws Exception { final ReadOnlySessionStore sessionStore = - IntegrationTestUtils.getStore(userSessionsStore, kafkaStreams, QueryableStoreTypes.sessionStore()); + IntegrationTestUtils.getStore(storeName, kafkaStreams, QueryableStoreTypes.sessionStore()); try (final KeyValueIterator, String> bob = sessionStore.fetch("bob")) { assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new SessionWindow(t1, t1)), "start"))); @@ -930,6 +966,11 @@ public void shouldReduceSessionWindows() throws Exception { } } + private static void assertHeaderCount(final Headers headers, final int expectedCount) { + assertThat("header count", headers.toArray().length, equalTo(expectedCount)); + } + + @Test public void shouldCountUnlimitedWindows() throws Exception { final long startTime = mockTime.milliseconds() - TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS) + 1; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java index f3ca9e6740af2..67730e82c89d2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KeyValue; @@ -36,6 +37,7 @@ import org.apache.kafka.streams.processor.internals.StoreFactory; import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.AggregationWithHeaders; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.StoreBuilder; @@ -105,7 +107,7 @@ public void enableSendingOldValues() { private class KStreamSessionWindowAggregateProcessor extends ContextualProcessor, Change> { - private SessionStore store; + private SessionStore> store; private TimestampedTupleForwarder, VAgg> tupleForwarder; private Sensor droppedRecordsSensor; private Sensor emittedRecordsSensor; @@ -147,7 +149,7 @@ public void init(final ProcessorContext, Change> context) { tupleForwarder = new TimestampedTupleForwarder<>( store, context, - new SessionCacheFlushListener<>(context), + new SessionCacheFlushListenerWithHeader<>(context), sendOldValues); } } @@ -165,22 +167,22 @@ public void process(final Record record) { observedStreamTime = Math.max(observedStreamTime, timestamp); final long windowCloseTime = observedStreamTime - windows.gracePeriodMs() - windows.inactivityGap(); - final List, VAgg>> merged = new ArrayList<>(); + final List, AggregationWithHeaders>> merged = new ArrayList<>(); final SessionWindow newSessionWindow = new SessionWindow(timestamp, timestamp); SessionWindow mergedWindow = newSessionWindow; VAgg agg = initializer.apply(); try ( - final KeyValueIterator, VAgg> iterator = store.findSessions( + final KeyValueIterator, AggregationWithHeaders> iterator = store.findSessions( record.key(), timestamp - windows.inactivityGap(), timestamp + windows.inactivityGap() ) ) { while (iterator.hasNext()) { - final KeyValue, VAgg> next = iterator.next(); + final KeyValue, AggregationWithHeaders> next = iterator.next(); merged.add(next); - agg = sessionMerger.apply(record.key(), agg, next.value); + agg = sessionMerger.apply(record.key(), agg, AggregationWithHeaders.getAggregationOrNull(next.value)); mergedWindow = mergeSessionWindow(mergedWindow, (SessionWindow) next.key.window()); } } @@ -189,16 +191,15 @@ public void process(final Record record) { logSkippedRecordForExpiredWindow(timestamp, windowCloseTime, mergedWindow); } else { if (!mergedWindow.equals(newSessionWindow)) { - for (final KeyValue, VAgg> session : merged) { + for (final KeyValue, AggregationWithHeaders> session : merged) { store.remove(session.key); - - maybeForwardUpdate(session.key, session.value, null); + maybeForwardUpdate(session.key, AggregationWithHeaders.getAggregationOrNull(session.value), null); } } agg = aggregator.apply(record.key(), record.value(), agg); final Windowed sessionKey = new Windowed<>(record.key(), mergedWindow); - store.put(sessionKey, agg); + store.put(sessionKey, AggregationWithHeaders.make(agg, new RecordHeaders())); maybeForwardUpdate(sessionKey, null, agg); } @@ -281,16 +282,16 @@ private void fetchAndEmit(final Record record, // Only time ordered (indexed) session store should have implemented // this function, otherwise a not-supported exception would throw - try (final KeyValueIterator, VAgg> windowToEmit = store - .findSessions(emitRangeLowerBound, emitRangeUpperBound)) { + try (final KeyValueIterator, AggregationWithHeaders> windowToEmit = + store.findSessions(emitRangeLowerBound, emitRangeUpperBound)) { while (windowToEmit.hasNext()) { emittedCount++; - final KeyValue, VAgg> kv = windowToEmit.next(); + final KeyValue, AggregationWithHeaders> kv = windowToEmit.next(); tupleForwarder.maybeForward( record.withKey(kv.key) - .withValue(new Change<>(kv.value, null)) + .withValue(new Change<>(AggregationWithHeaders.getAggregationOrNull(kv.value), null)) // set the timestamp as the window end timestamp .withTimestamp(kv.key.window().end()) .withHeaders(record.headers())); @@ -381,8 +382,8 @@ public String[] storeNames() { private class KTableSessionWindowValueGetter implements KTableValueGetter, VAgg> { - private SessionStore store; - + private SessionStore> store; + @Override public void init(final ProcessorContext context) { store = context.getStateStore(storeName); @@ -390,8 +391,10 @@ public void init(final ProcessorContext context) { @Override public ValueAndTimestamp get(final Windowed key) { + final AggregationWithHeaders result = + store.fetchSession(key.key(), key.window().start(), key.window().end()); return ValueAndTimestamp.make( - store.fetchSession(key.key(), key.window().start(), key.window().end()), + AggregationWithHeaders.getAggregationOrNull(result), key.window().end()); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerWithHeader.java similarity index 61% rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerWithHeader.java index af69dbab67979..bc0f2a9915388 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerWithHeader.java @@ -16,30 +16,46 @@ */ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorNode; +import org.apache.kafka.streams.state.AggregationWithHeaders; import org.apache.kafka.streams.state.internals.CacheFlushListener; -class SessionCacheFlushListener implements CacheFlushListener, VOut> { +class SessionCacheFlushListenerWithHeader + implements CacheFlushListener, AggregationWithHeaders> { + private final InternalProcessorContext, Change> context; @SuppressWarnings("rawtypes") private final ProcessorNode myNode; - SessionCacheFlushListener(final ProcessorContext, Change> context) { + SessionCacheFlushListenerWithHeader(final ProcessorContext, Change> context) { this.context = (InternalProcessorContext, Change>) context; myNode = this.context.currentNode(); } @Override - public void apply(final Record, Change> record) { + public void apply(final Record, Change>> record) { @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode(); context.setCurrentNode(myNode); try { - context.forward(record.withTimestamp(record.key().window().end())); + final VOut newValue = AggregationWithHeaders.getAggregationOrNull(record.value().newValue); + final VOut oldValue = AggregationWithHeaders.getAggregationOrNull(record.value().oldValue); + + final Headers headers = record.value().newValue != null + ? record.value().newValue.headers() + : new RecordHeaders(); + + context.forward( + record + .withValue(new Change<>(newValue, oldValue, record.value().isLatest)) + .withTimestamp(record.key().window().end()) + .withHeaders(headers)); } finally { context.setCurrentNode(prev); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java index a5317f488809e..5f1f9e409e771 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java @@ -17,11 +17,13 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.DslStoreFormat; import org.apache.kafka.streams.kstream.EmitStrategy; import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.state.DslSessionParams; import org.apache.kafka.streams.state.SessionBytesStoreSupplier; import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.SessionStoreWithHeaders; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; @@ -57,19 +59,21 @@ public SessionStoreMaterializer( } @Override - public StoreBuilder builder() { + public StoreBuilder> builder() { + final DslStoreFormat storeFormat = dslStoreFormat() == null ? DslStoreFormat.PLAIN : DslStoreFormat.HEADERS; final SessionBytesStoreSupplier supplier = materialized.storeSupplier() == null ? dslStoreSuppliers().sessionStore(new DslSessionParams( materialized.storeName(), Duration.ofMillis(retentionPeriod), - emitStrategy)) + emitStrategy, + storeFormat)) : (SessionBytesStoreSupplier) materialized.storeSupplier(); - final StoreBuilder> builder = Stores.sessionStoreBuilder( - supplier, - materialized.keySerde(), - materialized.valueSerde() - ); + final StoreBuilder> builder = Stores.sessionStoreBuilderWithHeaders( + supplier, + materialized.keySerde(), + materialized.valueSerde() + ); if (materialized.loggingEnabled()) { builder.withLoggingEnabled(materialized.logConfig()); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java b/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java index ef81c869379e8..dba06390a04f4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java @@ -95,12 +95,22 @@ public WindowBytesStoreSupplier windowStore(final DslWindowParams params) { @Override public SessionBytesStoreSupplier sessionStore(final DslSessionParams params) { if (params.emitStrategy().type() == EmitStrategy.StrategyType.ON_WINDOW_CLOSE) { + if (params.storeFormat() == DslStoreFormat.HEADERS) { + return new RocksDbTimeOrderedSessionBytesStoreSupplier( + params.name(), + params.retentionPeriod().toMillis(), + true, + true); + } return new RocksDbTimeOrderedSessionBytesStoreSupplier( params.name(), params.retentionPeriod().toMillis(), true); } + if (params.storeFormat() == DslStoreFormat.HEADERS) { + return Stores.persistentSessionStoreWithHeaders(params.name(), params.retentionPeriod()); + } return Stores.persistentSessionStore(params.name(), params.retentionPeriod()); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/DslSessionParams.java b/streams/src/main/java/org/apache/kafka/streams/state/DslSessionParams.java index 97193de6884cc..57e9c1455b6fb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/DslSessionParams.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/DslSessionParams.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state; +import org.apache.kafka.streams.DslStoreFormat; import org.apache.kafka.streams.kstream.EmitStrategy; import java.time.Duration; @@ -30,6 +31,7 @@ public class DslSessionParams { private final String name; private final Duration retentionPeriod; private final EmitStrategy emitStrategy; + private final DslStoreFormat storeFormat; /** * @param name name of the store (cannot be {@code null}) @@ -38,15 +40,24 @@ public class DslSessionParams { * contain the inactivity gap of the session and the entire grace period.) * @param emitStrategy defines how to emit results */ + @Deprecated public DslSessionParams( final String name, final Duration retentionPeriod, final EmitStrategy emitStrategy ) { + this(name, retentionPeriod, emitStrategy, DslStoreFormat.PLAIN); + } + + public DslSessionParams(final String name, + final Duration retentionPeriod, + final EmitStrategy emitStrategy, + final DslStoreFormat storeFormat) { Objects.requireNonNull(name); this.name = name; this.retentionPeriod = retentionPeriod; this.emitStrategy = emitStrategy; + this.storeFormat = storeFormat == null ? DslStoreFormat.PLAIN : storeFormat; } public String name() { @@ -61,6 +72,10 @@ public EmitStrategy emitStrategy() { return emitStrategy; } + public DslStoreFormat storeFormat() { + return storeFormat; + } + @Override public boolean equals(final Object o) { if (this == o) { @@ -72,12 +87,13 @@ public boolean equals(final Object o) { final DslSessionParams that = (DslSessionParams) o; return Objects.equals(name, that.name) && Objects.equals(retentionPeriod, that.retentionPeriod) - && Objects.equals(emitStrategy, that.emitStrategy); + && Objects.equals(emitStrategy, that.emitStrategy) + && Objects.equals(storeFormat, that.storeFormat); } @Override public int hashCode() { - return Objects.hash(name, retentionPeriod, emitStrategy); + return Objects.hash(name, retentionPeriod, emitStrategy, storeFormat); } @Override @@ -86,6 +102,7 @@ public String toString() { "name='" + name + '\'' + ", retentionPeriod=" + retentionPeriod + ", emitStrategy=" + emitStrategy + + ", storeFormat=" + storeFormat + '}'; } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index 53bf38cd777ba..db39656b83b53 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -672,14 +672,16 @@ public static StoreBuilder> sessionStoreBuilder(final * * @param supplier a {@link SessionBytesStoreSupplier} (cannot be {@code null}) * @param keySerde the key serde to use - * @param valueSerde the value serde to use + * @param valueSerde the value serde to use; if the serialized bytes is {@code null} for put operations, + * it is treated as delete * @param key type * @param value type * @return an instance of {@link StoreBuilder} than can build a {@link SessionStoreWithHeaders} */ - public static StoreBuilder> sessionStoreBuilderWithHeaders(final SessionBytesStoreSupplier supplier, - final Serde keySerde, - final Serde valueSerde) { + public static StoreBuilder> sessionStoreBuilderWithHeaders( + final SessionBytesStoreSupplier supplier, + final Serde keySerde, + final Serde valueSerde) { Objects.requireNonNull(supplier, "supplier cannot be null"); return new SessionStoreBuilderWithHeaders<>(supplier, keySerde, valueSerde, Time.SYSTEM); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java index 8ae592e4b4bbd..f15c21be5c730 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java @@ -20,6 +20,7 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.QueryableStoreType; import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.SessionStoreWithHeaders; import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.TimestampedWindowStore; @@ -48,6 +49,8 @@ public List stores(final String storeName, final QueryableStoreType qu return (List) Collections.singletonList(new ReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore) store)); } else if (store instanceof TimestampedWindowStore && queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) { return (List) Collections.singletonList(new ReadOnlyWindowStoreFacade<>((TimestampedWindowStore) store)); + } else if (store instanceof SessionStoreWithHeaders && queryableStoreType instanceof QueryableStoreTypes.SessionStoreType) { + return (List) Collections.singletonList(new ReadOnlySessionStoreFacade<>((SessionStoreWithHeaders) store)); } return (List) Collections.singletonList(store); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index d27095e19fd44..223dcc2be8bfc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -31,6 +31,7 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; +import org.apache.kafka.streams.processor.internals.SerdeGetter; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.query.FailureReason; import org.apache.kafka.streams.query.PositionBound; @@ -145,11 +146,15 @@ public void recordRestoreTime(final long restoreTimeNs) { restoreSensor.record(restoreTimeNs); } + protected Serde prepareValueSerdeForStore(final Serde valueSerde, final SerdeGetter getter) { + return WrappingNullableUtils.prepareValueSerde(valueSerde, getter); + } + private void initStoreSerde(final StateStoreContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); serdes = StoreSerdeInitializer.prepareStoreSerde( - context, storeName, changelogTopic, keySerde, valueSerde, WrappingNullableUtils::prepareValueSerde); + context, storeName, changelogTopic, keySerde, valueSerde, this::prepareValueSerdeForStore); } @SuppressWarnings("unchecked") diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java index 444d29834bbbe..ffb395ab02463 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.internals.SerdeGetter; import org.apache.kafka.streams.query.FailureReason; import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.Query; @@ -52,6 +53,17 @@ public class MeteredSessionStoreWithHeaders super(inner, metricsScope, keySerde, aggSerde, time); } + @SuppressWarnings("unchecked") + @Override + protected Serde> prepareValueSerdeForStore( + final Serde> valueSerde, + final SerdeGetter getter) { + if (valueSerde == null) { + return new AggregationWithHeadersSerde<>((Serde) getter.valueSerde()); + } + return super.prepareValueSerdeForStore(valueSerde, getter); + } + @Override public void put(final Windowed sessionKey, final AggregationWithHeaders aggregate) { Objects.requireNonNull(sessionKey, "sessionKey can't be null"); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlySessionStoreFacade.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlySessionStoreFacade.java new file mode 100644 index 0000000000000..9e08ba951903c --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlySessionStoreFacade.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.state.AggregationWithHeaders; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ReadOnlySessionStore; +import org.apache.kafka.streams.state.SessionStoreWithHeaders; + +public class ReadOnlySessionStoreFacade implements ReadOnlySessionStore { + protected final SessionStoreWithHeaders inner; + + protected ReadOnlySessionStoreFacade(final SessionStoreWithHeaders store) { + inner = store; + } + + @Override + public KeyValueIterator, V> findSessions(final K key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return new SessionStoreIteratorFacade<>(inner.findSessions(key, earliestSessionEndTime, latestSessionStartTime)); + } + + @Override + public KeyValueIterator, V> backwardFindSessions(final K key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return new SessionStoreIteratorFacade<>(inner.backwardFindSessions(key, earliestSessionEndTime, latestSessionStartTime)); + } + + @Override + public KeyValueIterator, V> findSessions(final K keyFrom, + final K keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return new SessionStoreIteratorFacade<>(inner.findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime)); + } + + @Override + public KeyValueIterator, V> backwardFindSessions(final K keyFrom, + final K keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return new SessionStoreIteratorFacade<>(inner.backwardFindSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime)); + } + + @Override + public V fetchSession(final K key, + final long sessionStartTime, + final long sessionEndTime) { + return AggregationWithHeaders.getAggregationOrNull(inner.fetchSession(key, sessionStartTime, sessionEndTime)); + } + + @Override + public KeyValueIterator, V> fetch(final K key) { + return new SessionStoreIteratorFacade<>(inner.fetch(key)); + } + + @Override + public KeyValueIterator, V> backwardFetch(final K key) { + return new SessionStoreIteratorFacade<>(inner.backwardFetch(key)); + } + + @Override + public KeyValueIterator, V> fetch(final K keyFrom, final K keyTo) { + return new SessionStoreIteratorFacade<>(inner.fetch(keyFrom, keyTo)); + } + + @Override + public KeyValueIterator, V> backwardFetch(final K keyFrom, final K keyTo) { + return new SessionStoreIteratorFacade<>(inner.backwardFetch(keyFrom, keyTo)); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java index 275123e32251f..f797ff0aadb3d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java @@ -25,6 +25,7 @@ import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyStoreQueryParameters; import org.apache.kafka.streams.state.QueryableStoreType; import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.SessionStoreWithHeaders; import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.TimestampedWindowStore; @@ -107,6 +108,8 @@ private static T validateAndCastStores(final StateStore store, return (T) new ReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore) store); } else if (store instanceof TimestampedWindowStore && queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) { return (T) new ReadOnlyWindowStoreFacade<>((TimestampedWindowStore) store); + } else if (store instanceof SessionStoreWithHeaders && queryableStoreType instanceof QueryableStoreTypes.SessionStoreType) { + return (T) new ReadOnlySessionStoreFacade<>((SessionStoreWithHeaders) store); } else { return (T) store; } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index d6b94c7e7f92c..d459ce1a60cd2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -38,9 +38,11 @@ import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; +import org.apache.kafka.streams.state.AggregationWithHeaders; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionBytesStoreSupplier; import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.SessionStoreWithHeaders; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier; @@ -91,7 +93,7 @@ public class KStreamSessionWindowAggregateProcessorTest { private InternalMockProcessorContext, Change> mockContext; private KStreamSessionWindowAggregate sessionAggregator; private Processor, Change> processor; - private SessionStore sessionStore; + private SessionStore> sessionStore; public EmitStrategy.StrategyType type; @@ -147,13 +149,15 @@ public , V extends Change> void forward(final R processor.init(mockContext); } + @SuppressWarnings("unchecked") private void initStore(final boolean enableCaching) { final SessionBytesStoreSupplier supplier = emitStrategy.type() == EmitStrategy.StrategyType.ON_WINDOW_CLOSE ? new RocksDbTimeOrderedSessionBytesStoreSupplier(STORE_NAME, GAP_MS * 3, true) : Stores.persistentSessionStore(STORE_NAME, ofMillis(GAP_MS * 3)); - final StoreBuilder> storeBuilder = Stores.sessionStoreBuilder(supplier, Serdes.String(), Serdes.Long()) - .withLoggingDisabled(); + final StoreBuilder> storeBuilder = + Stores.sessionStoreBuilderWithHeaders(supplier, Serdes.String(), Serdes.Long()) + .withLoggingDisabled(); if (enableCaching && emitStrategy.type() != EmitStrategy.StrategyType.ON_WINDOW_CLOSE) { storeBuilder.withCachingEnabled(); @@ -162,7 +166,7 @@ private void initStore(final boolean enableCaching) { if (sessionStore != null) { sessionStore.close(); } - sessionStore = storeBuilder.build(); + sessionStore = (SessionStore>) (SessionStore) storeBuilder.build(); sessionStore.init(mockContext, sessionStore); } @@ -179,10 +183,10 @@ public void shouldCreateSingleSessionWhenWithinGap(final EmitStrategy.StrategyTy processor.process(new Record<>("john", "first", 0L)); processor.process(new Record<>("john", "second", 500L)); - try (final KeyValueIterator, Long> values = + try (final KeyValueIterator, AggregationWithHeaders> values = sessionStore.findSessions("john", 0, 2000)) { assertTrue(values.hasNext()); - assertEquals(Long.valueOf(2), values.next().value); + assertEquals(Long.valueOf(2), AggregationWithHeaders.getAggregationOrNull(values.next().value)); } } @@ -192,27 +196,27 @@ public void shouldMergeSessions(final EmitStrategy.StrategyType inputType) { setup(inputType, true); final String sessionId = "mel"; processor.process(new Record<>(sessionId, "first", 0L)); - try (final KeyValueIterator, Long> iterator = sessionStore.findSessions(sessionId, 0, 0)) { + try (final KeyValueIterator, AggregationWithHeaders> iterator = sessionStore.findSessions(sessionId, 0, 0)) { assertTrue(iterator.hasNext()); } // move time beyond gap processor.process(new Record<>(sessionId, "second", GAP_MS + 1)); - try (final KeyValueIterator, Long> iterator = sessionStore.findSessions(sessionId, GAP_MS + 1, GAP_MS + 1)) { + try (final KeyValueIterator, AggregationWithHeaders> iterator = sessionStore.findSessions(sessionId, GAP_MS + 1, GAP_MS + 1)) { assertTrue(iterator.hasNext()); } // should still exist as not within gap - try (final KeyValueIterator, Long> iterator = sessionStore.findSessions(sessionId, 0, 0)) { + try (final KeyValueIterator, AggregationWithHeaders> iterator = sessionStore.findSessions(sessionId, 0, 0)) { assertTrue(iterator.hasNext()); } // move time back processor.process(new Record<>(sessionId, "third", GAP_MS / 2)); - try (final KeyValueIterator, Long> iterator = + try (final KeyValueIterator, AggregationWithHeaders> iterator = sessionStore.findSessions(sessionId, 0, GAP_MS + 1)) { - final KeyValue, Long> kv = iterator.next(); + final KeyValue, AggregationWithHeaders> kv = iterator.next(); - assertEquals(Long.valueOf(3), kv.value); + assertEquals(Long.valueOf(3), AggregationWithHeaders.getAggregationOrNull(kv.value)); assertFalse(iterator.hasNext()); } } @@ -223,9 +227,9 @@ public void shouldUpdateSessionIfTheSameTime(final EmitStrategy.StrategyType inp setup(inputType, true); processor.process(new Record<>("mel", "first", 0L)); processor.process(new Record<>("mel", "second", 0L)); - try (final KeyValueIterator, Long> iterator = + try (final KeyValueIterator, AggregationWithHeaders> iterator = sessionStore.findSessions("mel", 0, 0)) { - assertEquals(Long.valueOf(2L), iterator.next().value); + assertEquals(Long.valueOf(2L), AggregationWithHeaders.getAggregationOrNull(iterator.next().value)); assertFalse(iterator.hasNext()); } } @@ -289,18 +293,22 @@ public void shouldRemoveMergedSessionsFromStateStore(final EmitStrategy.Strategy processor.process(new Record<>("a", "1", 0L)); // first ensure it is in the store - try (final KeyValueIterator, Long> a1 = + try (final KeyValueIterator, AggregationWithHeaders> a1 = sessionStore.findSessions("a", 0, 0)) { - assertEquals(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L), a1.next()); + final KeyValue, AggregationWithHeaders> next = a1.next(); + assertEquals(new Windowed<>("a", new SessionWindow(0, 0)), next.key); + assertEquals(1L, AggregationWithHeaders.getAggregationOrNull(next.value)); } processor.process(new Record<>("a", "2", 100L)); // a1 from above should have been removed // should have merged session in store - try (final KeyValueIterator, Long> a2 = + try (final KeyValueIterator, AggregationWithHeaders> a2 = sessionStore.findSessions("a", 0, 100)) { - assertEquals(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 100)), 2L), a2.next()); + final KeyValue, AggregationWithHeaders> next = a2.next(); + assertEquals(new Windowed<>("a", new SessionWindow(0, 100)), next.key); + assertEquals(2L, AggregationWithHeaders.getAggregationOrNull(next.value)); assertFalse(a2.hasNext()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerWithHeaderTest.java similarity index 83% rename from streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java rename to streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerWithHeaderTest.java index 4d837f68fea8a..33487507a0760 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerWithHeaderTest.java @@ -16,9 +16,11 @@ */ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.state.AggregationWithHeaders; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -33,7 +35,7 @@ @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.STRICT_STUBS) -public class SessionCacheFlushListenerTest { +public class SessionCacheFlushListenerWithHeaderTest { @Test public void shouldForwardKeyNewValueOldValueAndTimestamp() { @SuppressWarnings("unchecked") @@ -44,10 +46,12 @@ public void shouldForwardKeyNewValueOldValueAndTimestamp() { new Change<>("newValue", "oldValue"), 73L)); - new SessionCacheFlushListener<>(context).apply( + new SessionCacheFlushListenerWithHeader<>(context).apply( new Record<>( new Windowed<>("key", new SessionWindow(21L, 73L)), - new Change<>("newValue", "oldValue"), + new Change<>( + AggregationWithHeaders.make("newValue", new RecordHeaders()), + AggregationWithHeaders.make("oldValue", new RecordHeaders())), 42L)); verify(context, times(2)).setCurrentNode(null); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java index d3ad500ce2e03..6e44bcd0fd590 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestOutputTopic; import org.apache.kafka.streams.TopologyTestDriver; @@ -44,6 +45,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.util.Properties; @@ -83,6 +86,10 @@ public void setup() { windowedCogroupedStream = cogroupedStream.windowedBy(SessionWindows.ofInactivityGapAndGrace(ofMillis(100), ofDays(1))); } + private void enableHeaders() { + props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, StreamsConfig.DSL_STORE_FORMAT_HEADERS); + } + @Test public void shouldNotHaveNullInitializerOnAggregate() { assertThrows(NullPointerException.class, () -> windowedCogroupedStream.aggregate(null, sessionMerger)); @@ -163,8 +170,12 @@ public void namedParamShouldSetName() { " <-- foo-cogroup-agg-0\n\n")); } - @Test - public void sessionWindowAggregateTest() { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void sessionWindowAggregateTest(final boolean withHeaders) { + if (withHeaders) { + enableHeaders(); + } final KTable, String> customers = groupedStream.cogroup(MockAggregator.TOSTRING_ADDER) .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(500))) .aggregate(MockInitializer.STRING_INIT, sessionMerger, Materialized.with(Serdes.String(), Serdes.String())); @@ -187,8 +198,12 @@ public void sessionWindowAggregateTest() { } } - @Test - public void sessionWindowAggregate2Test() { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void sessionWindowAggregate2Test(final boolean withHeaders) { + if (withHeaders) { + enableHeaders(); + } final KTable, String> customers = groupedStream.cogroup(MockAggregator.TOSTRING_ADDER) .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(500))) .aggregate(MockInitializer.STRING_INIT, sessionMerger, Materialized.with(Serdes.String(), Serdes.String())); @@ -214,8 +229,12 @@ public void sessionWindowAggregate2Test() { } - @Test - public void sessionWindowAggregateTest2StreamsTest() { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void sessionWindowAggregateTest2StreamsTest(final boolean withHeaders) { + if (withHeaders) { + enableHeaders(); + } final KTable, String> customers = windowedCogroupedStream.aggregate( MockInitializer.STRING_INIT, sessionMerger, Materialized.with(Serdes.String(), Serdes.String())); customers.toStream().to(OUTPUT); @@ -252,8 +271,12 @@ public void sessionWindowAggregateTest2StreamsTest() { } } - @Test - public void sessionWindowMixAggregatorsTest() { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void sessionWindowMixAggregatorsTest(final boolean withHeaders) { + if (withHeaders) { + enableHeaders(); + } final KTable, String> customers = windowedCogroupedStream.aggregate( MockInitializer.STRING_INIT, sessionMerger, Materialized.with(Serdes.String(), Serdes.String())); customers.toStream().to(OUTPUT); @@ -289,8 +312,12 @@ public void sessionWindowMixAggregatorsTest() { } - @Test - public void sessionWindowMixAggregatorsManyWindowsTest() { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void sessionWindowMixAggregatorsManyWindowsTest(final boolean withHeaders) { + if (withHeaders) { + enableHeaders(); + } final KTable, String> customers = windowedCogroupedStream.aggregate( MockInitializer.STRING_INIT, sessionMerger, Materialized.with(Serdes.String(), Serdes.String())); customers.toStream().to(OUTPUT); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java index 6dbb9b8d1dbc7..42016f6e7df0f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java @@ -36,10 +36,13 @@ import org.apache.kafka.streams.kstream.SessionWindowedKStream; import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore; +import org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStoreWithHeaders; import org.apache.kafka.streams.state.internals.MeteredSessionStore; -import org.apache.kafka.streams.state.internals.RocksDBTimeOrderedSessionStore; +import org.apache.kafka.streams.state.internals.SessionToHeadersStoreAdapter; import org.apache.kafka.streams.state.internals.WrappedStateStore; import org.apache.kafka.test.MockAggregator; import org.apache.kafka.test.MockApiProcessorSupplier; @@ -48,13 +51,16 @@ import org.apache.kafka.test.StreamsTestUtils; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Properties; +import java.util.stream.Stream; import static java.time.Duration.ofMillis; import static java.util.Arrays.asList; @@ -76,10 +82,21 @@ public class SessionWindowedKStreamImplTest { private boolean emitFinal; - public void setup(final EmitStrategy.StrategyType inputType) { + static Stream emitStrategyAndHeaders() { + return Stream.of( + Arguments.of(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false), + Arguments.of(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, true), + Arguments.of(EmitStrategy.StrategyType.ON_WINDOW_CLOSE, false), + Arguments.of(EmitStrategy.StrategyType.ON_WINDOW_CLOSE, true) + ); + } + public void setup(final EmitStrategy.StrategyType inputType, final boolean withHeaders) { type = inputType; final EmitStrategy emitStrategy = EmitStrategy.StrategyType.forType(type); emitFinal = type.equals(EmitStrategy.StrategyType.ON_WINDOW_CLOSE); + if (withHeaders) { + props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, StreamsConfig.DSL_STORE_FORMAT_HEADERS); + } final KStream stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String())); this.stream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String())) @@ -88,17 +105,17 @@ public void setup(final EmitStrategy.StrategyType inputType) { } @ParameterizedTest - @EnumSource(EmitStrategy.StrategyType.class) - public void shouldCountSessionWindowedWithCachingDisabled(final EmitStrategy.StrategyType inputType) { - setup(inputType); + @MethodSource("emitStrategyAndHeaders") + public void shouldCountSessionWindowedWithCachingDisabled(final EmitStrategy.StrategyType inputType, final boolean withHeaders) { + setup(inputType, withHeaders); props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); shouldCountSessionWindowed(); } @ParameterizedTest - @EnumSource(EmitStrategy.StrategyType.class) - public void shouldCountSessionWindowedWithCachingEnabled(final EmitStrategy.StrategyType inputType) { - setup(inputType); + @MethodSource("emitStrategyAndHeaders") + public void shouldCountSessionWindowedWithCachingEnabled(final EmitStrategy.StrategyType inputType, final boolean withHeaders) { + setup(inputType, withHeaders); shouldCountSessionWindowed(); } @@ -139,9 +156,9 @@ private void shouldCountSessionWindowed() { } @ParameterizedTest - @EnumSource(EmitStrategy.StrategyType.class) - public void shouldReduceWindowed(final EmitStrategy.StrategyType inputType) { - setup(inputType); + @MethodSource("emitStrategyAndHeaders") + public void shouldReduceWindowed(final EmitStrategy.StrategyType inputType, final boolean withHeaders) { + setup(inputType, withHeaders); final MockApiProcessorSupplier, String, Void, Void> supplier = new MockApiProcessorSupplier<>(); stream.reduce(MockReducer.STRING_ADDER) .toStream() @@ -178,9 +195,9 @@ public void shouldReduceWindowed(final EmitStrategy.StrategyType inputType) { } @ParameterizedTest - @EnumSource(EmitStrategy.StrategyType.class) - public void shouldAggregateSessionWindowed(final EmitStrategy.StrategyType inputType) { - setup(inputType); + @MethodSource("emitStrategyAndHeaders") + public void shouldAggregateSessionWindowed(final EmitStrategy.StrategyType inputType, final boolean withHeaders) { + setup(inputType, withHeaders); final MockApiProcessorSupplier, String, Void, Void> supplier = new MockApiProcessorSupplier<>(); stream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, @@ -219,15 +236,15 @@ public void shouldAggregateSessionWindowed(final EmitStrategy.StrategyType input } @ParameterizedTest - @EnumSource(EmitStrategy.StrategyType.class) - public void shouldMaterializeCount(final EmitStrategy.StrategyType inputType) { - setup(inputType); + @MethodSource("emitStrategyAndHeaders") + public void shouldMaterializeCount(final EmitStrategy.StrategyType inputType, final boolean withHeaders) { + setup(inputType, withHeaders); stream.count(Materialized.as("count-store")); try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { processData(driver); final SessionStore store = driver.getSessionStore("count-store"); - final List, Long>> data = StreamsTestUtils.toListAndCloseIterator(store.fetch("1", "2")); + final List, Long>> data = unwrapAggregations(store.fetch("1", "2")); if (!emitFinal) { assertThat( data, @@ -247,15 +264,15 @@ public void shouldMaterializeCount(final EmitStrategy.StrategyType inputType) { } @ParameterizedTest - @EnumSource(EmitStrategy.StrategyType.class) - public void shouldMaterializeReduced(final EmitStrategy.StrategyType inputType) { - setup(inputType); + @MethodSource("emitStrategyAndHeaders") + public void shouldMaterializeReduced(final EmitStrategy.StrategyType inputType, final boolean withHeaders) { + setup(inputType, withHeaders); stream.reduce(MockReducer.STRING_ADDER, Materialized.as("reduced")); try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { processData(driver); final SessionStore sessionStore = driver.getSessionStore("reduced"); - final List, String>> data = StreamsTestUtils.toListAndCloseIterator(sessionStore.fetch("1", "2")); + final List, String>> data = unwrapAggregations(sessionStore.fetch("1", "2")); if (!emitFinal) { assertThat( @@ -276,9 +293,9 @@ public void shouldMaterializeReduced(final EmitStrategy.StrategyType inputType) } @ParameterizedTest - @EnumSource(EmitStrategy.StrategyType.class) - public void shouldMaterializeAggregated(final EmitStrategy.StrategyType inputType) { - setup(inputType); + @MethodSource("emitStrategyAndHeaders") + public void shouldMaterializeAggregated(final EmitStrategy.StrategyType inputType, final boolean withHeaders) { + setup(inputType, withHeaders); stream.aggregate( MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, @@ -288,7 +305,7 @@ public void shouldMaterializeAggregated(final EmitStrategy.StrategyType inputTyp try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { processData(driver); final SessionStore sessionStore = driver.getSessionStore("aggregated"); - final List, String>> data = StreamsTestUtils.toListAndCloseIterator(sessionStore.fetch("1", "2")); + final List, String>> data = unwrapAggregations(sessionStore.fetch("1", "2")); if (!emitFinal) { assertThat( data, @@ -310,35 +327,35 @@ public void shouldMaterializeAggregated(final EmitStrategy.StrategyType inputTyp @ParameterizedTest @EnumSource(EmitStrategy.StrategyType.class) public void shouldThrowNullPointerOnAggregateIfInitializerIsNull(final EmitStrategy.StrategyType inputType) { - setup(inputType); + setup(inputType, false); assertThrows(NullPointerException.class, () -> stream.aggregate(null, MockAggregator.TOSTRING_ADDER, sessionMerger)); } @ParameterizedTest @EnumSource(EmitStrategy.StrategyType.class) public void shouldThrowNullPointerOnAggregateIfAggregatorIsNull(final EmitStrategy.StrategyType inputType) { - setup(inputType); + setup(inputType, false); assertThrows(NullPointerException.class, () -> stream.aggregate(MockInitializer.STRING_INIT, null, sessionMerger)); } @ParameterizedTest @EnumSource(EmitStrategy.StrategyType.class) public void shouldThrowNullPointerOnAggregateIfMergerIsNull(final EmitStrategy.StrategyType inputType) { - setup(inputType); + setup(inputType, false); assertThrows(NullPointerException.class, () -> stream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null)); } @ParameterizedTest @EnumSource(EmitStrategy.StrategyType.class) public void shouldThrowNullPointerOnReduceIfReducerIsNull(final EmitStrategy.StrategyType inputType) { - setup(inputType); + setup(inputType, false); assertThrows(NullPointerException.class, () -> stream.reduce(null)); } @ParameterizedTest @EnumSource(EmitStrategy.StrategyType.class) public void shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull(final EmitStrategy.StrategyType inputType) { - setup(inputType); + setup(inputType, false); assertThrows(NullPointerException.class, () -> stream.aggregate( null, MockAggregator.TOSTRING_ADDER, @@ -349,7 +366,7 @@ public void shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull(fin @ParameterizedTest @EnumSource(EmitStrategy.StrategyType.class) public void shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull(final EmitStrategy.StrategyType inputType) { - setup(inputType); + setup(inputType, false); assertThrows(NullPointerException.class, () -> stream.aggregate( MockInitializer.STRING_INIT, null, @@ -360,7 +377,7 @@ public void shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull(fina @ParameterizedTest @EnumSource(EmitStrategy.StrategyType.class) public void shouldThrowNullPointerOnMaterializedAggregateIfMergerIsNull(final EmitStrategy.StrategyType inputType) { - setup(inputType); + setup(inputType, false); assertThrows(NullPointerException.class, () -> stream.aggregate( MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, @@ -372,7 +389,7 @@ public void shouldThrowNullPointerOnMaterializedAggregateIfMergerIsNull(final Em @ParameterizedTest @EnumSource(EmitStrategy.StrategyType.class) public void shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull(final EmitStrategy.StrategyType inputType) { - setup(inputType); + setup(inputType, false); assertThrows(NullPointerException.class, () -> stream.aggregate( MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, @@ -383,7 +400,7 @@ public void shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull(fi @ParameterizedTest @EnumSource(EmitStrategy.StrategyType.class) public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull(final EmitStrategy.StrategyType inputType) { - setup(inputType); + setup(inputType, false); assertThrows(NullPointerException.class, () -> stream.reduce(null, Materialized.as("store"))); } @@ -391,28 +408,28 @@ public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull(final Emit @EnumSource(EmitStrategy.StrategyType.class) @SuppressWarnings("unchecked") public void shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull(final EmitStrategy.StrategyType inputType) { - setup(inputType); + setup(inputType, false); assertThrows(NullPointerException.class, () -> stream.reduce(MockReducer.STRING_ADDER, (Materialized) null)); } @ParameterizedTest @EnumSource(EmitStrategy.StrategyType.class) public void shouldThrowNullPointerOnMaterializedReduceIfNamedIsNull(final EmitStrategy.StrategyType inputType) { - setup(inputType); + setup(inputType, false); assertThrows(NullPointerException.class, () -> stream.reduce(MockReducer.STRING_ADDER, (Named) null)); } @ParameterizedTest @EnumSource(EmitStrategy.StrategyType.class) public void shouldThrowNullPointerOnCountIfMaterializedIsNull(final EmitStrategy.StrategyType inputType) { - setup(inputType); + setup(inputType, false); assertThrows(NullPointerException.class, () -> stream.count((Materialized>) null)); } @ParameterizedTest - @EnumSource(EmitStrategy.StrategyType.class) - public void shouldNotEnableCachingWithEmitFinal(final EmitStrategy.StrategyType inputType) { - setup(inputType); + @MethodSource("emitStrategyAndHeaders") + public void shouldNotEnableCachingWithEmitFinal(final EmitStrategy.StrategyType inputType, final boolean withHeaders) { + setup(inputType, withHeaders); if (!emitFinal) return; @@ -423,11 +440,15 @@ public void shouldNotEnableCachingWithEmitFinal(final EmitStrategy.StrategyType Materialized.>as("aggregated").withValueSerde(Serdes.String())); try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { - final SessionStore store = driver.getSessionStore("aggregated"); + final StateStore store = driver.getAllStateStores().get("aggregated"); final WrappedStateStore changeLogging = (WrappedStateStore) ((WrappedStateStore) store).wrapped(); assertThat(store, instanceOf(MeteredSessionStore.class)); - assertThat(changeLogging, instanceOf(ChangeLoggingSessionBytesStore.class)); - assertThat(changeLogging.wrapped(), instanceOf(RocksDBTimeOrderedSessionStore.class)); + if (withHeaders) { + assertThat(changeLogging, instanceOf(ChangeLoggingSessionBytesStoreWithHeaders.class)); + } else { + assertThat(changeLogging, instanceOf(ChangeLoggingSessionBytesStore.class)); + assertThat(changeLogging.wrapped(), instanceOf(SessionToHeadersStoreAdapter.class)); + } } } @@ -440,4 +461,15 @@ private void processData(final TopologyTestDriver driver) { inputTopic.pipeInput("2", "1", 600); inputTopic.pipeInput("2", "2", 599); } + + private List, V>> unwrapAggregations( + final KeyValueIterator, V> iterator) { + final List, V>> result = new ArrayList<>(); + while (iterator.hasNext()) { + final KeyValue, V> next = iterator.next(); + result.add(KeyValue.pair(next.key, next.value)); + } + iterator.close(); + return result; + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlySessionStoreFacadeTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlySessionStoreFacadeTest.java new file mode 100644 index 0000000000000..f0efd857f7103 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlySessionStoreFacadeTest.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.SessionWindow; +import org.apache.kafka.streams.state.AggregationWithHeaders; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.SessionStoreWithHeaders; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.STRICT_STUBS) +public class ReadOnlySessionStoreFacadeTest { + @Mock + private SessionStoreWithHeaders mockedSessionStoreWithHeaders; + @Mock + private KeyValueIterator, AggregationWithHeaders> mockedIterator; + + private ReadOnlySessionStoreFacade readOnlySessionStoreFacade; + + @BeforeEach + public void setup() { + readOnlySessionStoreFacade = new ReadOnlySessionStoreFacade<>(mockedSessionStoreWithHeaders); + } + + @Test + public void shouldReturnPlainValueOnFetchSession() { + when(mockedSessionStoreWithHeaders.fetchSession("key", 10L, 20L)) + .thenReturn(AggregationWithHeaders.make("value", new RecordHeaders())); + + assertThat(readOnlySessionStoreFacade.fetchSession("key", 10L, 20L), is("value")); + } + + @Test + public void shouldReturnNullOnFetchSessionWhenNull() { + when(mockedSessionStoreWithHeaders.fetchSession("unknownKey", 10L, 20L)) + .thenReturn(null); + + assertNull(readOnlySessionStoreFacade.fetchSession("unknownKey", 10L, 20L)); + } + + @Test + public void shouldReturnStrippedKeyValuePairsOnFindSessions() { + when(mockedIterator.next()) + .thenReturn(KeyValue.pair( + new Windowed<>("key1", new SessionWindow(10L, 20L)), + AggregationWithHeaders.make("value1", new RecordHeaders()))) + .thenReturn(KeyValue.pair( + new Windowed<>("key2", new SessionWindow(30L, 40L)), + AggregationWithHeaders.make("value2", new RecordHeaders()))); + when(mockedSessionStoreWithHeaders.findSessions("key1", 10L, 40L)) + .thenReturn(mockedIterator); + + final KeyValueIterator, String> iterator = + readOnlySessionStoreFacade.findSessions("key1", 10L, 40L); + + assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", new SessionWindow(10L, 20L)), "value1"))); + assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2", new SessionWindow(30L, 40L)), "value2"))); + } + + @Test + public void shouldReturnStrippedKeyValuePairsOnBackwardFindSessions() { + when(mockedIterator.next()) + .thenReturn(KeyValue.pair( + new Windowed<>("key1", new SessionWindow(30L, 40L)), + AggregationWithHeaders.make("value1", new RecordHeaders()))); + when(mockedSessionStoreWithHeaders.backwardFindSessions("key1", 10L, 40L)) + .thenReturn(mockedIterator); + + final KeyValueIterator, String> iterator = + readOnlySessionStoreFacade.backwardFindSessions("key1", 10L, 40L); + + assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", new SessionWindow(30L, 40L)), "value1"))); + } + + @Test + public void shouldReturnStrippedKeyValuePairsOnFindSessionsWithKeyRange() { + when(mockedIterator.next()) + .thenReturn(KeyValue.pair( + new Windowed<>("key1", new SessionWindow(10L, 20L)), + AggregationWithHeaders.make("value1", new RecordHeaders()))); + when(mockedSessionStoreWithHeaders.findSessions("key1", "key2", 10L, 40L)) + .thenReturn(mockedIterator); + + final KeyValueIterator, String> iterator = + readOnlySessionStoreFacade.findSessions("key1", "key2", 10L, 40L); + + assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", new SessionWindow(10L, 20L)), "value1"))); + } + + @Test + public void shouldReturnStrippedKeyValuePairsOnBackwardFindSessionsWithKeyRange() { + when(mockedIterator.next()) + .thenReturn(KeyValue.pair( + new Windowed<>("key2", new SessionWindow(30L, 40L)), + AggregationWithHeaders.make("value2", new RecordHeaders()))); + when(mockedSessionStoreWithHeaders.backwardFindSessions("key1", "key2", 10L, 40L)) + .thenReturn(mockedIterator); + + final KeyValueIterator, String> iterator = + readOnlySessionStoreFacade.backwardFindSessions("key1", "key2", 10L, 40L); + + assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2", new SessionWindow(30L, 40L)), "value2"))); + } + + @Test + public void shouldReturnStrippedKeyValuePairsOnFetch() { + when(mockedIterator.next()) + .thenReturn(KeyValue.pair( + new Windowed<>("key1", new SessionWindow(10L, 20L)), + AggregationWithHeaders.make("value1", new RecordHeaders()))) + .thenReturn(KeyValue.pair( + new Windowed<>("key1", new SessionWindow(30L, 40L)), + AggregationWithHeaders.make("value2", new RecordHeaders()))); + when(mockedSessionStoreWithHeaders.fetch("key1")).thenReturn(mockedIterator); + + final KeyValueIterator, String> iterator = + readOnlySessionStoreFacade.fetch("key1"); + + assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", new SessionWindow(10L, 20L)), "value1"))); + assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", new SessionWindow(30L, 40L)), "value2"))); + } + + @Test + public void shouldReturnStrippedKeyValuePairsOnBackwardFetch() { + when(mockedIterator.next()) + .thenReturn(KeyValue.pair( + new Windowed<>("key1", new SessionWindow(30L, 40L)), + AggregationWithHeaders.make("value1", new RecordHeaders()))); + when(mockedSessionStoreWithHeaders.backwardFetch("key1")).thenReturn(mockedIterator); + + final KeyValueIterator, String> iterator = + readOnlySessionStoreFacade.backwardFetch("key1"); + + assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", new SessionWindow(30L, 40L)), "value1"))); + } + + @Test + public void shouldReturnStrippedKeyValuePairsOnFetchWithKeyRange() { + when(mockedIterator.next()) + .thenReturn(KeyValue.pair( + new Windowed<>("key1", new SessionWindow(10L, 20L)), + AggregationWithHeaders.make("value1", new RecordHeaders()))); + when(mockedSessionStoreWithHeaders.fetch("key1", "key2")).thenReturn(mockedIterator); + + final KeyValueIterator, String> iterator = + readOnlySessionStoreFacade.fetch("key1", "key2"); + + assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", new SessionWindow(10L, 20L)), "value1"))); + } + + @Test + public void shouldReturnStrippedKeyValuePairsOnBackwardFetchWithKeyRange() { + when(mockedIterator.next()) + .thenReturn(KeyValue.pair( + new Windowed<>("key2", new SessionWindow(30L, 40L)), + AggregationWithHeaders.make("value2", new RecordHeaders()))); + when(mockedSessionStoreWithHeaders.backwardFetch("key1", "key2")).thenReturn(mockedIterator); + + final KeyValueIterator, String> iterator = + readOnlySessionStoreFacade.backwardFetch("key1", "key2"); + + assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2", new SessionWindow(30L, 40L)), "value2"))); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreFetchTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreFetchTest.java index c306bab040c53..87770940cf82b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreFetchTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreFetchTest.java @@ -161,7 +161,7 @@ private void verifyNormalQuery(final SessionStore stateStore) { expectedRecords.iterator() : expectedRecords.descendingIterator(); - TestUtils.checkEquals(scanIterator, dataIterator); + TestUtils.checkEquals(dataIterator, scanIterator); } try (final KeyValueIterator, Long> scanIterator = forward ? @@ -172,7 +172,7 @@ private void verifyNormalQuery(final SessionStore stateStore) { expectedRecords.iterator() : expectedRecords.descendingIterator(); - TestUtils.checkEquals(scanIterator, dataIterator); + TestUtils.checkEquals(dataIterator, scanIterator); } } @@ -185,7 +185,7 @@ private void verifyInfiniteQuery(final SessionStore stateStore) { expectedRecords.iterator() : expectedRecords.descendingIterator(); - TestUtils.checkEquals(scanIterator, dataIterator); + TestUtils.checkEquals(dataIterator, scanIterator); } try (final KeyValueIterator, Long> scanIterator = forward ? @@ -196,7 +196,7 @@ private void verifyInfiniteQuery(final SessionStore stateStore) { expectedRecords.iterator() : expectedRecords.descendingIterator(); - TestUtils.checkEquals(scanIterator, dataIterator); + TestUtils.checkEquals(dataIterator, scanIterator); } } diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 20d81be52f99e..4fa9117cee759 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -76,12 +76,14 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.state.AggregationWithHeaders; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.streams.state.ReadOnlySessionStore; import org.apache.kafka.streams.state.ReadOnlyWindowStore; import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.SessionStoreWithHeaders; import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders; import org.apache.kafka.streams.state.TimestampedWindowStore; @@ -93,6 +95,7 @@ import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.streams.state.internals.ReadOnlyKeyValueStoreFacade; import org.apache.kafka.streams.state.internals.ReadOnlyWindowStoreFacade; +import org.apache.kafka.streams.state.internals.SessionStoreIteratorFacade; import org.apache.kafka.streams.state.internals.ThreadCache; import org.apache.kafka.streams.test.TestRecord; @@ -1195,6 +1198,9 @@ public WindowStore> getTimestampedWindowStore @SuppressWarnings("unchecked") public SessionStore getSessionStore(final String name) { final StateStore store = getStateStore(name, false); + if (store instanceof SessionStoreWithHeaders) { + return new SessionStoreFacade<>((SessionStoreWithHeaders) store); + } return store instanceof SessionStore ? (SessionStore) store : null; } @@ -1436,4 +1442,115 @@ public Position getPosition() { } } + static class SessionStoreFacade implements SessionStore { + private final SessionStoreWithHeaders inner; + + SessionStoreFacade(final SessionStoreWithHeaders inner) { + this.inner = inner; + } + + @Override + public KeyValueIterator, V> findSessions(final K key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return new SessionStoreIteratorFacade<>(inner.findSessions(key, earliestSessionEndTime, latestSessionStartTime)); + } + + @Override + public KeyValueIterator, V> backwardFindSessions(final K key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return new SessionStoreIteratorFacade<>(inner.backwardFindSessions(key, earliestSessionEndTime, latestSessionStartTime)); + } + + @Override + public KeyValueIterator, V> findSessions(final K keyFrom, + final K keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return new SessionStoreIteratorFacade<>(inner.findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime)); + } + + @Override + public KeyValueIterator, V> backwardFindSessions(final K keyFrom, + final K keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return new SessionStoreIteratorFacade<>(inner.backwardFindSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime)); + } + + @Override + public V fetchSession(final K key, + final long sessionStartTime, + final long sessionEndTime) { + return AggregationWithHeaders.getAggregationOrNull(inner.fetchSession(key, sessionStartTime, sessionEndTime)); + } + + @Override + public KeyValueIterator, V> findSessions(final long earliestSessionEndTime, + final long latestSessionEndTime) { + return new SessionStoreIteratorFacade<>(inner.findSessions(earliestSessionEndTime, latestSessionEndTime)); + } + + @Override + public KeyValueIterator, V> fetch(final K key) { + return new SessionStoreIteratorFacade<>(inner.fetch(key)); + } + + @Override + public KeyValueIterator, V> backwardFetch(final K key) { + return new SessionStoreIteratorFacade<>(inner.backwardFetch(key)); + } + + @Override + public KeyValueIterator, V> fetch(final K keyFrom, final K keyTo) { + return new SessionStoreIteratorFacade<>(inner.fetch(keyFrom, keyTo)); + } + + @Override + public KeyValueIterator, V> backwardFetch(final K keyFrom, final K keyTo) { + return new SessionStoreIteratorFacade<>(inner.backwardFetch(keyFrom, keyTo)); + } + + @Override + public void remove(final Windowed sessionKey) { + inner.remove(sessionKey); + } + + @Override + public void put(final Windowed sessionKey, final V aggregate) { + inner.put(sessionKey, AggregationWithHeaders.make(aggregate, new RecordHeaders())); + } + + @Override + public String name() { + return inner.name(); + } + + @Override + public void init(final StateStoreContext stateStoreContext, final StateStore root) { + inner.init(stateStoreContext, root); + } + + @Override + public void close() { + inner.close(); + } + + @Override + public boolean persistent() { + return inner.persistent(); + } + + @Override + public boolean isOpen() { + return inner.isOpen(); + } + + @Override + public Position getPosition() { + return inner.getPosition(); + } + } + }