Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b71fc37
Add session store with headers infra and support for IQv2
bbejeck Mar 8, 2026
c460825
Clean up generics
bbejeck Mar 8, 2026
e596f0f
Add override for unknown query, remove support in IQv2 and posistion …
bbejeck Mar 9, 2026
edeb152
Reset error message not testing/supporting IQv2 for SessionStore with…
bbejeck Mar 10, 2026
96da383
Address review comments
bbejeck Mar 11, 2026
c21c66f
Add session store with headers infra and support for IQv2
bbejeck Mar 8, 2026
9b918b2
Add support for RocksDBSessionStores with headers
bbejeck Feb 24, 2026
506856f
KAFKA-20158: Add RocksDB session store upgrade path for headers support
bbejeck Feb 25, 2026
c30db5e
fixed test and made more effective
bbejeck Feb 25, 2026
6116037
Updates for changes from Segment class methods
bbejeck Feb 25, 2026
b13ada5
Address comments
bbejeck Feb 26, 2026
87b10ff
Renaming for usage clarity
bbejeck Feb 26, 2026
aaa9185
Adding DSL support for session windows with headers
bbejeck Feb 26, 2026
02638f6
Introduce wrapper to reduce duplicated code
bbejeck Feb 27, 2026
a7a5fa8
Updates for storing empty headers in merged session window aggregations
bbejeck Feb 28, 2026
8434166
Remove branching DSL works with Headers Store
bbejeck Mar 1, 2026
0ca7fdf
missed one comment,side cleanup
bbejeck Mar 1, 2026
38e3216
Remove generics per comment remove method override
bbejeck Mar 1, 2026
e9b26df
Changes for correctly wiring up session stores with headers
bbejeck Mar 3, 2026
bc20f83
Updates per comments
bbejeck Mar 6, 2026
5d09ae3
Adding support for DSL integration and readonly
bbejeck Mar 11, 2026
279cabe
Remove session headers store from IQv2 test options, rebase cleanup
bbejeck Mar 11, 2026
b1c663e
More IQv2 and PositionRestart intgration test cleanup
bbejeck Mar 11, 2026
cfd8b7a
Missed one more cleanup
bbejeck Mar 11, 2026
772e7f4
Adding parameters for testing with headers enabled
bbejeck Mar 12, 2026
716351b
Checkstyle fix
bbejeck Mar 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.query.StateQueryRequest.inStore;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -1498,14 +1499,10 @@ private <T> void shouldHandleSessionKeyDSLQueries() {
throw new AssertionError(queryResult.toString());
}
assertThat(partitionResult.getFailureReason(), is(FailureReason.UNKNOWN_QUERY_TYPE));
assertThat(partitionResult.getFailureMessage(), is(
"This store"
+ " (class org.apache.kafka.streams.state.internals.MeteredSessionStore)"
+ " doesn't know how to execute the given query"
+ " (WindowRangeQuery{key=Optional.empty, timeFrom=Optional[1970-01-01T00:00:00Z], timeTo=Optional[1970-01-01T00:00:00Z]})"
+ " because SessionStores only support WindowRangeQuery.withKey."
+ " Contact the store maintainer if you need support for a new query type."
));
assertThat(partitionResult.getFailureMessage(),
containsString("doesn't know how to execute the given query"));
assertThat(partitionResult.getFailureMessage(),
containsString("because SessionStores only support WindowRangeQuery.withKey."));
}
}
}
Expand Down Expand Up @@ -1571,14 +1568,10 @@ private <T> void shouldHandleSessionKeyPAPIQueries() {
throw new AssertionError(queryResult.toString());
}
assertThat(partitionResult.getFailureReason(), is(FailureReason.UNKNOWN_QUERY_TYPE));
assertThat(partitionResult.getFailureMessage(), is(
"This store"
+ " (class org.apache.kafka.streams.state.internals.MeteredSessionStore)"
+ " doesn't know how to execute the given query"
+ " (WindowRangeQuery{key=Optional.empty, timeFrom=Optional[1970-01-01T00:00:00Z], timeTo=Optional[1970-01-01T00:00:00Z]})"
+ " because SessionStores only support WindowRangeQuery.withKey."
+ " Contact the store maintainer if you need support for a new query type."
));
assertThat(partitionResult.getFailureMessage(),
containsString("doesn't know how to execute the given query"));
assertThat(partitionResult.getFailureMessage(),
containsString("because SessionStores only support WindowRangeQuery.withKey."));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package org.apache.kafka.streams.integration;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
Expand All @@ -25,6 +28,7 @@
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
Expand Down Expand Up @@ -58,6 +62,9 @@
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.consumer.ConsoleConsumer;
Expand All @@ -71,11 +78,15 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
Expand Down Expand Up @@ -718,8 +729,9 @@ public void shouldAggregateSlidingWindows(final TestInfo testInfo) throws Except
}
}

@Test
public void shouldCountSessionWindows() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void shouldCountSessionWindows(final boolean withHeaders) throws Exception {
final long sessionGap = 5 * 60 * 1000L;
final List<KeyValue<String, String>> t1Messages = Arrays.asList(
new KeyValue<>("bob", "start"),
Expand Down Expand Up @@ -797,15 +809,31 @@ public void shouldCountSessionWindows() throws Exception {
final Map<Windowed<String>, KeyValue<Long, Long>> results = new HashMap<>();
final CountDownLatch latch = new CountDownLatch(13);

builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
final var groupedByKey = builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(sessionGap)))
.count()
.toStream()
.process(() -> (Processor<Windowed<String>, Long, Object, Object>) record -> {
results.put(record.key(), KeyValue.pair(record.value(), record.timestamp()));
latch.countDown();
});
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(sessionGap)));

if (withHeaders) {
final SessionBytesStoreSupplier supplier = Stores.persistentSessionStoreWithHeaders(
"CountSessionWithHeadersStore",
ofMillis(sessionGap + TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS) + 1)
);
groupedByKey
.count(Materialized.as(supplier))
.toStream()
.process(() -> (Processor<Windowed<String>, Long, Object, Object>) record -> {
results.put(record.key(), KeyValue.pair(record.value(), record.timestamp()));
latch.countDown();
});
} else {
groupedByKey
.count()
.toStream()
.process(() -> (Processor<Windowed<String>, Long, Object, Object>) record -> {
results.put(record.key(), KeyValue.pair(record.value(), record.timestamp()));
latch.countDown();
});
}

startStreams();
latch.await(30, TimeUnit.SECONDS);
Expand All @@ -819,88 +847,44 @@ public void shouldCountSessionWindows() throws Exception {
assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))), equalTo(KeyValue.pair(1L, t3)));
}

@Test
public void shouldReduceSessionWindows() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void shouldReduceSessionWindows(final boolean withHeaders) throws Exception {
final long sessionGap = 1000L; // something to do with time
final List<KeyValue<String, String>> t1Messages = Arrays.asList(
new KeyValue<>("bob", "start"),
new KeyValue<>("penny", "start"),
new KeyValue<>("jo", "pause"),
new KeyValue<>("emily", "pause")
);

final Properties producerConfig = TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
StringSerializer.class,
StringSerializer.class,
new Properties());

final long t1 = mockTime.milliseconds();
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
userSessionsStream,
t1Messages,
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
StringSerializer.class,
StringSerializer.class,
new Properties()),
t1
);
final long t2 = t1 + (sessionGap / 2);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
userSessionsStream,
Collections.singletonList(
new KeyValue<>("emily", "resume")
),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
StringSerializer.class,
StringSerializer.class,
new Properties()),
t2
);
final long t3 = t1 + sessionGap + 1;
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
userSessionsStream,
Arrays.asList(
new KeyValue<>("bob", "pause"),
new KeyValue<>("penny", "stop")
),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
StringSerializer.class,
StringSerializer.class,
new Properties()),
t3
);
final long t4 = t3 + (sessionGap / 2);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
userSessionsStream,
Arrays.asList(
new KeyValue<>("bob", "resume"), // bobs session continues
new KeyValue<>("jo", "resume") // jo's starts new session
),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
StringSerializer.class,
StringSerializer.class,
new Properties()),
t4
);
final long t5 = t4 - 1;
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
userSessionsStream,
Collections.singletonList(
new KeyValue<>("jo", "late") // jo has late arrival
),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
StringSerializer.class,
StringSerializer.class,
new Properties()),
t5
);

produceSessionWindowData(producerConfig, withHeaders, t1, t2, t3, t4, t5, sessionGap);

final Map<Windowed<String>, KeyValue<String, Long>> results = new HashMap<>();
final CountDownLatch latch = new CountDownLatch(13);
final String userSessionsStore = "UserSessionsStore";

final Materialized<String, String, SessionStore<Bytes, byte[]>> materialized;
if (withHeaders) {
final SessionBytesStoreSupplier supplier = Stores.persistentSessionStoreWithHeaders(
userSessionsStore,
ofMillis(sessionGap + ofMinutes(1).toMillis() + 1)
);
materialized = Materialized.as(supplier);
} else {
materialized = Materialized.as(userSessionsStore);
}

builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .windowedBy(SessionWindows.ofInactivityGapAndGrace(ofMillis(sessionGap), ofMinutes(1))) .reduce((value1, value2) -> value1 + ":" + value2, Materialized.as(userSessionsStore))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(SessionWindows.ofInactivityGapAndGrace(ofMillis(sessionGap), ofMinutes(1)))
.reduce((value1, value2) -> value1 + ":" + value2, materialized)
.toStream()
.process(() -> (Processor<Windowed<String>, String, Object, Object>) record -> {
results.put(record.key(), KeyValue.pair(record.value(), record.timestamp()));
Expand All @@ -919,9 +903,61 @@ public void shouldReduceSessionWindows() throws Exception {
assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3, t4))), equalTo(KeyValue.pair("pause:resume", t4)));
assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))), equalTo(KeyValue.pair("stop", t3)));

// verify can query data via IQ
verifySessionStore(userSessionsStore, t1, t3, t4);

}

private void produceSessionWindowData(final Properties producerConfig,
final boolean withHeaders,
final long t1, final long t2, final long t3,
final long t4, final long t5,
final long sessionGap) throws Exception {
final List<KeyValue<String, String>> t1Messages = Arrays.asList(
new KeyValue<>("bob", "start"),
new KeyValue<>("penny", "start"),
new KeyValue<>("jo", "pause"),
new KeyValue<>("emily", "pause")
);

produceWithOptionalHeaders(t1Messages, producerConfig, withHeaders, "t1", t1);
produceWithOptionalHeaders(
Collections.singletonList(new KeyValue<>("emily", "resume")),
producerConfig, withHeaders, "t2", t2);
produceWithOptionalHeaders(
Arrays.asList(new KeyValue<>("bob", "pause"), new KeyValue<>("penny", "stop")),
producerConfig, withHeaders, "t3", t3);
produceWithOptionalHeaders(
Arrays.asList(
new KeyValue<>("bob", "resume"), // bobs session continues
new KeyValue<>("jo", "resume")), // jo's starts new session
producerConfig, withHeaders, "t4", t4);
produceWithOptionalHeaders(
Collections.singletonList(new KeyValue<>("jo", "late")), // jo has late arrival
producerConfig, withHeaders, "t5", t5);
}

private void produceWithOptionalHeaders(final Collection<KeyValue<String, String>> records,
final Properties producerConfig,
final boolean withHeaders,
final String batchId,
final long timestamp) throws Exception {
if (withHeaders) {
final Headers headers = new RecordHeaders(Arrays.asList(
new RecordHeader("batch", batchId.getBytes(StandardCharsets.UTF_8)),
new RecordHeader("source", "test".getBytes(StandardCharsets.UTF_8))
));
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
userSessionsStream, records, producerConfig, headers, timestamp, false);
} else {
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
userSessionsStream, records, producerConfig, timestamp);
}
}

private void verifySessionStore(final String storeName,
final long t1, final long t3, final long t4) throws Exception {
final ReadOnlySessionStore<String, String> sessionStore =
IntegrationTestUtils.getStore(userSessionsStore, kafkaStreams, QueryableStoreTypes.sessionStore());
IntegrationTestUtils.getStore(storeName, kafkaStreams, QueryableStoreTypes.sessionStore());

try (final KeyValueIterator<Windowed<String>, String> bob = sessionStore.fetch("bob")) {
assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new SessionWindow(t1, t1)), "start")));
Expand All @@ -930,6 +966,11 @@ public void shouldReduceSessionWindows() throws Exception {
}
}

private static void assertHeaderCount(final Headers headers, final int expectedCount) {
assertThat("header count", headers.toArray().length, equalTo(expectedCount));
}


@Test
public void shouldCountUnlimitedWindows() throws Exception {
final long startTime = mockTime.milliseconds() - TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS) + 1;
Expand Down
Loading
Loading