"I normalized my shard by its own mean and felt magnificent, right up until validation, which had never heard of my mean, returned numbers from a different planet."
A Worker That Fit On Itself
Preprocessing for AI at scale has two distinct jobs, and only one of them is genuinely distributed: learning the constants that describe the data (the fit), and applying those constants to every example (the transform). The transform is embarrassingly parallel; each example is processed alone, so it runs anywhere, on any shard, in any order. The fit is the hard part, because a statistic like a mean, a standard deviation, or a vocabulary must reflect the entire training distribution, which is spread across many machines. Computing it correctly is a distributed aggregation, and computing it incorrectly, by letting each worker fit on its own shard, silently corrupts the model. This section shows where each job runs, proves that the fit is the same mergeable aggregation you have already met, and weighs the cost of materializing the result once against recomputing it every epoch.
By this point in the chapter the data already lives in shards on distributed storage (building on the aggregation patterns of Chapter 6) and a sharded loader feeds it into training, as Section 8.5 set up. What we have not yet asked is what happens to each example between the raw bytes on disk and the tensor the model consumes. Numerical features get normalized, categorical features get encoded against a vocabulary, text gets tokenized, images get resized and scaled. That transformation pipeline is preprocessing, and the part of it that genuinely scales out, as opposed to the generic feature engineering you would do on a laptop, is the subject here. The distinction this section keeps insisting on is between work that is per-example and therefore trivially parallel, and work that depends on the whole dataset and therefore demands coordination.
1. Two Places Preprocessing Runs: Offline and Online Beginner
The first design decision is when the transform happens relative to training. In offline preprocessing, a batch job reads the raw dataset, applies the transform, and writes preprocessed shards back to storage; training then reads those finished shards directly. This is the natural home of a distributed DataFrame engine, so a normalization-and-encoding pass over a multi-terabyte table is exactly the kind of wide transformation that Spark and distributed DataFrames (Chapter 7) exist to run, with the fit expressed as a grouped aggregation and the transform as a column map. In online preprocessing, the transform runs inside the training data loader (the per-worker pipeline of Section 8.5), turning raw examples into tensors on the fly as each batch is assembled, often overlapped with computation on the accelerator.
Neither is universally correct, and the right answer depends on how often the transform changes and how expensive it is. A fixed, cheap transform (a known normalization, a frozen tokenizer) is a fine candidate for the loader, because recomputing it costs little and keeps the storage footprint small. An expensive, stable transform (heavy text cleaning, audio feature extraction, a resize-and-decode that dominates the loader budget) is better materialized once offline, so that no epoch pays for it twice. The cost model in Section 5 of this section makes that trade-off concrete. What both modes share, and what the rest of this section is about, is that the constants the transform depends on must be learned from the whole training set before either mode can run a single example correctly.
Every preprocessing step splits into a fit that learns constants from data (a mean, a standard deviation, a vocabulary, a quantile boundary) and a transform that applies those constants to one example at a time. The transform needs nothing but the example and the constants, so it parallelizes perfectly across shards, machines, epochs, training, validation, and serving. The fit is the only step that must see the whole training distribution, which means it is a distributed aggregation, and it is the only step where getting the distribution wrong, by fitting per shard instead of globally, quietly poisons the model. Distribute your attention accordingly: the fit is where the correctness lives.
2. The Fit/Transform Split, Done Right Across Machines Intermediate
Consider the most ordinary preprocessing step of all, standardizing a numeric feature to zero mean and unit variance. The transform is $z = (x - \mu)/\sigma$, applied to each value independently. The fit is the pair $(\mu, \sigma)$, and the entire correctness question is: which examples does that pair summarize? The answer must be the training distribution, the same $\mu$ and $\sigma$ for the first shard, the last shard, the validation set, and every production request, because the model learned weights that assume inputs were centered and scaled by those exact numbers. If each worker instead standardizes its own shard with that shard's local mean, every shard arrives at the model on a slightly different scale, the validation set uses yet another, and serving uses a fourth. Section 8.8 treats this family of mistakes as data leakage and correctness failures; here we focus on computing the shared statistic right.
Computing $(\mu, \sigma)$ over data spread across $K$ machines without shipping the raw data anywhere is a mergeable aggregation, the same idea that made distributed aggregation work in Chapter 6 (aggregation and combiners). The trick is to choose per-shard partial summaries that combine by simple addition. Mean and variance both fall out of three additive quantities, the count, the sum, and the sum of squares,
$$n = \sum_k n_k, \qquad S_1 = \sum_k \sum_{i \in k} x_i, \qquad S_2 = \sum_k \sum_{i \in k} x_i^2,$$from which $\mu = S_1/n$ and $\sigma^2 = S_2/n - \mu^2$. Each worker computes its own triple $(n_k, S_{1,k}, S_{2,k})$ over its shard alone, the triples are summed in a single all-reduce, and the merged triple yields exactly the statistic a single machine would have computed over all the data at once. Nothing but three numbers per worker crosses the network, no matter how many billions of examples each shard holds. The streaming mean and variance formula above is the foundation, and a numerically stabler version (Welford's update) is used in libraries when $S_2$ would overflow, but the additive, mergeable structure is identical.
The code below makes the equivalence concrete on a skewed feature column. It computes the mean and standard deviation the single-pass way over the whole column, then again as $K$ shard-local triples merged by addition, and finally applies the shared transform to confirm the standardized data has zero mean and unit variance everywhere.
import numpy as np
rng = np.random.default_rng(7)
N, K = 1_000_000, 16 # examples, shards (one per worker)
x = rng.gamma(shape=2.0, scale=3.0, size=N) + 5.0 # skewed feature column
shards = np.array_split(x, K) # K disjoint shards, deliberately unequal-ish
# --- Single-pass "ground truth" statistic on the whole training column ---
true_mean = x.mean()
true_std = x.std() # population std (ddof=0)
# --- Distributed fit: each worker emits a MERGEABLE triple (count, sum, sumsq) ---
def local_partial(col):
return np.array([col.size, col.sum(), np.square(col).sum()])
partials = [local_partial(s) for s in shards] # K independent map outputs
# --- Reduce step: triples are additive, so merging is a plain elementwise sum ---
n, s1, s2 = np.sum(partials, axis=0) # the all-reduce
merged_mean = s1 / n
merged_var = s2 / n - merged_mean * merged_mean # E[x^2] - E[x]^2
merged_std = np.sqrt(merged_var)
print("shards (workers) :", K)
print("single-pass mean :", f"{true_mean:.10f}")
print("merged mean :", f"{merged_mean:.10f}")
print("single-pass std :", f"{true_std:.10f}")
print("merged std :", f"{merged_std:.10f}")
print("max abs diff (mean,std) :",
f"{max(abs(merged_mean-true_mean), abs(merged_std-true_std)):.2e}")
# --- Transform: apply the SHARED stats identically on every shard ---
def transform(col, mu, sd):
return (col - mu) / sd
z_shards = [transform(s, merged_mean, merged_std) for s in shards]
z_all = np.concatenate(z_shards)
print("post-transform mean :", f"{z_all.mean():+.2e}") # ~0
print("post-transform std :", f"{z_all.std():.6f}") # ~1
shards (workers) : 16
single-pass mean : 10.9935636126
merged mean : 10.9935636126
single-pass std : 4.2374883765
merged std : 4.2374883765
max abs diff (mean,std) : 3.55e-15
post-transform mean : -2.14e-16
post-transform std : 1.000000
The agreement is not approximate; it is exact up to the rounding of floating-point addition, exactly as the data-parallel gradient was in Section 1.1. This is the same lesson in a new costume: the operation that looks like it should require seeing all the data at once actually decomposes into additive partials plus one all-reduce. A vocabulary is the same story with a different summary, where each shard emits a partial token-count dictionary and the reduce merges them by summing counts, after which a global frequency threshold selects the shared vocabulary. Quantile boundaries for bucketization merge mergeable sketches the same way. In every case the fit is a distributed aggregation and the transform is a per-example map.
3. Tokenization and Encoding at Scale Intermediate
Text and categorical features sharpen the fit/transform split because the "constant" being learned is a whole vocabulary or a subword merge table. Training a tokenizer (learning byte-pair-encoding merges, or counting token frequencies for a fixed vocabulary) is a fit: it must see a representative sample of the corpus, and at web scale that sample is itself sharded, so the counting is a distributed aggregation that merges per-shard count tables exactly as the mean did. Crucially, the tokenizer is fit once and then frozen. Applying it, mapping a string to a sequence of token ids, is a pure per-example transform with no cross-machine dependency, which is why tokenization parallelizes cleanly across every loader worker and every serving replica.
The discipline that matters at scale is that the frozen vocabulary travels as a small artifact (a file of merges or a token-to-id map) and is loaded identically everywhere. A token id is only meaningful relative to the vocabulary that defined it; if training and serving disagree on the mapping by even one entry, every downstream id shifts and the model receives gibberish. The same holds for categorical encoders: the learned category-to-index map is the shared artifact, and unseen categories at serving time must be routed to a reserved out-of-vocabulary slot rather than silently assigned a fresh index that the model never trained on. The fit produces a portable constant; the transform is stateless given that constant.
Code 8.7.1 spelled out the mergeable triple by hand. A distributed DataFrame engine hides the partials entirely: the fit is a single aggregation over the column and the transform is a single expression, and the engine schedules the partial-and-combine across the cluster for you. The line count for the whole standardize-a-column job collapses from a dozen to two:
from pyspark.sql import functions as F
# FIT: one distributed aggregation returns the shared constants (one all-reduce internally)
mu, sd = df.select(F.mean("x"), F.stddev_pop("x")).first()
# TRANSFORM: one per-row expression, applied across every partition, no extra shuffle
df_z = df.withColumn("x_z", (F.col("x") - F.lit(mu)) / F.lit(sd))
mu and sd so the column map in line 7 applies identical constants everywhere. For a learned multi-column pipeline, Spark ML's StandardScaler wraps the same fit/transform contract.Who: A data platform engineer at a payments company training a fraud model on sixty-four data-parallel workers.
Situation: A new transaction-amount feature was added to a loader that standardized each batch as it arrived, using convenient per-worker statistics.
Problem: Offline accuracy looked strong, but the model degraded sharply in production and behaved differently across regions, with no obvious bug in the model code.
Dilemma: Keep the fast in-loader standardization that scaled effortlessly, or move the fit to a coordinated pre-pass that required a cluster-wide aggregation before training could start.
Decision: They moved the fit out of the loader, computing one global $(\mu, \sigma)$ with a mergeable count-sum-sumsq aggregation and freezing it as a shared artifact.
How: A short pre-pass emitted the triple per shard, merged them in one all-reduce, and wrote the constants next to the dataset; the loader and the serving path both read that file and applied the identical transform.
Result: Every worker, the validation set, and production now scaled inputs by the same numbers; accuracy recovered and the regional discrepancy vanished, because each region was no longer normalized by its own local mean.
Lesson: A per-shard fit is sixty-four subtly different transforms wearing one name. Fit once, globally, with a mergeable aggregation, and ship the constants as a single shared artifact.
4. Materialize Once or Recompute Every Epoch Intermediate
Once the constants are fixed, the transform still has to be applied, and there are two moments to apply it: once, writing preprocessed shards to storage that every epoch then reads directly, or every epoch, recomputing the transform inside the loader as the raw data streams by. Materializing once turns an $E$-epoch training run from $E$ applications of the transform into one, at the cost of extra storage for the preprocessed copy and a loss of flexibility, since changing the transform means regenerating the whole materialized set. Recomputing keeps storage lean and the pipeline flexible, but pays the transform cost $E$ times and can make the loader the bottleneck if the transform is heavy, the exact loader-starvation failure mode of Section 8.5.
The decision reduces to comparing two costs over the life of the run. Let $C_t$ be the per-example transform cost, $E$ the number of epochs, $N$ the dataset size, and $C_{io}$ the extra per-example cost of reading a (typically larger) materialized example versus a raw one. Materializing once costs roughly $N \cdot C_t + E \cdot N \cdot C_{io}$ of added work; recomputing costs $E \cdot N \cdot C_t$. Materializing wins when
$$N \cdot C_t + E \cdot N \cdot C_{io} \;<\; E \cdot N \cdot C_t \quad\Longleftrightarrow\quad C_t \cdot \left(1 - \tfrac{1}{E}\right) \;>\; C_{io},$$which says: materialize when the transform is expensive relative to the extra input/output it creates and you run more than a couple of epochs, since the saved recomputation $C_t(1 - 1/E)$ then exceeds the added read cost $C_{io}$. Cheap transforms over few epochs favor recomputation; expensive transforms over many epochs favor materialization. Many production pipelines split the difference: materialize the expensive, stable steps (decode, resize, heavy text cleaning) offline once, and leave the cheap, varying steps (a final normalization, random augmentation) in the loader, getting the storage profile of recomputation for most of the work and the compute profile of materialization for the heavy part.
Augmentation is the deliberate exception to "materialize once". You want a different random crop, flip, or noise draw each epoch, so freezing augmented examples to disk would defeat the purpose and quietly shrink your effective dataset to a single fixed version of itself. The rule is not "always materialize" but "materialize the deterministic parts; keep the randomness live."
5. Beyond Training: Feature Stores and Serving Consistency Advanced
The fit/transform discipline does not end at training. A model in production receives raw features at request time and must transform them with the same constants it trained on, or it silently sees inputs from a different distribution than the one it learned. The system that enforces this is a feature store: a service that holds feature definitions and their learned constants, computes features offline in batch for training, and serves the identical computation online at low latency for inference, so the offline and online paths cannot drift apart. This online/offline consistency guarantee is the production-grade form of "fit once, apply everywhere", and it is developed in depth where it matters most, in the distributed recommendation case study (Chapter 38), whose feature pipelines fan out across hundreds of features and machines.
The thread that connects this section to that one is exactly the artifact we have been tracking: the shared constant produced by a distributed fit. Whether it is a mean, a vocabulary, or a learned bucket boundary, it is computed once by a mergeable aggregation across the training shards and then must be applied byte-for-byte identically in every later context. The feature store is the machinery that makes "byte-for-byte identically" a system guarantee rather than a hope.
The distributed fit in Code 8.7.1 is the all-reduce of Section 1.1 and the combiner-backed aggregation of Chapter 6, applied to feature statistics instead of gradients. A count-sum-sumsq triple merges by addition just as gradient partials do, which is why a single collective suffices to fit a statistic over a petabyte-scale corpus. Each time this book reaches for "summarize the whole distributed dataset", the answer is the same shape: choose mergeable partials, reduce them once, broadcast the result. Preprocessing is one more place that primitive returns, scaled out.
For foundation models, the heaviest distributed preprocessing is no longer normalization but corpus-scale curation: deduplication, quality filtering, and decontamination over trillions of tokens. The 2024 to 2026 literature treats these as first-class distributed-data problems. The FineWeb and FineWeb-Edu pipelines (Penedo et al., 2024) showed that distributed quality filtering and near-duplicate removal across a 15-trillion-token web crawl materially improve downstream accuracy, and the DataComp and DCLM benchmarks (Li et al., 2024) reframed dataset construction itself as the optimization target, holding the model fixed and searching over preprocessing recipes. Semantic and embedding-based deduplication, run as distributed nearest-neighbor passes, extends the MinHash and LSH machinery of Chapter 6. The shared lesson: at frontier scale, the preprocessing pass is a distributed-systems workload in its own right, and the quality of its fit (which documents to keep, which duplicates to drop) can matter as much as the training algorithm.
We have separated preprocessing into a distributed fit and a parallel transform, computed the fit as an exact mergeable aggregation, weighed materializing against recomputing, and followed the shared constant all the way to the serving boundary. The one thing we have treated only by warning is what goes wrong when these boundaries blur, when the fit accidentally peeks at validation data, or when training and serving transforms drift apart. Those are correctness failures with a name, and the next section gives them one. The treatment of data leakage and correctness in distributed pipelines begins in Section 8.8.
For each preprocessing step, state (i) whether its fit is trivial, a distributed aggregation, or absent, and (ii) whether you would run its transform offline (materialized) or online (in the loader), with a one-sentence justification: (a) clip a numeric feature to the global 1st and 99th percentiles; (b) lowercase a text field; (c) one-hot encode a categorical column with 50,000 categories; (d) apply a random horizontal flip to an image; (e) subtract a per-channel mean from an image. Explain why exactly one of these must never be materialized once.
Adapt Code 8.7.1 to fit a vocabulary instead of a mean. Generate a sharded corpus of word tokens, have each shard emit a partial count dictionary (its mergeable partial), merge the dictionaries by summing counts in a single reduce, and keep only tokens whose global count exceeds a threshold. Verify the merged vocabulary equals the one you would get by counting the whole corpus at once, then show that a token's id changes if you instead build the vocabulary per shard. Explain why the per-shard vocabulary breaks training/serving consistency.
A transform costs $C_t = 8$ milliseconds per example and turns a 4-kilobyte raw example into a 40-kilobyte preprocessed one; reading from storage costs 0.1 millisecond per kilobyte, so the extra read cost is $C_{io} = 0.1 \times (40 - 4) = 3.6$ milliseconds per example. Using the inequality from Section 4, find the smallest number of epochs $E$ for which materializing once is cheaper than recomputing. Then redo the analysis if the transform is sped up to $C_t = 4$ milliseconds and explain, in terms of $C_t$ versus $C_{io}$, why materialization can stop paying off even across many epochs.