diff --git a/changes/4040.feature.md b/changes/4040.feature.md new file mode 100644 index 0000000000..7e2813ed5d --- /dev/null +++ b/changes/4040.feature.md @@ -0,0 +1 @@ +`zarr.experimental.cache_store.CacheStore` now performs negative caching by default (`cache_missing=True`, opt-out). A full-key read that finds the key absent in the source store is remembered, so repeat reads of that absent key return immediately without a source round-trip — useful for sparse arrays where most chunks resolve to the fill value. Remembered misses respect `max_age_seconds` and are evicted when the key is written via `set`/`set_if_not_exists`. Negative-cache activity is reported as `negative_hits` in `cache_stats()` and `missing_keys` in `cache_info()`. Only full-key reads are affected (not byte-range reads or `exists`). Pass `cache_missing=False` to restore the previous behavior. Like the positive cache (unbounded when `max_size is None`), the negative cache is bounded only by `max_age_seconds`; set a finite TTL for scans over very large sparse key spaces. diff --git a/src/zarr/experimental/cache_store.py b/src/zarr/experimental/cache_store.py index 1535b42f67..4205686fb6 100644 --- a/src/zarr/experimental/cache_store.py +++ b/src/zarr/experimental/cache_store.py @@ -30,8 +30,14 @@ class _CacheState: hits: int = 0 misses: int = 0 evictions: int = 0 + negative_hits: int = 0 key_insert_times: dict[_CacheEntryKey, float] = field(default_factory=dict) range_cache: dict[str, dict[ByteRequest, Buffer]] = field(default_factory=dict) + # Negative cache: full keys known to be absent in the source store, mapped to their + # (monotonic) insertion time. Used to short-circuit repeat reads of absent keys. + # Entries carry no data, so they are kept out of the byte-size accounting above; + # staleness is bounded by ``max_age_seconds``. + missing_keys: dict[str, float] = field(default_factory=dict) class CacheStore(WrapperStore[Store]): @@ -62,6 +68,26 @@ class CacheStore(WrapperStore[Store]): Note: Individual values larger than max_size will not be cached. cache_set_data : bool, optional Whether to cache data when it's written to the store. Default is True. + cache_missing : bool, optional + Whether to remember full-key misses (negative caching). When True, a full-key + ``get`` that finds the key absent in the source store records that absence, so + subsequent ``get``s for the same key return ``None`` without a source round-trip. + This benefits repeated reads of sparse arrays (most chunks absent). Negative + entries respect ``max_age_seconds`` and are evicted when the key is written + (``set``/``set_if_not_exists``). Only full-key reads are affected (not byte-range + reads or ``exists``). Default is True. + + Notes: + + - With ``max_age_seconds="infinity"`` (the default) a remembered miss never + expires, so a key written to the source by another process stays invisible + through this cache. Pair ``cache_missing=True`` with a finite + ``max_age_seconds`` if the source may be written concurrently. + - Like the positive cache (which is unbounded when ``max_size is None``), the + negative cache is bounded only by ``max_age_seconds``. With an infinite TTL, + a scan over a very large sparse key space will accumulate one small entry per + absent key. Set a finite ``max_age_seconds`` (or ``cache_missing=False``) for + such workloads. Examples -------- @@ -91,6 +117,7 @@ class CacheStore(WrapperStore[Store]): max_age_seconds: int | Literal["infinity"] max_size: int | None cache_set_data: bool + cache_missing: bool _state: _CacheState def __init__( @@ -101,6 +128,7 @@ def __init__( max_age_seconds: int | str = "infinity", max_size: int | None = None, cache_set_data: bool = True, + cache_missing: bool = True, ) -> None: super().__init__(store) @@ -121,6 +149,7 @@ def __init__( self.max_age_seconds = max_age_seconds self.max_size = max_size self.cache_set_data = cache_set_data + self.cache_missing = cache_missing self._state = _CacheState() def _with_store(self, store: Store) -> Self: @@ -136,6 +165,7 @@ def with_read_only(self, read_only: bool = False) -> Self: max_age_seconds=self.max_age_seconds, max_size=self.max_size, cache_set_data=self.cache_set_data, + cache_missing=self.cache_missing, ) store._state = self._state return store @@ -151,6 +181,31 @@ def _is_key_fresh(self, entry_key: _CacheEntryKey) -> bool: elapsed = now - self._state.key_insert_times.get(entry_key, 0) return elapsed < self.max_age_seconds + def _is_missing_fresh(self, key: str) -> bool: + """Check if a negative (missing-key) entry is still fresh. + + Mirrors ``_is_key_fresh`` but reads the negative-cache insertion time. + """ + if self.max_age_seconds == "infinity": + return True + elapsed = time.monotonic() - self._state.missing_keys.get(key, 0.0) + return elapsed < self.max_age_seconds + + def _record_missing(self, key: str) -> None: + """Record *key* as known-missing (absent in the source store). + + Must be called while holding ``self._state.lock``. Staleness is bounded by + ``max_age_seconds`` via ``_is_missing_fresh``. + """ + self._state.missing_keys[key] = time.monotonic() + + def _evict_missing(self, key: str) -> None: + """Drop any negative entry for *key* (it is now present or being written). + + Must be called while holding ``self._state.lock``. + """ + self._state.missing_keys.pop(key, None) + async def _accommodate_value(self, value_size: int) -> None: """Ensure there is enough space in the cache for a new value. @@ -266,6 +321,10 @@ async def _cache_miss( await self._cache.delete(key) async with self._state.lock: self._remove_from_tracking(key) + # The key is absent in the source: remember the miss so a repeat + # read can short-circuit without a source round-trip. + if self.cache_missing: + self._record_missing(key) else: entry_key: _CacheEntryKey = (key, byte_range) async with self._state.lock: @@ -279,6 +338,10 @@ async def _cache_miss( if byte_range is None: await self._cache.set(key, result) await self._track_entry(key, result) + # A value now exists for this key: drop any stale negative entry. + if self.cache_missing: + async with self._state.lock: + self._evict_missing(key) else: entry_key = (key, byte_range) self._state.range_cache.setdefault(key, {})[byte_range] = result @@ -351,6 +414,16 @@ async def get( Buffer | None The retrieved data, or None if not found """ + # Negative cache fast-path (full-key reads only): a fresh "known absent" record + # short-circuits to None without consulting the positive cache or the source. + # Checked here, before the positive-entry freshness gate, because a negative-only + # key has no positive entry and would otherwise be routed straight to the source. + if self.cache_missing and byte_range is None: + async with self._state.lock: + if key in self._state.missing_keys and self._is_missing_fresh(key): + self._state.negative_hits += 1 + return None + entry_key: _CacheEntryKey = (key, byte_range) if byte_range is not None else key if not self._is_key_fresh(entry_key): return await self._get_no_cache(key, prototype, byte_range) @@ -369,9 +442,12 @@ async def set(self, key: str, value: Buffer) -> None: The data to store """ await super().set(key, value) - # Invalidate all cached byte-range entries (source data changed) + # Invalidate all cached byte-range entries (source data changed) and drop any + # negative entry — the key now has a value. async with self._state.lock: self._invalidate_range_entries(key) + if self.cache_missing: + self._evict_missing(key) if self.cache_set_data: await self._cache.set(key, value) await self._track_entry(key, value) @@ -380,6 +456,26 @@ async def set(self, key: str, value: Buffer) -> None: async with self._state.lock: self._remove_from_tracking(key) + async def set_if_not_exists(self, key: str, value: Buffer) -> None: + """ + Store data only if the key does not already exist in the source store. + + Parameters + ---------- + key : str + The key to store under + value : Buffer + The data to store + """ + await super().set_if_not_exists(key, value) + # Whether or not the write happened, any negative entry is now unsafe: either + # we just wrote the key, or it already existed (so the record was already + # wrong). Evicting unconditionally is always safe. We do not populate the + # positive cache here — there is no guaranteed-fresh value to store. + if self.cache_missing: + async with self._state.lock: + self._evict_missing(key) + async def delete(self, key: str) -> None: """ Delete data from both the underlying store and cache. @@ -407,18 +503,26 @@ def cache_info(self) -> dict[str, Any]: "max_size": self.max_size, "current_size": self._state.current_size, "cache_set_data": self.cache_set_data, + "cache_missing": self.cache_missing, "tracked_keys": len(self._state.key_insert_times), "cached_keys": len(self._state.cache_order), + "missing_keys": len(self._state.missing_keys), } def cache_stats(self) -> dict[str, Any]: - """Return cache performance statistics.""" + """Return cache performance statistics. + + ``hit_rate`` reflects positive-cache hits over positive lookups only; a + negative-cache hit (an absent key served from the negative cache) is reported + separately as ``negative_hits`` and is counted as neither a hit nor a miss. + """ total_requests = self._state.hits + self._state.misses hit_rate = self._state.hits / total_requests if total_requests > 0 else 0.0 return { "hits": self._state.hits, "misses": self._state.misses, "evictions": self._state.evictions, + "negative_hits": self._state.negative_hits, "total_requests": total_requests, "hit_rate": hit_rate, } @@ -435,7 +539,9 @@ async def clear_cache(self) -> None: self._state.cache_order.clear() self._state.key_sizes.clear() self._state.range_cache.clear() + self._state.missing_keys.clear() self._state.current_size = 0 + self._state.negative_hits = 0 def __repr__(self) -> str: """Return string representation of the cache store.""" diff --git a/tests/test_experimental/test_cache_store.py b/tests/test_experimental/test_cache_store.py index fc17ccd5e1..17ee32c5c4 100644 --- a/tests/test_experimental/test_cache_store.py +++ b/tests/test_experimental/test_cache_store.py @@ -298,8 +298,10 @@ async def test_cache_info(self, cached_store: CacheStore) -> None: "max_size", "current_size", "cache_set_data", + "cache_missing", "tracked_keys", "cached_keys", + "missing_keys", } assert set(info.keys()) == expected_keys @@ -1047,3 +1049,131 @@ async def test_delete_invalidates_cached_byte_ranges(self) -> None: # Key is gone from source result = await cached_store.get("key", proto) assert result is None + + +class TestCacheStoreNegativeCaching: + """Tests for opt-in negative (missing-key) caching (``cache_missing=True``).""" + + async def test_basic(self, monkeypatch: pytest.MonkeyPatch) -> None: + """A second get of an absent key is served from the negative cache without a + source round-trip.""" + source = MemoryStore() + cs = CacheStore(source, cache_store=MemoryStore(), cache_missing=True) + proto = default_buffer_prototype() + + calls = {"n": 0} + orig_get = source.get + + async def counting_get(*args: object, **kwargs: object) -> object: + calls["n"] += 1 + return await orig_get(*args, **kwargs) # type: ignore[arg-type] + + monkeypatch.setattr(source, "get", counting_get) + + assert await cs.get("c/0", proto) is None + assert cs.cache_info()["missing_keys"] == 1 + after_first = calls["n"] + + assert await cs.get("c/0", proto) is None + assert calls["n"] == after_first # no further source access + assert cs.cache_stats()["negative_hits"] == 1 + + async def test_enabled_by_default(self) -> None: + """Negative caching is on by default (opt-out).""" + cs = CacheStore(MemoryStore(), cache_store=MemoryStore()) + proto = default_buffer_prototype() + assert cs.cache_missing is True + assert await cs.get("c/0", proto) is None + assert await cs.get("c/0", proto) is None + assert cs.cache_info()["missing_keys"] == 1 + assert cs.cache_stats()["negative_hits"] == 1 + + async def test_can_be_disabled(self) -> None: + """With ``cache_missing=False`` nothing is remembered.""" + cs = CacheStore(MemoryStore(), cache_store=MemoryStore(), cache_missing=False) + proto = default_buffer_prototype() + assert await cs.get("c/0", proto) is None + assert await cs.get("c/0", proto) is None + assert cs.cache_info()["missing_keys"] == 0 + assert cs.cache_stats()["negative_hits"] == 0 + + async def test_evicted_on_set(self) -> None: + source = MemoryStore() + cs = CacheStore(source, cache_store=MemoryStore(), cache_missing=True) + proto = default_buffer_prototype() + assert await cs.get("c/0", proto) is None + assert cs.cache_info()["missing_keys"] == 1 + + await cs.set("c/0", CPUBuffer.from_bytes(b"value")) + assert cs.cache_info()["missing_keys"] == 0 + result = await cs.get("c/0", proto) + assert result is not None + assert result.to_bytes() == b"value" + + async def test_evicted_on_set_if_not_exists(self) -> None: + source = MemoryStore() + cs = CacheStore(source, cache_store=MemoryStore(), cache_missing=True) + proto = default_buffer_prototype() + assert await cs.get("c/0", proto) is None + assert cs.cache_info()["missing_keys"] == 1 + + await cs.set_if_not_exists("c/0", CPUBuffer.from_bytes(b"value")) + assert cs.cache_info()["missing_keys"] == 0 + result = await cs.get("c/0", proto) + assert result is not None + assert result.to_bytes() == b"value" + + async def test_respects_ttl(self) -> None: + """A negative entry expires after ``max_age_seconds`` so a key written to the + source out-of-band becomes visible again.""" + source = MemoryStore() + cs = CacheStore(source, cache_store=MemoryStore(), cache_missing=True, max_age_seconds=1) + proto = default_buffer_prototype() + assert await cs.get("c/0", proto) is None + + # an external writer adds the key directly to the source store + await source.set("c/0", CPUBuffer.from_bytes(b"late")) + + # before TTL: still reported missing from the negative cache + assert await cs.get("c/0", proto) is None + await asyncio.sleep(1.1) + + # after TTL: the stale negative entry is bypassed, source is consulted + result = await cs.get("c/0", proto) + assert result is not None + assert result.to_bytes() == b"late" + assert cs.cache_info()["missing_keys"] == 0 + + async def test_byte_range_unaffected(self) -> None: + """Byte-range misses do not populate the negative cache.""" + cs = CacheStore(MemoryStore(), cache_store=MemoryStore(), cache_missing=True) + proto = default_buffer_prototype() + assert await cs.get("c/0", proto, byte_range=RangeByteRequest(0, 4)) is None + assert cs.cache_info()["missing_keys"] == 0 + + async def test_stats_and_info(self) -> None: + """``negative_hits``/``missing_keys``/``cache_missing`` are surfaced and the + positive ``hit_rate`` is unaffected by negative hits.""" + source = MemoryStore() + cs = CacheStore(source, cache_store=MemoryStore(), cache_missing=True) + proto = default_buffer_prototype() + + await cs.set("present", CPUBuffer.from_bytes(b"x")) + assert (await cs.get("present", proto)) is not None # positive hit + assert await cs.get("absent", proto) is None # records miss + assert await cs.get("absent", proto) is None # negative hit + + info = cs.cache_info() + stats = cs.cache_stats() + assert info["cache_missing"] is True + assert info["missing_keys"] == 1 + assert stats["negative_hits"] == 1 + assert stats["hits"] == 1 + assert stats["misses"] == 1 # negative hit counts as neither hit nor miss + assert stats["hit_rate"] == 0.5 + + async def test_delete_does_not_record(self) -> None: + """Deleting a key does not create a negative entry (deletion != checked-absent).""" + cs = CacheStore(MemoryStore(), cache_store=MemoryStore(), cache_missing=True) + await cs.delete("c/0") + assert cs.cache_info()["missing_keys"] == 0