-
Notifications
You must be signed in to change notification settings - Fork 489
fix(glue): Support create_table for S3 Tables federated databases #3058
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
9708909
4f9299e
de9c1b2
355f242
420da60
d3644b5
510d49d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,6 +16,7 @@ | |
| # under the License. | ||
|
|
||
|
|
||
| import logging | ||
| from typing import ( | ||
| TYPE_CHECKING, | ||
| Any, | ||
|
|
@@ -120,13 +121,16 @@ | |
| ICEBERG_FIELD_OPTIONAL = "iceberg.field.optional" | ||
| ICEBERG_FIELD_CURRENT = "iceberg.field.current" | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| GLUE_PROFILE_NAME = "glue.profile-name" | ||
| GLUE_REGION = "glue.region" | ||
| GLUE_ACCESS_KEY_ID = "glue.access-key-id" | ||
| GLUE_SECRET_ACCESS_KEY = "glue.secret-access-key" | ||
| GLUE_SESSION_TOKEN = "glue.session-token" | ||
| GLUE_MAX_RETRIES = "glue.max-retries" | ||
| GLUE_RETRY_MODE = "glue.retry-mode" | ||
| GLUE_CONNECTION_S3_TABLES = "aws:s3tables" | ||
|
|
||
| MAX_RETRIES = 10 | ||
| STANDARD_RETRY_MODE = "standard" | ||
|
|
@@ -417,6 +421,116 @@ def _get_glue_table(self, database_name: str, table_name: str) -> "TableTypeDef" | |
| except self.glue.exceptions.EntityNotFoundException as e: | ||
| raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e | ||
|
|
||
| def _is_s3tables_database(self, database_name: str) -> bool: | ||
| """Check if a Glue database is federated with S3 Tables. | ||
|
|
||
| S3 Tables databases have a FederatedDatabase property with | ||
| ConnectionType set to aws:s3tables. | ||
|
|
||
| Args: | ||
| database_name: The name of the Glue database. | ||
|
|
||
| Returns: | ||
| True if the database is an S3 Tables federated database. | ||
| """ | ||
| try: | ||
| database_response = self.glue.get_database(Name=database_name) | ||
| except self.glue.exceptions.EntityNotFoundException: | ||
| return False | ||
| database = database_response["Database"] | ||
| federated = database.get("FederatedDatabase", {}) | ||
| return (federated.get("ConnectionType") or "").lower() == GLUE_CONNECTION_S3_TABLES | ||
|
|
||
| def _create_table_s3tables( | ||
| self, | ||
| identifier: str | Identifier, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: we have some redundancy with these params passed in:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yep, done |
||
| schema: Union[Schema, "pa.Schema"], | ||
| location: str | None, | ||
| partition_spec: PartitionSpec, | ||
| sort_order: SortOrder, | ||
| properties: Properties, | ||
| ) -> Table: | ||
| """Create an Iceberg table in an S3 Tables federated database. | ||
|
|
||
| S3 Tables manages storage internally, so the table location is not known until the | ||
| table is created in the service. This method: | ||
| 1. Creates a minimal table entry in Glue (format=ICEBERG), which causes S3 Tables | ||
| to allocate storage. | ||
| 2. Retrieves the managed storage location via GetTable. | ||
| 3. Writes Iceberg metadata to that location. | ||
| 4. Updates the Glue table entry with the metadata pointer. | ||
|
|
||
| On failure, the table created in step 1 is deleted. | ||
| """ | ||
| database_name, table_name = self.identifier_to_database_and_table(identifier) | ||
|
|
||
| if location is not None: | ||
| raise ValueError( | ||
| f"Cannot specify a location for S3 Tables table {database_name}.{table_name}. " | ||
| "S3 Tables manages the storage location automatically." | ||
| ) | ||
|
|
||
| # Create a minimal table in Glue so S3 Tables allocates storage. | ||
| self._create_glue_table( | ||
| database_name=database_name, | ||
| table_name=table_name, | ||
| table_input={ | ||
| "Name": table_name, | ||
| "Parameters": {"format": "ICEBERG"}, | ||
| }, | ||
| ) | ||
|
|
||
| try: | ||
| # Retrieve the managed storage location. | ||
| glue_table = self._get_glue_table(database_name=database_name, table_name=table_name) | ||
| storage_descriptor = glue_table.get("StorageDescriptor", {}) | ||
| managed_location = storage_descriptor.get("Location") | ||
| if not managed_location: | ||
| raise ValueError(f"S3 Tables did not assign a storage location for {database_name}.{table_name}") | ||
|
|
||
| # Build the Iceberg metadata targeting the managed location. | ||
| staged_table = self._create_staged_table( | ||
| identifier=identifier, | ||
| schema=schema, | ||
| location=managed_location, | ||
| partition_spec=partition_spec, | ||
| sort_order=sort_order, | ||
| properties=properties, | ||
| ) | ||
|
|
||
| # Write metadata and update the Glue table with the metadata pointer. | ||
| self._write_metadata(staged_table.metadata, staged_table.io, staged_table.metadata_location) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. S3 Tables managed storage doesn't support ListObjectsV2. This path will add an exist check before writing we should be able to skip that check.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oops, fixed (I had patched this locally). I think a better fix is probably a separate PR that just removes this existence check in the base catalog -- Java doesn't do it and the random filename should make it unnecessary. But that's a more global change. |
||
| table_input = _construct_table_input(table_name, staged_table.metadata_location, properties, staged_table.metadata) | ||
| version_id = glue_table.get("VersionId") | ||
| if not version_id: | ||
| raise CommitFailedException( | ||
| f"Cannot commit {database_name}.{table_name} because Glue table version id is missing" | ||
| ) | ||
| self._update_glue_table( | ||
| database_name=database_name, | ||
| table_name=table_name, | ||
| table_input=table_input, | ||
| version_id=version_id, | ||
| ) | ||
| except Exception: | ||
| # Clean up the table created in step 1. | ||
| try: | ||
| self.glue.delete_table(DatabaseName=database_name, Name=table_name) | ||
|
jamesbornholt marked this conversation as resolved.
|
||
| except Exception: | ||
| logger.warning( | ||
| f"Failed to clean up S3 Tables table {database_name}.{table_name}", | ||
| exc_info=logger.isEnabledFor(logging.DEBUG), | ||
| ) | ||
| raise | ||
|
|
||
| return Table( | ||
| identifier=self.identifier_to_tuple(identifier), | ||
| metadata=staged_table.metadata, | ||
| metadata_location=staged_table.metadata_location, | ||
| io=self._load_file_io(staged_table.metadata.properties, staged_table.metadata_location), | ||
| catalog=self, | ||
| ) | ||
|
|
||
| def create_table( | ||
| self, | ||
| identifier: str | Identifier, | ||
|
|
@@ -433,6 +547,7 @@ def create_table( | |
| identifier: Table identifier. | ||
| schema: Table's schema. | ||
| location: Location for the table. Optional Argument. | ||
| Must not be set for S3 Tables, which manage their own storage. | ||
| partition_spec: PartitionSpec for the table. | ||
| sort_order: SortOrder for the table. | ||
| properties: Table properties that can be a string based dictionary. | ||
|
|
@@ -442,9 +557,22 @@ def create_table( | |
|
|
||
| Raises: | ||
| AlreadyExistsError: If a table with the name already exists. | ||
| ValueError: If the identifier is invalid, or no path is given to store metadata. | ||
| ValueError: If the identifier is invalid, no path is given to store metadata, | ||
| or a location is specified for an S3 Tables table. | ||
|
|
||
| """ | ||
| database_name, table_name = self.identifier_to_database_and_table(identifier) | ||
|
|
||
| if self._is_s3tables_database(database_name): | ||
| return self._create_table_s3tables( | ||
| identifier=identifier, | ||
| schema=schema, | ||
| location=location, | ||
| partition_spec=partition_spec, | ||
| sort_order=sort_order, | ||
| properties=properties, | ||
| ) | ||
|
|
||
| staged_table = self._create_staged_table( | ||
| identifier=identifier, | ||
| schema=schema, | ||
|
|
@@ -453,13 +581,18 @@ def create_table( | |
| sort_order=sort_order, | ||
| properties=properties, | ||
| ) | ||
| database_name, table_name = self.identifier_to_database_and_table(identifier) | ||
|
|
||
| self._write_metadata(staged_table.metadata, staged_table.io, staged_table.metadata_location) | ||
| table_input = _construct_table_input(table_name, staged_table.metadata_location, properties, staged_table.metadata) | ||
| self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input) | ||
|
|
||
| return self.load_table(identifier=identifier) | ||
| return Table( | ||
| identifier=self.identifier_to_tuple(identifier), | ||
| metadata=staged_table.metadata, | ||
| metadata_location=staged_table.metadata_location, | ||
| io=self._load_file_io(staged_table.metadata.properties, staged_table.metadata_location), | ||
| catalog=self, | ||
| ) | ||
|
|
||
| def register_table(self, identifier: str | Identifier, metadata_location: str) -> Table: | ||
| """Register a new table using existing metadata. | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.