Part II: Distributed Data Processing for AI
Chapter 8: Distributed Storage and Data Loading

Data Versioning and Lineage

"They asked me to reproduce last Tuesday's run. I had the code, the seed, and the config. I did not have last Tuesday's data, and so I had nothing at all."

A Worker That Trained on a Table That No Longer Exists
Big Picture

A model is a function of its data, so reproducing, debugging, or auditing a training run requires the exact dataset snapshot it consumed, identified as precisely as you identify a commit of source code. Versioning code while leaving the data unversioned is the most common reproducibility hole in a distributed AI pipeline: the code that built a training table is in Git, but the petabyte the table was distilled from has been overwritten, recompacted, or quietly relabeled, and the run can never be recreated. This closing section of Chapter 8 gives data the same discipline code already has. We content-address dataset snapshots so that identity follows bytes, lean on the lakehouse time travel of Section 8.3 and on tools like DVC and lakeFS to make that snapshot retrievable, and record lineage so that any number in a trained model can be traced back through its transformations to the raw sources that produced it. Versioning is the last layer of the storage stack, the one that makes every layer below it reproducible and auditable rather than merely fast.

Every prior section of this chapter made the storage and loading path faster: object storage that scales past one disk (Section 8.2), columnar lakehouse tables (Section 8.3), partitioning and compaction (Section 8.4), sharded loaders (Section 8.5), streaming pipelines (Section 8.6), distributed preprocessing (Section 8.7), and leakage-safe correctness (Section 8.8). None of that speed is worth anything if you cannot say, six months later, exactly which bytes a deployed model was trained on. That question is not academic. It is asked by a regulator auditing a credit model, by an engineer bisecting a sudden accuracy regression, and by a researcher trying to reproduce a published number. The answer requires that the training dataset have a stable, verifiable identity, and that the chain from raw source to final table be recorded. Those two capabilities, versioning and lineage, are the subject of this section, and together they convert the fast-but-forgetful pipeline of the earlier sections into one you can trust.

1. A Model Is a Function of Its Data Beginner

Write a trained model as the output of a training procedure applied to three inputs: the code, the configuration (hyperparameters and seed), and the data. Symbolically, $\theta = \mathcal{T}(\text{code}, \text{config}, D)$, where $D$ is the exact training set. Reproducibility, the property defined and measured in Section 5.7, is the guarantee that re-running $\mathcal{T}$ with the same three inputs yields the same $\theta$ up to controlled nondeterminism. Practitioners habitually pin the first two inputs, a Git commit for the code and a config file for the seed and hyperparameters, and then leave the third, $D$, as a mutable path like s3://data/train/ whose contents drift from day to day. Pinning two of three inputs and letting the third float does not give you two-thirds of reproducibility; it gives you none, because $\theta$ depends on all three and a single relabeled row changes the result.

The fix is to give $D$ an identity as precise as a Git SHA. The dataset must be addressable not by a path that points at whatever currently lives there, but by a name that is bound to specific bytes and can never silently change underneath you. This is the same insight that made source control useful: a commit hash names a tree of bytes, not a branch that moves. Applied to data, it means the training table referenced by a run is a frozen snapshot, and the reference is to that snapshot's fingerprint, so "the data this model saw" is a question with one unambiguous answer forever.

Key Insight: Version the Data, Not Just the Code That Builds It

Because $\theta = \mathcal{T}(\text{code}, \text{config}, D)$ depends on the data $D$ as strongly as on the code, versioning code alone reproduces nothing. The training set needs an immutable, verifiable identity, a fingerprint bound to exact bytes, so that a run can name the data it consumed the way it names its commit. A pipeline that pins code and seed but points at a mutable data path has pinned two of three inputs and reproduced zero of three, because the unpinned one is enough to change every weight.

2. Content-Addressed Snapshots and Dataset Fingerprints Intermediate

The mechanism that gives data a stable identity is content addressing: name a blob by a cryptographic hash of its bytes rather than by a location. A snapshot of a dataset is the ordered collection of its records, and its fingerprint is a hash computed over the canonical bytes of every record in order. Two properties follow immediately and are exactly the two we need. First, identity is reproducible: rebuilding the same logical dataset from scratch, even on a different machine, yields the same fingerprint, because the hash sees only content. Second, identity is sensitive: changing a single label, dropping one row, or reordering records changes the fingerprint completely, so a snapshot's name is a tamper-evident witness to its exact contents. This is the same Merkle-style addressing that underpins Git and content-addressable object stores; here we point it at training data.

The program below builds three snapshots of a tiny labeled dataset and fingerprints each. Snapshot v1 is the baseline. Snapshot v2 has identical content rebuilt independently (with record keys deliberately reordered, to show the fingerprint depends on content, not on incidental serialization order). Snapshot v3 flips one single label. It then writes a lineage record binding the inputs and the transform code to the output snapshot, the topic of the next section.

import hashlib
import json

# A dataset snapshot is an ordered set of records. Its IDENTITY is a content hash:
# the same bytes always produce the same fingerprint, any change produces a new one.

def fingerprint(records):
    """Content-address a dataset snapshot: hash the canonical bytes of every record."""
    h = hashlib.sha256()
    for r in records:                                  # order-stable iteration
        canonical = json.dumps(r, sort_keys=True, separators=(",", ":"))
        h.update(canonical.encode("utf-8"))
        h.update(b"\n")                                # record delimiter
    return h.hexdigest()

# Snapshot v1: three labeled training rows.
v1 = [
    {"id": 1, "text": "ship the model", "label": "pos"},
    {"id": 2, "text": "the run crashed", "label": "neg"},
    {"id": 3, "text": "all-reduce converged", "label": "pos"},
]

# Snapshot v2: identical content, rebuilt independently (reproducible identity).
v2 = [
    {"label": "pos", "text": "ship the model", "id": 1},   # keys reordered on purpose
    {"id": 2, "text": "the run crashed", "label": "neg"},
    {"id": 3, "text": "all-reduce converged", "label": "pos"},
]

# Snapshot v3: ONE label flipped on row 2 (a single-cell change).
v3 = [dict(r) for r in v1]
v3[1]["label"] = "pos"

fp1, fp2, fp3 = fingerprint(v1), fingerprint(v2), fingerprint(v3)
print("snapshot v1 fingerprint :", fp1[:16], "...")
print("snapshot v2 fingerprint :", fp2[:16], "...")
print("snapshot v3 fingerprint :", fp3[:16], "...")
print("v1 == v2 (same content) :", fp1 == fp2)
print("v1 == v3 (one cell flip):", fp1 == fp3)

# A lineage record links INPUTS + CODE -> the OUTPUT snapshot it produced.
raw_sources = ["s3://corpus/reviews-2026-06.jsonl", "s3://corpus/labels-v4.csv"]
code_version = "git:9f3a1c2"                            # the transform that built it
lineage = {
    "output_snapshot": fp1,
    "inputs": raw_sources,
    "transform_code": code_version,
    "row_count": len(v1),
}
# The lineage record is itself content-addressed, so the (inputs, code) -> output
# binding is tamper-evident: change any field and its own id changes too.
lineage_id = hashlib.sha256(
    json.dumps(lineage, sort_keys=True).encode("utf-8")
).hexdigest()[:16]
print("lineage record id       :", lineage_id)
print("lineage record          :", json.dumps(lineage, indent=None))
Code 8.9.1: A dataset fingerprint and a lineage record from first principles, pure Python with only hashlib and json. The fingerprint function content-addresses a snapshot; the closing block binds raw inputs and transform code to the output snapshot and content-addresses that binding too.
snapshot v1 fingerprint : 3dd4f59af2233d0a ...
snapshot v2 fingerprint : 3dd4f59af2233d0a ...
snapshot v3 fingerprint : 38112a84caf59356 ...
v1 == v2 (same content) : True
v1 == v3 (one cell flip): False
lineage record id       : 8daa463f2483324b
lineage record          : {"output_snapshot": "3dd4f59af2233d0a45a0b32bc487a1b7badc99f74221c1ecda853e0cb6b1712e", "inputs": ["s3://corpus/reviews-2026-06.jsonl", "s3://corpus/labels-v4.csv"], "transform_code": "git:9f3a1c2", "row_count": 3}
Output 8.9.1: The independently rebuilt snapshot v2 shares v1's fingerprint exactly (3dd4f59a...), proving identity follows content, while the single flipped label in v3 produces a completely different fingerprint (38112a84...). The lineage record at the bottom binds the two raw sources and the transform commit to the snapshot they produced, and carries its own tamper-evident id.

The two outcomes in Output 8.9.1 are the whole argument. The same content gives the same name no matter where or when it was built, so a fingerprint is a stable handle you can record in a run's metadata and resolve forever. A one-cell change gives a different name, so the fingerprint cannot be fooled: if the data a model saw differs in any record from the data you are holding now, the mismatch is visible at a glance. A real implementation hashes file chunks or table manifests rather than in-memory dicts, and stores the bytes once under their hash in an object store, but the identity guarantee is precisely the one this toy demonstrates.

Fun Note: The Dataset That Was Reproducible Until Someone Sorted It

A naive fingerprint that hashes records in whatever order the filesystem returns them will declare two identical datasets different the moment one is read from a freshly compacted partition. Teams discover this when a nightly job that "changed nothing" reports a brand-new dataset hash every morning. The cure is to define a canonical order (sort by a stable key) before hashing, so identity depends on content and not on the mood of the directory listing. Reproducibility, it turns out, is mostly the discipline of deciding what you are allowed to ignore.

3. Snapshots in Practice: Time Travel, DVC, and lakeFS Intermediate

Hashing every record by hand, as in Code 8.9.1, explains the idea but does not scale to a multi-terabyte table that thousands of workers read concurrently. Three production mechanisms give you the same immutable-snapshot guarantee at scale, and they differ mainly in what granularity they version. The first is lakehouse time travel. The Delta Lake and Apache Iceberg table formats from Section 8.3 keep a transaction log of immutable snapshots; every commit produces a new table version, and a reader can ask for the table as of a specific version or timestamp. A training job records the table version it read, and that version is a frozen snapshot the format guarantees will never change, because new writes append new snapshots rather than mutating old ones. The second is file-level versioning with DVC, which stores large files in an object store keyed by content hash and keeps small pointer files in Git, so a dataset version is just a Git commit that resolves to exact bytes. The third is branch-and-commit versioning of an entire object-storage bucket with lakeFS, which gives a whole data lake Git-like semantics (branch, commit, merge, revert) so you can snapshot, branch, and roll back petabytes the way you branch code. Table 8.9.1 places the three against the from-scratch fingerprint so you can match a mechanism to a granularity.

Table 8.9.1: Four ways to give a dataset an immutable, retrievable identity, from the hand-rolled fingerprint of Code 8.9.1 to the production tools. Each gives the same guarantee (a name bound to exact bytes) at a different granularity and scale.
MechanismVersioning granularityHow a snapshot is namedBest when
Content fingerprint (Code 8.9.1)Whole dataset or chunkSHA-256 of canonical bytesTeaching, custom integrity checks
Lakehouse time travel (Delta / Iceberg)Table commitTable version number or timestampTables already in a lakehouse (Section 8.3)
DVCFile / directoryGit commit resolving content hashesFile-based datasets versioned alongside code
lakeFSWhole bucket / data lakeCommit id on a branchBranch, merge, and revert across a petabyte lake

All four entries in Table 8.9.1 deliver the identity guarantee of Section 2; they differ in granularity and operational fit, not in the underlying idea. Lakehouse time travel is the right reach when the data already lives in Delta or Iceberg tables, because the snapshot is a free byproduct of the transaction log you are already keeping. DVC fits file-based corpora that you want to version in the same pull request as the code that consumes them. lakeFS fits an organization that wants Git-like branching over an entire lake, so that a risky relabeling can happen on a branch and be merged only once a model trained on it has been validated.

Library Shortcut: Pin an Exact Snapshot in Three Lines Each

The hand-rolled fingerprint of Code 8.9.1 becomes a one-liner with any of the production tools, and each returns a snapshot identity you store in your run's metadata. The same logical operation, "freeze this data and give me back its name", appears three ways below:

# 1) Lakehouse time travel: read a table AS OF an exact version (deltalake / Delta).
from deltalake import DeltaTable
dt = DeltaTable("s3://lake/train_table")
version = dt.version()                         # the immutable snapshot this run reads
df = dt.to_pyarrow_table()                     # ... reproducible: pin `version` in metadata
# later, reproduce exactly:  DeltaTable("s3://lake/train_table", version=version)

# 2) DVC: snapshot a file dataset; its identity is the resulting Git commit.
#   $ dvc add data/train.parquet      # content-hashes the file, writes a .dvc pointer
#   $ git commit -am "train snapshot" # the commit IS the dataset version

# 3) lakeFS: commit the whole bucket on a branch; the commit id names the snapshot.
import lakefs
branch = lakefs.repository("corpus").branch("main")
commit = branch.commit(message="frozen train snapshot")   # commit.get_commit().id
Code 8.9.2: The same immutable-snapshot guarantee as Code 8.9.1, expressed with deltalake, DVC, and lakeFS. The dozen lines of manual hashing collapse to one call per tool; each library handles content hashing, deduplicated storage, and concurrent-read consistency that a from-scratch fingerprint would leave to you.

4. Lineage: From Raw Source to Trained Model Intermediate

A snapshot tells you what the data was; lineage tells you where it came from. Lineage is the recorded chain linking each artifact to the inputs and the code that produced it: which raw sources, which transformations, which commit of which job built the training table that produced a given model. The lineage record at the bottom of Code 8.9.1 is the atomic unit, one output snapshot bound to its inputs and its transform code, and a real pipeline records one such edge per transformation, so the edges compose into a graph from raw sources all the way to the deployed model. This is the same idea you met as RDD lineage in Section 7.2, where Spark records how each partition was derived from its parents so a lost partition can be recomputed. There, lineage served fault tolerance (recompute what was lost). Here, the same recorded provenance serves reproducibility and audit: trace a data issue to its origin, and reproduce or roll back from there.

The payoff is concrete. When a model's accuracy drops on one customer segment, lineage lets you walk backward from the model to the training snapshot, from the snapshot to the transform that built it, and from the transform to the specific raw source, perhaps a relabeled file or a corrupted upstream feed, that introduced the defect. Without lineage you are reduced to guessing; with it, debugging a data problem becomes a graph traversal. And because every node in that graph is a content-addressed snapshot, you can not only find the bad input but reproduce the exact state of the world before it entered, retrain from the last good snapshot, and prove the fix. Figure 8.9.1 draws the graph this section has been building toward.

Raw sources reviews-2026-06.jsonl immutable input labels-v4.csv immutable input transform code git:9f3a1c2 Versioned snapshot fingerprint 3dd4f59a... immutable, retrievable Training T(code, config, D) Trained model θ = T(...) trace a data issue to its origin: model → snapshot → transform → raw source
Figure 8.9.1: The lineage graph this section builds. Two immutable raw sources and a pinned transform commit (left) flow into a content-addressed dataset snapshot (center, the 3dd4f59a... fingerprint from Output 8.9.1), which feeds the training procedure $\mathcal{T}$ that produces the model $\theta$ (right). The dashed back-edge is the audit path: from a model defect you walk backward through the recorded edges to the exact raw source that caused it, then reproduce or roll back from the last good snapshot.
Practical Example: The Accuracy Drop Traced to One Relabeled File

Who: A data scientist on the trust-and-safety team of a content platform owning a toxicity classifier.

Situation: An overnight retrain shipped a model whose precision on one language fell six points, with no code change in the training repository.

Problem: The training table was rebuilt nightly from dozens of upstream feeds; the code was identical to the prior good run, so the defect had to be in the data, but the table at s3://data/train/ had already been overwritten by the next night's build.

Dilemma: Spend days re-deriving every upstream feed by hand to find the culprit, or roll back blindly to an older model and lose a week of legitimate improvements with it.

Decision: They used the lineage graph. The model's metadata recorded the Delta table version it trained on; the table's commit recorded the transform job and its inputs; one input was a relabeling feed whose snapshot fingerprint had changed.

How: They time-traveled the suspect feed to its previous snapshot, diffed it against the new one, and found a vendor had inverted the label convention on roughly two thousand rows. They retrained from the last good snapshot of that one feed while keeping every other improvement.

Result: Precision recovered on the first retrain, the bad feed was quarantined, and the whole investigation took an afternoon instead of a week, because every artifact had a name and a recorded parent.

Lesson: Lineage turns "something in the data broke" from an open-ended hunt into a backward graph traversal. The snapshot tells you what changed; the lineage edge tells you where it came from.

Thesis Thread: Lineage Is the Storage Layer's Half of Reproducibility

Distribution multiplies the number of places a dataset can silently change: many writers, many compaction jobs, many object-store buckets, many workers reading concurrently. The single-machine habit of "the data is just the file on disk" does not survive that multiplication, which is why a distributed storage stack must carry identity and provenance as first-class metadata, not as an afterthought. Versioning and lineage are the storage layer's contribution to the book-wide reproducibility thread that began with the gradient identity of Section 1.1 and the measurement discipline of Section 5.7: an exact, reproducible computation is worthless if its largest input cannot be named. This same provenance chain reappears, extended to models and experiments, in the MLOps tracking of Chapter 26.

5. From Data Lineage to Experiment Tracking Intermediate

The lineage record of Code 8.9.1 stops at the dataset snapshot, but the natural next edge in the graph runs from the snapshot to the model and from the model to the experiment that produced it. That extension is the province of MLOps experiment tracking, where each run logs its code commit, its config, its dataset snapshot id, and the resulting metrics and model artifact, so the entire chain from raw source to deployed model is queryable. The dataset fingerprint this section produces is exactly the field an experiment tracker needs in order to answer "which data did this production model see?", and pinning it is what makes the tracker's reproducibility claim true rather than aspirational. We develop that full model-and-experiment provenance, including model registries and the audit trails a deployed fleet must keep, in Chapter 26. For now the boundary is clean: this chapter versions the data and records the edges up to the training set; MLOps versions the model and the experiment and records the edges beyond it, and the dataset fingerprint is the handshake between the two.

Research Frontier: Provenance for Foundation-Model Data (2024 to 2026)

Versioning a few gigabytes is solved; versioning the multi-trillion-token corpora behind foundation models, with auditable provenance, is an active frontier. The Data Provenance Initiative (Longpre et al., 2024) audited the licenses and sources of thousands of widely used fine-tuning datasets and found pervasive mislabeling, sharpening the demand for machine-verifiable lineage rather than trusted READMEs. Regulatory pressure is now explicit: the EU AI Act's 2024 to 2026 phase-in requires providers of general-purpose models to document training-data sources, turning lineage from good practice into a compliance artifact. On the systems side, content-defined chunking and Merkle-tree manifests (the design behind Hugging Face's Xet-backed storage, 2024 to 2025) make petabyte-scale corpora deduplicated and content-addressed so that a dataset version is a verifiable hash tree rather than a mutable folder. The unifying thread is the message of this section pushed to web scale: a model is a function of its data, and at foundation-model size that function's largest argument must be named, hashed, and provenance-tracked, not merely pointed at.

6. Chapter Summary and Project Ideas Beginner

This section closes Chapter 8, so it is worth tracing the spine the whole chapter built. The storage layer, not the training loop, is what determines whether an AI system scales (Section 8.1), because the bytes have to be read before any gradient can be computed. We built that layer from the bottom up: object storage and distributed filesystems that scale capacity past one disk (Section 8.2); columnar formats and the lakehouse, Parquet and Arrow under Delta and Iceberg, that make analytical reads and atomic table snapshots cheap (Section 8.3); data layout, partitioning, and compaction that decide how much of a table a query must touch (Section 8.4); sharded training data and the loader that keeps accelerators fed rather than starved (Section 8.5); streaming and WebDataset-style pipelines that read shards as a sequential stream (Section 8.6); distributed preprocessing for the transformations that must themselves scale out (Section 8.7); the leakage and correctness traps that a distributed pipeline introduces and must guard against (Section 8.8); and finally, in this section, the versioning and lineage that make every layer below reproducible and auditable. The thread tying them together is that storage is not a passive substrate but an active part of the distributed AI system, and that the same data which determines a model's behavior must therefore be sharded for throughput, kept correct under concurrency, and named precisely enough to reproduce.

Key Takeaway: Chapter 8 in One Breath

Storage determines scale: the bytes must be read before they can be learned from, so the data path is a first-class part of the system. Build it in layers. (1) Object storage and distributed filesystems give capacity beyond one disk. (2) Columnar formats (Parquet, Arrow) under a lakehouse (Delta, Iceberg) make reads cheap and table snapshots atomic. (3) Layout, partitioning, and compaction control how much data a query touches. (4) Sharded loaders keep accelerators fed instead of waiting on I/O. (5) Streaming pipelines read shards as a sequential feed. (6) Distributed preprocessing scales the transforms themselves. (7) Leakage and correctness must be guarded explicitly, because concurrency creates failure modes a single machine never sees. (8) Versioning and lineage give the data an immutable, content-addressed identity and a recorded provenance chain, so a model, which is a function of its data, can be reproduced, audited, and debugged back to the raw source. Speed without reproducible identity is a pipeline you cannot trust; this chapter built both.

Project Ideas

Carry one of these from a single-machine baseline to a distributed version, applying the full chapter. Each is sized to grow into a capstone (Chapter 41).

1. A versioned, sharded training-data pipeline. Take a public dataset of at least a few gigabytes, build it into a lakehouse table (Delta or Iceberg, Section 8.3), partition and compact it for fast reads (Section 8.4), shard it into a streaming loader (Sections 8.5 and 8.6), and record an immutable snapshot version plus a lineage record for every rebuild. Demonstrate that you can pin a run to an exact table version, reproduce its fingerprint on a second machine, and roll back to a prior snapshot after a deliberate bad relabel.

2. A lineage-driven data-bug bisector. Construct a pipeline of three or four chained transforms from two raw sources to a training table, recording a content-addressed lineage edge at each step. Inject a defect into one upstream source, then write a tool that walks the lineage graph backward from a model's degraded metric to the exact raw input and transform commit responsible, mirroring the practical example of Section 4. Measure how the time-to-root-cause compares with and without the lineage records.

3. A fingerprint-gated reproducibility check. Wrap a small distributed training job so that it refuses to start unless the dataset fingerprint recorded in the run config matches the live data, and logs the fingerprint, code commit, and config to an experiment tracker (foreshadowing Chapter 26). Show that the guard catches a silent data change that would otherwise have produced a different model from an unchanged config.

Exercise 8.9.1: Why Pinning the Seed Is Not Enough Conceptual

A colleague argues that since their training run pins the code commit and the random seed, the run is fully reproducible and versioning the data is redundant. Using $\theta = \mathcal{T}(\text{code}, \text{config}, D)$ from Section 1, construct a concrete two-line scenario in which the code and seed are byte-identical across two runs yet the trained models differ, and state exactly which property of the data reference caused it. Then explain why a content-addressed snapshot reference, rather than a path, removes the problem, and why "the file is in our data lake" is not the same guarantee.

Exercise 8.9.2: Break and Fix the Fingerprint Coding

Start from Code 8.9.1. First, modify fingerprint to iterate records in a nondeterministic order (for instance, hash a shuffled copy) and show that two runs over the same dataset now produce different fingerprints, reproducing the bug in the Fun Note. Then fix it by sorting records on a stable key before hashing, and verify that an independently shuffled rebuild recovers the original fingerprint. Finally, add a single new row to the dataset and confirm the fingerprint changes, then remove it and confirm the original fingerprint returns exactly, demonstrating that identity is both stable and sensitive.

Exercise 8.9.3: Design a Lineage Schema for a Rollback Analysis

You operate a pipeline that builds a training table nightly from five upstream feeds, each independently versioned. Design the minimal lineage record you must store per nightly build so that, given only a deployed model exhibiting a regression, you can (a) identify the exact training snapshot it used, (b) list the five upstream feed snapshots that built that table, and (c) roll back to the most recent build in which a named feed had a specified prior snapshot. State the fields, argue why each is necessary, and explain how time travel (Section 3) makes step (c) an $O(1)$ lookup rather than a rebuild from raw sources. Relate your schema to the RDD lineage of Section 7.2: what does each approach record per edge, and why?

That closes Chapter 8, and with it the storage and data-loading foundation of Part II. You arrived able to load a file; you leave able to build a distributed data path that scales in capacity, reads cheaply, feeds accelerators without starving them, stays correct under concurrency, and names its every snapshot precisely enough to reproduce and audit. The data so far has been at rest: bounded tables, read in full, versioned as static snapshots. The next chapter sets the data in motion. Chapter 9 asks what changes when the dataset is unbounded and arriving continuously, when "the training set" is a never-ending feed of events, and when a model must update and serve in real time rather than from a frozen snapshot. The storage discipline of this chapter becomes the durable log beneath that stream, and the versioning instinct you built here becomes the watermark and checkpoint machinery that keeps a streaming computation correct.