Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/3826.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added a `subchunk_write_order` option to `ShardingCodec` to allow for `morton`, `unordered`, `lexicographic`, and `colexicographic` subchunk orderings.
7 changes: 7 additions & 0 deletions docs/user-guide/performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ bytes within chunks of an array may improve the compression ratio, depending on
the structure of the data, the compression algorithm used, and which compression
filters (e.g., byte-shuffle) have been applied.

### Subchunk memory layout

The order of chunks **within each shard** can be changed via the `subchunk_write_order` parameter of the `ShardingCodec`. That parameter is a string which must be one of `["morton", "lexicographic", "colexicographic", "unordered"]`.

By default [`morton`](https://en.wikipedia.org/wiki/Z-order_curve) order provides good spatial locality however [`lexicographic` (i.e., row-major)](https://en.wikipedia.org/wiki/Row-_and_column-major_order), for example, may be better suited to "batched" workflows where some form of sequential reading through a fixed number of outer dimensions is desired. The options are `lexicographic`, `morton`, `unordered` (i.e., random), and `colexicographic`.


### Empty chunks

It is possible to configure how Zarr handles the storage of chunks that are "empty"
Expand Down
3 changes: 2 additions & 1 deletion src/zarr/codecs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
Zlib,
Zstd,
)
from zarr.codecs.sharding import ShardingCodec, ShardingCodecIndexLocation
from zarr.codecs.sharding import ShardingCodec, ShardingCodecIndexLocation, SubchunkWriteOrder
from zarr.codecs.transpose import TransposeCodec
from zarr.codecs.vlen_utf8 import VLenBytesCodec, VLenUTF8Codec
from zarr.codecs.zstd import ZstdCodec
Expand All @@ -43,6 +43,7 @@
"GzipCodec",
"ShardingCodec",
"ShardingCodecIndexLocation",
"SubchunkWriteOrder",
"TransposeCodec",
"VLenBytesCodec",
"VLenUTF8Codec",
Expand Down
63 changes: 48 additions & 15 deletions src/zarr/codecs/sharding.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from enum import Enum
from functools import lru_cache
from operator import itemgetter
from typing import TYPE_CHECKING, Any, NamedTuple, cast
from typing import TYPE_CHECKING, Any, Literal, NamedTuple, cast

import numpy as np
import numpy.typing as npt
Expand Down Expand Up @@ -47,8 +47,6 @@
BasicIndexer,
ChunkProjection,
SelectorTuple,
_morton_order,
_morton_order_keys,
c_order_iter,
get_indexer,
morton_order_iter,
Expand All @@ -59,7 +57,7 @@

if TYPE_CHECKING:
from collections.abc import Iterator
from typing import Self
from typing import Final, Self

from zarr.core.common import JSON
from zarr.core.dtype.wrapper import TBaseDType, TBaseScalar, ZDType
Expand All @@ -78,6 +76,15 @@ class ShardingCodecIndexLocation(Enum):
end = "end"


SubchunkWriteOrder = Literal["morton", "unordered", "lexicographic", "colexicographic"]
SUBCHUNK_WRITE_ORDER: Final[tuple[str, str, str, str]] = (
"morton",
"unordered",
"lexicographic",
"colexicographic",
)


def parse_index_location(data: object) -> ShardingCodecIndexLocation:
return parse_enum(data, ShardingCodecIndexLocation)

Expand Down Expand Up @@ -283,15 +290,13 @@ def to_dict_vectorized(
dict mapping chunk coordinate tuples to Buffer or None
"""
starts, ends, valid = self.index.get_chunk_slices_vectorized(chunk_coords_array)
chunks_per_shard = tuple(self.index.offsets_and_lengths.shape[:-1])
chunk_coords_keys = _morton_order_keys(chunks_per_shard)

result: dict[tuple[int, ...], Buffer | None] = {}
for i, coords in enumerate(chunk_coords_keys):
for i, coords in enumerate(chunk_coords_array):
if valid[i]:
result[coords] = self.buf[int(starts[i]) : int(ends[i])]
result[tuple(coords.ravel())] = self.buf[int(starts[i]) : int(ends[i])]
else:
result[coords] = None
result[tuple(coords.ravel())] = None

return result

Expand All @@ -305,7 +310,9 @@ class ShardingCodec(
chunk_shape: tuple[int, ...]
codecs: tuple[Codec, ...]
index_codecs: tuple[Codec, ...]
rng: np.random.Generator | None
index_location: ShardingCodecIndexLocation = ShardingCodecIndexLocation.end
subchunk_write_order: SubchunkWriteOrder = "morton"

def __init__(
self,
Expand All @@ -314,16 +321,24 @@ def __init__(
codecs: Iterable[Codec | dict[str, JSON]] = (BytesCodec(),),
index_codecs: Iterable[Codec | dict[str, JSON]] = (BytesCodec(), Crc32cCodec()),
index_location: ShardingCodecIndexLocation | str = ShardingCodecIndexLocation.end,
subchunk_write_order: SubchunkWriteOrder = "morton",
rng: np.random.Generator | None = None,
) -> None:
chunk_shape_parsed = parse_shapelike(chunk_shape)
codecs_parsed = parse_codecs(codecs)
index_codecs_parsed = parse_codecs(index_codecs)
index_location_parsed = parse_index_location(index_location)
if subchunk_write_order not in SUBCHUNK_WRITE_ORDER:
raise ValueError(
f"Unrecognized subchunk write order: {subchunk_write_order}. Only {SUBCHUNK_WRITE_ORDER} are allowed."
)

object.__setattr__(self, "chunk_shape", chunk_shape_parsed)
object.__setattr__(self, "codecs", codecs_parsed)
object.__setattr__(self, "index_codecs", index_codecs_parsed)
object.__setattr__(self, "index_location", index_location_parsed)
object.__setattr__(self, "subchunk_write_order", subchunk_write_order)
object.__setattr__(self, "rng", rng)

# Use instance-local lru_cache to avoid memory leaks

Expand All @@ -336,14 +351,15 @@ def __init__(

# todo: typedict return type
def __getstate__(self) -> dict[str, Any]:
return self.to_dict()
return {"rng": self.rng, **self.to_dict()}

def __setstate__(self, state: dict[str, Any]) -> None:
config = state["configuration"]
object.__setattr__(self, "chunk_shape", parse_shapelike(config["chunk_shape"]))
object.__setattr__(self, "codecs", parse_codecs(config["codecs"]))
object.__setattr__(self, "index_codecs", parse_codecs(config["index_codecs"]))
object.__setattr__(self, "index_location", parse_index_location(config["index_location"]))
object.__setattr__(self, "rng", state["rng"])

# Use instance-local lru_cache to avoid memory leaks
# object.__setattr__(self, "_get_chunk_spec", lru_cache()(self._get_chunk_spec))
Expand Down Expand Up @@ -523,6 +539,24 @@ async def _decode_partial_single(
else:
return out

def _subchunk_order_iter(
self, chunks_per_shard: tuple[int, ...], subchunk_write_order: SubchunkWriteOrder
) -> Iterable[tuple[int, ...]]:
match subchunk_write_order:
case "morton":
subchunk_iter = morton_order_iter(chunks_per_shard)
case "lexicographic":
subchunk_iter = np.ndindex(chunks_per_shard)
case "colexicographic":
subchunk_iter = (c[::-1] for c in np.ndindex(chunks_per_shard[::-1]))
case "unordered":
subchunk_list = list(np.ndindex(chunks_per_shard))
(self.rng if self.rng is not None else np.random.default_rng()).shuffle(
subchunk_list
)
subchunk_iter = iter(subchunk_list)
return subchunk_iter

async def _encode_single(
self,
shard_array: NDBuffer,
Expand All @@ -540,8 +574,7 @@ async def _encode_single(
chunk_grid=RegularChunkGrid(chunk_shape=chunk_shape),
)
)

shard_builder = dict.fromkeys(morton_order_iter(chunks_per_shard))
shard_builder = dict.fromkeys(self._subchunk_order_iter(chunks_per_shard, "lexicographic"))

await self.codec_pipeline.write(
[
Expand Down Expand Up @@ -582,7 +615,7 @@ async def _encode_partial_single(
)

if self._is_complete_shard_write(indexer, chunks_per_shard):
shard_dict = dict.fromkeys(morton_order_iter(chunks_per_shard))
shard_dict = dict.fromkeys(self._subchunk_order_iter(chunks_per_shard, "lexicographic"))
else:
shard_reader = await self._load_full_shard_maybe(
byte_getter=byte_setter,
Expand All @@ -592,7 +625,7 @@ async def _encode_partial_single(
shard_reader = shard_reader or _ShardReader.create_empty(chunks_per_shard)
# Use vectorized lookup for better performance
shard_dict = shard_reader.to_dict_vectorized(
np.asarray(_morton_order(chunks_per_shard))
np.array(list(self._subchunk_order_iter(chunks_per_shard, "lexicographic")))
)

await self.codec_pipeline.write(
Expand Down Expand Up @@ -631,7 +664,7 @@ async def _encode_shard_dict(

template = buffer_prototype.buffer.create_zero_length()
chunk_start = 0
for chunk_coords in morton_order_iter(chunks_per_shard):
for chunk_coords in self._subchunk_order_iter(chunks_per_shard, self.subchunk_write_order):
value = map.get(chunk_coords)
if value is None:
continue
Expand Down
24 changes: 21 additions & 3 deletions src/zarr/testing/strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
import zarr
from zarr.abc.store import RangeByteRequest, Store
from zarr.codecs.bytes import BytesCodec
from zarr.core.array import Array
from zarr.codecs.crc32c_ import Crc32cCodec
from zarr.codecs.sharding import SUBCHUNK_WRITE_ORDER, ShardingCodec, SubchunkWriteOrder
from zarr.core.array import Array, CompressorsLike, SerializerLike
from zarr.core.chunk_grids import RegularChunkGrid
from zarr.core.chunk_key_encodings import DefaultChunkKeyEncoding
from zarr.core.common import JSON, ZarrFormat
Expand Down Expand Up @@ -128,6 +130,9 @@ def dimension_names(draw: st.DrawFn, *, ndim: int | None = None) -> list[None |
return draw(st.none() | st.lists(st.none() | simple_text, min_size=ndim, max_size=ndim)) # type: ignore[arg-type]


subchunk_write_orders: st.SearchStrategy[SubchunkWriteOrder] = st.sampled_from(SUBCHUNK_WRITE_ORDER)


@st.composite
def array_metadata(
draw: st.DrawFn,
Expand Down Expand Up @@ -249,6 +254,7 @@ def arrays(
arrays: st.SearchStrategy | None = None,
attrs: st.SearchStrategy = attrs,
zarr_formats: st.SearchStrategy = zarr_formats,
subchunk_write_orders: SearchStrategy[SubchunkWriteOrder] = subchunk_write_orders,
) -> AnyArray:
store = draw(stores, label="store")
path = draw(paths, label="array parent")
Expand All @@ -260,12 +266,22 @@ def arrays(
nparray = draw(arrays, label="array data")
chunk_shape = draw(chunk_shapes(shape=nparray.shape), label="chunk shape")
dim_names: None | list[str | None] = None
serializer: SerializerLike = "auto"
compressors_unsearched: CompressorsLike = "auto"
if zarr_format == 3 and all(c > 0 for c in chunk_shape):
shard_shape = draw(
st.none() | shard_shapes(shape=nparray.shape, chunk_shape=chunk_shape),
label="shard shape",
)
dim_names = draw(dimension_names(ndim=nparray.ndim), label="dimension names")
subchunk_write_order = draw(subchunk_write_orders)
serializer = ShardingCodec(
subchunk_write_order=subchunk_write_order,
codecs=[BytesCodec()],
index_codecs=[BytesCodec(), Crc32cCodec()],
chunk_shape=chunk_shape,
)
compressors_unsearched = None
else:
shard_shape = None
# test that None works too.
Expand All @@ -284,9 +300,10 @@ def arrays(
shards=shard_shape,
dtype=nparray.dtype,
attributes=attributes,
# compressor=compressor, # FIXME
compressors=compressors_unsearched, # FIXME
fill_value=fill_value,
dimension_names=dim_names,
serializer=serializer,
)

assert isinstance(a, Array)
Expand All @@ -298,7 +315,8 @@ def arrays(
assert isinstance(root[array_path], Array)
assert nparray.shape == a.shape
assert chunk_shape == a.chunks
assert shard_shape == a.shards
if shard_shape is not None:
assert shard_shape == a.shards
assert a.basename == name, (a.basename, name)
assert dict(a.attrs) == expected_attrs

Expand Down
Loading
Loading