Skip to content

KAFKA-20257: GlobalStateManagerImpl delegates offset tracking to stores#21739

Open
nicktelford wants to merge 1 commit intoapache:trunkfrom
nicktelford:KAFKA-19712-CS1b
Open

KAFKA-20257: GlobalStateManagerImpl delegates offset tracking to stores#21739
nicktelford wants to merge 1 commit intoapache:trunkfrom
nicktelford:KAFKA-19712-CS1b

Conversation

@nicktelford
Copy link
Contributor

As part of KIP-1035, we want to transition away from task-specific .checkpoint files, and instead delegate offset management to StateStores.

We now have a LegacyCheckpointingStateStore wrapper to encapsulate the management of offsets for StateStore implementations that do not know how to manage their own offsets (i.e. for which managesOffsets() == false).

As of KAFKA-20212, RocksDBStore now knows how to manage its own offsets, so it will not be wrapped in a LegacyCheckpointingStateStore; only user-defined persistent stores will use this wrapper.

Corresponding changes to ProcessorStateManager have been submitted independently, as KAFKA-19712.

Until both ProcessorStateManager and GlobalStateManagerImpl have been updated, the StateManager interface must remain as-is. Therefore, the flush and checkpoint methods will not be consolidated until a later PR, which will clean up the interface and its usage by Task and friends.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@github-actions github-actions bot added triage PRs from the community streams labels Mar 12, 2026
Copy link
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @nicktelford - LGTM - just a couple minor questions

maybeWrappedStore.init(globalProcessorContext, maybeWrappedStore);

// make sure each topic-partition from checkpointFileCache is associated with a global state store
checkpointFileCache.keySet().forEach(tp -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We removed this check an it makes sense since we're moving away from checkpoint files, with the new approach I'm thinking there's no way for this to get out sync now?

@Test
public void shouldReadCheckpointOffsets() throws IOException {
final Map<TopicPartition, Long> expected = writeCheckpoint();
writeCheckpoint();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is there a test for full migration path? i.e. legacy .checkpoint exists → stores get migrated offsets → legacy file deleted might be worth adding

@github-actions github-actions bot removed the triage PRs from the community label Mar 14, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants