perf: use sync methods for chunk encoding / decoding#3885
Conversation
`PreparedWrite` models a set of per-chunk changes that would be applied to a stored chunk. `SupportsChunkPacking` is a protocol for array -> bytes codecs that can use `PreparedWrite` objects to update an existing chunk.
…into perf/prepared-write-v2
…into perf/prepared-write-v2
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3885 +/- ##
==========================================
+ Coverage 93.53% 93.76% +0.22%
==========================================
Files 89 89
Lines 11901 12499 +598
==========================================
+ Hits 11132 11720 +588
- Misses 769 779 +10
🚀 New features to boost your workflow:
|
…into perf/prepared-write-v2
|
@TomAugspurger how would this design work with CUDA codecs? |
5d3064e to
b67a5a0
Compare
a84a15a to
68a7cdc
Compare
| # Phase 1: fetch all chunks (IO, sequential) | ||
| raw_buffers: list[Buffer | None] = [ | ||
| bg.get_sync(prototype=cs.prototype) # type: ignore[attr-defined] | ||
| for bg, cs, *_ in batch | ||
| ] | ||
|
|
||
| # Phase 2: decode (compute, optionally threaded) | ||
| def _decode_one(raw: Buffer | None, chunk_spec: ArraySpec) -> NDBuffer | None: | ||
| if raw is None: | ||
| return None | ||
| return transform.decode_chunk(raw, chunk_spec) | ||
|
|
||
| specs = [cs for _, cs, *_ in batch] | ||
| if n_workers > 0 and len(batch) > 1: | ||
| with ThreadPoolExecutor(max_workers=n_workers) as pool: | ||
| decoded_list = list(pool.map(_decode_one, raw_buffers, specs)) | ||
| else: | ||
| decoded_list = [ | ||
| _decode_one(raw, spec) for raw, spec in zip(raw_buffers, specs, strict=True) | ||
| ] |
There was a problem hiding this comment.
Why isn't this all multi-threaded i.e., the I/O as well?
There was a problem hiding this comment.
I should benchmark this, but my expectation was that IO against memory storage and local storage is not compute-limited, and so threads wouldn't remove a real bottleneck. for memory storage i'm sure this is true, not sure about local storage though
Adds a SupportsSetRange protocol to zarr.abc.store for stores that allow overwriting a byte range within an existing value. Implementations are added for LocalStore (using file-handle seek+write) and MemoryStore (in-memory bytearray slice assignment). This is the prerequisite for the partial-shard write fast path in ShardingCodec, which can patch individual inner-chunk slots without rewriting the entire shard blob when the inner codec chain is fixed-size. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
V2Codec, BytesCodec, BloscCodec, etc. previously only implemented the async _decode_single / _encode_single methods. Add their sync counterparts (_decode_sync / _encode_sync) so that the upcoming SyncCodecPipeline can dispatch through them without spinning up an event loop. For codecs that wrap external compressors (numcodecs.Zstd, numcodecs.Blosc, the V2 fallback chain), the sync versions just call the underlying compressor's blocking API directly instead of routing through asyncio.to_thread. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…arallelism
Adds SyncCodecPipeline alongside BatchedCodecPipeline. The new pipeline
runs codecs through their sync entry points (_decode_sync / _encode_sync)
and dispatches per-chunk work to a module-level thread pool sized by
the codec_pipeline.max_workers config (default = os.cpu_count()).
Each chunk's full lifecycle (fetch + decode + scatter for reads;
get-existing + merge + encode + set/delete for writes) runs as one
pool task — overlapping IO of one chunk with compute of another.
Scatter into the shared output buffer is thread-safe because chunks
have non-overlapping output selections.
The async wrappers (read/write) detect SupportsGetSync/SupportsSetSync
stores and dispatch to the sync fast path, passing the configured
max_workers. Other stores fall through to the async path, which still
uses asyncio.concurrent_map at async.concurrency.
Notes on perf:
- Default (None → cpu_count) is tuned for chunks ≥ ~512 KB.
- Small chunks (≤ 64 KB) regress 1.5-3x because pool dispatch overhead
(~30-50 µs/task) dominates per-chunk work. Workaround:
zarr.config.set({"codec_pipeline.max_workers": 1}).
- For large chunks on local/memory stores, IO+compute parallelism
yields 1.7-2.5x over BatchedCodecPipeline on direct-API reads and
~2.5x on roundtrip.
ChunkTransform encapsulates the sync codec chain. It caches resolved
ArraySpecs across calls with the same chunk_spec — combined with the
constant-ArraySpec optimization in indexing, hot-path overhead is
minimized.
Includes test scaffolding for the new pipeline (test_sync_codec_pipeline)
and config plumbing for the max_workers key.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds _encode_partial_sync and _decode_partial_sync to ShardingCodec.
For fixed-size inner codec chains and stores that implement
SupportsSetRange, partial writes patch individual inner-chunk slots
in-place instead of rewriting the whole shard:
- Reads existing shard index (one byte-range get).
- For each affected inner chunk: decodes the slot, merges the new
region, re-encodes.
- Writes each modified slot at its deterministic byte offset, then
rewrites just the index.
For variable-size inner codecs (e.g. with compression) or stores that
don't support byte-range writes, falls through to a full-shard rewrite
matching BatchedCodecPipeline semantics.
The partial-decode path computes a ReadPlan from the shard index and
issues one byte-range get per overlapping chunk, decoding only what
the read selection touches.
Both paths are dispatched from SyncCodecPipeline via the existing
supports_partial_decode / supports_partial_encode protocol checks.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two new test files:
test_codec_invariants — asserts contract-level properties that every
codec / shard / buffer combination must satisfy: round-trip exactness,
prototype propagation, fill-value handling, all-empty shard handling.
test_pipeline_parity — exhaustive matrix asserting that
SyncCodecPipeline and BatchedCodecPipeline produce semantically
identical results across codec configs, layouts (including
nested sharding), write sequences, and write_empty_chunks settings.
Three checks per cell:
1. Same array contents on read.
2. Same set of store keys after writes.
3. Each pipeline reads the other's output identically (catches
layout-divergence bugs).
These tests pinned the design throughout the SyncCodecPipeline +
partial-shard development.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds .gitignore entries for .claude/, CLAUDE.md, and docs/superpowers/ so local IDE/agent planning artifacts don't get committed by accident. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
aa111a2 to
1be5563
Compare
| selected = decoded[chunk_selection] | ||
| if drop_axes: | ||
| selected = selected.squeeze(axis=drop_axes) | ||
| out[out_selection] = selected |
There was a problem hiding this comment.
It might be worth experimenting with moving this setting operation out[out_selection] = selected outside the threadpool execution since, IIRC, it holds the GIL and is probably non-trivial time-wise.
There was a problem hiding this comment.
The memory usage will probably go up a bit though....
|
@ilan-gold this is ready for you to dig into |
|
@d-v-b This looks promising. Is this branch ready to be tested regarding the regression we saw here ecmwf/anemoi-datasets#486? |
|
@floriankrb yes if you have time to test it, that would be great |
The pool dispatch in read_sync/write_sync (codec_pipeline.max_workers > 1) had zero functional test coverage — only config-default assertions existed — even though threading is the opt-in we point users at. Adds: - an end-to-end multi-chunk read/write roundtrip with max_workers=4 (verified the pool dispatch actually fires, not the sequential branch); - worker-exception propagation tests for both write_sync (list-consumed pool.map) and read_sync (tuple-consumed pool.map): a store error raised in a pool worker must surface to the caller; - a concurrent-decode test: transpose filter (so ChunkTransform._resolve_specs cache traffic actually occurs — with no AA codecs the cache is bypassed), pool workers decoding concurrently, plus an outer thread pool issuing overlapping reads. Pins correctness under concurrency around the shared transform's mutable cache. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
zarr_format=2 appeared in none of the pipeline test files: v2 arrays were only exercised implicitly through whichever pipeline is the global default. v2 goes through the V2Codec wrapper (numcodecs filters + compressor) — a different codec path than the v3 AA/AB/BB chain, with its own _encode_sync/_decode_sync under FusedCodecPipeline — so it deserves explicit coverage on BOTH pipelines and BOTH store kinds (sync fast path + async fallback). Adds v2-roundtrip (uncompressed) and v2-gzip-roundtrip (numcodecs.GZip — the v2 compressor spelling; v3 codec configs are rejected for v2 arrays) to SCENARIOS. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Both _encode_shard_dict_sync and the async _encode_shard_dict encoded the shard index TWICE when index_location=start: encode to learn the length, shift the present chunks' offsets by it, then re-encode with corrected offsets. The index size is knowable without encoding — _shard_index_size() is already the byte- exact contract every index read path relies on (reads slice exactly that many bytes) — so the layout can use absolute offsets from the start and the index is encoded once. Saves a full index encode (including its crc32c over the offsets array) per shard write with index_location=start. The layout loop was also duplicated between the sync and async versions — the same drift surface that produced the evolve_from_array_spec endian bug. Both now delegate to a shared pure _build_shard_layout (offset math lives once) and _assemble_shard. A runtime guard verifies the encoded index length matches _shard_index_size rather than silently corrupting offsets if someone ever configures variable-size index codecs. Verified: 606 tests pass including the pipeline-parity byte-identical shard assertions across index_location=start/end and every subchunk_write_order, and the sharded reopen tests — the on-disk layout is unchanged. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
read_missing_chunks exists to help consumers distinguish a transport error from a truly missing chunk. That distinction is a STORE-KEY-level concept: a missing shard key raises ChunkNotFoundError. It does not cleanly apply to inner subchunks of a shard that was fetched successfully — there is no transport ambiguity there; the shard index simply records the subchunk as absent — so those fill with the fill value rather than raising. Both pipelines already implement exactly this (verified empirically), but nothing pinned it, so the asymmetry vs unsharded arrays read as a bug in review. This adds a shared-suite test (both pipelines x sync/async stores) asserting both sides: missing shard key raises; missing inner subchunk of an existing shard fills. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
* chore: add parallelism TODOs * perf: non-sharded reads * chore: code duplication * fix: arguments dont spread themselves! * fix: bring in suggested guard * perf: don't block on pool ops * chore: docstring + materialize early --------- Co-authored-by: ilan-gold <ilanbassgold@gmail.com>
…line Addresses the open question on this PR about sync/async byte getters, benchmark-guided as discussed. _ShardingByteGetter/_ShardingByteSetter are in-memory dict wrappers but presented only an async API, so the nested codec_pipeline.read over inner chunks fell to the async fallback: one concurrent_map coroutine per inner chunk for a dict lookup (~2.1 us/chunk pure asyncio overhead, ~8.8 ms per 4096-chunk shard). On top of that, the nested pipeline (ShardingCodec.codec_pipeline) is built by from_codecs and never evolved, so its sync transform was always None — inner chunks also paid per-chunk AsyncChunkTransform coroutines, and the decode_sync/encode_sync fallback improvements in this PR could not reach them. Changes: - SyncByteGetter / SyncByteSetter runtime protocols in zarr.abc.store (resurrecting the design from the original perf/prepared-write experiments, same names and shape). StorePath matches structurally but is still gated on its STORE's sync support; the protocols gate non-StorePath byte getters. - _ShardingByteGetter/Setter implement get_sync/set_sync/delete_sync; the async methods delegate to the sync ones (single implementation). - ShardingCodec._get_inner_pipeline(shard_spec): the nested pipeline evolved against the inner chunk spec (threads specs through the inner chain AND builds the sync transform). The four nested read/write call sites use it. - FusedCodecPipeline.read/write gates accept non-StorePath SyncByteGetter/ SyncByteSetter, so nested inner-chunk IO takes read_sync/write_sync. - _decode_shard_index/_encode_shard_index delegate to their sync twins (pure compute; kills a per-shard AsyncChunkTransform round-trip and a sync/async duplication). Benchmark (4096 inner chunks per shard, LatencyStore@0 i.e. the async-fallback path = sharded data on remote stores), vs this PR's head: uncompressed read: 44.2 -> 28.9 ms (1.53x) gzip read: 182.0 -> 50.9 ms (3.6x) writes: unchanged (~34 / ~70 ms) — already optimized by this PR's encode_sync fallback (whole-shard sync encode, no byte setters involved). Adds a regression test asserting sharded fallback reads route inner chunks through the sync fast path (read_sync engaged, zero AsyncChunkTransform calls); verified it fails if the gate is removed. Full sharding + parity + pipeline suites pass (619). Assisted-by: ClaudeCode:claude-fable-5
… cache inner pipeline The pipeline test suites have failed intermittently all along on an arbitrary test that passes in isolation. Root cause (allocation site verified with PYTHONTRACEMALLOC): pytest-asyncio implicitly creates an event loop in _get_event_loop_no_warn during fixture setup/teardown and never closes it. When GC reclaims that loop — or its self-pipe socketpair — mid-test, pytest's unraisable hook converts the ResourceWarning into a failure of whichever unrelated test happens to be running. The sync-bytegetter change increased per-shard-op allocation churn enough to make this near-deterministic, which is how it was finally traced. Two changes: - pyproject filterwarnings: narrowly ignore the two unraisable shapes (BaseEventLoop.__del__, AF_UNIX socketpair), mirroring the existing s3fs/aiobotocore entry. Not zarr's loops. - ShardingCodec._get_inner_pipeline is now memoized per (pipeline class, shard_spec) — evolving built a ChunkTransform on every shard operation. The pipeline class is part of the key so codec_pipeline.path config changes are still honored. Battery that previously failed ~every run now passes 3x consecutively (635). Assisted-by: ClaudeCode:claude-fable-5
…hain evolve Continues the anti-skew work: where the sync and async sharding paths implemented the same logic twice, extract a single source of truth so the copies cannot drift (the mechanism behind the pipeline-level endian bug). - _get_inner_chunk_transform / _get_index_chunk_transform now evolve their codec chains via evolve_codecs (spec THREADED forward). Both previously evolved every codec against the same unthreaded spec — the exact bug shape that stripped BytesCodec.endian at the pipeline level, latent here for any spec-changing inner codec. Both are also now actually memoized (the inner transform's docstring claimed a cache that did not exist; transforms were rebuilt per call). - New regression test (dependency-free dtype-widening stub codec) asserting the inner serializer keeps its endian; verified it fails on the unthreaded version. - _shard_index_byte_range(): the index-location byte-range arithmetic existed verbatim in both _load_shard_index_maybe and its _sync twin; now one helper. - _pair_chunks_with_byte_ranges(): the chunk-coord/byte-range pairing loop existed verbatim in both _load_partial_shard_maybe and its _sync twin; now one helper. Deliberately NOT unified: the small hand-rolled loops remaining in _decode_sync/_encode_sync vs their async twins. Post sync-bytegetter work the async versions are thin delegations to the (shared, evolved) nested pipeline, so the heavy machinery — evolve, transforms, layout, index codecs — is already single-sourced; force-merging the residual loops would couple different missing-chunk/concurrency semantics for little drift-surface gain. The pipeline-parity suite guards their behavioral equivalence. Full battery passes twice (636); ruff + mypy clean. Assisted-by: ClaudeCode:claude-fable-5
…erge copy Prototype of the "chunk state algebra" direction: the write-side state logic (maybe-read existing -> merge -> empty-normalize -> encode-or-elide) existed in four places with divergent inline conventions. It is now three canonical functions in codec_pipeline.py: - chunk_is_empty(): THE write_empty_chunks normalization rule (all-fill chunk normalizes to missing), previously five scattered inline all_equal checks. - encode_or_elide_chunk(): normalize-empty + encode; None = must not be stored. - merge_and_encode_chunk(): the full single-chunk write transition. Used by the fused _write_one and both branches of the sharding _encode_partial_sync loop (including the scalar-broadcast memoization, which now memoizes the canonical function's result). _encode_sync uses encode_or_elide_chunk. Unification found a real perf bug: _merge_chunk_array's complete-chunk early return required value.shape == chunk_spec.shape, which never holds for multi-chunk writes — so every complete chunk of every multi-chunk fused write paid a create+fill+copy. (Sharding's hand-rolled loop bypassed this with a view, which is itself how the two copies had drifted.) The guard now returns value[out_selection] whenever it is exactly chunk-shaped. Measured, two A/B passes: unsharded full write (1000 chunks) ~1.5x faster; bulk partial shard write (900 complete inner chunks) ~1.4x faster; sharded single-chunk write unchanged. Callers pass existing=None for complete chunks so fully-overwritten data is never decoded. Also removed the last divergent missing-chunk conventions: _decode_sync's try/except KeyError is now .get() -> None (None is the single "missing" convention), and the dead skip_empty/fill_value prologues are gone. 1945 tests pass including pipeline-parity byte-identical assertions; ruff + mypy clean. Assisted-by: ClaudeCode:claude-fable-5
…r_chunk) The read twin of merge_and_encode_chunk: the fill-on-missing scatter logic existed in five places (fused _read_one, partial-decode _read_one, the async fallback scatter loop, and the two sharding decode tails) with two different missing conventions (None vs try/except KeyError) and three fill spellings (precomputed batch fill, raw shard_spec.fill_value, inline or-default). Now two canonical functions in codec_pipeline.py: - scatter_chunk(): scatter an already-selected region; None = missing -> scatter fill, return a "missing" GetResult. POLICY-FREE on purpose: whether missing is an error (read_missing_chunks=False) is decided at the array layer from the top-level statuses — which is exactly what makes missing INNER chunks of a present shard fill rather than raise (the sharding codec discards the nested statuses). That semantic, previously implicit in which loop happened to run, is now written down where the rule lives. - decode_and_scatter_chunk(): decode (None = missing) -> select -> scatter. All five sites converted; the last try/except KeyError missing-convention is gone (Mapping.get works for both _ShardReader and plain dicts). Read benchmarks A/B neutral-to-marginally-better on all paths (unsharded full, sharded full/partial, half-missing fill). 786 tests pass including pipeline-parity and indexing; ruff + mypy clean. Assisted-by: ClaudeCode:claude-fable-5
Add tests/test_fastpath_equivalence.py with four hypothesis properties, one per fast path on the branch: - _merge_chunk_array complete-chunk view == general merge path (and independent of existing chunk content) - ShardingCodec._decode_full_shard_bulk == _decode_sync for dense uncompressed shards across dtypes, endianness, subchunk write order, and index location (asserts the bulk path actually applies, so the test cannot pass vacuously) - scalar writes leave the store byte-identical to equivalent broadcast array writes (pins the sharded scalar-broadcast memoization) - Store.get_ranges_sync coalesced reads == one get_sync per range, for arbitrary gap/coalesce limits and Range/Offset/Suffix/None requests Each was verified to catch its bug class by temporary fault injection: dropping the bulk decode's endian handling and shifting the coalesce re-slice offset by one both produced shrunk falsifying examples. Assisted-by: ClaudeCode:claude-fable-5
Code fixes: - ShardingCodec.evolve_from_array_spec now threads the spec through the inner chain via evolve_codecs. The unthreaded evolve survived on the real array-creation path after the transform builders were fixed, baking an endian-stripped BytesCodec into the evolved instance behind any dtype-changing inner codec. Regression test goes through evolve_from_array_spec and then builds the inner transform. - Async _decode_shard_index/_encode_shard_index fall back to the async pipeline when an index codec is not sync-capable, instead of failing every path for third-party async-only index codecs. - Removed the redundant hand-rolled dict caches inside the chunk transform builders; the instance-local lru_cache wrappers are the single memoization mechanism. _shard_index_size is now lru_cached too. - The encoded-index-size guard lives once, in _assemble_shard, instead of duplicated in the sync and async encoders. - _get_inner_pipeline cache key includes codec_pipeline.batch_size, which from_codecs captures at construction. - Coordinate arrays built via np.indices instead of np.array(list(np.ndindex(...))) in the partial-write loaders. - _merge_chunk_array docstring states the view-aliasing contract; the guard comment sits on the check it annotates. - The socketpair unraisable filter covers family=(1|2) (Windows emulates socketpair with AF_INET), and its comment owns the tradeoff that the patterns cannot scope to pytest-asyncio. Test hardening: - Pool tests assert the pool branch actually fires (_resolve_max_workers + a _get_pool spy) instead of silently degrading to the sequential branch on a config regression; the concurrent-read test re-opens the array each round so readers race a cold spec cache. - New direct test pins the SyncByteSetter write gate in FusedCodecPipeline.write (verified to fail with the gate removed). - New test pins the merge fast path's view-aliasing + source-unmutated contract end-to-end on both pipelines. - read_missing_chunks=False sharded test asserts both halves of the asymmetry against the same partially-written array. - v2 scenario with a numcodecs Delta filter covers the V2Codec filter branch; _chunk_keys recognizes v2 metadata keys. Assisted-by: ClaudeCode:claude-fable-5
# Conflicts: # tests/test_store/test_local.py # tests/test_store/test_memory.py
Rename the fused-default placeholder to the PR-numbered 3885.feature.md and add a second feature entry for the new SyncByteGetter/SyncByteSetter protocols and Store.get_ranges_sync. Assisted-by: ClaudeCode:claude-opus-4.8
Making FusedCodecPipeline the default left the async (Batched) mirror paths and the new sync-IO error paths exercised only narrowly, dropping project coverage. Add targeted tests for the reachable gaps: - AsyncChunkTransform.decode_chunk/encode_chunk == ChunkTransform across aa/ab/bb codec combinations (the async per-chunk chain the default sync path never runs), plus FusedCodecPipeline.encode/decode None-chunk passthrough. - ShardingCodec._decode_single/_encode_single whole-shard round-trip and all-empty branches. The codec advertises partial decode/encode, so the pipeline always picks the partial methods; these whole-shard async methods are reached only via the direct ArrayBytesCodec API. - Async-only index codec fallback in _decode_shard_index/ _encode_shard_index (zarr-developers#269), via a test-only async-only ArrayBytesCodec stub that is not SupportsSyncCodec. - Store.get_ranges_sync happy path, missing-key BaseExceptionGroup, and the non-sync-store TypeError. Local merged coverage on the touched files: codec_pipeline.py 85.4->92.2%, sharding.py 92.4->96.7%, abc/store.py 93.8->95.3%. Assisted-by: ClaudeCode:claude-opus-4.8
ilan-gold
left a comment
There was a problem hiding this comment.
A few small comments, may do them myself.
| """Encode a full shard synchronously. | ||
|
|
||
| Sync counterpart to `_encode_single`. This is reached when a | ||
| `ShardingCodec` is an *inner* codec of another sharding codec (nested |
There was a problem hiding this comment.
Is this true? I hit this path when I ran test_create_array_with_data_num_gets which looks pretty un-nested
| } | ||
|
|
||
| def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: | ||
| # Thread the spec through the inner chain (`evolve_codecs`): each codec |
There was a problem hiding this comment.
Should this and others be real docstrings?
There was a problem hiding this comment.
yeah I think it should be
| else: | ||
| shard_dict = dict.fromkeys(morton_order_iter(chunks_per_shard)) | ||
|
|
||
| from zarr.core.codec_pipeline import merge_and_encode_chunk |
There was a problem hiding this comment.
What do you think about moving these helpers that currently exist in and are shared from codec_pipeline into their own file?
There was a problem hiding this comment.
I think that's a good idea
|
|
||
| Returns `None` for an all-empty shard (no chunks present). | ||
| """ | ||
| layout = self._build_shard_layout(shard_dict, chunks_per_shard) |
There was a problem hiding this comment.
Instead of materializing layout as a list, does it make sense to have it be an iterator? This could help save on memory, but not sure.
There was a problem hiding this comment.
as long as we are sure we aren't consuming the iterator, then passing the (dead, empty) iterator off to a consumer, then sure
| raise ValueError(f"Unrecognized subchunk write order: {subchunk_write_order!r}.") | ||
| return subchunk_iter | ||
|
|
||
| def _decode_full_shard_bulk( |
There was a problem hiding this comment.
| def _decode_full_shard_bulk( | |
| def _decode_full_shard_bulk_if_uncompressed( |
or something. Then you probably don't need such a giant comment in _decode_partial_sync
This PR defines a new codec pipeline class called
PhasedCodecPipelinethat enables much higher performance for chunk encoding and decoding than the currentBatchedCodecPipeline.The approach here is to completely ignore how the v3 spec defines array -> bytes codecs 😆. Instead of treating codecs as functions that mix IO and compute, we treat codec encoding and decoding as a sequence:
fetch exactly what we need to fetch from storage, given the codecs we have. So if there's a sharding codec in the first array->bytes position, the codec pipeline knows it must fetch the shard index, then fetch the involved subchunks, before passing them to compute.
Basically, we use the first array -> bytes codec to figure out what kind of preparatory IO and final IO we need to perform, and the rest of the codecs to figure out what kind of chunk encoding we need to do. Separating IO from compute in different phases makes things simpler and faster.
Happy to chat more about this direction. IMO the spec should be re-written with this framing, because it makes much more sense than trying to shoe-horn sharding in as a codec.
I don't want to make our benchmarking suite any bigger but on my laptop this codec pipeline is 2-5x faster than the batchedcodec pipeline for a lot of workloads. I can include some of those benchmarks later.
This was mostly written by claude, based on previous work in #3719. All these changes should be non-breaking, so I think this is in principle safe for us to play around with in a patch or minor release.
Edit: this PR depends on changes submitted in #3907 and #3908
Another edit: the big pitch of this PR -- separating IO from compute -- didn't end up valuable, because we ran into a large amount of overhead due to indexing / python object creation. It turns out the vast majority of the speed benefits can be had simply by avoiding async for storage backends that don't need it. see #3885 (comment) for a detailed summary of the current state of things here.