Part II: Distributed Data Processing for AI
Chapter 8: Distributed Storage and Data Loading

Data Leakage and Correctness in Distributed Pipelines

"My validation accuracy was magnificent. It turned out I had been quietly grading my own homework, on every machine, in parallel."

A Shard That Believes It Is the Whole Model
Big Picture

Distribution does not create data-leakage and reproducibility bugs, but it makes them dramatically easier to commit and far harder to notice, because the correctness of a split or a statistic now depends on machines agreeing about a global property they each see only a slice of. When preprocessing statistics, train/test boundaries, and shuffle order are all computed locally on independent shards, the pipeline can look perfectly healthy on every node while leaking future or held-out information into training. The damage is invisible offline: leakage and unstable ordering inflate validation metrics, the model ships looking excellent, and the gap appears only in production where the leaked information is genuinely absent. This section names the three traps that distribution sharpens, leakage, entity-crossing splits, and nondeterminism, and shows the one discipline that defeats all three: compute correctness from a stable global key, never from local shard state.

The previous section built distributed preprocessing: feature transforms, normalization, and tokenization spread across many workers so that terabyte-scale corpora can be prepared in parallel. That machinery is fast precisely because each worker acts on its own shard without consulting the others. The same independence that buys throughput is what makes correctness fragile here. A statistic fit on one shard is not the dataset statistic; a random split decided one row at a time does not respect the entities those rows belong to; an unordered parallel scan does not produce the same sequence twice. Each of these is a small local decision that is individually reasonable and collectively wrong. This section is about catching them before they reach a metric you trust.

Three traps recur often enough to name. The first is leakage: information that will not exist at prediction time leaks into training, usually through a statistic fit on data that includes the test rows or the future. The second is the entity-crossing split: a naive per-row random partition that scatters the same user, document, or near-duplicate across the train and test sides of the boundary. The third is nondeterminism: shuffles and orderings that change from run to run, so that a result cannot be reproduced and a regression cannot be attributed. We take them in turn, then prove the second one on a runnable example.

1. Leakage: Fitting on Data You Will Not Have Intermediate

Leakage is the contamination of training with information that is unavailable at prediction time. The textbook instance is preprocessing fit on the wrong scope. Suppose you standardize a feature by subtracting its mean and dividing by its standard deviation. If you compute that mean and standard deviation over the entire dataset before you carve out the test set, then every training example has been nudged by a statistic that already knows the test values. The model never sees a test label, yet it trains in a coordinate system that the test set helped define, and your held-out estimate is optimistic by exactly the amount that contamination is worth. The cure is procedural and absolute: fit every preprocessing statistic on the training split alone, then apply the frozen statistic to validation and test. In a distributed pipeline this means the global mean must be an all-reduce over training shards only, a direct continuation of the partitioned-statistics machinery of Chapter 6, with the test shards excluded from the reduction.

The subtler and more dangerous form is temporal leakage, where future information sneaks backward into the past. It is the silent killer of time-series and recommendation systems. A feature like "this user's average rating" or "the rolling 7-day click-through rate" is fine only if every value used to train on a row at time $t$ was computable strictly before $t$. Compute that average over the whole history, including events after $t$, and the model learns to predict the present from the future, a skill it loses the instant it meets live traffic. Distribution makes this trap easy to spring because a windowed aggregation is naturally sharded by entity, and a worker holding one user's entire history has no built-in notion of "as of time $t$"; it sees the full timeline at once and will happily average across it unless the pipeline forces a causal cutoff. The rule is that every feature for a row must be a function only of information with a timestamp strictly earlier than that row's prediction time, and that rule has to be enforced shard by shard.

Key Insight: Leakage Is a Scope Error, and Distribution Multiplies the Scopes

Every leak is the same mistake wearing a different costume: a statistic was computed over a scope wider than what the model will have at prediction time. Whole-dataset normalization widens the scope to include the test set; whole-history aggregation widens it to include the future. On one machine there is one scope to police. In a distributed pipeline each shard has its own local scope, and the global scope is whatever the reductions choose to combine, so correctness now depends on every worker agreeing about which rows and which timestamps are in-scope. Define the legal scope once, as a rule about training-only and as-of-time-$t$ data, and make every collective honor it.

2. Splitting Without Crossing Entities Intermediate

A train/test split is supposed to estimate how the model behaves on data it has never seen. That guarantee holds only if the test rows are genuinely independent of the training rows. The most common way to break it is to split by row when the rows are not the unit of independence. In a recommendation dataset, one user generates many rows; in a document corpus, one article generates many passages; on the open web, one viral post generates thousands of near-duplicate copies. Shuffle those rows and cut at 80 percent, and the same user, article, or duplicate lands on both sides of the line. The model then "generalizes" to a test user whose other rows it memorized during training. The metric measures memorization and calls it generalization.

The fix is a group split, also called an entity split: the unit of partition is the group key (user, document, cluster of near-duplicates), and a whole group goes entirely to train or entirely to test, never split between them. The challenge in a distributed pipeline is that the split decision must be identical on every machine without any machine seeing the whole dataset. A coordinator broadcasting a giant assignment table does not scale. The scalable answer is to make the decision a pure function of the group key, computed independently and identically everywhere: hash the stable key to a bucket, and route buckets to splits. Because $\text{bucket}(k) = h(k) \bmod B$ depends only on $k$, every worker that sees a row for group $k$ reaches the same verdict with no communication at all. This is the same stable-hashing idea that powered MinHash near-duplicate detection in Section 6.7; there it grouped duplicates together, and here it keeps every member of a group on the same side of the split.

Naive per-row random split (leaks) Group-wise hashed split (correct) user A ●   user B ■ TRAIN TEST user A is in BOTH sides: the test score is memorization TRAIN TEST bucket(key) = hash(key) mod B user A → train bucket, user B → test bucket same verdict on every worker, no communication no entity crosses the boundary
Figure 8.8.1: Left, a naive per-row random split scatters the rows of user A (circles) and user B (squares) across both sides, so user A appears in train and test and the model is graded on data it memorized. Right, a group-wise split hashes the stable group key to a bucket; every row of a user follows its key to one side only, so no entity crosses the boundary. Because the bucket is a pure function of the key, every worker computes the identical split with no coordination, the property that makes it scale.

Closely related is duplicate leakage across the train/test boundary in web-scale corpora. Even with a clean per-document split, two documents that are near-identical (a press release republished by fifty outlets, a Wikipedia mirror) can land on opposite sides, and testing on one after training on its twin again measures memorization. The defense is to deduplicate before splitting, assign every near-duplicate cluster a single stable cluster id with the MinHash and locality-sensitive-hashing pipeline of Section 6.7, and then group-split on the cluster id rather than the raw document. The split key is whatever your true unit of independence is, and on the web that unit is the deduplicated cluster, not the URL.

3. The Leak You Can Run Intermediate

Abstract warnings about leakage rarely land until you watch a metric lie to your face. The program below builds a small recommendation-style dataset where each user contributes many rows and the label is mostly a property of the user, exactly the regime where row-level splitting leaks. It then compares two splits with an identical, deliberately memorizing model that predicts a test row from its user's training-set majority. The leaky split is a per-row random shuffle. The correct split hashes each user id to a bucket and routes whole users to train or test. Both splits, both accuracies, and the count of users straddling the boundary are printed so the inflation is impossible to miss.

import hashlib
import numpy as np

rng = np.random.default_rng(7)

# A recommendation-style dataset: G users, each appearing in several rows.
# The LABEL is essentially a property of the USER (a per-user bias), so any
# row from a user nearly determines the labels of that user's other rows.
G, ROWS_PER_USER, d = 600, 12, 6
N = G * ROWS_PER_USER

user_id = np.repeat(np.arange(G), ROWS_PER_USER)        # the group key
user_bias = rng.standard_normal(G)                       # one hidden bias per user
X = rng.standard_normal((N, d))
# label depends mostly on the user's bias, only weakly on the row features.
y = (user_bias[user_id] + 0.15 * X[:, 0] > 0).astype(int)

def fit_and_score(tr, te):
    # A memorizing model: predict each test row by its user's TRAIN-set majority.
    # If the user is unseen in train, fall back to the global majority.
    global_major = int(round(y[tr].mean()))
    per_user = {}
    for u in np.unique(user_id[tr]):
        rows = tr[user_id[tr] == u]
        per_user[u] = int(round(y[rows].mean()))
    pred = np.array([per_user.get(user_id[i], global_major) for i in te])
    return (pred == y[te]).mean()

idx = np.arange(N)

# ---- LEAKY split: naive per-row random shuffle, 80/20.
rng.shuffle(idx)
cut = int(0.8 * N)
acc_leaky = fit_and_score(idx[:cut], idx[cut:])
tr_users = set(user_id[idx[:cut]].tolist())
te_users = set(user_id[idx[cut:]].tolist())
overlap = len(tr_users & te_users)

# ---- CORRECT group-wise split: hash the STABLE group key, assign whole user.
# bucket = (md5(user_id) mod 100); users with bucket < 20 go to test.
def bucket(u):
    h = hashlib.md5(str(int(u)).encode()).hexdigest()
    return int(h, 16) % 100

is_test = np.array([bucket(u) < 20 for u in user_id])    # one decision per row, by user
all_rows = np.arange(N)
tr2, te2 = all_rows[~is_test], all_rows[is_test]
acc_group = fit_and_score(tr2, te2)
overlap2 = len(set(user_id[tr2].tolist()) & set(user_id[te2].tolist()))

print("rows N                         :", N)
print("distinct users G               :", G)
print("--- leaky per-row random split ---")
print("users in BOTH train and test   :", overlap, "of", G)
print("offline accuracy (looks great) : %.3f" % acc_leaky)
print("--- group-wise hashed split ------")
print("users in BOTH train and test   :", overlap2, "of", G)
print("offline accuracy (honest)      : %.3f" % acc_group)
print("inflation from leakage         : %.3f" % (acc_leaky - acc_group))
Code 8.8.1: A leaky per-row split versus a correct group-wise hashed split, scored by the same memorizing model. The hash bucket(u) depends only on the user id, so it is the single-machine stand-in for a decision every distributed worker would reach independently and identically.
rows N                         : 7200
distinct users G               : 600
--- leaky per-row random split ---
users in BOTH train and test   : 552 of 600
offline accuracy (looks great) : 0.922
--- group-wise hashed split ------
users in BOTH train and test   : 0 of 600
offline accuracy (honest)      : 0.526
inflation from leakage         : 0.396
Output 8.8.1: The leaky split puts 552 of 600 users on both sides and reports 92.2 percent accuracy; the group split lets zero users cross the boundary and reports the honest 52.6 percent. The leak inflated the headline metric by 39.6 points, the gap that would reappear as a production collapse the moment the model met genuinely new users.

The two numbers tell the whole story. Nothing about the model changed between them; only the question changed. The leaky split asked "can you recall a user you have already seen?" and the group split asked "can you predict a user you have never met?", which is the question production will actually pose. A 92.2 percent that becomes 52.6 percent under an honest split is not a strong model with a measurement quirk; it is barely-better-than-coin-flip dressed up by leakage. The discipline that prevents it costs three lines: choose the group key, hash it, split on the bucket.

Fun Note: The Model That Was a Genius Until Payday

A recurring pattern in postmortems: a model posts a triumphant offline number, ships, and degrades the instant real traffic arrives, sometimes literally on the first day. The villain is almost never the architecture. It is a split that let the model peek, so the offline test was an open-book exam and production was the closed-book final. The model did not get worse on launch day; it just finally got asked a question it had not already seen the answer to.

Library Shortcut: scikit-learn GroupShuffleSplit Does the Group Split for You

Code 8.8.1 spelled out the hashing so the mechanism is visible. In practice you hand the group key to scikit-learn and let it guarantee that no group straddles the boundary. GroupShuffleSplit takes a groups array and produces train/test indices in which every group lands entirely on one side, and GroupKFold does the same for cross-validation folds:

from sklearn.model_selection import GroupShuffleSplit

# groups = the stable entity key for every row (user id, document id, cluster id)
gss = GroupShuffleSplit(n_splits=1, test_size=0.2, random_state=0)
train_idx, test_idx = next(gss.split(X, y, groups=user_id))
# guarantee: set(user_id[train_idx]) and set(user_id[test_idx]) are disjoint
Code 8.8.2: The disjoint-group guarantee from Output 8.8.1 in two lines. For a multi-machine pipeline the hashing form of Code 8.8.1 is still preferred because it needs no shared index list; GroupShuffleSplit is the single-node tool, and its groups argument is exactly the stable key you would hash.

4. Nondeterminism: Results That Will Not Repeat Advanced

The third trap is subtler than a wrong number; it is a number you cannot reproduce. A distributed read returns shards in whatever order they finish, a parallel shuffle seeds itself from wall-clock time, a hash-based split uses a hash that is randomized per process. Run the pipeline twice and the train/test boundary moves, the batch order changes, and two training runs that should be identical diverge. The immediate harm is that you can no longer tell a real improvement from shuffle noise: a model that scores better might owe it entirely to a luckier split. The deeper harm is that a leakage or correctness bug becomes unfalsifiable, because you cannot reproduce the exact conditions that exposed it. Reproducibility is not a nicety here; it is the precondition for trusting any comparison, which is why Chapter 5 treats deterministic, seeded pipelines as a measurement requirement rather than an optional polish.

The defenses all share one shape: replace anything that depends on timing or process identity with something that depends only on stable data. Seed every random generator from a fixed value, and derive per-shard seeds deterministically from a global seed and the shard index so that shard $i$ always shuffles the same way. Make the split a pure function of a stable key, as in Code 8.8.1, rather than of arrival order, so the boundary is identical no matter which worker processes a row or in what sequence. Sort or canonicalize shard outputs by a stable key before any order-sensitive step. Note that Python's built-in hash() is salted per process and must not be used for a split key; use a fixed hash such as the hashlib.md5 of Code 8.8.1, whose value is the same in every process on every machine. The test of a correct distributed pipeline is blunt: run it twice from the same seed and confirm the splits, the statistics, and the metrics are bit-for-bit identical.

Practical Example: The Recommender That Aced Validation and Failed Launch

Who: A data scientist on the personalization team at a streaming-media company, owning the next-item recommendation model.

Situation: A new model reported 0.91 offline accuracy on a held-out set built by a Spark job that randomly split the interaction log 80/20 across the cluster.

Problem: In the first week of the A/B test the treatment group performed no better than the old model, and on brand-new users it performed clearly worse, nowhere near the 0.91 promise.

Dilemma: Trust the glowing offline number and blame the online infrastructure or a serving bug, or distrust the evaluation itself and rebuild the split, which would invalidate weeks of reported results.

Decision: They audited the split first and found that 92 percent of users in the test set also appeared in training, because the random split partitioned by interaction row, not by user, exactly the Figure 8.8.1 leak.

How: They switched to a user-level group split (hash the user id to a bucket, route whole users), and for repeat content they first deduplicated near-identical items with MinHash and split on the cluster id, then re-ran every offline number.

Result: Honest offline accuracy fell to 0.58, which finally matched the online behavior; with a truthful baseline the team could iterate on real generalization instead of chasing a memorization mirage, and the next model improved both numbers together.

Lesson: An offline metric that does not survive a group-wise split is not a strong result; it is a leak. Make the split honest before you make the model better, or you will optimize the wrong thing at full distributed throughput.

5. Why Leakage Inflates Offline Metrics and Collapses in Production Intermediate

It is worth being precise about the mechanism, because "leakage inflates metrics" is easy to repeat and easy to underestimate. An offline metric is an estimate of production performance, and that estimate is only as good as the resemblance between the offline test and the production query. Leakage breaks the resemblance in a specific direction: it gives the offline test access to information that production will not have, so the offline test is strictly easier than the real task. The model is rewarded for exploiting that surplus information, the optimizer steers toward exploiting it harder, and the offline metric climbs. None of that effort transfers, because in production the surplus information is simply gone: the test user is genuinely new, the future has not happened yet, the duplicate is not in the training set. The metric does not degrade gracefully; it falls back to the model's true generalization, which can be far lower, as Output 8.8.1 made vivid with a 39.6 point drop.

This is also why leakage is so corrosive to iteration, not just to a single launch. Every design decision you make, every hyperparameter you tune, every architecture you prefer, is chosen because it improved the offline metric. If that metric is inflated by leakage, you are optimizing for the ability to exploit leaked information, and the whole research loop tilts toward models that memorize better rather than generalize better. The damage compounds silently across an entire project. The remedy is the same discipline stated three ways across this section: train-only statistics, as-of-time-$t$ features, group-wise splits on a deduplicated stable key, all computed identically on every machine. Distribution did not invent these traps, but it spreads the surface area across many shards, so the only durable defense is to make correctness a property of the global key rather than of any local shard's view.

Research Frontier: Train/Test Contamination in LLM Benchmarks (2024 to 2026)

The entity-crossing split has a web-scale cousin that now dominates evaluation debates for large language models: benchmark contamination, where the test questions themselves leak into the pretraining corpus. Because frontier models train on enormous web crawls, popular benchmarks (GSM8K, MMLU, HumanEval, and their kin) are frequently present verbatim or paraphrased in the training data, so a high score can reflect memorization rather than reasoning. Work from 2024 to 2026 has built detectors for this leak: membership-inference and perplexity-gap probes, deliberately rewritten or freshly authored test variants that a contaminated model fails, and "living" benchmarks refreshed after a model's training cutoff to guarantee novelty. The same near-duplicate machinery used to deduplicate corpora is now turned inward to audit whether the evaluation set survived contact with the training set, making the group-split and deduplicate-before-split discipline of this section a frontier concern for the field's headline numbers, not just an engineering footnote. We return to evaluation integrity with the measurement tools of Chapter 5.

The throughline of the chapter so far has been moving data at scale: storing it, sharding it, loading it, preprocessing it across many machines. This section added the constraint that makes all of that trustworthy, namely that the splits and statistics computed on those shards must reflect a global truth no single shard can see. The natural next question is how to make a dataset, and the exact transforms applied to it, something you can name, pin, and reproduce, so that "the model trained on version 7 with these features" is a precise and recoverable statement. That is data versioning and lineage, the subject of Section 8.9.

Exercise 8.8.1: Name the Leak and the Scope Conceptual

For each pipeline, state whether it leaks, name the leak (whole-dataset preprocessing, temporal, entity-crossing, or duplicate), and give the scope the offending statistic should have been computed over: (a) a fraud model that standardizes each feature using the mean and variance of the full labeled history before splitting by time; (b) a churn model whose "average monthly spend" feature for a customer is computed over that customer's entire lifetime, including months after the prediction date; (c) a passage-retrieval model split per passage, where each Wikipedia article contributes dozens of passages; (d) a sentiment model whose web-scraped test set contains articles that also appear, lightly reworded, in training. Explain in each case which production guarantee the leak violates.

Exercise 8.8.2: Make the Inflation Worse, Then Cure It Coding

Start from Code 8.8.1. First strengthen the leak: increase ROWS_PER_USER and reduce the weight of the row feature so the label depends almost entirely on the user, and report how the leaky-versus-group accuracy gap responds. Then add a third split that is a per-row random split but stratified to keep the class balance equal across train and test, and show that stratification does not fix the leak (the gap survives), because balancing labels is unrelated to keeping entities apart. Finally, replace your hand-rolled hash split with scikit-learn GroupShuffleSplit from Code 8.8.2 and confirm it reproduces the disjoint-group result. Explain why class stratification and group separation are orthogonal concerns.

Exercise 8.8.3: Prove Your Split Is Deterministic Across Shards Analysis

You are given a dataset partitioned into 8 shards on 8 workers, with no worker seeing the whole dataset. Design a group-wise train/test split, computed independently on each shard, that is provably identical to the split a single machine would produce from the full dataset, and reproducible bit-for-bit across runs. Specify the hash function and why Python's built-in hash() is disqualified, the bucketing rule, and the seeding scheme for any randomness. Then describe a concrete two-run test that would catch a nondeterminism bug (for example, a worker that seeds its shuffle from wall-clock time), and state what global invariant the test checks. Relate your invariant to the reproducibility requirements of Chapter 5.