feat: pluggable RangeJob sources for repackage (cdx / sql{athena,duckdb} / csv)#73
Draft
malteos wants to merge 10 commits into
Draft
feat: pluggable RangeJob sources for repackage (cdx / sql{athena,duckdb} / csv)#73malteos wants to merge 10 commits into
malteos wants to merge 10 commits into
Conversation
…}/csv)
Rename the warc_by_cdx command to 'repackage' and make the range-job stage
pluggable behind a RangeJobSource abstraction:
- sources/: base (RangeJobSource + CostEstimate), sql_base (shared query
builder + crawl resolution), cdx, athena, duckdb (optional dep), csv
(reader + RangeJobCsvWriter), and a make_source factory.
- CLI: --target-source {cdx,sql,csv} with --engine {athena,duckdb}; shared
--hostnames/--query/--query-file; --duckdb-index-path; --csv-path; CSV
materialization via --range-jobs-output/--no-fetch/--csv-self-contained;
generalized --confirm-cost guard.
- WARCFilter: takes an injected source; orchestrator owns queueing, the
record limit, counting, and _STOP emission in a finally (fixes the prior
hung-readers bug when a source raised). Sources own their own stage-1
client/connection; WARCFilter manages only read/write S3 clients.
- DuckDB reads the CC columnar parquet directly via read_parquet with
per-crawl partition globbing; Athena unchanged in behaviour.
- setup.py: cdx_toolkit[duckdb] extra; conftest: requires_duckdb.
Tests: unit (sql_base, csv round-trip, make_source, confirm_cost, producer
stop-sentinel regression, --no-fetch); CSV round-trip e2e from the CDX
fixture; gated Athena/DuckDB e2e (CC-MAIN-2026-17/commoncrawl.org, single
partition). DuckDB e2e verified live; 205 passed.
Extend the guided SQL filter (athena + duckdb) to match on url_host_registered_domain in addition to url_host_name. --hostnames and --domains can be combined (OR-ed); at least one (or a raw --query) is required. TLD optimizer hint is derived from both. Also make the DuckDB source resilient to transient S3 read timeouts (http_timeout/http_retries). Tests: unit coverage for domain-only / combined host+domain query building (athena + duckdb) and factory validation; gated DuckDB domain e2e (CC-MAIN-2026-17 / commoncrawl.org, --limit 10) verified live.
A raw --query/--query-file can SELECT analysis columns (e.g. content_languages) beyond the required warc_filename/offset/length; those extra columns now flow through to the materialized range-jobs CSV (RangeJob.extra -> lazy CSV header). Rename the CSV fetch-job columns to the index's WARC names (warc_filename, warc_record_offset, warc_record_length; warc_url for self-contained) so they never collide with the index's own columns (e.g. the page-URL 'url' column). The CSV reader auto-detects mode from warc_url/warc_filename and warns+prefers warc_url if both are present.
… locality Group records of the same WARC file with ascending offsets to improve S3 range-read locality (and to enable later coalescing). Applied as an ORDER BY on guided SQL queries (Athena + DuckDB) and as an in-memory sort when loading a CSV source. Raw --query and cdx sources keep their order. New --no-sort-ranges opts out (e.g. already-sorted / very large CSV, or to preserve original order).
Add `cdxt repackage --processes N`: shard range jobs by warc_filename across N worker processes (one asyncio event loop per CPU core), then merge the per-process shards into a single <prefix>.warc.gz with one warcinfo record (server-side S3 UploadPartCopy, or fsspec streaming locally). Reads from s3://commoncrawl in-region. The work is many small independent range-GETs; the limiter is request-rate on one core. Multi-process gives ~2.6x on a 4-vCPU c5n.xlarge (~457 -> ~1130 rec/s). - one writer per process; removed the multi-writer-per-process fan-out (num_writers / fetcher_to_consumer_ratio / --parallel_writers) and the now-vestigial writer_id segment from output filenames - only the first shard writes the warcinfo; all shards share one canonical WARC-Record-ID and the warcinfo filename names the merged file - optional uvloop event loop via CDXT_UVLOOP=1 (~+8% single-core) - new modules: filter_warc/multiprocess.py, filter_warc/merge.py - docs: README repackage section, CHANGELOG, docs/notes/warc-fetcher-performance.md - tests updated for the new filename scheme and single-writer model Claude-Session: https://claude.ai/code/session_011XtVGuu26tiQpAbdPhnixk
Add hf:// support to repackage: - read WARC ranges from an HF Storage Bucket via --warc-download-prefix=hf://buckets/<ns>/<name> with two backends (--hf-reader): 'fsspec' (HfFileSystem.cat_file in a thread) and 'cdn' (async aiohttp ranged GET of the CDN-fronted resolve URL). - write repackage output back to an hf:// bucket: per-process shards are staged locally and merged into the bucket via fsspec (the WARC writer layer stays S3/local-only). - new hf_utils module (is_hf_url, hf path/resolve-URL mapping, the two readers); reader threaded through filter_async/_run_filter_pipeline/read_warc_records via an AsyncExitStack. - single-process hf:// output is routed through the multiprocess stage+merge path. - optional 'hf' extra (huggingface_hub, aiohttp); offline unit tests for the URL helpers. Byte-identical to s3:// reads (verified); both HF arms validated end-to-end.
The cdn branch of make_hf_reader yielded the reader inside the aiohttp ClientSession context but never invoked reader.aclose(), so the per-process "HF resolver throttling: N x 429, X.Xs total" summary line was lost. Tools that grep for it (e.g. read-arms.sh's throttle_report) saw zero.
The S3 shard merge issued UploadPartCopy calls sequentially, one part per shard. A single server-side copy runs at only ~20 MB/s, so merging 16 shards of a 10 GiB job took ~9 min and dominated end-to-end wall (fetch was ~2 min). Run the copies concurrently across one multipart upload (boto3 clients are thread-safe; ThreadPoolExecutor.map preserves part order and re-raises the first failure to trip the abort). Also slice each shard into _PART_TARGET-sized ranged parts: raises concurrency further and keeps every part under the 5 GiB UploadPartCopy ceiling, so shards >5 GiB (a 100 GiB job has ~6 GiB shards) now merge instead of failing. Part size / concurrency tunable via CDXT_MERGE_PART_MIB and CDXT_MERGE_CONCURRENCY. Adds _plan_parts() with unit tests covering byte coverage, ordering, the >=5 MiB non-final-part rule, small-tail folding, and the 5 GiB ceiling.
…ARCs)
Multi-process repackage hardcoded the worker to max_file_size=None ('never
rotate') and merged every shard into one <prefix>.warc.gz, so --size (default
1 GB) was ignored -- a 100 GiB job produced a single 100 GiB file. Single-process
mode rotates correctly into <prefix>-NNN.warc.gz, each <= --size.
Now multi-process matches: command.py passes args.size through; each worker
rotates at --size and writes self-contained WARCs (own warcinfo per file, fresh
unique WARC-Record-ID); after fetch, all rotated shard files are renumbered into
one global <prefix>-NNN.warc.gz series via a concurrent copy pass (merge.copy_object:
server-side S3 CopyObject, multipart for >5 GiB, fsspec for local/hf). The single
shared-warcinfo merge is gone. The last file of each shard may be short, so up to
N short files appear at shard boundaries (a target size, as single-process).
Verified end-to-end: 1 GiB job, 4 procs, --size 200 MB -> 8 files (4 @ ~200 MB +
4 shard-tail), each 1 warcinfo, 35,700 responses total. Adds unit tests for the
output naming and shard-file collection.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Makes the WARC-repackaging command's range-job stage pluggable behind a
RangeJobSourceabstraction, and renames the command
warc_by_cdx→repackage(it now repackages WARC rangesfrom several sources, not just CDX).
Range jobs (a WARC file + byte range) can now come from:
--target-source--enginecdxsqlathenasqlduckdbread_parquet)csvAthena and DuckDB share one SQL core (
sources/sql_base.py); they differ only in theFROMclauseand execution. The pipeline orchestrator now owns queueing, the record limit, counting, and
stop-sentinel emission (in a
finally) — which also fixes a latent bug where a source raisingmid-run left the WARC readers hung forever. Each source owns its own stage-1 client/connection;
WARCFiltermanages only the read/write S3 clients.It also adds multi-process fetching (
--processes N): range jobs are sharded across workerprocesses (one asyncio event loop per core) and the per-process shards are merged into a single
<prefix>.warc.gzwith onewarcinforecord — ~2.6× throughput on a 4-vCPU node. SeeMulti-core fetching & single merged output below.
CLI usage
Global flags (
--crawl,--limit) come before therepackagesubcommand; source flags come after.CDX files (default)
cdxt repackage \ --target-source cdx \ --cdx-path filtered_CC-MAIN-2024-30.cdx.gz \ --prefix ./out/EXAMPLE \ --warc-download-prefix https://data.commoncrawl.org # multiple indices via glob: cdxt repackage --target-source cdx --cdx-path s3://bucket/cdx/ --cdx-glob '*.cdx.gz' --prefix ./out/EXAMPLESQL — Athena (guided by hostnames/domains, pruned by crawl)
cdxt --crawl CC-MAIN-2026-17 repackage \ --target-source sql --engine athena \ --hostnames commoncrawl.org www.commoncrawl.org \ --athena-database ccindex \ --athena-s3-output s3://my-bucket/athena-results/ \ --prefix s3://my-bucket/out/CC \ --warc-download-prefix s3://commoncrawlThe guided filter matches on
url_host_name(exact host) via--hostnamesand/orurl_host_registered_domainvia--domains(which also covers subdomains). They can be combined(predicates are OR-ed):
# every host under the example.com registered domain, plus one exact extra host cdxt --crawl CC-MAIN-2026-17 repackage \ --target-source sql --engine duckdb \ --domains example.com \ --hostnames blog.example.org \ --prefix ./out/EXSQL — DuckDB (reads the public parquet directly; no Athena charge)
cdxt --crawl CC-MAIN-2026-17 repackage \ --target-source sql --engine duckdb \ --hostnames commoncrawl.org \ --prefix ./out/CC # requires the optional dependency: pip install cdx_toolkit[duckdb]SQL — raw query (power users)
The query must
SELECT warc_filename, warc_record_offset, warc_record_length:cdxt repackage \ --target-source sql --engine athena \ --query "SELECT warc_filename, warc_record_offset, warc_record_length FROM ccindex WHERE subset = 'warc' AND crawl = 'CC-MAIN-2026-17' AND url_host_registered_domain = 'commoncrawl.org' AND content_mime_type = 'application/pdf' LIMIT 5000" \ --athena-database ccindex \ --athena-s3-output s3://my-bucket/athena-results/ \ --prefix ./out/PDFS \ --confirm-cost # or load it from a file with --query-file ./my_query.sqlCSV (consume a previously materialized range-jobs file)
cdxt repackage \ --target-source csv \ --csv-path ranges.csv \ --prefix ./out/CC \ --warc-download-prefix https://data.commoncrawl.orgCost guard
SQL index scans bill by data scanned (Athena: ~$5/TB), so
repackageprompts before a potentiallyexpensive query. It runs without a prompt only when the query is restricted to ≤ 10 crawls.
Otherwise (no
--crawl→ all crawls, > 10 crawls, or a raw--querywhose pruning can't beverified) it asks for confirmation; in a non-interactive shell it aborts unless
--confirm-costispassed.
cdxandcsvsources never prompt.Range-jobs CSV (materialization)
Any source can write its resolved range jobs to a CSV with
--range-jobs-output. Add--no-fetchto only produce the CSV (cheap — no WARC download), then consume it later. This decouples the
(possibly expensive) index query from extraction, makes runs reproducible/shareable, and lets the
whole reader/writer pipeline be tested without AWS.
CSV formats
Default (relative filename) — the consumer prepends
--warc-download-prefix:Self-contained (
--csv-self-contained) — full URLs, used as-is (no prefix needed). The readerauto-detects the mode from the header (
warc_urlvswarc_filename); if both are present it warnsand uses
warc_url:.tsv/.tsv.gzinputs are read as tab-delimited;.gzinputs are decompressed.Extra columns from a raw
--query— any column a raw--query/--query-fileSELECTs beyond therequired
warc_filename, warc_record_offset, warc_record_lengthis carried through to therange-jobs CSV (handy for later analysis), e.g. adding
content_languages:The fetch-job columns are named
warc_*precisely so they never collide with the index's owncolumns (e.g. the page-URL column
urlflows through as an ordinary extra column, distinct fromwarc_url). These extra columns don't affect fetching.Fetch-time ordering
Range jobs are sorted by
(warc_filename, warc_record_offset)by default, grouping records of thesame WARC file with ascending offsets for better S3 range-read locality (and to enable coalescing
adjacent ranges later). This is applied as an
ORDER BYon a guided SQL query (Athena/DuckDB) and asan in-memory sort when loading a CSV source. A raw
--query/--query-fileis left untouched (orderit yourself), and
cdxsources keep their native SURT order. Pass--no-sort-rangesto disable —useful for an already-sorted or very large CSV (the CSV sort buffers all rows in memory), or to
preserve a source's original order.
Multi-core fetching & single merged output
The fetcher is asyncio-based — many concurrent range reads, but a single event loop on one CPU
core, so for large in-region jobs the limiter is request-rate on that core.
repackagenow takes--processes N(set it to the vCPU count): it shards range jobs bywarc_filenameacross Nworker processes, each with its own event loop and a single writer.
--parallel_readers Rsets theasync readers per process.
The per-process shards are then merged into a single
<prefix>.warc.gz(server-side via S3UploadPartCopy, or fsspec streaming locally) with onewarcinforecord. Only the first shard'swarcinfosurvives the merge, all shards share one canonicalWARC-Record-ID, and the warcinfofilename names the merged file. Pass
--keep-shardsto keep the per-process shards instead.CDXT_UVLOOP=1 cdxt -v repackage \ --target-source csv --csv-path range-jobs.csv \ --prefix s3://my-bucket/path/homepages \ --warc-download-prefix=s3://commoncrawl \ --processes 4 --parallel_readers 48 # -> writes a single s3://my-bucket/path/homepages.warc.gzOn an AWS EC2
c5n.xlarge(4 vCPU) in us-east-1 reading froms3://commoncrawl, a ~1 GiB /~35k-record homepages job goes from ~457 rec/s (1 core) to ~1130 rec/s (~2.6×, scaling
linearly+8%) single-core speedup. Seewith
--processes). SettingCDXT_UVLOOP=1uses the uvloopevent loop (install
uvloopseparately) for a small (docs/notes/warc-fetcher-performance.md for the full
analysis.
Optional dependency
DuckDB is optional:
pip install cdx_toolkit[duckdb]. Without it, the other sources work unchanged;selecting
--engine duckdbraises a clear error.Testing
--domains/ combined host+domain),escape_sql_literalinjection rejection, CSVreader/writer round-trip in both modes + header auto-detection, the
make_sourcefactoryvalidation, the
confirm_costguard,--no-fetch, and a regression test that the producer stillreleases readers (
_STOP) when a source raises.CC-MAIN-2026-17/commoncrawl.org(single partition — cheap, never an all-crawls scan), including a DuckDB
--domainsrun.tests/filter_warc/updated for the single-writer model and the newmerged
<prefix>.warc.gzfilename scheme.Verified locally: full suite green; DuckDB e2e ran live and passed; all 6 S3
aioboto3 read/write tests pass; flake8 clean. (Athena gated test skips where Athena permissions
aren't available; its code path is covered by unit tests and shares the orchestration/fetch path
proven by the DuckDB e2e.)