Part IV: Parallel Deep Learning and Large Models
Chapter 19: Training Foundation Models at Scale

Distributed Dataset Construction

"They spent nine months building the cluster and one afternoon picking the data. The model remembers which decision they got wrong."

A Quality Classifier That Was Outvoted
Big Picture

Before a single gradient is computed, a foundation-model project must build its training corpus, and that construction is itself one of the largest distributed-data jobs the team will ever run: petabytes of raw web crawl turned into a few trillion clean, sharded, tokenizer-ready tokens. The scaling laws of Section 19.2 told you how many tokens a given compute budget wants; this section is about manufacturing those tokens. It is a MapReduce-shaped problem measured in petabytes, the place where everything Part II taught about distributed data processing pays off in a single pipeline. The pipeline reads web-scale sources, extracts text from messy HTML, filters by language and quality, and emits shuffled shards in a streaming-friendly layout. The recurring lesson, proven repeatedly between 2023 and 2025, is data-centric: at a fixed compute budget, better data beats more data, so the filtering stage is not janitorial work but a primary lever on final model quality.

A foundation model is, in a precise sense, a compression of its training corpus, so the corpus is the most important input the project controls. Yet the raw material is the open web: trillions of pages of HTML, most of it boilerplate, spam, navigation chrome, machine translation, and duplicate text, in hundreds of languages and dozens of encodings. Turning that into a corpus a model should learn from is not a preprocessing afterthought; it is a distributed computation on the same scale as the training run itself, and frequently on the same cluster. A modern crawl such as Common Crawl is on the order of hundreds of terabytes of compressed WARC files per monthly snapshot, and a competitive pretraining set draws on many snapshots. No single machine reads that, so the construction job is partitioned across hundreds or thousands of workers from its first byte.

This section walks the construction pipeline end to end: why each stage is a distributed job, how language and quality filtering work and why quality filtering dominates the others, and what the output format must look like so that Section 8.6's streaming data loaders can feed thousands of training workers without stalling. Deduplication, the single most impactful filter of all, is large enough to get its own treatment in Section 19.4; here we build the funnel around it.

Crawl raw HTML WARC files ~petabytes Extract text from markup ~100s TB Filter lang + quality ~10s TB Dedup near-dup removal §19.4 Shard shuffled shards ~trillions of tokens tokenizer- ready train
Figure 19.3.1: The data construction funnel. Volume shrinks at every stage: petabytes of raw crawl become hundreds of terabytes of extracted text, tens of terabytes after language and quality filtering, less still after deduplication (Section 19.4), and finally a few trillion shuffled, tokenizer-ready tokens. Each bar's height is proportional to the data that survives; the steepest single drop is usually filtering plus dedup.

1. Why Building the Corpus Is a Petabyte MapReduce Job Beginner

Every stage of corpus construction has the shape that Chapter 6 formalized: an embarrassingly parallel map over independent records, occasionally followed by a shuffle that groups records by a key. Extraction maps over individual web pages, each parsed without reference to any other, which is a pure map with no reduce. Language identification and quality scoring are likewise per-document maps. Deduplication is the one stage that genuinely needs a shuffle, because deciding whether two documents are near-duplicates requires bringing candidate pairs together by a shared key (a hash or a MinHash band), which is exactly the group-by-key that the MapReduce shuffle and Chapter 7's Spark joins provide.

The scale is what forces distribution. A single monthly Common Crawl snapshot is on the order of $90\,\text{TB}$ compressed and several times that uncompressed; a pretraining corpus draws on many snapshots plus curated sources, so the input routinely runs to a petabyte or more. Reading a petabyte at a generous $1\,\text{GB/s}$ per worker takes one worker about $10^6$ seconds, roughly eleven and a half days, before any actual processing. Spread across $W$ workers the wall-clock divides by $W$, so a thousand-worker job reads it in about a quarter of an hour of raw I/O. The map work is perfectly parallel, the storage is shared object storage (Section 8.6), and the only synchronization is the dedup shuffle. This is the canonical case for Spark or a MapReduce-style framework, and it is the moment in the book where the data-processing machinery of Part II is applied at its full intended scale.

Key Insight: The Corpus Is Built by the Same Cluster That Trains the Model

Corpus construction is not a tidy preprocessing step that finishes before the GPUs arrive; it is a first-class distributed-data job, often the largest one a foundation-model team runs, executed on petabyte inputs with the MapReduce and Spark tooling of Part II. Extraction, language ID, and quality scoring are parallel maps; deduplication is the one stage needing a shuffle. Treating data construction as an afterthought, rather than as a distributed system to be engineered and measured, is the most common and most expensive mistake in a pretraining project.

2. Crawl and Extract: Text Out of Markup Beginner

The pipeline begins with ingestion. Most teams do not crawl the web themselves; they start from Common Crawl's WARC (Web ARChive) files, which package raw HTTP responses, and read them in parallel from object storage. Each worker streams a slice of the WARC shards, and for each captured page it must turn HTML into the visible text a human would read. That extraction is harder than deleting tags. Boilerplate (navigation bars, cookie banners, footers, ad templates) repeats across millions of pages and carries no learnable signal, so good extractors estimate the main content region and discard the chrome. Production pipelines use trafilatura or resiliparse for this, tuned to keep article text while dropping menus.

Extraction is also where encoding and structure are normalized: character sets are unified to UTF-8, HTML entities are resolved, and the document is reduced to plain text plus a little metadata (source URL, language guess, timestamp). Because every page is processed independently, extraction is the purest map in the pipeline and scales linearly with worker count. It is also where the first large volume drop happens: pages that are pure markup, that fail to parse, or that yield only a few words of body text are dropped immediately, so the hundreds of terabytes of extracted text are already much smaller than the petabytes of raw HTML they came from.

Fun Note: The Web Is Mostly Not Prose

When teams first measure what fraction of raw crawled pages survive to a clean training corpus, the number is sobering: it is common for well under ten percent of extracted documents to clear the full filtering and deduplication gauntlet. The other ninety-plus percent is boilerplate, spam, near-duplicates, machine-generated filler, and pages in languages the run does not target. A foundation model is trained on the thin, carefully sieved residue of the open web, not the web itself.

3. Filtering: Language, Heuristics, and a Model That Judges Data Intermediate

Filtering decides what the model learns from, and it works in layers of increasing sophistication. The first layer is language identification: a fast classifier (commonly a fastText model) labels each document, and the pipeline keeps only the target languages. The second layer is heuristic quality filtering, a set of cheap rules drawn from the Gopher and C4 recipes: drop documents that are too short, that have an unusual ratio of symbols to words, that consist mostly of lists or bullet fragments, that repeat lines or n-grams excessively, or that lack the stopwords a fluent passage would contain. These rules are crude individually but remove a large mass of obvious junk at almost no cost.

The third and most consequential layer is classifier-based quality scoring: a model judges the data. A lightweight classifier, often a linear model or small neural net over n-gram or embedding features, is trained to distinguish a reference set of high-quality text (for example, pages linked from curated sources) from random web text, then scores every candidate document. Documents below a threshold are dropped. Formally, if a learned scorer $q(d) \in [0,1]$ estimates the probability that document $d$ resembles the high-quality reference distribution, the pipeline keeps the set $$\mathcal{D}_{\text{keep}} = \{\, d : \text{lang}(d) \in \mathcal{L} \;\wedge\; h(d) \;\wedge\; q(d) \ge \tau \,\},$$ where $\mathcal{L}$ is the target language set, $h(d)$ is the conjunction of heuristic rules, and $\tau$ is a tunable threshold trading corpus size against average quality. Choosing $\tau$ is a real decision: too high and you discard usable tokens the scaling laws of Section 19.2 say you need, too low and low-grade text dilutes the signal.

The 2024 FineWeb and DataComp-for-Language-Models (DCLM) results made the stakes of this layer unambiguous. Both showed that a model-based quality filter, applied at scale, produces a corpus on which a fixed-compute model trains to materially higher benchmark accuracy than the same model trained on a larger but less-filtered corpus. The filtering stage, in other words, moved more benchmark points than most architectural tweaks. That is the empirical heart of the data-centric view, and we state it as the section's thesis.

Thesis Thread: Distributed Data Processing Becomes a Model-Quality Lever

Parts II and III treated distributed data processing as plumbing: how to move and group petabytes correctly and cheaply. Here that plumbing becomes a direct lever on intelligence. The quality classifier $q(d)$ is itself a model, run as a parallel map over the entire corpus, and the threshold $\tau$ it enforces shifts final benchmark accuracy more than a comparable spend on extra compute. Scale-out data processing is not a prerequisite the foundation-model team tolerates; it is one of the highest-leverage parts of the system, which is why the MapReduce shuffle of Chapter 6 and the Spark joins of Chapter 7 return here doing load-bearing work, not bookkeeping.

4. A Distributed-Style Pipeline End to End Intermediate

The code below runs the construction pipeline in miniature on a synthetic crawl, using only the Python standard library so it executes anywhere. It builds twenty thousand HTML-ish "pages" containing a realistic mix (genuine English prose, non-English text, spam boilerplate, and near-empty stubs), then runs the three real stages: a parallel extract that strips markup, a filter combining language identification with a heuristic-plus-classifier quality gate, and a shard step that routes survivors to balanced output shards by a stable content hash. The map phase runs across eight worker processes with ProcessPoolExecutor, the same partition-and-map structure a Spark job uses at petabyte scale. The point is to watch how much each stage removes and what token yield remains, the numbers a real pipeline reports to its operators.

import re
import random
import hashlib
from concurrent.futures import ProcessPoolExecutor
from collections import Counter

# ---- 1. Build a synthetic web crawl: a list of raw HTML-ish records. ----
ENGLISH = ("the quick brown fox jumps over lazy dogs while data flows through "
           "the cluster and workers reduce gradients across the network").split()
SPANISH = "el rapido zorro marron salta sobre perros perezosos mientras datos".split()
BOILER = "click here subscribe now buy cheap pills free download winner".split()

def make_doc(i):
    r = random.random()
    if r < 0.18:                     # boilerplate / spam page
        body = " ".join(random.choices(BOILER, k=random.randint(4, 30)))
    elif r < 0.34:                   # non-English page
        body = " ".join(random.choices(SPANISH, k=random.randint(20, 120)))
    elif r < 0.42:                   # near-empty page
        body = " ".join(random.choices(ENGLISH, k=random.randint(1, 6)))
    else:                            # genuine English prose
        body = " ".join(random.choices(ENGLISH, k=random.randint(60, 400)))
    nav = "<nav>home about contact</nav>"
    return f"<html><head><title>doc {i}</title></head><body>{nav}<p>{body}</p></body></html>"

# ---- 2. EXTRACT: strip tags, keep visible text. Runs in parallel workers. ----
TAG = re.compile(r"<[^>]+>")
def extract(doc):
    text = TAG.sub(" ", doc)               # drop all HTML tags
    text = re.sub(r"\s+", " ", text).strip()
    return text

# ---- 3. FILTER: language id + heuristic quality + a tiny quality classifier. ----
EN_VOCAB = set(ENGLISH) | {"home", "about", "contact", "doc"}
def english_fraction(tokens):
    if not tokens:
        return 0.0
    return sum(t in EN_VOCAB for t in tokens) / len(tokens)

def quality_score(tokens):
    """Cheap stand-in for a learned quality classifier: rewards length and
    lexical diversity, penalizes spam-like repetition."""
    if not tokens:
        return 0.0
    diversity = len(set(tokens)) / len(tokens)
    spam = sum(t in set(BOILER) for t in tokens) / len(tokens)
    length_ok = min(len(tokens), 50) / 50.0
    return 0.5 * diversity + 0.5 * length_ok - spam

def keep(text):
    toks = text.split()
    if len(toks) < 10:                     # heuristic: too short
        return False, toks
    if english_fraction(toks) < 0.5:       # language filter
        return False, toks
    if quality_score(toks) < 0.35:         # classifier-based quality gate
        return False, toks
    return True, toks

def process_chunk(chunk):
    """One 'worker': extract then filter its shard, return surviving token lists."""
    kept, n_extracted = [], 0
    for doc in chunk:
        text = extract(doc)
        n_extracted += 1
        ok, toks = keep(text)
        if ok:
            kept.append(toks)
    return n_extracted, kept

def pct(x, base):
    return f"{100.0 * x / base:5.1f}%"

if __name__ == "__main__":
    random.seed(0)
    N_DOCS = 20_000
    crawl = [make_doc(i) for i in range(N_DOCS)]
    raw_bytes = sum(len(d) for d in crawl)

    # Split the crawl across K workers, MapReduce-style.
    K = 8
    chunks = [crawl[i::K] for i in range(K)]
    extracted_total, survivors = 0, []
    with ProcessPoolExecutor(max_workers=K) as ex:
        for n_extracted, kept in ex.map(process_chunk, chunks):
            extracted_total += n_extracted
            survivors.extend(kept)

    # ---- 4. SHARD: route survivors to balanced shards by a stable hash. ----
    N_SHARDS = 4
    shard_counts, shard_tokens = Counter(), Counter()
    for toks in survivors:
        h = hashlib.blake2b(" ".join(toks).encode(), digest_size=8).digest()
        sid = int.from_bytes(h, "big") % N_SHARDS   # stable content-hash routing
        shard_counts[sid] += 1
        shard_tokens[sid] += len(toks)

    total_tokens, kept_docs = sum(shard_tokens.values()), len(survivors)
    print("STAGE                         docs       retained   tokens")
    print(f"crawl (raw HTML)          {N_DOCS:8d}    100.0%   ({raw_bytes/1e6:.1f} MB raw)")
    print(f"extract (text from HTML)  {extracted_total:8d}  {pct(extracted_total, N_DOCS)}")
    print(f"filter (lang+qual)        {kept_docs:8d}  {pct(kept_docs, N_DOCS)}   {total_tokens:8d}")
    print(f"\nfinal corpus: {kept_docs} docs, {total_tokens} tokens across {N_SHARDS} shards")
    for sid in range(N_SHARDS):
        print(f"  shard {sid}: {shard_counts[sid]:5d} docs  {shard_tokens[sid]:8d} tokens")
    print(f"\nyield: {pct(kept_docs, N_DOCS)} of crawled docs survived to training")
Code 19.3.1: A distributed-style construction pipeline in pure Python. Eight worker processes each extract and filter their slice of the crawl (the parallel map), survivors are routed to balanced output shards by a content hash, and the script reports per-stage retention and final token yield. The Windows-safe if __name__ == "__main__" guard is required because ProcessPoolExecutor spawns fresh processes.
STAGE                         docs       retained   tokens
crawl (raw HTML)             20000    100.0%   (19.7 MB raw)
extract (text from HTML)     20000  100.0%
filter (lang+qual)           12079   60.4%    2705564

final corpus: 12079 docs, 2705564 tokens across 4 shards
  shard 0:  2892 docs    650410 tokens
  shard 1:  3110 docs    698546 tokens
  shard 2:  3047 docs    678772 tokens
  shard 3:  3030 docs    677836 tokens

yield:  60.4% of crawled docs survived to training
Output 19.3.1: The pipeline keeps $60.4\%$ of synthetic documents; the filter removed the spam, non-English, and near-empty pages, and the four shards came out within a few percent of each other in both document and token counts. On a real crawl the surviving fraction is far smaller, but the shape, a steep filter drop and near-balanced hash-routed shards, is exactly what production pipelines aim for.

Two properties of Output 19.3.1 carry over directly to real pipelines. First, the per-shard balance: routing by a content hash gives shards of nearly equal size without any coordination between workers, which matters because the training loaders of Section 8.6 assign shards to workers and any imbalance becomes a straggler. Second, content-hash routing is stable and idempotent: rerun the pipeline and a document lands in the same shard, so incremental crawls append cleanly and reproducibility is free. The synthetic yield of sixty percent is generous by design; the structural lesson, not the percentage, is the takeaway.

5. The Output Format: Sharded, Shuffled, Streaming-Friendly Intermediate

The funnel's output is not one giant file; it is many shards, each a sequence of documents in a binary or compressed-line format, sized so that one shard streams comfortably from object storage while a worker trains on the previous one. Three properties matter. The data must be sharded so that thousands of training workers each read a disjoint slice in parallel, exactly the partitioning arc that runs from Section 8.6 through the training loop. It must be shuffled at the document level so that consecutive training examples are not all from one website or one crawl date, which would bias the gradient; global shuffling at petabyte scale is itself a distributed shuffle, done once at construction time rather than per-epoch. And it must be deduplicated, the subject of Section 19.4, because duplicate documents waste compute and, worse, encourage memorization.

The layout is deliberately streaming-friendly: shards are written so a loader can fetch the next one over the network while the current one feeds the GPUs, hiding I/O latency behind computation in the same way the collective overlap of Chapter 16 hides communication behind the backward pass. Whether the corpus is finally tokenized at construction time or streamed as text and tokenized on the fly (Section 19.5), the shard boundaries and the shuffle are fixed here, because re-shuffling a multi-trillion-token corpus mid-training is not something anyone wants to do twice.

Library Shortcut: datatrove and NeMo Curator Run the Whole Funnel

Code 19.3.1 hand-built extract, filter, and shard in roughly a hundred lines, and a real pipeline adds robust extraction, model-based filtering, fuzzy dedup, and distributed orchestration on top. Two open libraries package the entire funnel. Hugging Face's datatrove (the toolkit behind the FineWeb corpus) expresses the pipeline as a list of composable blocks, runs locally or on a Slurm cluster with the same code, and ships the exact Gopher, C4, and FineWeb filters as building blocks:

# pip install datatrove[all]
from datatrove.pipeline.readers import WarcReader
from datatrove.pipeline.extractors import Trafilatura
from datatrove.pipeline.filters import (
    LanguageFilter, GopherQualityFilter, GopherRepetitionFilter)
from datatrove.pipeline.writers import JsonlWriter
from datatrove.executor import LocalPipelineExecutor

pipeline = [
    WarcReader("s3://commoncrawl/.../segment/"),  # ingest raw crawl
    Trafilatura(),                                 # HTML -> main text
    LanguageFilter(languages=["en"]),              # keep target languages
    GopherRepetitionFilter(),                      # drop repetitive junk
    GopherQualityFilter(),                         # heuristic quality gate
    JsonlWriter("output/shards/"),                 # sharded, streaming-ready
]
LocalPipelineExecutor(pipeline=pipeline, tasks=64).run()  # 64 parallel workers
Code 19.3.2: The full construction funnel in datatrove. The same block list scales from a laptop to thousands of Slurm tasks by swapping the executor; NVIDIA's NeMo Curator offers an equivalent GPU-accelerated pipeline with model-based quality classifiers and fuzzy deduplication built in. The library handles WARC parsing, sharded output, checkpointing, and distributed orchestration that Code 19.3.1 only sketched.
Practical Example: The Filter That Beat a Bigger Cluster

Who: A data engineer on a team pretraining a mid-size open language model on a fixed GPU budget.

Situation: The first model was trained on a lightly filtered Common Crawl extract, deduplicated but with only heuristic quality rules, and it lagged published baselines of the same size on reasoning benchmarks.

Problem: The obvious fix, buying more GPU hours to train longer on more tokens, was not available; the compute budget was capped for the quarter.

Dilemma: Spend the limited engineering time squeezing the training loop for a few percent more throughput, or invest it in the data pipeline by adding a model-based quality classifier and re-running construction, which would shrink the corpus and risk discarding useful tokens.

Decision: They rebuilt the corpus, adding a fastText quality classifier trained to separate curated reference text from raw web text, following the FineWeb and DCLM recipe, and tuned the threshold $\tau$ on a small validation pretraining run.

How: They expressed the new funnel in datatrove, ran it as a few hundred Slurm tasks over the existing extracted text, and produced a corpus roughly forty percent smaller but markedly cleaner, re-sharded and re-shuffled for the loaders.

Result: At the same compute budget and fewer total tokens, the model trained on the filtered corpus beat the previous version on the reasoning benchmarks by several points, confirming on their own stack that at fixed compute, better data beats more data.

Lesson: When compute is capped, the data pipeline is often the cheapest place to buy accuracy. The quality filter is a model-quality lever, not housekeeping.

6. The Data-Centric View: Better Data Beats More Data Advanced

The scaling laws of Section 19.2 are stated for a fixed data distribution: given that distribution, more compute and more tokens predict lower loss. What they do not say is that all tokens are equal, and they are not. Two corpora of identical token count can produce models that differ by many benchmark points, because the loss the model minimizes is the loss on its data, and a model trained on cleaner data minimizes a more useful objective. The data-centric reframing of the last few years says: at a fixed compute budget, the highest-return move is frequently to improve the data distribution, not to add parameters or steps. Filtering, deduplication, and careful source mixing change the distribution the scaling law operates over, effectively shifting the whole loss curve down.

This is why the construction pipeline of this section deserves the same engineering rigor as the training loop. Every decision in the funnel, the extractor's boilerplate cutoff, the language threshold, the quality classifier's reference set, the dedup aggressiveness of Section 19.4, the mixing weights across web, code, and books, reshapes the distribution the model compresses. The team that treats these as a distributed-data engineering problem, measured and ablated like any other system parameter, builds better models at the same cost than the team that treats data as a fixed input and pours all its effort into the GPUs.

Research Frontier: Data Curation as a First-Class Research Problem (2024 to 2026)

Data curation moved from folklore to benchmarked science between 2024 and 2026. FineWeb (Penedo et al., 2024) released a fifteen-trillion-token corpus and, crucially, the ablations showing which filtering choices moved downstream accuracy, then FineWeb-Edu distilled it further with an education-quality classifier. DataComp-for-Language-Models (DCLM, Li et al., 2024) framed curation as a competition with the model and compute fixed and only the data varying, demonstrating that a strong model-based filter on Common Crawl beats much larger unfiltered baselines at equal compute. Active lines now include learned data selection and reweighting (DoReMi-style domain mixtures), influence-function and proxy-model methods that score a document by its predicted effect on downstream loss, and synthetic-data generation to fill gaps the web does not cover. The unifying message is the one this section makes operational: the data pipeline is a primary research lever, and the distributed-processing tools of Chapter 7 are how that lever is pulled at petabyte scale.

We now have the corpus as a funnel: ingested, extracted, filtered by language and a model that judges quality, and emitted as balanced, shuffled shards ready for the loaders. One stage was deferred because it is large and subtle enough to stand alone, deduplication, which removes the near-duplicate documents that waste compute and drive memorization, and which is the single most impactful filter of all. That is the subject of Section 19.4.

Exercise 19.3.1: Where Does the Volume Go? Conceptual

Using Figure 19.3.1 and the funnel stages, argue for each of the following whether it is a pure map (no shuffle) or requires a shuffle, and why: (a) extracting text from HTML; (b) language identification; (c) classifier-based quality scoring; (d) near-duplicate removal; (e) global document-level shuffling of the final corpus. For the stages that need a shuffle, name the grouping key and connect it to the MapReduce shuffle of Chapter 6. Then explain why, if filtering removes most of the data, you still want extraction to run first rather than filtering raw HTML directly.

Exercise 19.3.2: Tune the Quality Threshold Coding

Modify Code 19.3.1 so the quality gate threshold $\tau$ (the 0.35 in keep) is swept over several values, for example $\{0.20, 0.35, 0.50, 0.65\}$. For each $\tau$, report the surviving document count, the total token yield, and the average quality_score of the survivors. Plot or tabulate the trade-off between corpus size and average quality, and identify the value of $\tau$ where further raising the bar removes many tokens for little quality gain. Relate your curve to the FineWeb and DCLM finding that, at fixed compute, a smaller cleaner corpus can beat a larger one.

Exercise 19.3.3: Read Time and Worker Count Analysis

A pretraining corpus draws on $1.2\,\text{PB}$ of raw crawl. Each worker reads from object storage at a sustained $800\,\text{MB/s}$ and the extract-plus-filter map work is fully parallel and I/O-bound. Estimate the wall-clock to read and map the whole corpus with $W = 500$ and $W = 2000$ workers, ignoring the dedup shuffle. Then suppose deduplication's shuffle moves $20\%$ of the post-filter data (assume $30\,\text{TB}$) across a network at an aggregate $50\,\text{GB/s}$; estimate its time and state whether the job is dominated by the parallel read or the shuffle. Connect your reasoning to the communication-cost models of Chapter 3.