Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -2039,3 +2039,87 @@ DataFrame()
| 3 | 6 |
+---+---+
```

## Type mapping

### PyArrow

The Iceberg specification only specifies type mapping for Avro, Parquet, and ORC:

- [Iceberg to Avro](https://iceberg.apache.org/spec/#avro)

- [Iceberg to Parquet](https://iceberg.apache.org/spec/#parquet)

- [Iceberg to ORC](https://iceberg.apache.org/spec/#orc)

The following tables describe the type mappings between PyIceberg and PyArrow. In the tables below, `pa` refers to the `pyarrow` module:

```python
import pyarrow as pa
```

#### PyIceberg to PyArrow type mapping
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code reference:

class _ConvertToArrowSchema(SchemaVisitorPerPrimitiveType[pa.DataType]):
_metadata: dict[bytes, bytes]
def __init__(
self, metadata: dict[bytes, bytes] = EMPTY_DICT, include_field_ids: bool = True, file_format: FileFormat | None = None
) -> None:
self._metadata = metadata
self._include_field_ids = include_field_ids
self._file_format = file_format
def schema(self, _: Schema, struct_result: pa.StructType) -> pa.schema:
return pa.schema(list(struct_result), metadata=self._metadata)
def struct(self, _: StructType, field_results: builtins.list[pa.DataType]) -> pa.DataType:
return pa.struct(field_results)
def field(self, field: NestedField, field_result: pa.DataType) -> pa.Field:
metadata = {}
if field.doc:
metadata[PYARROW_FIELD_DOC_KEY] = field.doc
if self._include_field_ids:
# Add field ID based on file format
if self._file_format == FileFormat.ORC:
metadata[ORC_FIELD_ID_KEY] = str(field.field_id)
else:
# Default to Parquet for backward compatibility
metadata[PYARROW_PARQUET_FIELD_ID_KEY] = str(field.field_id)
if self._file_format == FileFormat.ORC:
metadata[ORC_FIELD_REQUIRED_KEY] = str(field.required).lower()
return pa.field(
name=field.name,
type=field_result,
nullable=field.optional,
metadata=metadata,
)
def list(self, list_type: ListType, element_result: pa.DataType) -> pa.DataType:
element_field = self.field(list_type.element_field, element_result)
return pa.large_list(value_type=element_field)
def map(self, map_type: MapType, key_result: pa.DataType, value_result: pa.DataType) -> pa.DataType:
key_field = self.field(map_type.key_field, key_result)
value_field = self.field(map_type.value_field, value_result)
return pa.map_(key_type=key_field, item_type=value_field)
def visit_fixed(self, fixed_type: FixedType) -> pa.DataType:
return pa.binary(len(fixed_type))
def visit_decimal(self, decimal_type: DecimalType) -> pa.DataType:
# It looks like decimal{32,64} is not fully implemented:
# https://github.com/apache/arrow/issues/25483
# https://github.com/apache/arrow/issues/43956
# However, if we keep it as 128 in memory, and based on the
# precision/scale Arrow will map it to INT{32,64}
# https://github.com/apache/arrow/blob/598938711a8376cbfdceaf5c77ab0fd5057e6c02/cpp/src/parquet/arrow/schema.cc#L380-L392
return pa.decimal128(decimal_type.precision, decimal_type.scale)
def visit_boolean(self, _: BooleanType) -> pa.DataType:
return pa.bool_()
def visit_integer(self, _: IntegerType) -> pa.DataType:
return pa.int32()
def visit_long(self, _: LongType) -> pa.DataType:
return pa.int64()
def visit_float(self, _: FloatType) -> pa.DataType:
# 32-bit IEEE 754 floating point
return pa.float32()
def visit_double(self, _: DoubleType) -> pa.DataType:
# 64-bit IEEE 754 floating point
return pa.float64()
def visit_date(self, _: DateType) -> pa.DataType:
# Date encoded as an int
return pa.date32()
def visit_time(self, _: TimeType) -> pa.DataType:
return pa.time64("us")
def visit_timestamp(self, _: TimestampType) -> pa.DataType:
return pa.timestamp(unit="us")
def visit_timestamp_ns(self, _: TimestampNanoType) -> pa.DataType:
return pa.timestamp(unit="ns")
def visit_timestamptz(self, _: TimestamptzType) -> pa.DataType:
return pa.timestamp(unit="us", tz="UTC")
def visit_timestamptz_ns(self, _: TimestamptzNanoType) -> pa.DataType:
return pa.timestamp(unit="ns", tz="UTC")
def visit_string(self, _: StringType) -> pa.DataType:
return pa.large_string()
def visit_uuid(self, _: UUIDType) -> pa.DataType:
return pa.uuid()
def visit_unknown(self, _: UnknownType) -> pa.DataType:
"""Type `UnknownType` can be promoted to any primitive type in V3+ tables per the Iceberg spec."""
return pa.null()
def visit_binary(self, _: BinaryType) -> pa.DataType:
return pa.large_binary()
def visit_geometry(self, geometry_type: GeometryType) -> pa.DataType:
"""Convert geometry type to PyArrow type.
When geoarrow-pyarrow is available, returns a GeoArrow WKB extension type
with CRS metadata. Otherwise, falls back to large_binary which stores WKB bytes.
"""
try:
import geoarrow.pyarrow as ga
return ga.wkb().with_crs(geometry_type.crs)
except ImportError:
return pa.large_binary()
def visit_geography(self, geography_type: GeographyType) -> pa.DataType:
"""Convert geography type to PyArrow type.
When geoarrow-pyarrow is available, returns a GeoArrow WKB extension type
with CRS and edge type metadata. Otherwise, falls back to large_binary which stores WKB bytes.
"""
try:
import geoarrow.pyarrow as ga
wkb_type = ga.wkb().with_crs(geography_type.crs)
# Map Iceberg algorithm to GeoArrow edge type
if geography_type.algorithm == "spherical":
wkb_type = wkb_type.with_edge_type(ga.EdgeType.SPHERICAL)
# "planar" is the default edge type in GeoArrow, no need to set explicitly
return wkb_type
except ImportError:
return pa.large_binary()


| PyIceberg type class | PyArrow type |
|---------------------------------|-------------------------------------|
| `BooleanType` | `pa.bool_()` |
| `IntegerType` | `pa.int32()` |
| `LongType` | `pa.int64()` |
| `FloatType` | `pa.float32()` |
| `DoubleType` | `pa.float64()` |
| `DecimalType(p, s)` | `pa.decimal128(p, s)` |
| `DateType` | `pa.date32()` |
| `TimeType` | `pa.time64("us")` |
| `TimestampType` | `pa.timestamp("us")` |
| `TimestampNanoType` (format version 3 only) | `pa.timestamp("ns")` [[2]](#notes) |
| `TimestamptzType` | `pa.timestamp("us", tz="UTC")` [[1]](#notes) |
| `TimestamptzNanoType` (format version 3 only) | `pa.timestamp("ns", tz="UTC")` [[1]](#notes) [[2]](#notes) |
| `StringType` | `pa.large_string()` |
| `UUIDType` | `pa.uuid()` |
| `BinaryType` | `pa.large_binary()` |
| `FixedType(L)` | `pa.binary(L)` |
| `StructType` | `pa.struct()` |
| `ListType(e)` | `pa.large_list(e)` |
| `MapType(k, v)` | `pa.map_(k, v)` |
| `UnknownType` (format version 3 only) | `pa.null()` [[2]](#notes) |

---

#### PyArrow to PyIceberg type mapping
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code reference:

class _ConvertToIceberg(PyArrowSchemaVisitor[IcebergType | Schema]):
"""Converts PyArrowSchema to Iceberg Schema. Applies the IDs from name_mapping if provided."""
_field_names: builtins.list[str]
def __init__(
self, downcast_ns_timestamp_to_us: bool = False, format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION
) -> None: # noqa: F821
self._field_names = []
self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
self._format_version = format_version
def _field_id(self, field: pa.Field) -> int:
if (field_id := _get_field_id(field)) is not None:
return field_id
else:
raise ValueError(f"Cannot convert {field} to Iceberg Field as field_id is empty.")
def schema(self, schema: pa.Schema, struct_result: StructType) -> Schema:
return Schema(*struct_result.fields)
def struct(self, struct: pa.StructType, field_results: builtins.list[NestedField]) -> StructType:
return StructType(*field_results)
def field(self, field: pa.Field, field_result: IcebergType) -> NestedField:
field_id = self._field_id(field)
field_doc = doc_str.decode() if (field.metadata and (doc_str := field.metadata.get(PYARROW_FIELD_DOC_KEY))) else None
field_type = field_result
return NestedField(field_id, field.name, field_type, required=not field.nullable, doc=field_doc)
def list(self, list_type: pa.ListType, element_result: IcebergType) -> ListType:
element_field = list_type.value_field
self._field_names.append(LIST_ELEMENT_NAME)
element_id = self._field_id(element_field)
self._field_names.pop()
return ListType(element_id, element_result, element_required=not element_field.nullable)
def map(self, map_type: pa.MapType, key_result: IcebergType, value_result: IcebergType) -> MapType:
key_field = map_type.key_field
self._field_names.append(MAP_KEY_NAME)
key_id = self._field_id(key_field)
self._field_names.pop()
value_field = map_type.item_field
self._field_names.append(MAP_VALUE_NAME)
value_id = self._field_id(value_field)
self._field_names.pop()
return MapType(key_id, key_result, value_id, value_result, value_required=not value_field.nullable)
def primitive(self, primitive: pa.DataType) -> PrimitiveType:
if pa.types.is_boolean(primitive):
return BooleanType()
elif pa.types.is_integer(primitive):
width = primitive.bit_width
if width <= 32:
return IntegerType()
elif width <= 64:
return LongType()
else:
# Does not exist (yet)
raise TypeError(f"Unsupported integer type: {primitive}")
elif pa.types.is_float32(primitive):
return FloatType()
elif pa.types.is_float64(primitive):
return DoubleType()
elif isinstance(primitive, pa.Decimal128Type):
primitive = cast(pa.Decimal128Type, primitive)
return DecimalType(primitive.precision, primitive.scale)
elif pa.types.is_string(primitive) or pa.types.is_large_string(primitive) or pa.types.is_string_view(primitive):
return StringType()
elif pa.types.is_date32(primitive):
return DateType()
elif isinstance(primitive, pa.Time64Type) and primitive.unit == "us":
return TimeType()
elif pa.types.is_timestamp(primitive):
primitive = cast(pa.TimestampType, primitive)
if primitive.unit in ("s", "ms", "us"):
# Supported types, will be upcast automatically to 'us'
pass
elif primitive.unit == "ns":
if self._downcast_ns_timestamp_to_us:
logger.warning("Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.")
elif self._format_version >= 3:
if primitive.tz in UTC_ALIASES:
return TimestamptzNanoType()
elif primitive.tz is None:
return TimestampNanoType()
else:
raise TypeError(
"Iceberg does not yet support 'ns' timestamp precision. "
"Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically "
"downcast 'ns' to 'us' on write.",
)
else:
raise TypeError(f"Unsupported precision for timestamp type: {primitive.unit}")
if primitive.tz in UTC_ALIASES:
return TimestamptzType()
elif primitive.tz is None:
return TimestampType()
elif pa.types.is_binary(primitive) or pa.types.is_large_binary(primitive) or pa.types.is_binary_view(primitive):
return BinaryType()
elif pa.types.is_fixed_size_binary(primitive):
primitive = cast(pa.FixedSizeBinaryType, primitive)
return FixedType(primitive.byte_width)
elif pa.types.is_null(primitive):
# PyArrow null type (pa.null()) is converted to Iceberg UnknownType
# UnknownType can be promoted to any primitive type in V3+ tables per the Iceberg spec
if self._format_version < 3:
field_path = ".".join(self._field_names) if self._field_names else "<root>"
raise ValueError(
"Null type (pa.null()) is not supported in Iceberg format version "
f"{self._format_version}. Field: {field_path}. "
"Requires format-version=3+ or use a concrete type (string, int, boolean, etc.)."
)
return UnknownType()
elif isinstance(primitive, pa.UuidType):
return UUIDType()
raise TypeError(f"Unsupported type: {primitive}")
def before_field(self, field: pa.Field) -> None:
self._field_names.append(field.name)
def after_field(self, field: pa.Field) -> None:
self._field_names.pop()
def before_list_element(self, element: pa.Field) -> None:
self._field_names.append(LIST_ELEMENT_NAME)
def after_list_element(self, element: pa.Field) -> None:
self._field_names.pop()
def before_map_key(self, key: pa.Field) -> None:
self._field_names.append(MAP_KEY_NAME)
def after_map_key(self, element: pa.Field) -> None:
self._field_names.pop()
def before_map_value(self, value: pa.Field) -> None:
self._field_names.append(MAP_VALUE_NAME)
def after_map_value(self, element: pa.Field) -> None:
self._field_names.pop()


| PyArrow type | PyIceberg type class |
|------------------------------------|-----------------------------|
| `pa.bool_()` | `BooleanType` |
| `pa.int8()` / `pa.int16()` / `pa.int32()` | `IntegerType` |
| `pa.int64()` | `LongType` |
| `pa.float32()` | `FloatType` |
| `pa.float64()` | `DoubleType` |
| `pa.decimal128(p, s)` | `DecimalType(p, s)` |
| `pa.decimal256(p, s)` | Unsupported |
| `pa.date32()` | `DateType` |
| `pa.date64()` | Unsupported |
| `pa.time64("us")` | `TimeType` |
| `pa.timestamp("s")` / `pa.timestamp("ms")` / `pa.timestamp("us")` | `TimestampType` |
| `pa.timestamp("ns")` | `TimestampNanoType` (format version 3 only) [[2]](#notes) |
| `pa.timestamp("s", tz="UTC")` / `pa.timestamp("ms", tz="UTC")` / `pa.timestamp("us", tz="UTC")` | `TimestamptzType` [[1]](#notes) |
| `pa.timestamp("ns", tz="UTC")` | `TimestamptzNanoType` (format version 3 only) [[1]](#notes) [[2]](#notes) |
| `pa.string()` / `pa.large_string()` / `pa.string_view()` | `StringType` |
| `pa.uuid()` | `UUIDType` |
| `pa.binary()` / `pa.large_binary()` / `pa.binary_view()` | `BinaryType` |
| `pa.binary(L)` | `FixedType(L)` |
| `pa.struct([...])` | `StructType` |
| `pa.list_(e)` / `pa.large_list(e)` / `pa.list_(e, fixed_size)` | `ListType(e)` |
| `pa.map_(k, v)` | `MapType(k, v)` |
| `pa.null()` | `UnknownType` (format version 3 only) [[2]](#notes) |

---

#### Notes

[1] Only the `UTC` timezone and its aliases are supported for PyArrow-to-PyIceberg timestamp-with-timezone conversion.

[2] The PyArrow-to-PyIceberg mappings for `pa.timestamp("ns")`, `pa.timestamp("ns", tz="UTC")`, and `pa.null()` require Iceberg format version 3. By default, `pyarrow_to_schema()` uses format version 2. `TimestampNanoType`, `TimestamptzNanoType`, and `UnknownType` are likewise format-version-3-only Iceberg types.

[3] For nanosecond Iceberg timestamp types (`TimestampNanoType` and `TimestamptzNanoType`), writing in format version 3 is not yet implemented (see [GitHub issue #1551](https://github.com/apache/iceberg-python/issues/1551)).

[4] The mappings are not fully symmetric. On read, PyArrow normalizes some families of types into a single Iceberg type, and on write PyIceberg emits a canonical PyArrow type: for example, `pa.int8()` and `pa.int16()` read as `IntegerType` and write back as `pa.int32()`, `pa.string()` reads as `StringType` and writes back as `pa.large_string()`, `pa.binary()` reads as `BinaryType` and writes back as `pa.large_binary()`, `pa.list_(...)` writes back as `pa.large_list(...)`, and `pa.timestamp("s")` / `pa.timestamp("ms")` read as `TimestampType` and write back as `pa.timestamp("us")`.
Loading