Skip to content

perf: use sync methods for chunk encoding / decoding#3885

Open
d-v-b wants to merge 77 commits into
zarr-developers:mainfrom
d-v-b:perf/prepared-write-v2
Open

perf: use sync methods for chunk encoding / decoding#3885
d-v-b wants to merge 77 commits into
zarr-developers:mainfrom
d-v-b:perf/prepared-write-v2

Conversation

@d-v-b

@d-v-b d-v-b commented Apr 8, 2026

Copy link
Copy Markdown
Contributor

This PR defines a new codec pipeline class called PhasedCodecPipeline that enables much higher performance for chunk encoding and decoding than the current BatchedCodecPipeline.

The approach here is to completely ignore how the v3 spec defines array -> bytes codecs 😆. Instead of treating codecs as functions that mix IO and compute, we treat codec encoding and decoding as a sequence:

  1. preparatory IO, async
    fetch exactly what we need to fetch from storage, given the codecs we have. So if there's a sharding codec in the first array->bytes position, the codec pipeline knows it must fetch the shard index, then fetch the involved subchunks, before passing them to compute.
  2. pure compute. sync. Apply filters and compressors. safe to parallelize over chunks.
  3. (if writing): final IO, async. reconcile the in-memory compressed chunks against our model of the stored chunk. Write out bytes.

Basically, we use the first array -> bytes codec to figure out what kind of preparatory IO and final IO we need to perform, and the rest of the codecs to figure out what kind of chunk encoding we need to do. Separating IO from compute in different phases makes things simpler and faster.

Happy to chat more about this direction. IMO the spec should be re-written with this framing, because it makes much more sense than trying to shoe-horn sharding in as a codec.

I don't want to make our benchmarking suite any bigger but on my laptop this codec pipeline is 2-5x faster than the batchedcodec pipeline for a lot of workloads. I can include some of those benchmarks later.

This was mostly written by claude, based on previous work in #3719. All these changes should be non-breaking, so I think this is in principle safe for us to play around with in a patch or minor release.

Edit: this PR depends on changes submitted in #3907 and #3908

Another edit: the big pitch of this PR -- separating IO from compute -- didn't end up valuable, because we ran into a large amount of overhead due to indexing / python object creation. It turns out the vast majority of the speed benefits can be had simply by avoiding async for storage backends that don't need it. see #3885 (comment) for a detailed summary of the current state of things here.

d-v-b added 4 commits April 7, 2026 10:38
`PreparedWrite` models a set of per-chunk changes that would be applied to a stored chunk. `SupportsChunkPacking`
is a protocol for array -> bytes codecs that can use `PreparedWrite` objects to update an existing chunk.
@github-actions github-actions Bot added the needs release notes Automatically applied to PRs which haven't added release notes label Apr 8, 2026
@codecov

codecov Bot commented Apr 8, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 94.94681% with 38 lines in your changes missing coverage. Please review.
✅ Project coverage is 93.76%. Comparing base (ea76ec8) to head (0879b40).
⚠️ Report is 3 commits behind head on main.

Files with missing lines Patch % Lines
src/zarr/core/codec_pipeline.py 95.00% 18 Missing ⚠️
src/zarr/codecs/sharding.py 97.00% 9 Missing ⚠️
src/zarr/codecs/numcodecs/_codecs.py 66.66% 8 Missing ⚠️
src/zarr/abc/store.py 91.89% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3885      +/-   ##
==========================================
+ Coverage   93.53%   93.76%   +0.22%     
==========================================
  Files          89       89              
  Lines       11901    12499     +598     
==========================================
+ Hits        11132    11720     +588     
- Misses        769      779      +10     
Files with missing lines Coverage Δ
src/zarr/codecs/_v2.py 94.11% <100.00%> (+0.50%) ⬆️
src/zarr/core/array.py 97.89% <100.00%> (+0.01%) ⬆️
src/zarr/core/config.py 100.00% <ø> (ø)
src/zarr/storage/_fsspec.py 91.32% <ø> (ø)
src/zarr/testing/buffer.py 93.18% <100.00%> (-6.82%) ⬇️
src/zarr/abc/store.py 95.31% <91.89%> (-0.82%) ⬇️
src/zarr/codecs/numcodecs/_codecs.py 93.18% <66.66%> (-3.21%) ⬇️
src/zarr/codecs/sharding.py 97.20% <97.00%> (+5.68%) ⬆️
src/zarr/core/codec_pipeline.py 94.22% <95.00%> (+2.05%) ⬆️

... and 5 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@d-v-b

d-v-b commented Apr 9, 2026

Copy link
Copy Markdown
Contributor Author

@TomAugspurger how would this design work with CUDA codecs?

@d-v-b d-v-b force-pushed the perf/prepared-write-v2 branch from 5d3064e to b67a5a0 Compare April 15, 2026 09:51
@github-actions github-actions Bot removed the needs release notes Automatically applied to PRs which haven't added release notes label Apr 15, 2026
@d-v-b d-v-b force-pushed the perf/prepared-write-v2 branch 2 times, most recently from a84a15a to 68a7cdc Compare April 17, 2026 10:41
Comment thread src/zarr/core/codec_pipeline.py Outdated
Comment on lines +943 to +962
# Phase 1: fetch all chunks (IO, sequential)
raw_buffers: list[Buffer | None] = [
bg.get_sync(prototype=cs.prototype) # type: ignore[attr-defined]
for bg, cs, *_ in batch
]

# Phase 2: decode (compute, optionally threaded)
def _decode_one(raw: Buffer | None, chunk_spec: ArraySpec) -> NDBuffer | None:
if raw is None:
return None
return transform.decode_chunk(raw, chunk_spec)

specs = [cs for _, cs, *_ in batch]
if n_workers > 0 and len(batch) > 1:
with ThreadPoolExecutor(max_workers=n_workers) as pool:
decoded_list = list(pool.map(_decode_one, raw_buffers, specs))
else:
decoded_list = [
_decode_one(raw, spec) for raw, spec in zip(raw_buffers, specs, strict=True)
]

@ilan-gold ilan-gold Apr 17, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't this all multi-threaded i.e., the I/O as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should benchmark this, but my expectation was that IO against memory storage and local storage is not compute-limited, and so threads wouldn't remove a real bottleneck. for memory storage i'm sure this is true, not sure about local storage though

d-v-b and others added 6 commits April 17, 2026 22:51
Adds a SupportsSetRange protocol to zarr.abc.store for stores that
allow overwriting a byte range within an existing value. Implementations
are added for LocalStore (using file-handle seek+write) and MemoryStore
(in-memory bytearray slice assignment).

This is the prerequisite for the partial-shard write fast path in
ShardingCodec, which can patch individual inner-chunk slots without
rewriting the entire shard blob when the inner codec chain is fixed-size.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
V2Codec, BytesCodec, BloscCodec, etc. previously only implemented the
async _decode_single / _encode_single methods. Add their sync
counterparts (_decode_sync / _encode_sync) so that the upcoming
SyncCodecPipeline can dispatch through them without spinning up an
event loop.

For codecs that wrap external compressors (numcodecs.Zstd, numcodecs.Blosc,
the V2 fallback chain), the sync versions just call the underlying
compressor's blocking API directly instead of routing through
asyncio.to_thread.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…arallelism

Adds SyncCodecPipeline alongside BatchedCodecPipeline. The new pipeline
runs codecs through their sync entry points (_decode_sync / _encode_sync)
and dispatches per-chunk work to a module-level thread pool sized by
the codec_pipeline.max_workers config (default = os.cpu_count()).

Each chunk's full lifecycle (fetch + decode + scatter for reads;
get-existing + merge + encode + set/delete for writes) runs as one
pool task — overlapping IO of one chunk with compute of another.
Scatter into the shared output buffer is thread-safe because chunks
have non-overlapping output selections.

The async wrappers (read/write) detect SupportsGetSync/SupportsSetSync
stores and dispatch to the sync fast path, passing the configured
max_workers. Other stores fall through to the async path, which still
uses asyncio.concurrent_map at async.concurrency.

Notes on perf:
- Default (None → cpu_count) is tuned for chunks ≥ ~512 KB.
- Small chunks (≤ 64 KB) regress 1.5-3x because pool dispatch overhead
  (~30-50 µs/task) dominates per-chunk work. Workaround:
  zarr.config.set({"codec_pipeline.max_workers": 1}).
- For large chunks on local/memory stores, IO+compute parallelism
  yields 1.7-2.5x over BatchedCodecPipeline on direct-API reads and
  ~2.5x on roundtrip.

ChunkTransform encapsulates the sync codec chain. It caches resolved
ArraySpecs across calls with the same chunk_spec — combined with the
constant-ArraySpec optimization in indexing, hot-path overhead is
minimized.

Includes test scaffolding for the new pipeline (test_sync_codec_pipeline)
and config plumbing for the max_workers key.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds _encode_partial_sync and _decode_partial_sync to ShardingCodec.
For fixed-size inner codec chains and stores that implement
SupportsSetRange, partial writes patch individual inner-chunk slots
in-place instead of rewriting the whole shard:

  - Reads existing shard index (one byte-range get).
  - For each affected inner chunk: decodes the slot, merges the new
    region, re-encodes.
  - Writes each modified slot at its deterministic byte offset, then
    rewrites just the index.

For variable-size inner codecs (e.g. with compression) or stores that
don't support byte-range writes, falls through to a full-shard rewrite
matching BatchedCodecPipeline semantics.

The partial-decode path computes a ReadPlan from the shard index and
issues one byte-range get per overlapping chunk, decoding only what
the read selection touches.

Both paths are dispatched from SyncCodecPipeline via the existing
supports_partial_decode / supports_partial_encode protocol checks.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two new test files:

  test_codec_invariants — asserts contract-level properties that every
  codec / shard / buffer combination must satisfy: round-trip exactness,
  prototype propagation, fill-value handling, all-empty shard handling.

  test_pipeline_parity — exhaustive matrix asserting that
  SyncCodecPipeline and BatchedCodecPipeline produce semantically
  identical results across codec configs, layouts (including
  nested sharding), write sequences, and write_empty_chunks settings.
  Three checks per cell:
    1. Same array contents on read.
    2. Same set of store keys after writes.
    3. Each pipeline reads the other's output identically (catches
       layout-divergence bugs).

These tests pinned the design throughout the SyncCodecPipeline +
partial-shard development.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds .gitignore entries for .claude/, CLAUDE.md, and docs/superpowers/
so local IDE/agent planning artifacts don't get committed by accident.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@d-v-b d-v-b force-pushed the perf/prepared-write-v2 branch from aa111a2 to 1be5563 Compare April 17, 2026 21:04
Comment thread src/zarr/core/codec_pipeline.py Outdated
selected = decoded[chunk_selection]
if drop_axes:
selected = selected.squeeze(axis=drop_axes)
out[out_selection] = selected

@ilan-gold ilan-gold Apr 18, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth experimenting with moving this setting operation out[out_selection] = selected outside the threadpool execution since, IIRC, it holds the GIL and is probably non-trivial time-wise.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The memory usage will probably go up a bit though....

@d-v-b

d-v-b commented Jun 7, 2026

Copy link
Copy Markdown
Contributor Author

@ilan-gold this is ready for you to dig into

@floriankrb

Copy link
Copy Markdown

@d-v-b This looks promising. Is this branch ready to be tested regarding the regression we saw here ecmwf/anemoi-datasets#486?

@d-v-b

d-v-b commented Jun 9, 2026

Copy link
Copy Markdown
Contributor Author

@floriankrb yes if you have time to test it, that would be great

d-v-b and others added 9 commits June 10, 2026 15:15
The pool dispatch in read_sync/write_sync (codec_pipeline.max_workers > 1) had
zero functional test coverage — only config-default assertions existed — even
though threading is the opt-in we point users at. Adds:

- an end-to-end multi-chunk read/write roundtrip with max_workers=4 (verified
  the pool dispatch actually fires, not the sequential branch);
- worker-exception propagation tests for both write_sync (list-consumed
  pool.map) and read_sync (tuple-consumed pool.map): a store error raised in a
  pool worker must surface to the caller;
- a concurrent-decode test: transpose filter (so ChunkTransform._resolve_specs
  cache traffic actually occurs — with no AA codecs the cache is bypassed),
  pool workers decoding concurrently, plus an outer thread pool issuing
  overlapping reads. Pins correctness under concurrency around the shared
  transform's mutable cache.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
zarr_format=2 appeared in none of the pipeline test files: v2 arrays were only
exercised implicitly through whichever pipeline is the global default. v2 goes
through the V2Codec wrapper (numcodecs filters + compressor) — a different
codec path than the v3 AA/AB/BB chain, with its own _encode_sync/_decode_sync
under FusedCodecPipeline — so it deserves explicit coverage on BOTH pipelines
and BOTH store kinds (sync fast path + async fallback).

Adds v2-roundtrip (uncompressed) and v2-gzip-roundtrip (numcodecs.GZip — the v2
compressor spelling; v3 codec configs are rejected for v2 arrays) to SCENARIOS.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Both _encode_shard_dict_sync and the async _encode_shard_dict encoded the shard
index TWICE when index_location=start: encode to learn the length, shift the
present chunks' offsets by it, then re-encode with corrected offsets. The index
size is knowable without encoding — _shard_index_size() is already the byte-
exact contract every index read path relies on (reads slice exactly that many
bytes) — so the layout can use absolute offsets from the start and the index is
encoded once. Saves a full index encode (including its crc32c over the offsets
array) per shard write with index_location=start.

The layout loop was also duplicated between the sync and async versions — the
same drift surface that produced the evolve_from_array_spec endian bug. Both now
delegate to a shared pure _build_shard_layout (offset math lives once) and
_assemble_shard. A runtime guard verifies the encoded index length matches
_shard_index_size rather than silently corrupting offsets if someone ever
configures variable-size index codecs.

Verified: 606 tests pass including the pipeline-parity byte-identical shard
assertions across index_location=start/end and every subchunk_write_order, and
the sharded reopen tests — the on-disk layout is unchanged.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
read_missing_chunks exists to help consumers distinguish a transport error from
a truly missing chunk. That distinction is a STORE-KEY-level concept: a missing
shard key raises ChunkNotFoundError. It does not cleanly apply to inner
subchunks of a shard that was fetched successfully — there is no transport
ambiguity there; the shard index simply records the subchunk as absent — so
those fill with the fill value rather than raising.

Both pipelines already implement exactly this (verified empirically), but
nothing pinned it, so the asymmetry vs unsharded arrays read as a bug in review.
This adds a shared-suite test (both pipelines x sync/async stores) asserting
both sides: missing shard key raises; missing inner subchunk of an existing
shard fills.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
* chore: add parallelism TODOs

* perf: non-sharded reads

* chore: code duplication

* fix: arguments dont spread themselves!

* fix: bring in suggested guard

* perf: don't block on pool ops

* chore: docstring + materialize early

---------

Co-authored-by: ilan-gold <ilanbassgold@gmail.com>
…line

Addresses the open question on this PR about sync/async byte getters,
benchmark-guided as discussed.

_ShardingByteGetter/_ShardingByteSetter are in-memory dict wrappers but
presented only an async API, so the nested codec_pipeline.read over inner
chunks fell to the async fallback: one concurrent_map coroutine per inner chunk
for a dict lookup (~2.1 us/chunk pure asyncio overhead, ~8.8 ms per 4096-chunk
shard). On top of that, the nested pipeline (ShardingCodec.codec_pipeline) is
built by from_codecs and never evolved, so its sync transform was always None —
inner chunks also paid per-chunk AsyncChunkTransform coroutines, and the
decode_sync/encode_sync fallback improvements in this PR could not reach them.

Changes:
- SyncByteGetter / SyncByteSetter runtime protocols in zarr.abc.store
  (resurrecting the design from the original perf/prepared-write experiments,
  same names and shape). StorePath matches structurally but is still gated on
  its STORE's sync support; the protocols gate non-StorePath byte getters.
- _ShardingByteGetter/Setter implement get_sync/set_sync/delete_sync; the async
  methods delegate to the sync ones (single implementation).
- ShardingCodec._get_inner_pipeline(shard_spec): the nested pipeline evolved
  against the inner chunk spec (threads specs through the inner chain AND
  builds the sync transform). The four nested read/write call sites use it.
- FusedCodecPipeline.read/write gates accept non-StorePath SyncByteGetter/
  SyncByteSetter, so nested inner-chunk IO takes read_sync/write_sync.
- _decode_shard_index/_encode_shard_index delegate to their sync twins (pure
  compute; kills a per-shard AsyncChunkTransform round-trip and a sync/async
  duplication).

Benchmark (4096 inner chunks per shard, LatencyStore@0 i.e. the async-fallback
path = sharded data on remote stores), vs this PR's head:

  uncompressed read: 44.2 -> 28.9 ms (1.53x)
  gzip         read: 182.0 -> 50.9 ms (3.6x)
  writes: unchanged (~34 / ~70 ms) — already optimized by this PR's
  encode_sync fallback (whole-shard sync encode, no byte setters involved).

Adds a regression test asserting sharded fallback reads route inner chunks
through the sync fast path (read_sync engaged, zero AsyncChunkTransform calls);
verified it fails if the gate is removed. Full sharding + parity + pipeline
suites pass (619).

Assisted-by: ClaudeCode:claude-fable-5
… cache inner pipeline

The pipeline test suites have failed intermittently all along on an arbitrary
test that passes in isolation. Root cause (allocation site verified with
PYTHONTRACEMALLOC): pytest-asyncio implicitly creates an event loop in
_get_event_loop_no_warn during fixture setup/teardown and never closes it. When
GC reclaims that loop — or its self-pipe socketpair — mid-test, pytest's
unraisable hook converts the ResourceWarning into a failure of whichever
unrelated test happens to be running. The sync-bytegetter change increased
per-shard-op allocation churn enough to make this near-deterministic, which is
how it was finally traced.

Two changes:
- pyproject filterwarnings: narrowly ignore the two unraisable shapes
  (BaseEventLoop.__del__, AF_UNIX socketpair), mirroring the existing
  s3fs/aiobotocore entry. Not zarr's loops.
- ShardingCodec._get_inner_pipeline is now memoized per (pipeline class,
  shard_spec) — evolving built a ChunkTransform on every shard operation. The
  pipeline class is part of the key so codec_pipeline.path config changes are
  still honored.

Battery that previously failed ~every run now passes 3x consecutively (635).

Assisted-by: ClaudeCode:claude-fable-5
…hain evolve

Continues the anti-skew work: where the sync and async sharding paths
implemented the same logic twice, extract a single source of truth so the
copies cannot drift (the mechanism behind the pipeline-level endian bug).

- _get_inner_chunk_transform / _get_index_chunk_transform now evolve their
  codec chains via evolve_codecs (spec THREADED forward). Both previously
  evolved every codec against the same unthreaded spec — the exact bug shape
  that stripped BytesCodec.endian at the pipeline level, latent here for any
  spec-changing inner codec. Both are also now actually memoized (the inner
  transform's docstring claimed a cache that did not exist; transforms were
  rebuilt per call).
- New regression test (dependency-free dtype-widening stub codec) asserting
  the inner serializer keeps its endian; verified it fails on the unthreaded
  version.
- _shard_index_byte_range(): the index-location byte-range arithmetic existed
  verbatim in both _load_shard_index_maybe and its _sync twin; now one helper.
- _pair_chunks_with_byte_ranges(): the chunk-coord/byte-range pairing loop
  existed verbatim in both _load_partial_shard_maybe and its _sync twin; now
  one helper.

Deliberately NOT unified: the small hand-rolled loops remaining in
_decode_sync/_encode_sync vs their async twins. Post sync-bytegetter work the
async versions are thin delegations to the (shared, evolved) nested pipeline,
so the heavy machinery — evolve, transforms, layout, index codecs — is already
single-sourced; force-merging the residual loops would couple different
missing-chunk/concurrency semantics for little drift-surface gain. The
pipeline-parity suite guards their behavioral equivalence.

Full battery passes twice (636); ruff + mypy clean.

Assisted-by: ClaudeCode:claude-fable-5
…erge copy

Prototype of the "chunk state algebra" direction: the write-side state logic
(maybe-read existing -> merge -> empty-normalize -> encode-or-elide) existed in
four places with divergent inline conventions. It is now three canonical
functions in codec_pipeline.py:

- chunk_is_empty(): THE write_empty_chunks normalization rule (all-fill chunk
  normalizes to missing), previously five scattered inline all_equal checks.
- encode_or_elide_chunk(): normalize-empty + encode; None = must not be stored.
- merge_and_encode_chunk(): the full single-chunk write transition. Used by the
  fused _write_one and both branches of the sharding _encode_partial_sync loop
  (including the scalar-broadcast memoization, which now memoizes the canonical
  function's result). _encode_sync uses encode_or_elide_chunk.

Unification found a real perf bug: _merge_chunk_array's complete-chunk early
return required value.shape == chunk_spec.shape, which never holds for
multi-chunk writes — so every complete chunk of every multi-chunk fused write
paid a create+fill+copy. (Sharding's hand-rolled loop bypassed this with a
view, which is itself how the two copies had drifted.) The guard now returns
value[out_selection] whenever it is exactly chunk-shaped. Measured, two A/B
passes: unsharded full write (1000 chunks) ~1.5x faster; bulk partial shard
write (900 complete inner chunks) ~1.4x faster; sharded single-chunk write
unchanged. Callers pass existing=None for complete chunks so fully-overwritten
data is never decoded.

Also removed the last divergent missing-chunk conventions: _decode_sync's
try/except KeyError is now .get() -> None (None is the single "missing"
convention), and the dead skip_empty/fill_value prologues are gone.

1945 tests pass including pipeline-parity byte-identical assertions; ruff +
mypy clean.

Assisted-by: ClaudeCode:claude-fable-5
@d-v-b d-v-b added benchmark Code will be benchmarked in a CI job. and removed benchmark Code will be benchmarked in a CI job. labels Jun 10, 2026
d-v-b added 2 commits June 10, 2026 21:59
…r_chunk)

The read twin of merge_and_encode_chunk: the fill-on-missing scatter logic
existed in five places (fused _read_one, partial-decode _read_one, the async
fallback scatter loop, and the two sharding decode tails) with two different
missing conventions (None vs try/except KeyError) and three fill spellings
(precomputed batch fill, raw shard_spec.fill_value, inline or-default).

Now two canonical functions in codec_pipeline.py:
- scatter_chunk(): scatter an already-selected region; None = missing ->
  scatter fill, return a "missing" GetResult. POLICY-FREE on purpose: whether
  missing is an error (read_missing_chunks=False) is decided at the array
  layer from the top-level statuses — which is exactly what makes missing
  INNER chunks of a present shard fill rather than raise (the sharding codec
  discards the nested statuses). That semantic, previously implicit in which
  loop happened to run, is now written down where the rule lives.
- decode_and_scatter_chunk(): decode (None = missing) -> select -> scatter.

All five sites converted; the last try/except KeyError missing-convention is
gone (Mapping.get works for both _ShardReader and plain dicts). Read
benchmarks A/B neutral-to-marginally-better on all paths (unsharded full,
sharded full/partial, half-missing fill).

786 tests pass including pipeline-parity and indexing; ruff + mypy clean.

Assisted-by: ClaudeCode:claude-fable-5
@d-v-b d-v-b added run-downstream Run the tests of downstream libraries (e.g., xarray) against zarr and removed run-downstream Run the tests of downstream libraries (e.g., xarray) against zarr labels Jun 10, 2026
d-v-b added 4 commits June 11, 2026 07:43
Add tests/test_fastpath_equivalence.py with four hypothesis properties,
one per fast path on the branch:

- _merge_chunk_array complete-chunk view == general merge path (and
  independent of existing chunk content)
- ShardingCodec._decode_full_shard_bulk == _decode_sync for dense
  uncompressed shards across dtypes, endianness, subchunk write order,
  and index location (asserts the bulk path actually applies, so the
  test cannot pass vacuously)
- scalar writes leave the store byte-identical to equivalent broadcast
  array writes (pins the sharded scalar-broadcast memoization)
- Store.get_ranges_sync coalesced reads == one get_sync per range, for
  arbitrary gap/coalesce limits and Range/Offset/Suffix/None requests

Each was verified to catch its bug class by temporary fault injection:
dropping the bulk decode's endian handling and shifting the coalesce
re-slice offset by one both produced shrunk falsifying examples.

Assisted-by: ClaudeCode:claude-fable-5
Code fixes:
- ShardingCodec.evolve_from_array_spec now threads the spec through the
  inner chain via evolve_codecs. The unthreaded evolve survived on the
  real array-creation path after the transform builders were fixed,
  baking an endian-stripped BytesCodec into the evolved instance behind
  any dtype-changing inner codec. Regression test goes through
  evolve_from_array_spec and then builds the inner transform.
- Async _decode_shard_index/_encode_shard_index fall back to the async
  pipeline when an index codec is not sync-capable, instead of failing
  every path for third-party async-only index codecs.
- Removed the redundant hand-rolled dict caches inside the chunk
  transform builders; the instance-local lru_cache wrappers are the
  single memoization mechanism. _shard_index_size is now lru_cached too.
- The encoded-index-size guard lives once, in _assemble_shard, instead
  of duplicated in the sync and async encoders.
- _get_inner_pipeline cache key includes codec_pipeline.batch_size,
  which from_codecs captures at construction.
- Coordinate arrays built via np.indices instead of
  np.array(list(np.ndindex(...))) in the partial-write loaders.
- _merge_chunk_array docstring states the view-aliasing contract; the
  guard comment sits on the check it annotates.
- The socketpair unraisable filter covers family=(1|2) (Windows
  emulates socketpair with AF_INET), and its comment owns the tradeoff
  that the patterns cannot scope to pytest-asyncio.

Test hardening:
- Pool tests assert the pool branch actually fires (_resolve_max_workers
  + a _get_pool spy) instead of silently degrading to the sequential
  branch on a config regression; the concurrent-read test re-opens the
  array each round so readers race a cold spec cache.
- New direct test pins the SyncByteSetter write gate in
  FusedCodecPipeline.write (verified to fail with the gate removed).
- New test pins the merge fast path's view-aliasing + source-unmutated
  contract end-to-end on both pipelines.
- read_missing_chunks=False sharded test asserts both halves of the
  asymmetry against the same partially-written array.
- v2 scenario with a numcodecs Delta filter covers the V2Codec filter
  branch; _chunk_keys recognizes v2 metadata keys.

Assisted-by: ClaudeCode:claude-fable-5
# Conflicts:
#	tests/test_store/test_local.py
#	tests/test_store/test_memory.py
Rename the fused-default placeholder to the PR-numbered 3885.feature.md and
add a second feature entry for the new SyncByteGetter/SyncByteSetter protocols
and Store.get_ranges_sync.

Assisted-by: ClaudeCode:claude-opus-4.8
@d-v-b d-v-b removed the benchmark Code will be benchmarked in a CI job. label Jun 11, 2026
@d-v-b d-v-b changed the title perf: phased codecpipeline perf: sync codec pipeline Jun 11, 2026
@d-v-b d-v-b changed the title perf: sync codec pipeline perf: use sync methods for chunk encoding / decoding Jun 11, 2026
d-v-b added 2 commits June 11, 2026 16:17
Making FusedCodecPipeline the default left the async (Batched) mirror
paths and the new sync-IO error paths exercised only narrowly, dropping
project coverage. Add targeted tests for the reachable gaps:

- AsyncChunkTransform.decode_chunk/encode_chunk == ChunkTransform across
  aa/ab/bb codec combinations (the async per-chunk chain the default
  sync path never runs), plus FusedCodecPipeline.encode/decode
  None-chunk passthrough.
- ShardingCodec._decode_single/_encode_single whole-shard round-trip and
  all-empty branches. The codec advertises partial decode/encode, so the
  pipeline always picks the partial methods; these whole-shard async
  methods are reached only via the direct ArrayBytesCodec API.
- Async-only index codec fallback in _decode_shard_index/
  _encode_shard_index (zarr-developers#269), via a test-only async-only ArrayBytesCodec
  stub that is not SupportsSyncCodec.
- Store.get_ranges_sync happy path, missing-key BaseExceptionGroup, and
  the non-sync-store TypeError.

Local merged coverage on the touched files: codec_pipeline.py
85.4->92.2%, sharding.py 92.4->96.7%, abc/store.py 93.8->95.3%.

Assisted-by: ClaudeCode:claude-opus-4.8

@ilan-gold ilan-gold left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few small comments, may do them myself.

"""Encode a full shard synchronously.

Sync counterpart to `_encode_single`. This is reached when a
`ShardingCodec` is an *inner* codec of another sharding codec (nested

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this true? I hit this path when I ran test_create_array_with_data_num_gets which looks pretty un-nested

}

def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:
# Thread the spec through the inner chain (`evolve_codecs`): each codec

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this and others be real docstrings?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I think it should be

else:
shard_dict = dict.fromkeys(morton_order_iter(chunks_per_shard))

from zarr.core.codec_pipeline import merge_and_encode_chunk

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about moving these helpers that currently exist in and are shared from codec_pipeline into their own file?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a good idea


Returns `None` for an all-empty shard (no chunks present).
"""
layout = self._build_shard_layout(shard_dict, chunks_per_shard)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of materializing layout as a list, does it make sense to have it be an iterator? This could help save on memory, but not sure.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as long as we are sure we aren't consuming the iterator, then passing the (dead, empty) iterator off to a consumer, then sure

raise ValueError(f"Unrecognized subchunk write order: {subchunk_write_order!r}.")
return subchunk_iter

def _decode_full_shard_bulk(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def _decode_full_shard_bulk(
def _decode_full_shard_bulk_if_uncompressed(

or something. Then you probably don't need such a giant comment in _decode_partial_sync

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

performance Potential issues with Zarr performance (I/O, memory, etc.) run-downstream Run the tests of downstream libraries (e.g., xarray) against zarr

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants