Skip to content

KAFKA-20287 : Fix CF handle leaks#21751

Merged
mjsax merged 4 commits intoapache:trunkfrom
muralibasani:fix-cf-leaks
Mar 27, 2026
Merged

KAFKA-20287 : Fix CF handle leaks#21751
mjsax merged 4 commits intoapache:trunkfrom
muralibasani:fix-cf-leaks

Conversation

@muralibasani
Copy link
Copy Markdown
Contributor

@muralibasani muralibasani commented Mar 13, 2026

  • RocksDBStore.java — openRocksDB() base method: Added finally block
    after RocksDB.open() to close all CF handles and db if
    createColumnFamilies() or mergeColumnFamilyHandleLists() fails.

  • RocksDBStore.java — openDB(): Added finally block with
    closeNativeResources() to close all partially-initialized native
    resources if openRocksDB() or cfAccessor.open() fails.

  • RocksDBTimestampedStore.java: Changed RocksIterator from manual
    close() to try-with-resources, and added finally block to close all CF
    handles if an exception occurs before the accessor takes ownership.

  • RocksDBMigratingSessionStoreWithHeaders.java: Same as above —
    try-with-resources for RocksIterator and finally block for CF handle
    cleanup on failure.

  • RocksDBTimestampedStoreWithHeaders.java — openFromDefaultStore():
    Added finally block to close all CF handles if an exception occurs
    before the accessor takes ownership.

  • RocksDBTimestampedStoreWithHeaders.java — openFromTimestampedStore():
    Replaced manual per-handle close() calls (which missed
    columnFamilies.get(3)) with a single finally block that loops over all
    handles on failure.

Reviewers: Matthias J. Sax matthias@confluent.io, Alieh Saeedi
asaeedi@confluent.io

@github-actions github-actions bot added triage PRs from the community streams labels Mar 13, 2026
throw new ProcessorStateException(fatalMessage, fatal);
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error opening store " + name, e);
openRocksDB(dbOptions, columnFamilyOptions);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method internally calls createColumnFamilies() or mergeColumnFamilyHandleLists() which might fail

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we should do the necessary exception-handling inside openRocksDB instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see openRocksDB() already handles its own cleanup (CF handles + db), but the closeNativeResources() in openDB() covers a different failure scope that openRocksDB() can't handle.

These 6 native resources (userSpecifiedOptions, cache, filter, wOptions, fOptions, statistics) are all created in openDB() before openRocksDB() is ever called and openRocksDB() doesn't own them and has no references to clean them up.

If openRocksDB() itself fails, it will cleanup everything which is in finally block.

If openRocksDB() does not fail but cfAccessor.open() fails — now db is open. Hence closeNativeResources handles full teardown.

Other option is to move everything into openRocksDB(), but this would change the method signature across all overrides.

success = true;
} finally {
if (!success) {
closeNativeResources();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if openRocksDB() or cfAccessor.open() fail

noTimestampColumnFamily.close();
boolean success = false;
try {
try (final RocksIterator noTimestampsIter = db.newIterator(noTimestampColumnFamily)) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updating with try-resources

boolean success = false;
try {
// Check if default CF has data (plain store upgrade)
try (final RocksIterator defaultIter = db.newIterator(defaultCf)) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also updating with try-with resources

boolean success = false;
try {
// verify and close empty Default ColumnFamily
try (final RocksIterator defaultIter = db.newIterator(columnFamilies.get(0))) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updating here as well in the same way try with resources

noHeadersColumnFamily.close();
boolean success = false;
try {
try (final RocksIterator noHeadersIter = db.newIterator(noHeadersColumnFamily)) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updating here with try-with resources

@muralibasani muralibasani changed the title Fix CF handle leaks KAFKA-20287 : Fix CF handle leaks Mar 13, 2026
@muralibasani muralibasani marked this pull request as ready for review March 13, 2026 19:56
@github-actions github-actions bot removed the triage PRs from the community label Mar 14, 2026
@muralibasani
Copy link
Copy Markdown
Contributor Author

@mjsax do you think this pr addresses the cf handle leaks ? Possible to review ?

Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Made a first pass. -- In the meantime, we added RocksDBMigratingWindowStoreWithHeaders that we should include in this PR.

* Used only by the error cleanup path in {@link #openDB} where some resources
* may not have been initialized yet.
*/
private void closeNativeResources() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do already have code in close() which closes all these resources -- are these new helper necessary? I would believe that the runtime ensures that we call RocksDBStore.close() if init() (which calls openDB()) fails.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In close method, we are checking isOpen method which in turn checks open.get()

this would wait for openDB/openRocksDb() and if these fail, it would return false.

this 'false' does not close all resources in close() method.

throw new ProcessorStateException(fatalMessage, fatal);
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error opening store " + name, e);
openRocksDB(dbOptions, columnFamilyOptions);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we should do the necessary exception-handling inside openRocksDB instead?

mergeColumnFamilyHandleLists(existingColumnFamilies, createdColumnFamilies, allDescriptors);
openSuccess = true;
return result;
} finally {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we not use catch(Exception e) -- this allows us to drop openSuccess flag?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with removing success flag. Updated with catch block.

for (final ColumnFamilyHandle handle : createdColumnFamilies) {
handle.close();
}
db.close();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to close the db -- this should happen in RocksDBStore.close() already?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, db.close() is not required again. Removed.

.collect(Collectors.toList());
final List<ColumnFamilyHandle> existingColumnFamilies = new ArrayList<>(existingDescriptors.size());
final List<ColumnFamilyHandle> createdColumnFamilies = new ArrayList<>();
db = RocksDB.open(dbOptions, absolutePath, existingDescriptors, existingColumnFamilies);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this call should also go inside the try-catch block?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The RocksDB.open() call is now inside the try-catch block, so if createColumnFamilies() or mergeColumnFamilyHandleLists() throws, the existingColumnFamilies handles populated by RocksDB.open() are properly cleaned up.

boolean openSuccess = false;
try {
createdColumnFamilies.addAll(db.createColumnFamilies(toCreate));
final List<ColumnFamilyHandle> result =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final List<ColumnFamilyHandle> result =
final List<ColumnFamilyHandle> allColumnFamilies =

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now returning it inline, instead of new var

}
}
success = true;
} finally {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use catch(Exception) and get rid of success flag?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. But also had to replace Exception with RunTimeException (spotbugs)

}
}
success = true;
} finally {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same

}
}
success = true;
} finally {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied the above change here too.

}
}
success = true;
} finally {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use catch(Exception) instead

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@mjsax mjsax added kip Requires or implements a KIP ci-approved labels Mar 20, 2026
Copy link
Copy Markdown
Contributor

@aliehsaeedii aliehsaeedii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @muralibasani. Good catch.
Just needs a rebase (and former comments from Matthias)

@muralibasani
Copy link
Copy Markdown
Contributor Author

@mjsax @aliehsaeedii Updated PR based on comments.
Pls take another look.

noHeadersColumnFamily.close();
try {
// Check if DEFAULT CF has data (upgrade from old format without headers)
try (final RocksIterator noHeadersIter = db.newIterator(noHeadersColumnFamily)) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moving newIterator() into try-resources

Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. A few more cosmetics. Overall LGTM.

setupStatistics(configs, dbOptions);
openRocksDB(dbOptions, columnFamilyOptions);
dbAccessor = new DirectDBAccessor(db, fOptions, wOptions);
boolean success = false;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems you missed this one for removal?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes. Done.

throw new ProcessorStateException("Error opening store " + name, e);
}
success = true;
} finally {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use catch() instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, removed.

withHeadersColumnFamily,
HeadersBytesStore::convertToHeaderFormat,
this,
open
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
open
open

noTimestampColumnFamily,
withTimestampColumnFamily,
TimestampedBytesStore::convertToTimestampedFormat,
this, open
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this, open
this,
open

headersCf,
HeadersBytesStore::convertFromPlainToHeaderFormat,
this,
open
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
open
open

log.info("Opening store {} in regular mode", name);
cfAccessor = new SingleColumnFamilyAccessor(offsetsCf, withHeadersColumnFamily);
noHeadersColumnFamily.close();
try {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to nest two try-catch block? With success flag begin remove, isn't one sufficient? (Also elsewhere.)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed not needed. Removed from others too.

@muralibasani muralibasani requested a review from mjsax March 21, 2026 17:32
@muralibasani
Copy link
Copy Markdown
Contributor Author

muralibasani commented Mar 21, 2026

Thanks. A few more cosmetics. Overall LGTM.

Thank you @mjsax . Pls take another look.

@mjsax mjsax merged commit 5e509dc into apache:trunk Mar 27, 2026
26 checks passed
mjsax pushed a commit that referenced this pull request Mar 27, 2026
- RocksDBStore.java — openRocksDB() base method: Added finally block
after RocksDB.open() to close all CF handles and db if
createColumnFamilies() or mergeColumnFamilyHandleLists() fails.

- RocksDBStore.java — openDB(): Added finally block with
closeNativeResources() to close all partially-initialized native
resources  if openRocksDB() or cfAccessor.open() fails.

- RocksDBTimestampedStore.java: Changed RocksIterator from manual
close() to try-with-resources, and added finally block to close all CF
handles if an exception occurs before the accessor takes ownership.

- RocksDBMigratingSessionStoreWithHeaders.java: Same as above —
try-with-resources for RocksIterator and finally block for CF handle
cleanup on failure.

- RocksDBTimestampedStoreWithHeaders.java — openFromDefaultStore():
Added finally block to close all CF handles if an exception occurs
before the accessor takes ownership.

- RocksDBTimestampedStoreWithHeaders.java — openFromTimestampedStore():
Replaced manual per-handle close() calls (which missed
columnFamilies.get(3)) with a single finally block that loops over all
handles on failure.

Reviewers: Matthias J. Sax <matthias@confluent.io>, Alieh Saeedi
 <asaeedi@confluent.io>
@mjsax
Copy link
Copy Markdown
Member

mjsax commented Mar 27, 2026

Thanks for the PR!

Merged to trunk and cherry-picked to 4.3 branch.

Shekharrajak pushed a commit to Shekharrajak/kafka that referenced this pull request Mar 31, 2026
- RocksDBStore.java — openRocksDB() base method: Added finally block
after RocksDB.open() to close all CF handles and db if
createColumnFamilies() or mergeColumnFamilyHandleLists() fails.

- RocksDBStore.java — openDB(): Added finally block with
closeNativeResources() to close all partially-initialized native
resources  if openRocksDB() or cfAccessor.open() fails.

- RocksDBTimestampedStore.java: Changed RocksIterator from manual
close() to try-with-resources, and added finally block to close all CF
handles if an exception occurs before the accessor takes ownership.

- RocksDBMigratingSessionStoreWithHeaders.java: Same as above —
try-with-resources for RocksIterator and finally block for CF handle
cleanup on failure.

- RocksDBTimestampedStoreWithHeaders.java — openFromDefaultStore():
Added finally block to close all CF handles if an exception occurs
before the accessor takes ownership.

- RocksDBTimestampedStoreWithHeaders.java — openFromTimestampedStore():
Replaced manual per-handle close() calls (which missed
columnFamilies.get(3)) with a single finally block that loops over all
handles on failure.

Reviewers: Matthias J. Sax <matthias@confluent.io>, Alieh Saeedi
 <asaeedi@confluent.io>
nileshkumar3 pushed a commit to nileshkumar3/kafka that referenced this pull request Apr 15, 2026
- RocksDBStore.java — openRocksDB() base method: Added finally block
after RocksDB.open() to close all CF handles and db if
createColumnFamilies() or mergeColumnFamilyHandleLists() fails.

- RocksDBStore.java — openDB(): Added finally block with
closeNativeResources() to close all partially-initialized native
resources  if openRocksDB() or cfAccessor.open() fails.

- RocksDBTimestampedStore.java: Changed RocksIterator from manual
close() to try-with-resources, and added finally block to close all CF
handles if an exception occurs before the accessor takes ownership.

- RocksDBMigratingSessionStoreWithHeaders.java: Same as above —
try-with-resources for RocksIterator and finally block for CF handle
cleanup on failure.

- RocksDBTimestampedStoreWithHeaders.java — openFromDefaultStore():
Added finally block to close all CF handles if an exception occurs
before the accessor takes ownership.

- RocksDBTimestampedStoreWithHeaders.java — openFromTimestampedStore():
Replaced manual per-handle close() calls (which missed
columnFamilies.get(3)) with a single finally block that loops over all
handles on failure.

Reviewers: Matthias J. Sax <matthias@confluent.io>, Alieh Saeedi
 <asaeedi@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-approved kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants