Skip to content

feat: pluggable RangeJob sources for repackage (cdx / sql{athena,duckdb} / csv)#73

Draft
malteos wants to merge 10 commits into
feat/warc-by-cdxfrom
feat/warc-range-sources
Draft

feat: pluggable RangeJob sources for repackage (cdx / sql{athena,duckdb} / csv)#73
malteos wants to merge 10 commits into
feat/warc-by-cdxfrom
feat/warc-range-sources

Conversation

@malteos

@malteos malteos commented Jun 12, 2026

Copy link
Copy Markdown
Collaborator

Summary

Makes the WARC-repackaging command's range-job stage pluggable behind a RangeJobSource
abstraction, and renames the command warc_by_cdxrepackage (it now repackages WARC ranges
from several sources, not just CDX).

Range jobs (a WARC file + byte range) can now come from:

--target-source --engine Where range jobs come from
cdx CDX index files (local or S3, via fsspec)
sql athena CC columnar index via AWS Athena
sql duckdb CC columnar index parquet on S3 via DuckDB (read_parquet)
csv a range-jobs CSV/TSV (e.g. produced by a previous run)

Athena and DuckDB share one SQL core (sources/sql_base.py); they differ only in the FROM clause
and execution. The pipeline orchestrator now owns queueing, the record limit, counting, and
stop-sentinel emission (in a finally) — which also fixes a latent bug where a source raising
mid-run left the WARC readers hung forever. Each source owns its own stage-1 client/connection;
WARCFilter manages only the read/write S3 clients.

It also adds multi-process fetching (--processes N): range jobs are sharded across worker
processes (one asyncio event loop per core) and the per-process shards are merged into a single
<prefix>.warc.gz
with one warcinfo record — ~2.6× throughput on a 4-vCPU node. See
Multi-core fetching & single merged output below.

Base branch: this PR targets feat/warc-by-cdx (PR #54), not main.

CLI usage

Global flags (--crawl, --limit) come before the repackage subcommand; source flags come after.

CDX files (default)

cdxt repackage \
    --target-source cdx \
    --cdx-path filtered_CC-MAIN-2024-30.cdx.gz \
    --prefix ./out/EXAMPLE \
    --warc-download-prefix https://data.commoncrawl.org
# multiple indices via glob:
cdxt repackage --target-source cdx --cdx-path s3://bucket/cdx/ --cdx-glob '*.cdx.gz' --prefix ./out/EXAMPLE

SQL — Athena (guided by hostnames/domains, pruned by crawl)

cdxt --crawl CC-MAIN-2026-17 repackage \
    --target-source sql --engine athena \
    --hostnames commoncrawl.org www.commoncrawl.org \
    --athena-database ccindex \
    --athena-s3-output s3://my-bucket/athena-results/ \
    --prefix s3://my-bucket/out/CC \
    --warc-download-prefix s3://commoncrawl

The guided filter matches on url_host_name (exact host) via --hostnames and/or
url_host_registered_domain via --domains (which also covers subdomains). They can be combined
(predicates are OR-ed):

# every host under the example.com registered domain, plus one exact extra host
cdxt --crawl CC-MAIN-2026-17 repackage \
    --target-source sql --engine duckdb \
    --domains example.com \
    --hostnames blog.example.org \
    --prefix ./out/EX

SQL — DuckDB (reads the public parquet directly; no Athena charge)

cdxt --crawl CC-MAIN-2026-17 repackage \
    --target-source sql --engine duckdb \
    --hostnames commoncrawl.org \
    --prefix ./out/CC
# requires the optional dependency:  pip install cdx_toolkit[duckdb]

SQL — raw query (power users)

The query must SELECT warc_filename, warc_record_offset, warc_record_length:

cdxt repackage \
    --target-source sql --engine athena \
    --query "SELECT warc_filename, warc_record_offset, warc_record_length
             FROM ccindex
             WHERE subset = 'warc'
               AND crawl = 'CC-MAIN-2026-17'
               AND url_host_registered_domain = 'commoncrawl.org'
               AND content_mime_type = 'application/pdf'
             LIMIT 5000" \
    --athena-database ccindex \
    --athena-s3-output s3://my-bucket/athena-results/ \
    --prefix ./out/PDFS \
    --confirm-cost
# or load it from a file with --query-file ./my_query.sql

CSV (consume a previously materialized range-jobs file)

cdxt repackage \
    --target-source csv \
    --csv-path ranges.csv \
    --prefix ./out/CC \
    --warc-download-prefix https://data.commoncrawl.org

Cost guard

SQL index scans bill by data scanned (Athena: ~$5/TB), so repackage prompts before a potentially
expensive query. It runs without a prompt only when the query is restricted to ≤ 10 crawls.
Otherwise (no --crawl → all crawls, > 10 crawls, or a raw --query whose pruning can't be
verified) it asks for confirmation; in a non-interactive shell it aborts unless --confirm-cost is
passed. cdx and csv sources never prompt.

WARNING  This duckdb query is not restricted to specific crawls ... and may scan ALL crawls.
Index SQL scans can be expensive (Athena bills per TB scanned). Proceed? [y/N]

Range-jobs CSV (materialization)

Any source can write its resolved range jobs to a CSV with --range-jobs-output. Add --no-fetch
to only produce the CSV (cheap — no WARC download), then consume it later. This decouples the
(possibly expensive) index query from extraction, makes runs reproducible/shareable, and lets the
whole reader/writer pipeline be tested without AWS.

# 1) produce the ranges once (cheap, single-crawl query, no WARC fetch)
cdxt --crawl CC-MAIN-2026-17 repackage \
    --target-source sql --engine duckdb --hostnames commoncrawl.org \
    --range-jobs-output ranges.csv --no-fetch

# 2) fetch + repackage from the CSV, as many times as you like
cdxt repackage --target-source csv --csv-path ranges.csv \
    --warc-download-prefix https://data.commoncrawl.org --prefix ./out/CC

CSV formats

Default (relative filename) — the consumer prepends --warc-download-prefix:

warc_filename,warc_record_offset,warc_record_length
crawl-data/CC-MAIN-2026-17/segments/1234567890.12/warc/CC-MAIN-20260101000000-20260101010000-00000.warc.gz,111440525,9754
crawl-data/CC-MAIN-2026-17/segments/1234567890.12/warc/CC-MAIN-20260101000000-20260101010000-00001.warc.gz,98231,12044

Self-contained (--csv-self-contained) — full URLs, used as-is (no prefix needed). The reader
auto-detects the mode from the header (warc_url vs warc_filename); if both are present it warns
and uses warc_url:

warc_url,warc_record_offset,warc_record_length
https://data.commoncrawl.org/crawl-data/CC-MAIN-2026-17/segments/1234567890.12/warc/CC-MAIN-...-00000.warc.gz,111440525,9754
s3://commoncrawl/crawl-data/CC-MAIN-2026-17/segments/1234567890.12/warc/CC-MAIN-...-00001.warc.gz,98231,12044

.tsv/.tsv.gz inputs are read as tab-delimited; .gz inputs are decompressed.

Extra columns from a raw --query — any column a raw --query/--query-file SELECTs beyond the
required warc_filename, warc_record_offset, warc_record_length is carried through to the
range-jobs CSV (handy for later analysis), e.g. adding content_languages:

warc_filename,warc_record_offset,warc_record_length,content_languages
crawl-data/CC-MAIN-2026-17/segments/1234567890.12/warc/CC-MAIN-...-00000.warc.gz,111440525,9754,eng

The fetch-job columns are named warc_* precisely so they never collide with the index's own
columns (e.g. the page-URL column url flows through as an ordinary extra column, distinct from
warc_url). These extra columns don't affect fetching.

Fetch-time ordering

Range jobs are sorted by (warc_filename, warc_record_offset) by default, grouping records of the
same WARC file with ascending offsets for better S3 range-read locality (and to enable coalescing
adjacent ranges later). This is applied as an ORDER BY on a guided SQL query (Athena/DuckDB) and as
an in-memory sort when loading a CSV source. A raw --query/--query-file is left untouched (order
it yourself), and cdx sources keep their native SURT order. Pass --no-sort-ranges to disable —
useful for an already-sorted or very large CSV (the CSV sort buffers all rows in memory), or to
preserve a source's original order.

Multi-core fetching & single merged output

The fetcher is asyncio-based — many concurrent range reads, but a single event loop on one CPU
core, so for large in-region jobs the limiter is request-rate on that core. repackage now takes
--processes N (set it to the vCPU count): it shards range jobs by warc_filename across N
worker processes, each with its own event loop and a single writer. --parallel_readers R sets the
async readers per process.

The per-process shards are then merged into a single <prefix>.warc.gz (server-side via S3
UploadPartCopy, or fsspec streaming locally) with one warcinfo record. Only the first shard's
warcinfo survives the merge, all shards share one canonical WARC-Record-ID, and the warcinfo
filename names the merged file. Pass --keep-shards to keep the per-process shards instead.

CDXT_UVLOOP=1 cdxt -v repackage \
    --target-source csv --csv-path range-jobs.csv \
    --prefix s3://my-bucket/path/homepages \
    --warc-download-prefix=s3://commoncrawl \
    --processes 4 --parallel_readers 48
# -> writes a single s3://my-bucket/path/homepages.warc.gz

On an AWS EC2 c5n.xlarge (4 vCPU) in us-east-1 reading from s3://commoncrawl, a ~1 GiB /
~35k-record homepages job goes from ~457 rec/s (1 core) to ~1130 rec/s (~2.6×, scaling linearly
with --processes). Setting CDXT_UVLOOP=1 uses the uvloop
event loop (install uvloop separately) for a small (
+8%) single-core speedup. See
docs/notes/warc-fetcher-performance.md for the full
analysis.

Output naming / removed flags: output is now a single merged <prefix>.warc.gz rather than
numbered per-writer files. The old per-process writer fan-out (--parallel_writers / num_writers
/ fetcher_to_consumer_ratio) and the writer_id segment in output filenames have been removed —
each process has exactly one writer. New modules: filter_warc/multiprocess.py,
filter_warc/merge.py.

Optional dependency

DuckDB is optional: pip install cdx_toolkit[duckdb]. Without it, the other sources work unchanged;
selecting --engine duckdb raises a clear error.

Testing

  • Unit (no AWS): SQL builder per engine (incl. --domains / combined host+domain),
    escape_sql_literal injection rejection, CSV
    reader/writer round-trip in both modes + header auto-detection, the make_source factory
    validation, the confirm_cost guard, --no-fetch, and a regression test that the producer still
    releases readers (_STOP) when a source raises.
  • CSV round-trip e2e (offline produce from the CDX fixture + HTTP consume), both modes.
  • Gated e2e (skipped in CI): Athena and DuckDB querying CC-MAIN-2026-17 / commoncrawl.org
    (single partition — cheap, never an all-crawls scan), including a DuckDB --domains run.
  • Multi-process / merge: tests/filter_warc/ updated for the single-writer model and the new
    merged <prefix>.warc.gz filename scheme.

Verified locally: full suite green; DuckDB e2e ran live and passed; all 6 S3
aioboto3 read/write tests pass
; flake8 clean. (Athena gated test skips where Athena permissions
aren't available; its code path is covered by unit tests and shares the orchestration/fetch path
proven by the DuckDB e2e.)

…}/csv)

Rename the warc_by_cdx command to 'repackage' and make the range-job stage
pluggable behind a RangeJobSource abstraction:

- sources/: base (RangeJobSource + CostEstimate), sql_base (shared query
  builder + crawl resolution), cdx, athena, duckdb (optional dep), csv
  (reader + RangeJobCsvWriter), and a make_source factory.
- CLI: --target-source {cdx,sql,csv} with --engine {athena,duckdb}; shared
  --hostnames/--query/--query-file; --duckdb-index-path; --csv-path; CSV
  materialization via --range-jobs-output/--no-fetch/--csv-self-contained;
  generalized --confirm-cost guard.
- WARCFilter: takes an injected source; orchestrator owns queueing, the
  record limit, counting, and _STOP emission in a finally (fixes the prior
  hung-readers bug when a source raised). Sources own their own stage-1
  client/connection; WARCFilter manages only read/write S3 clients.
- DuckDB reads the CC columnar parquet directly via read_parquet with
  per-crawl partition globbing; Athena unchanged in behaviour.
- setup.py: cdx_toolkit[duckdb] extra; conftest: requires_duckdb.

Tests: unit (sql_base, csv round-trip, make_source, confirm_cost, producer
stop-sentinel regression, --no-fetch); CSV round-trip e2e from the CDX
fixture; gated Athena/DuckDB e2e (CC-MAIN-2026-17/commoncrawl.org, single
partition). DuckDB e2e verified live; 205 passed.
@malteos malteos marked this pull request as draft June 12, 2026 10:58
malteos and others added 9 commits June 12, 2026 13:16
Extend the guided SQL filter (athena + duckdb) to match on
url_host_registered_domain in addition to url_host_name. --hostnames and
--domains can be combined (OR-ed); at least one (or a raw --query) is required.
TLD optimizer hint is derived from both. Also make the DuckDB source resilient
to transient S3 read timeouts (http_timeout/http_retries).

Tests: unit coverage for domain-only / combined host+domain query building
(athena + duckdb) and factory validation; gated DuckDB domain e2e
(CC-MAIN-2026-17 / commoncrawl.org, --limit 10) verified live.
A raw --query/--query-file can SELECT analysis columns (e.g.
content_languages) beyond the required warc_filename/offset/length;
those extra columns now flow through to the materialized range-jobs
CSV (RangeJob.extra -> lazy CSV header). Rename the CSV fetch-job
columns to the index's WARC names (warc_filename, warc_record_offset,
warc_record_length; warc_url for self-contained) so they never collide
with the index's own columns (e.g. the page-URL 'url' column). The CSV
reader auto-detects mode from warc_url/warc_filename and warns+prefers
warc_url if both are present.
… locality

Group records of the same WARC file with ascending offsets to improve
S3 range-read locality (and to enable later coalescing). Applied as an
ORDER BY on guided SQL queries (Athena + DuckDB) and as an in-memory
sort when loading a CSV source. Raw --query and cdx sources keep their
order. New --no-sort-ranges opts out (e.g. already-sorted / very large
CSV, or to preserve original order).
Add `cdxt repackage --processes N`: shard range jobs by warc_filename across N
worker processes (one asyncio event loop per CPU core), then merge the per-process
shards into a single <prefix>.warc.gz with one warcinfo record (server-side S3
UploadPartCopy, or fsspec streaming locally). Reads from s3://commoncrawl in-region.

The work is many small independent range-GETs; the limiter is request-rate on one
core. Multi-process gives ~2.6x on a 4-vCPU c5n.xlarge (~457 -> ~1130 rec/s).

- one writer per process; removed the multi-writer-per-process fan-out
  (num_writers / fetcher_to_consumer_ratio / --parallel_writers) and the now-vestigial
  writer_id segment from output filenames
- only the first shard writes the warcinfo; all shards share one canonical
  WARC-Record-ID and the warcinfo filename names the merged file
- optional uvloop event loop via CDXT_UVLOOP=1 (~+8% single-core)
- new modules: filter_warc/multiprocess.py, filter_warc/merge.py
- docs: README repackage section, CHANGELOG, docs/notes/warc-fetcher-performance.md
- tests updated for the new filename scheme and single-writer model

Claude-Session: https://claude.ai/code/session_011XtVGuu26tiQpAbdPhnixk
Add hf:// support to repackage:
- read WARC ranges from an HF Storage Bucket via --warc-download-prefix=hf://buckets/<ns>/<name>
  with two backends (--hf-reader): 'fsspec' (HfFileSystem.cat_file in a thread) and
  'cdn' (async aiohttp ranged GET of the CDN-fronted resolve URL).
- write repackage output back to an hf:// bucket: per-process shards are staged locally and
  merged into the bucket via fsspec (the WARC writer layer stays S3/local-only).
- new hf_utils module (is_hf_url, hf path/resolve-URL mapping, the two readers); reader
  threaded through filter_async/_run_filter_pipeline/read_warc_records via an AsyncExitStack.
- single-process hf:// output is routed through the multiprocess stage+merge path.
- optional 'hf' extra (huggingface_hub, aiohttp); offline unit tests for the URL helpers.

Byte-identical to s3:// reads (verified); both HF arms validated end-to-end.
The cdn branch of make_hf_reader yielded the reader inside the aiohttp
ClientSession context but never invoked reader.aclose(), so the per-process
"HF resolver throttling: N x 429, X.Xs total" summary line was lost. Tools
that grep for it (e.g. read-arms.sh's throttle_report) saw zero.
The S3 shard merge issued UploadPartCopy calls sequentially, one part per
shard. A single server-side copy runs at only ~20 MB/s, so merging 16 shards
of a 10 GiB job took ~9 min and dominated end-to-end wall (fetch was ~2 min).

Run the copies concurrently across one multipart upload (boto3 clients are
thread-safe; ThreadPoolExecutor.map preserves part order and re-raises the
first failure to trip the abort). Also slice each shard into _PART_TARGET-sized
ranged parts: raises concurrency further and keeps every part under the 5 GiB
UploadPartCopy ceiling, so shards >5 GiB (a 100 GiB job has ~6 GiB shards) now
merge instead of failing. Part size / concurrency tunable via
CDXT_MERGE_PART_MIB and CDXT_MERGE_CONCURRENCY.

Adds _plan_parts() with unit tests covering byte coverage, ordering, the
>=5 MiB non-final-part rule, small-tail folding, and the 5 GiB ceiling.
…ARCs)

Multi-process repackage hardcoded the worker to max_file_size=None ('never
rotate') and merged every shard into one <prefix>.warc.gz, so --size (default
1 GB) was ignored -- a 100 GiB job produced a single 100 GiB file. Single-process
mode rotates correctly into <prefix>-NNN.warc.gz, each <= --size.

Now multi-process matches: command.py passes args.size through; each worker
rotates at --size and writes self-contained WARCs (own warcinfo per file, fresh
unique WARC-Record-ID); after fetch, all rotated shard files are renumbered into
one global <prefix>-NNN.warc.gz series via a concurrent copy pass (merge.copy_object:
server-side S3 CopyObject, multipart for >5 GiB, fsspec for local/hf). The single
shared-warcinfo merge is gone. The last file of each shard may be short, so up to
N short files appear at shard boundaries (a target size, as single-process).

Verified end-to-end: 1 GiB job, 4 procs, --size 200 MB -> 8 files (4 @ ~200 MB +
4 shard-tail), each 1 warcinfo, 35,700 responses total. Adds unit tests for the
output naming and shard-file collection.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant