Bug report
Bug description:
When showwarning is patched (most commonly by setting logging.captureWarnings(True), the warnings.catch_warnings context manager still isn't thread safe with context_aware_warnings enabled.
In Python 3.14 even with context_aware_warnings flag enabled, there is a bug in asyncio / multithread contexts when using warnings.catch_warnings and logging.captureWarnings(True) since both attempt to patch warnings.showwarning. Write me a minimal reproducible example of this issue to share with the cpython team.
"""
MRE: catch_warnings + captureWarnings race on warnings.showwarning
Python 3.14+ with context_aware_warnings enabled.
Both warnings.catch_warnings() and logging.captureWarnings(True) save/restore
warnings.showwarning by *value at call time* — a plain reference copy. Under
concurrent execution the save/restore pairs interleave, leaking the wrong
showwarning into the global module state.
Scenario that demonstrates the interleaving:
T1 catch_warnings.__enter__ → saved = showwarning (= warn_orig)
T1 captureWarnings(True) → saved = showwarning (= warn_orig)
showwarning = log_showwarning
T2 catch_warnings.__enter__ → saved = showwarning (= log_showwarning ← T1's patch!)
T2 captureWarnings(True) → showwarning = log_showwarning
T1 captureWarnings(False) → showwarning = warn_orig (correct)
T1 catch_warnings.__exit__ → showwarning = warn_orig (correct, saved in step 1)
T2 captureWarnings(False) → showwarning = log_showwarning (BUG — saved T1's patch)
T2 catch_warnings.__exit__ → showwarning = log_showwarning (BUG — persists)
After all threads exit, warnings.showwarning is still logging.showwarning even
though every thread called captureWarnings(False).
Two reproductions:
1. ThreadPoolExecutor (preemptive — natural race)
2. asyncio with explicit yield points (cooperative — deterministic)
"""
import asyncio
import concurrent.futures
import logging
import threading
import warnings
import sys
try:
warnings._context_aware_warnings = True
except AttributeError:
sys.exit("Python 3.14+ required for context_aware_warnings")
logging.basicConfig(level=logging.WARNING, force=True)
ORIGINAL_SHOWWARNING = warnings.showwarning
# ── Reproduction 1: threads (preemptive) ──────────────────────────────
_thread_errors: list[str] = []
_thread_lock = threading.Lock()
def _thread_worker(n: int) -> None:
with warnings.catch_warnings():
warnings.simplefilter("always")
logging.captureWarnings(True)
warnings.warn(f"thread worker {n}", UserWarning)
logging.captureWarnings(False)
if warnings.showwarning is not ORIGINAL_SHOWWARNING:
with _thread_lock:
_thread_errors.append(
f"worker {n}: showwarning is {warnings.showwarning!r} "
f"instead of {ORIGINAL_SHOWWARNING!r}"
)
def reproduce_with_threads() -> None:
for _ in range(200):
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as pool:
list(pool.map(_thread_worker, range(32)))
# ── Reproduction 2: asyncio (cooperative with explicit yield points) ──
_async_errors: list[str] = []
_async_lock = threading.Lock()
# Barriers to orchestrate the interleaving shown at the top of this file.
_barrier_1 = asyncio.Event() # T2 waits for T1 to finish captureWarnings(True)
_barrier_2 = asyncio.Event() # T1 waits for T2 to finish catch_warnings __enter__
_barrier_3 = asyncio.Event() # T2 waits for T1 to exit catch_warnings
async def _async_worker_a(n: int) -> None:
"""Task A: the "early" task that sets captureWarnings before B enters."""
with warnings.catch_warnings():
warnings.simplefilter("always")
logging.captureWarnings(True)
_barrier_1.set() # T1: captureWarnings(True) done
await asyncio.sleep(0) # yield so B enters catch_warnings
await _barrier_2.wait() # wait until B has done __enter__
logging.captureWarnings(False)
# __exit__ restores showwarning to what it was on __enter__ (= original)
_barrier_3.set() # signal B to continue
async def _async_worker_b(n: int) -> None:
"""Task B: enters catch_warnings while A's captureWarnings(True) is active."""
await _barrier_1.wait() # wait for A's captureWarnings(True)
with warnings.catch_warnings():
# __enter__ saves showwarning — which is currently A's logging.showwarning!
_barrier_2.set() # signal A to continue
await asyncio.sleep(0) # yield so A calls captureWarnings(False) + __exit__
await _barrier_3.wait() # A is done
logging.captureWarnings(True)
await asyncio.sleep(0)
logging.captureWarnings(False)
# __exit__ restores showwarning to what it was on __enter__ (= logging.showwarning)
# At this point showwarning is logging.showwarning — even though
# captureWarnings(False) was called. This is wrong.
if warnings.showwarning is not ORIGINAL_SHOWWARNING:
async with _async_lock:
_async_errors.append(
f"async scenario: showwarning is {warnings.showwarning!r} "
f"instead of {ORIGINAL_SHOWWARNING!r}"
)
async def reproduce_with_asyncio() -> None:
_barrier_1.clear()
_barrier_2.clear()
_barrier_3.clear()
await asyncio.gather(_async_worker_a(0), _async_worker_b(0))
# ── Report ────────────────────────────────────────────────────────────
def _report(backend: str, errors: list[str]) -> None:
print(f"\n{'─'*70}")
print(f"[{backend}]")
print(f"{'─'*70}")
if errors:
print(f" BUG REPRODUCED — {len(errors)} occurrence(s):")
for err in errors:
print(f" • {err}")
else:
print(" No bug detected (race is timing-dependent; try again)")
sw = warnings.showwarning
print(f" warnings.showwarning: original={ORIGINAL_SHOWWARNING!r}, current={sw!r}")
print(f" {'✓ match' if sw is ORIGINAL_SHOWWARNING else '✗ LEAKED'}")
def main() -> None:
reproduce_with_threads()
_report("ThreadPoolExecutor (200×32 workers)", _thread_errors)
_async_errors.clear()
for _ in range(20):
asyncio.run(reproduce_with_asyncio())
_report("asyncio (20 deterministic interleavings)", _async_errors)
if __name__ == "__main__":
main()
CPython versions tested on:
3.14
Operating systems tested on:
macOS
Bug report
Bug description:
When
showwarningis patched (most commonly by settinglogging.captureWarnings(True), thewarnings.catch_warningscontext manager still isn't thread safe withcontext_aware_warningsenabled.CPython versions tested on:
3.14
Operating systems tested on:
macOS