"They spent nine months building the cluster and one afternoon picking the data. The model remembers which decision they got wrong."
A Quality Classifier That Was Outvoted
Before a single gradient is computed, a foundation-model project must build its training corpus, and that construction is itself one of the largest distributed-data jobs the team will ever run: petabytes of raw web crawl turned into a few trillion clean, sharded, tokenizer-ready tokens. The scaling laws of Section 19.2 told you how many tokens a given compute budget wants; this section is about manufacturing those tokens. It is a MapReduce-shaped problem measured in petabytes, the place where everything Part II taught about distributed data processing pays off in a single pipeline. The pipeline reads web-scale sources, extracts text from messy HTML, filters by language and quality, and emits shuffled shards in a streaming-friendly layout. The recurring lesson, proven repeatedly between 2023 and 2025, is data-centric: at a fixed compute budget, better data beats more data, so the filtering stage is not janitorial work but a primary lever on final model quality.
A foundation model is, in a precise sense, a compression of its training corpus, so the corpus is the most important input the project controls. Yet the raw material is the open web: trillions of pages of HTML, most of it boilerplate, spam, navigation chrome, machine translation, and duplicate text, in hundreds of languages and dozens of encodings. Turning that into a corpus a model should learn from is not a preprocessing afterthought; it is a distributed computation on the same scale as the training run itself, and frequently on the same cluster. A modern crawl such as Common Crawl is on the order of hundreds of terabytes of compressed WARC files per monthly snapshot, and a competitive pretraining set draws on many snapshots. No single machine reads that, so the construction job is partitioned across hundreds or thousands of workers from its first byte.
This section walks the construction pipeline end to end: why each stage is a distributed job, how language and quality filtering work and why quality filtering dominates the others, and what the output format must look like so that Section 8.6's streaming data loaders can feed thousands of training workers without stalling. Deduplication, the single most impactful filter of all, is large enough to get its own treatment in Section 19.4; here we build the funnel around it.
1. Why Building the Corpus Is a Petabyte MapReduce Job Beginner
Every stage of corpus construction has the shape that Chapter 6 formalized: an embarrassingly parallel map over independent records, occasionally followed by a shuffle that groups records by a key. Extraction maps over individual web pages, each parsed without reference to any other, which is a pure map with no reduce. Language identification and quality scoring are likewise per-document maps. Deduplication is the one stage that genuinely needs a shuffle, because deciding whether two documents are near-duplicates requires bringing candidate pairs together by a shared key (a hash or a MinHash band), which is exactly the group-by-key that the MapReduce shuffle and Chapter 7's Spark joins provide.
The scale is what forces distribution. A single monthly Common Crawl snapshot is on the order of $90\,\text{TB}$ compressed and several times that uncompressed; a pretraining corpus draws on many snapshots plus curated sources, so the input routinely runs to a petabyte or more. Reading a petabyte at a generous $1\,\text{GB/s}$ per worker takes one worker about $10^6$ seconds, roughly eleven and a half days, before any actual processing. Spread across $W$ workers the wall-clock divides by $W$, so a thousand-worker job reads it in about a quarter of an hour of raw I/O. The map work is perfectly parallel, the storage is shared object storage (Section 8.6), and the only synchronization is the dedup shuffle. This is the canonical case for Spark or a MapReduce-style framework, and it is the moment in the book where the data-processing machinery of Part II is applied at its full intended scale.
Corpus construction is not a tidy preprocessing step that finishes before the GPUs arrive; it is a first-class distributed-data job, often the largest one a foundation-model team runs, executed on petabyte inputs with the MapReduce and Spark tooling of Part II. Extraction, language ID, and quality scoring are parallel maps; deduplication is the one stage needing a shuffle. Treating data construction as an afterthought, rather than as a distributed system to be engineered and measured, is the most common and most expensive mistake in a pretraining project.
2. Crawl and Extract: Text Out of Markup Beginner
The pipeline begins with ingestion. Most teams do not crawl the web themselves; they start from Common Crawl's WARC (Web ARChive) files, which package raw HTTP responses, and read them in parallel from object storage. Each worker streams a slice of the WARC shards, and for each captured page it must turn HTML into the visible text a human would read. That extraction is harder than deleting tags. Boilerplate (navigation bars, cookie banners, footers, ad templates) repeats across millions of pages and carries no learnable signal, so good extractors estimate the main content region and discard the chrome. Production pipelines use trafilatura or resiliparse for this, tuned to keep article text while dropping menus.
Extraction is also where encoding and structure are normalized: character sets are unified to UTF-8, HTML entities are resolved, and the document is reduced to plain text plus a little metadata (source URL, language guess, timestamp). Because every page is processed independently, extraction is the purest map in the pipeline and scales linearly with worker count. It is also where the first large volume drop happens: pages that are pure markup, that fail to parse, or that yield only a few words of body text are dropped immediately, so the hundreds of terabytes of extracted text are already much smaller than the petabytes of raw HTML they came from.
When teams first measure what fraction of raw crawled pages survive to a clean training corpus, the number is sobering: it is common for well under ten percent of extracted documents to clear the full filtering and deduplication gauntlet. The other ninety-plus percent is boilerplate, spam, near-duplicates, machine-generated filler, and pages in languages the run does not target. A foundation model is trained on the thin, carefully sieved residue of the open web, not the web itself.
3. Filtering: Language, Heuristics, and a Model That Judges Data Intermediate
Filtering decides what the model learns from, and it works in layers of increasing sophistication. The first layer is language identification: a fast classifier (commonly a fastText model) labels each document, and the pipeline keeps only the target languages. The second layer is heuristic quality filtering, a set of cheap rules drawn from the Gopher and C4 recipes: drop documents that are too short, that have an unusual ratio of symbols to words, that consist mostly of lists or bullet fragments, that repeat lines or n-grams excessively, or that lack the stopwords a fluent passage would contain. These rules are crude individually but remove a large mass of obvious junk at almost no cost.
The third and most consequential layer is classifier-based quality scoring: a model judges the data. A lightweight classifier, often a linear model or small neural net over n-gram or embedding features, is trained to distinguish a reference set of high-quality text (for example, pages linked from curated sources) from random web text, then scores every candidate document. Documents below a threshold are dropped. Formally, if a learned scorer $q(d) \in [0,1]$ estimates the probability that document $d$ resembles the high-quality reference distribution, the pipeline keeps the set $$\mathcal{D}_{\text{keep}} = \{\, d : \text{lang}(d) \in \mathcal{L} \;\wedge\; h(d) \;\wedge\; q(d) \ge \tau \,\},$$ where $\mathcal{L}$ is the target language set, $h(d)$ is the conjunction of heuristic rules, and $\tau$ is a tunable threshold trading corpus size against average quality. Choosing $\tau$ is a real decision: too high and you discard usable tokens the scaling laws of Section 19.2 say you need, too low and low-grade text dilutes the signal.
The 2024 FineWeb and DataComp-for-Language-Models (DCLM) results made the stakes of this layer unambiguous. Both showed that a model-based quality filter, applied at scale, produces a corpus on which a fixed-compute model trains to materially higher benchmark accuracy than the same model trained on a larger but less-filtered corpus. The filtering stage, in other words, moved more benchmark points than most architectural tweaks. That is the empirical heart of the data-centric view, and we state it as the section's thesis.
Parts II and III treated distributed data processing as plumbing: how to move and group petabytes correctly and cheaply. Here that plumbing becomes a direct lever on intelligence. The quality classifier $q(d)$ is itself a model, run as a parallel map over the entire corpus, and the threshold $\tau$ it enforces shifts final benchmark accuracy more than a comparable spend on extra compute. Scale-out data processing is not a prerequisite the foundation-model team tolerates; it is one of the highest-leverage parts of the system, which is why the MapReduce shuffle of Chapter 6 and the Spark joins of Chapter 7 return here doing load-bearing work, not bookkeeping.
4. A Distributed-Style Pipeline End to End Intermediate
The code below runs the construction pipeline in miniature on a synthetic crawl, using only the Python standard library so it executes anywhere. It builds twenty thousand HTML-ish "pages" containing a realistic mix (genuine English prose, non-English text, spam boilerplate, and near-empty stubs), then runs the three real stages: a parallel extract that strips markup, a filter combining language identification with a heuristic-plus-classifier quality gate, and a shard step that routes survivors to balanced output shards by a stable content hash. The map phase runs across eight worker processes with ProcessPoolExecutor, the same partition-and-map structure a Spark job uses at petabyte scale. The point is to watch how much each stage removes and what token yield remains, the numbers a real pipeline reports to its operators.
import re
import random
import hashlib
from concurrent.futures import ProcessPoolExecutor
from collections import Counter
# ---- 1. Build a synthetic web crawl: a list of raw HTML-ish records. ----
ENGLISH = ("the quick brown fox jumps over lazy dogs while data flows through "
"the cluster and workers reduce gradients across the network").split()
SPANISH = "el rapido zorro marron salta sobre perros perezosos mientras datos".split()
BOILER = "click here subscribe now buy cheap pills free download winner".split()
def make_doc(i):
r = random.random()
if r < 0.18: # boilerplate / spam page
body = " ".join(random.choices(BOILER, k=random.randint(4, 30)))
elif r < 0.34: # non-English page
body = " ".join(random.choices(SPANISH, k=random.randint(20, 120)))
elif r < 0.42: # near-empty page
body = " ".join(random.choices(ENGLISH, k=random.randint(1, 6)))
else: # genuine English prose
body = " ".join(random.choices(ENGLISH, k=random.randint(60, 400)))
nav = "<nav>home about contact</nav>"
return f"<html><head><title>doc {i}</title></head><body>{nav}<p>{body}</p></body></html>"
# ---- 2. EXTRACT: strip tags, keep visible text. Runs in parallel workers. ----
TAG = re.compile(r"<[^>]+>")
def extract(doc):
text = TAG.sub(" ", doc) # drop all HTML tags
text = re.sub(r"\s+", " ", text).strip()
return text
# ---- 3. FILTER: language id + heuristic quality + a tiny quality classifier. ----
EN_VOCAB = set(ENGLISH) | {"home", "about", "contact", "doc"}
def english_fraction(tokens):
if not tokens:
return 0.0
return sum(t in EN_VOCAB for t in tokens) / len(tokens)
def quality_score(tokens):
"""Cheap stand-in for a learned quality classifier: rewards length and
lexical diversity, penalizes spam-like repetition."""
if not tokens:
return 0.0
diversity = len(set(tokens)) / len(tokens)
spam = sum(t in set(BOILER) for t in tokens) / len(tokens)
length_ok = min(len(tokens), 50) / 50.0
return 0.5 * diversity + 0.5 * length_ok - spam
def keep(text):
toks = text.split()
if len(toks) < 10: # heuristic: too short
return False, toks
if english_fraction(toks) < 0.5: # language filter
return False, toks
if quality_score(toks) < 0.35: # classifier-based quality gate
return False, toks
return True, toks
def process_chunk(chunk):
"""One 'worker': extract then filter its shard, return surviving token lists."""
kept, n_extracted = [], 0
for doc in chunk:
text = extract(doc)
n_extracted += 1
ok, toks = keep(text)
if ok:
kept.append(toks)
return n_extracted, kept
def pct(x, base):
return f"{100.0 * x / base:5.1f}%"
if __name__ == "__main__":
random.seed(0)
N_DOCS = 20_000
crawl = [make_doc(i) for i in range(N_DOCS)]
raw_bytes = sum(len(d) for d in crawl)
# Split the crawl across K workers, MapReduce-style.
K = 8
chunks = [crawl[i::K] for i in range(K)]
extracted_total, survivors = 0, []
with ProcessPoolExecutor(max_workers=K) as ex:
for n_extracted, kept in ex.map(process_chunk, chunks):
extracted_total += n_extracted
survivors.extend(kept)
# ---- 4. SHARD: route survivors to balanced shards by a stable hash. ----
N_SHARDS = 4
shard_counts, shard_tokens = Counter(), Counter()
for toks in survivors:
h = hashlib.blake2b(" ".join(toks).encode(), digest_size=8).digest()
sid = int.from_bytes(h, "big") % N_SHARDS # stable content-hash routing
shard_counts[sid] += 1
shard_tokens[sid] += len(toks)
total_tokens, kept_docs = sum(shard_tokens.values()), len(survivors)
print("STAGE docs retained tokens")
print(f"crawl (raw HTML) {N_DOCS:8d} 100.0% ({raw_bytes/1e6:.1f} MB raw)")
print(f"extract (text from HTML) {extracted_total:8d} {pct(extracted_total, N_DOCS)}")
print(f"filter (lang+qual) {kept_docs:8d} {pct(kept_docs, N_DOCS)} {total_tokens:8d}")
print(f"\nfinal corpus: {kept_docs} docs, {total_tokens} tokens across {N_SHARDS} shards")
for sid in range(N_SHARDS):
print(f" shard {sid}: {shard_counts[sid]:5d} docs {shard_tokens[sid]:8d} tokens")
print(f"\nyield: {pct(kept_docs, N_DOCS)} of crawled docs survived to training")
if __name__ == "__main__" guard is required because ProcessPoolExecutor spawns fresh processes.STAGE docs retained tokens
crawl (raw HTML) 20000 100.0% (19.7 MB raw)
extract (text from HTML) 20000 100.0%
filter (lang+qual) 12079 60.4% 2705564
final corpus: 12079 docs, 2705564 tokens across 4 shards
shard 0: 2892 docs 650410 tokens
shard 1: 3110 docs 698546 tokens
shard 2: 3047 docs 678772 tokens
shard 3: 3030 docs 677836 tokens
yield: 60.4% of crawled docs survived to training
Two properties of Output 19.3.1 carry over directly to real pipelines. First, the per-shard balance: routing by a content hash gives shards of nearly equal size without any coordination between workers, which matters because the training loaders of Section 8.6 assign shards to workers and any imbalance becomes a straggler. Second, content-hash routing is stable and idempotent: rerun the pipeline and a document lands in the same shard, so incremental crawls append cleanly and reproducibility is free. The synthetic yield of sixty percent is generous by design; the structural lesson, not the percentage, is the takeaway.
5. The Output Format: Sharded, Shuffled, Streaming-Friendly Intermediate
The funnel's output is not one giant file; it is many shards, each a sequence of documents in a binary or compressed-line format, sized so that one shard streams comfortably from object storage while a worker trains on the previous one. Three properties matter. The data must be sharded so that thousands of training workers each read a disjoint slice in parallel, exactly the partitioning arc that runs from Section 8.6 through the training loop. It must be shuffled at the document level so that consecutive training examples are not all from one website or one crawl date, which would bias the gradient; global shuffling at petabyte scale is itself a distributed shuffle, done once at construction time rather than per-epoch. And it must be deduplicated, the subject of Section 19.4, because duplicate documents waste compute and, worse, encourage memorization.
The layout is deliberately streaming-friendly: shards are written so a loader can fetch the next one over the network while the current one feeds the GPUs, hiding I/O latency behind computation in the same way the collective overlap of Chapter 16 hides communication behind the backward pass. Whether the corpus is finally tokenized at construction time or streamed as text and tokenized on the fly (Section 19.5), the shard boundaries and the shuffle are fixed here, because re-shuffling a multi-trillion-token corpus mid-training is not something anyone wants to do twice.
Code 19.3.1 hand-built extract, filter, and shard in roughly a hundred lines, and a real pipeline adds robust extraction, model-based filtering, fuzzy dedup, and distributed orchestration on top. Two open libraries package the entire funnel. Hugging Face's datatrove (the toolkit behind the FineWeb corpus) expresses the pipeline as a list of composable blocks, runs locally or on a Slurm cluster with the same code, and ships the exact Gopher, C4, and FineWeb filters as building blocks:
# pip install datatrove[all]
from datatrove.pipeline.readers import WarcReader
from datatrove.pipeline.extractors import Trafilatura
from datatrove.pipeline.filters import (
LanguageFilter, GopherQualityFilter, GopherRepetitionFilter)
from datatrove.pipeline.writers import JsonlWriter
from datatrove.executor import LocalPipelineExecutor
pipeline = [
WarcReader("s3://commoncrawl/.../segment/"), # ingest raw crawl
Trafilatura(), # HTML -> main text
LanguageFilter(languages=["en"]), # keep target languages
GopherRepetitionFilter(), # drop repetitive junk
GopherQualityFilter(), # heuristic quality gate
JsonlWriter("output/shards/"), # sharded, streaming-ready
]
LocalPipelineExecutor(pipeline=pipeline, tasks=64).run() # 64 parallel workers
datatrove. The same block list scales from a laptop to thousands of Slurm tasks by swapping the executor; NVIDIA's NeMo Curator offers an equivalent GPU-accelerated pipeline with model-based quality classifiers and fuzzy deduplication built in. The library handles WARC parsing, sharded output, checkpointing, and distributed orchestration that Code 19.3.1 only sketched.Who: A data engineer on a team pretraining a mid-size open language model on a fixed GPU budget.
Situation: The first model was trained on a lightly filtered Common Crawl extract, deduplicated but with only heuristic quality rules, and it lagged published baselines of the same size on reasoning benchmarks.
Problem: The obvious fix, buying more GPU hours to train longer on more tokens, was not available; the compute budget was capped for the quarter.
Dilemma: Spend the limited engineering time squeezing the training loop for a few percent more throughput, or invest it in the data pipeline by adding a model-based quality classifier and re-running construction, which would shrink the corpus and risk discarding useful tokens.
Decision: They rebuilt the corpus, adding a fastText quality classifier trained to separate curated reference text from raw web text, following the FineWeb and DCLM recipe, and tuned the threshold $\tau$ on a small validation pretraining run.
How: They expressed the new funnel in datatrove, ran it as a few hundred Slurm tasks over the existing extracted text, and produced a corpus roughly forty percent smaller but markedly cleaner, re-sharded and re-shuffled for the loaders.
Result: At the same compute budget and fewer total tokens, the model trained on the filtered corpus beat the previous version on the reasoning benchmarks by several points, confirming on their own stack that at fixed compute, better data beats more data.
Lesson: When compute is capped, the data pipeline is often the cheapest place to buy accuracy. The quality filter is a model-quality lever, not housekeeping.
6. The Data-Centric View: Better Data Beats More Data Advanced
The scaling laws of Section 19.2 are stated for a fixed data distribution: given that distribution, more compute and more tokens predict lower loss. What they do not say is that all tokens are equal, and they are not. Two corpora of identical token count can produce models that differ by many benchmark points, because the loss the model minimizes is the loss on its data, and a model trained on cleaner data minimizes a more useful objective. The data-centric reframing of the last few years says: at a fixed compute budget, the highest-return move is frequently to improve the data distribution, not to add parameters or steps. Filtering, deduplication, and careful source mixing change the distribution the scaling law operates over, effectively shifting the whole loss curve down.
This is why the construction pipeline of this section deserves the same engineering rigor as the training loop. Every decision in the funnel, the extractor's boilerplate cutoff, the language threshold, the quality classifier's reference set, the dedup aggressiveness of Section 19.4, the mixing weights across web, code, and books, reshapes the distribution the model compresses. The team that treats these as a distributed-data engineering problem, measured and ablated like any other system parameter, builds better models at the same cost than the team that treats data as a fixed input and pours all its effort into the GPUs.
Data curation moved from folklore to benchmarked science between 2024 and 2026. FineWeb (Penedo et al., 2024) released a fifteen-trillion-token corpus and, crucially, the ablations showing which filtering choices moved downstream accuracy, then FineWeb-Edu distilled it further with an education-quality classifier. DataComp-for-Language-Models (DCLM, Li et al., 2024) framed curation as a competition with the model and compute fixed and only the data varying, demonstrating that a strong model-based filter on Common Crawl beats much larger unfiltered baselines at equal compute. Active lines now include learned data selection and reweighting (DoReMi-style domain mixtures), influence-function and proxy-model methods that score a document by its predicted effect on downstream loss, and synthetic-data generation to fill gaps the web does not cover. The unifying message is the one this section makes operational: the data pipeline is a primary research lever, and the distributed-processing tools of Chapter 7 are how that lever is pulled at petabyte scale.
We now have the corpus as a funnel: ingested, extracted, filtered by language and a model that judges quality, and emitted as balanced, shuffled shards ready for the loaders. One stage was deferred because it is large and subtle enough to stand alone, deduplication, which removes the near-duplicate documents that waste compute and drive memorization, and which is the single most impactful filter of all. That is the subject of Section 19.4.
Using Figure 19.3.1 and the funnel stages, argue for each of the following whether it is a pure map (no shuffle) or requires a shuffle, and why: (a) extracting text from HTML; (b) language identification; (c) classifier-based quality scoring; (d) near-duplicate removal; (e) global document-level shuffling of the final corpus. For the stages that need a shuffle, name the grouping key and connect it to the MapReduce shuffle of Chapter 6. Then explain why, if filtering removes most of the data, you still want extraction to run first rather than filtering raw HTML directly.
Modify Code 19.3.1 so the quality gate threshold $\tau$ (the 0.35 in keep) is swept over several values, for example $\{0.20, 0.35, 0.50, 0.65\}$. For each $\tau$, report the surviving document count, the total token yield, and the average quality_score of the survivors. Plot or tabulate the trade-off between corpus size and average quality, and identify the value of $\tau$ where further raising the bar removes many tokens for little quality gain. Relate your curve to the FineWeb and DCLM finding that, at fixed compute, a smaller cleaner corpus can beat a larger one.
A pretraining corpus draws on $1.2\,\text{PB}$ of raw crawl. Each worker reads from object storage at a sustained $800\,\text{MB/s}$ and the extract-plus-filter map work is fully parallel and I/O-bound. Estimate the wall-clock to read and map the whole corpus with $W = 500$ and $W = 2000$ workers, ignoring the dedup shuffle. Then suppose deduplication's shuffle moves $20\%$ of the post-filter data (assume $30\,\text{TB}$) across a network at an aggregate $50\,\text{GB/s}$; estimate its time and state whether the job is dominated by the parallel read or the shuffle. Connect your reasoning to the communication-cost models of Chapter 3.