Skip to content

Fix IcebergIO conn pool crash by moving FileIO lifecycle to @Teardown#38149

Open
dejii wants to merge 3 commits intoapache:masterfrom
dejii:fix/iceberg-fileio-lifecycle
Open

Fix IcebergIO conn pool crash by moving FileIO lifecycle to @Teardown#38149
dejii wants to merge 3 commits intoapache:masterfrom
dejii:fix/iceberg-fileio-lifecycle

Conversation

@dejii
Copy link
Copy Markdown
Contributor

@dejii dejii commented Apr 13, 2026

Problem

IcebergIO write pipelines fail with "connection pool shut down" errors after processing more than one bundle on the same worker thread. The root cause:

  • The catalog is @MonotonicNonNull on the DoFn — created once, reused across all bundles on that DoFn instance
  • RecordWriterManager.close() (called in @FinishBundle) was calling table.io().close() to release FileIO resources
  • RESTSessionCatalog share a single FileIO across all tables. Closing it per-bundle permanently kills the connection pool on the catalog — all subsequent bundles fail

Fix

Remove FileIO close from RecordWriterManager — it must not own the lifecycle of a resource it doesn't own.

Add @Teardown to all IcebergIO write DoFns — the catalog (and its underlying FileIO) is closed exactly once when the DoFn instance is destroyed by the runner, not after each bundle:

  • WriteUngroupedRowsToFilesDoFn
  • WriteGroupedRowsToFilesDoFn
  • WriteDirectRowsToFilesDoFn
  • AppendFilesToTablesDoFn

Catalog compatibility

All major Iceberg catalogs verified against source:

Catalog FileIO in close()
RESTSessionCatalog ✅ via CloseableGroup + FileIOTracker - 1, 2
GlueCatalog ✅ via CloseableGroup + FileIOTracker - 1, 2
JdbcCatalog ✅ via CloseableGroup - 1, 2
NessieCatalog ✅ via CloseableGroup - 1, 2
HadoopCatalog N/A — defaults to HadoopFileIO which has no close() (HadoopFileIO.java)
HiveCatalog N/A — defaults to HadoopFileIO which has no close() (HadoopFileIO.java)

Tests

  • testRecordWriterManagerDoesNotCloseSharedFileIO — verifies FileIO survives RecordWriterManager.close()
  • testFileIOSurvivesAcrossBundles — two sequential bundles succeed with shared FileIO (core regression test)
  • testFullLifecycleBundlesThenCatalogClose — full DoFn lifecycle: multiple bundles + catalog.close() at teardown
  • testNonCloseableCatalogSkippedOnTeardown — verifies non-Closeable catalog (HadoopCatalog) is safely skipped

Also verified in production on Google Cloud Dataflow using a custom build of this fix. The pipeline writes to dynamic Iceberg destinations (multiple tables from a single pipeline). No connection pool errors observed with this fix.

follow up on: #37782


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a critical issue where IcebergIO write pipelines fail due to premature connection pool closure. By shifting the responsibility of closing the FileIO from the per-bundle RecordWriterManager to the DoFn's @teardown lifecycle, we ensure that shared resources remain active across multiple bundles, preventing 'connection pool shut down' errors while maintaining proper resource cleanup.

Highlights

  • Resource Lifecycle Management: Removed FileIO closure from RecordWriterManager.close() to prevent premature connection pool termination.
  • Teardown Implementation: Added @teardown methods to IcebergIO write DoFns to ensure the catalog is closed exactly once when the DoFn instance is destroyed.
  • Regression Testing: Added comprehensive tests to verify FileIO persistence across bundles and proper cleanup during DoFn teardown.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@dejii
Copy link
Copy Markdown
Contributor Author

dejii commented Apr 13, 2026

@ahmedabu98 following up on #37782 - that fix correctly moved FileIO close from RecordWriter to RecordWriterManager, but it turns out there's a deeper issue that only manifests under high write volume to dynamic destinations (many bundles per worker).

The root cause: the catalog is @MonotonicNonNull on the DoFn and reused across all bundles on the same instance. RecordWriterManager.close() is called per bundle (@FinishBundle), so closing FileIO there, even deduplicated, kills the catalog's shared connection pool for all subsequent bundles on that DoFn.

This PR removes FileIO close from RecordWriterManager entirely and adds @Teardown to all four IcebergIO write DoFns, so the catalog (and its FileIO) is closed exactly once when the DoFn instance is destroyed.

Would appreciate your review here.

@dejii
Copy link
Copy Markdown
Contributor Author

dejii commented Apr 13, 2026

The alternative from Beam's side would create brittle coupling to Iceberg internals. @Teardown is a clean boundary: Beam manages the catalog lifecycle, the catalog manages everything it owns.

@github-actions
Copy link
Copy Markdown
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@dejii
Copy link
Copy Markdown
Contributor Author

dejii commented Apr 13, 2026

The check failures are not related to the code changes. e.g:

:sdks:java:maven-archetypes:examples:generateSources: generate-sources.sh exited with code 127 (command not found)

@dejii
Copy link
Copy Markdown
Contributor Author

dejii commented Apr 13, 2026

assign set of reviewers

@github-actions
Copy link
Copy Markdown
Contributor

Assigning reviewers:

R: @chamikaramj for label java.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@ahmedabu98
Copy link
Copy Markdown
Contributor

Thanks @dejii, overall this looks good. I'm wondering though, if our assumption that closing the Catalog will indeed close all underlying FileIOs. Did you check if this is true for different catalog implementations?

Copy link
Copy Markdown
Contributor

@stankiewicz stankiewicz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit lost with overall lifecycle of catalog here.

@Teardown
public void teardown() throws IOException {
if (catalog instanceof Closeable) {
((Closeable) catalog).close();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this catalog is set via catalog = catalogConfig.catalog();
catalogConfig is single object that is passed to multiple transforms and is returning cachedCatalog if one exists.

what happens to catalog instance in other transform if cachedCatalog is closed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cachedCatalog is transient, so it's not serialized. Each DoFn gets its own IcebergCatalogConfig instance with cachedCatalog = null on deserialization. No sharing across transforms at execution time, so closing one DoFn's catalog cannot affect another.

private transient @MonotonicNonNull Catalog cachedCatalog;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, each doFn gets it's own IcebergCatalogConfig during deserialization but this is implementation detail, that works fine with Dataflow Runner v2. As you can see below, in doFn lifecycle, doFn can be passed as instance and during expansion, icebergCatalogConfig is single object.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see - I'm using Dataflow Runner v2.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think is the way forward here? The current state i.e., closing FileIO in @FinishBundle via RecordWriterManager.close() is worse. Should we move catalog initialization to @Setup and close it in @Teardown? That would make the lifecycle explicit and address the IcebergCatalogConfig sharing concern for the "passed as instance" case as well.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will check how v1 behaves but to make it working for runners that have doFn passed by instance, consider:

  • changing IcebergCatalogConfig purely config + getCatalog() without caching.
  • DoFns store IcebergCatalogConfig reference from ctor
  • obtain getCatalog() in setup()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on closing, I guess this PR has one of the valid approaches, with the assumption that catalog is unique to doFn instance.

One problem - RecordWriterManager still have static cache where getOrCreateTable may return closed IO.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good - I'll take a look at that, thanks!

@stankiewicz
Copy link
Copy Markdown
Contributor

hey,

I understand the need to close the catalog.
Doing it per bundle is an overkill, especially if for each bundle there are many tables to write to, catalog will waste plenty of time on loading table credentials.
Not doing it at all, especially with dynamic destinations and with vended credentials, fileIOTracker will grow without limits and we may constantly refresh credentials for tables that we don't need credentials anymore.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add .removalListener(notification -> n.getValue().io().close() ) if config has vended credentials to close per table io.

@dejii
Copy link
Copy Markdown
Contributor Author

dejii commented Apr 13, 2026

Hey @stankiewicz - thanks for the review.

Doing it per bundle is an overkill, especially if for each bundle there are many tables to write to, catalog will waste plenty of time on loading table credentials.

Just to clarify - closing FileIO per bundle is the current behavior, and it's not wasted time but thrown errors when using dynamic destinations: the catalog tries to reuse a dead connection pool on every subsequent bundle, and the pipeline ultimately fails.

@dejii
Copy link
Copy Markdown
Contributor Author

dejii commented Apr 13, 2026

Thanks @dejii, overall this looks good. I'm wondering though, if our assumption that closing the Catalog will indeed close all underlying FileIOs. Did you check if this is true for different catalog implementations?

@ahmedabu98 Yes, confirmed - verified against source and included links in the PR description. REST, Glue, JDBC, and Nessie catalogs all properly close their FileIO via CloseableGroup in close(). Hadoop and Hive catalogs do not close their FileIO, but this is likely intentional - they default to HadoopFileIO which has no close() implementation since Hadoop's FileSystem manages its own lifecycle.

@dejii
Copy link
Copy Markdown
Contributor Author

dejii commented Apr 14, 2026

@stankiewicz Thanks for the review - addressed all the feedback from our discussion. Here's what changed:

  • Catalog lifecycle: Each DoFn now creates its own catalog via catalogConfig.newCatalog() in @Setup and closes it in @Teardown. This removes the reliance on the shared cachedCatalog in IcebergCatalogConfig, so the fix works correctly regardless of whether the DoFn is passed as instance or deserialized. The existing catalog() method is preserved for driver-side operations (pipeline construction, schema inference, etc.).

  • Static table cache: LAST_REFRESHED_TABLE_CACHE is no longer static - each DoFn owns its own cache instance and passes it to RecordWriterManager via the constructor. This prevents a closed catalog's dead Table objects from poisoning other DoFn instances.

@dejii dejii requested a review from stankiewicz April 14, 2026 00:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants