Part V: Distributed Inference and Serving
Chapter 26: MLOps for Distributed AI

Distributed Data and Training Pipelines

"I am not one job. I am six jobs in a trench coat, holding hands, and if the third one trips we all go down together. Please cache me."

A Training Pipeline Between Two Failed Runs
Big Picture

A trained model is not the output of a single program; it is the output of a directed acyclic graph of distributed jobs, each running on its own cluster, each depending on the artifacts of the ones before it. Ingesting raw data, validating it, preprocessing it on a Spark cluster, training on a GPU fleet, evaluating, and registering the result are separate distributed computations chained into one pipeline. The job of a workflow orchestrator is to schedule that graph across shared compute, run each stage only when its dependencies are ready, retry the stages that fail on flaky hardware, skip the stages whose inputs have not changed, and do all of this reproducibly so that the same inputs always yield the same model. This section builds the pipeline-as-DAG mental model, shows the orchestrator's core responsibilities (dependency ordering, validation gates, caching, retries), and proves the caching idea with a runnable graph runner that skips work it has already done.

The previous section framed MLOps as operating an AI system across a fleet rather than babysitting one model on one box. The first concrete instance of that fleet-wide view is the pipeline that produces the model in the first place. On a single machine, producing a model is a script: load data, preprocess, fit, save. At scale, every one of those verbs is its own distributed job with its own resource profile. Data ingestion touches a distributed object store; preprocessing runs on a Spark cluster (Chapter 7); training claims dozens of GPUs for hours; evaluation and registration are short but gate everything downstream. Stitching these heterogeneous jobs into one reliable, repeatable workflow is what this section is about, and the central abstraction is the DAG.

Workflow orchestrator (Airflow / Kubeflow / Flyte) schedules the DAG, provisions compute per stage, retries failures, caches unchanged stages ingest object store validate schema gate preprocess Spark cluster train GPU cluster evaluate metric gate register model registry
Figure 26.2.1: The model-production pipeline as a directed acyclic graph. Solid arrows are data dependencies between stages; each stage is a distinct distributed job on its own cluster. The green band is the orchestrator, which schedules every node (dashed control links), provisions the right compute for each, and owns retries and caching. The validate and evaluate nodes are gates: the graph does not advance past them unless their checks pass.

1. The Pipeline Is a DAG of Distributed Jobs Beginner

A directed acyclic graph is the right abstraction because it captures exactly the two facts that matter: which stages must finish before which others can start, and which stages have no ordering between them and may run in parallel. In Figure 26.2.1 the chain ingest, validate, preprocess, train, evaluate, register is linear, but real pipelines fan out (preprocess images and text on separate clusters) and fan in (a training stage that consumes both), and the DAG encodes all of it without the orchestrator having to understand what any stage does. Each node is a black box that reads upstream artifacts and writes its own; each edge is a dependency. This is the same dataflow-graph idea that Spark (Chapter 7) uses inside a single job, lifted one level up to coordinate whole jobs.

What makes these nodes hard is that they are not equal. The preprocess node may run hundreds of Spark executors for twenty minutes; the train node may hold a 64-GPU gang for nine hours; the register node finishes in a second. The orchestrator must provision a different shape of distributed compute for each step and tear it down afterward, which is why scheduling a long GPU training step is itself a placement problem against a shared cluster, the subject of Chapter 33. The pipeline layer decides when the training job should run and on what inputs; the cluster scheduler decides where its GPUs come from.

Key Insight: The Orchestrator Coordinates Jobs, Not Computations

A distributed-training framework coordinates workers inside one job (it owns the all-reduce). A workflow orchestrator coordinates whole jobs across the lifecycle (it owns the edges between them). These are different layers and they compose: the train node of the DAG is, internally, a data-parallel job running its own collectives. Keeping the two layers distinct is what lets you swap a single-GPU training stage for a 512-GPU one without rewriting the pipeline; only the contents of one node change, not the graph.

2. Validation Gates: Catch Bad Data Before It Costs a GPU-Day Beginner

The most expensive failure in a training pipeline is the one you discover after the train node has run. A corrupted feature column, a silently shifted label distribution, or a schema change upstream can produce a model that trains to convergence and is quietly worthless. The remedy is a validation gate: a cheap stage placed immediately after ingestion that checks the data against an expected schema and against statistical expectations (column types, null rates, value ranges, distribution drift versus the last accepted batch) and refuses to let the graph proceed if the checks fail. In Figure 26.2.1 this is the validate node, and its placement before the costly Spark and GPU stages is deliberate: failing fast is the entire point.

Gates are where data engineering and reliability meet. The same logic that makes a distributed system tolerate a bad node, fail the unit of work and re-run it rather than corrupting the whole, applies here at the data level: a batch that fails validation is rejected, the pipeline halts or routes to a remediation branch, and no GPU is spent on it. We made the general case for this fail-and-recover discipline in Section 2.4; a validation gate is that discipline applied to data quality.

Practical Example: The Schema Change That Almost Shipped a Dead Model

Who: An ML platform engineer at a payments company maintaining a nightly fraud-model retrain.

Situation: An upstream team renamed a transaction-amount field and changed its units from cents to dollars in the same release.

Problem: The preprocess stage happily cast the new column, training ran for its full six GPU-hours, and the model converged on a feature that was now off by a factor of one hundred.

Dilemma: Add a heavyweight data-contract system across every upstream producer, slow and political, or put a lightweight statistical gate in the pipeline itself, fast to ship but only catching what it is told to check.

Decision: They added a validate node that compared each batch's per-column summary statistics against the last accepted batch and failed the run on a mean shift beyond a set threshold.

How: The gate computed column means and null rates during ingestion (cheap, already a distributed scan) and asserted them against a stored baseline before the Spark and GPU stages were allowed to start.

Result: The next night's run halted at the gate in under a minute, flagging the amount column's hundredfold mean shift, and no GPU time was spent on the broken batch.

Lesson: A cheap gate that runs before the expensive stages converts a six-GPU-hour silent failure into a one-minute loud one. Validate where it is cheap, fail before it is expensive.

3. Caching and Incremental Recomputation Intermediate

Most pipeline runs are not the first run. You change a hyperparameter and rerun; you fix a bug in the eval stage and rerun; the nightly schedule fires on data that is mostly unchanged. Recomputing the twenty-minute Spark preprocess and the nine-hour train every time would be ruinous, and most of it is wasted because the inputs to those stages did not change. The fix is content-addressed caching: fingerprint each stage's inputs (the outputs of its upstream stages plus its own configuration), and if a stage has already produced an output for that exact fingerprint, reuse the cached output and skip the work. A stage reruns only when something it actually depends on changes, and the change propagates downstream to exactly the stages it touches.

This is the same lineage idea that underpins distributed dataflow systems. Spark tracks the lineage of every partition so it can recompute only what a failure lost (Section 7.2), and distributed data loaders cache and reuse materialized shards rather than recomputing them every epoch (Section 8.9). A pipeline cache lifts lineage to the granularity of whole stages: the fingerprint is the lineage key, and a cache hit is the proof that recomputation is unnecessary. Formally, if stage $s$ depends on inputs $I_s$ and we write its fingerprint as a hash $h(I_s)$, then a run reruns $s$ if and only if $h(I_s)$ differs from the fingerprint stored at the last successful run, and a change to a stage propagates to a descendant $d$ exactly when it alters $h(I_d)$.

The runner below implements this in pure Python: a DAG of six stages, content fingerprinting, skip-on-unchanged caching, and retry-on-failure. The train stage is deliberately flaky on its first attempt (simulating a preempted GPU node) so the retry path is exercised, and we run the pipeline three times to watch caching at work.

import hashlib, json, time

CACHE = {}                       # stage_name -> (input_fingerprint, output)
ATTEMPTS = {}                    # stage_name -> how many times the body actually ran

def fingerprint(obj):
    """Stable content hash of a stage's inputs (upstream outputs + config)."""
    blob = json.dumps(obj, sort_keys=True, default=repr).encode()
    return hashlib.sha256(blob).hexdigest()[:12]

def run_stage(name, body, inputs, max_retries=3):
    """Run one stage unless its input fingerprint is unchanged from last run."""
    fp = fingerprint(inputs)
    if name in CACHE and CACHE[name][0] == fp:                # cache hit -> skip
        print(f"  [skip ] {name:<11} inputs unchanged (fp={fp})")
        return CACHE[name][1]
    for attempt in range(1, max_retries + 1):                 # retry-on-failure
        ATTEMPTS[name] = ATTEMPTS.get(name, 0) + 1
        try:
            out = body(inputs, attempt)
            print(f"  [run  ] {name:<11} attempt {attempt} ok (fp={fp})")
            CACHE[name] = (fp, out)
            return out
        except Exception as e:
            print(f"  [retry] {name:<11} attempt {attempt} failed: {e}")
    raise RuntimeError(f"stage {name} exhausted {max_retries} retries")

# Each body is a distributed job in production; here it is pure Python.
def ingest(inp, attempt):     return {"rows": list(range(inp["n_rows"]))}
def validate(inp, attempt):                                  # data-validation gate
    rows = inp["upstream"]["rows"]
    if any(r < 0 for r in rows): raise ValueError("negative ids in batch")
    return {"rows": rows, "schema_ok": True}
def preprocess(inp, attempt): return {"features": [r * 2 for r in inp["upstream"]["rows"]]}
def train(inp, attempt):                                     # long GPU step; flaky once
    if attempt == 1: raise TimeoutError("GPU node preempted")
    return {"model": sum(inp["upstream"]["features"]), "epochs": inp["epochs"]}
def evaluate(inp, attempt):   return {"metric": inp["upstream"]["model"] % 97}
def register(inp, attempt):   return {"registered": True, "metric": inp["upstream"]["metric"]}

def build_pipeline(n_rows, epochs):
    """Topologically ordered DAG: ingest->validate->preprocess->train->eval->register."""
    o = run_stage("ingest",     ingest,     {"n_rows": n_rows})
    o = run_stage("validate",   validate,   {"upstream": o})
    o = run_stage("preprocess", preprocess, {"upstream": o})
    o = run_stage("train",      train,      {"upstream": o, "epochs": epochs})
    o = run_stage("evaluate",   evaluate,   {"upstream": o})
    return run_stage("register",   register,   {"upstream": o})

print("=== Run 1: cold cache (train retries once after a preemption) ===")
print("  result:", build_pipeline(n_rows=1000, epochs=3))
print("\n=== Run 2: same inputs, warm cache (every stage skipped) ===")
print("  result:", build_pipeline(n_rows=1000, epochs=3))
print("\n=== Run 3: epochs changed 3 -> 5 (only train and its descendants rerun) ===")
print("  result:", build_pipeline(n_rows=1000, epochs=5))
print("\nstage bodies actually executed:", dict(sorted(ATTEMPTS.items())))
Code 26.2.1: A from-scratch pipeline-DAG runner with content-addressed caching and retry-on-failure. The fingerprint of a stage's inputs is its cache key; run_stage skips any stage whose fingerprint matches the last successful run, and retries a failing body up to max_retries times before giving up.
=== Run 1: cold cache (train retries once after a preemption) ===
  [run  ] ingest      attempt 1 ok (fp=e6e4ed2805fa)
  [run  ] validate    attempt 1 ok (fp=d5002d909fb4)
  [run  ] preprocess  attempt 1 ok (fp=c14d31e0b46f)
  [retry] train       attempt 1 failed: GPU node preempted
  [run  ] train       attempt 2 ok (fp=dffbe2f8953e)
  [run  ] evaluate    attempt 1 ok (fp=8058b7afd310)
  [run  ] register    attempt 1 ok (fp=84a7343cf56e)
  result: {'registered': True, 'metric': 94}

=== Run 2: same inputs, warm cache (every stage skipped) ===
  [skip ] ingest      inputs unchanged (fp=e6e4ed2805fa)
  [skip ] validate    inputs unchanged (fp=d5002d909fb4)
  [skip ] preprocess  inputs unchanged (fp=c14d31e0b46f)
  [skip ] train       inputs unchanged (fp=dffbe2f8953e)
  [skip ] evaluate    inputs unchanged (fp=8058b7afd310)
  [skip ] register    inputs unchanged (fp=84a7343cf56e)
  result: {'registered': True, 'metric': 94}

=== Run 3: epochs changed 3 -> 5 (only train and its descendants rerun) ===
  [skip ] ingest      inputs unchanged (fp=e6e4ed2805fa)
  [skip ] validate    inputs unchanged (fp=d5002d909fb4)
  [skip ] preprocess  inputs unchanged (fp=c14d31e0b46f)
  [retry] train       attempt 1 failed: GPU node preempted
  [run  ] train       attempt 2 ok (fp=2a7c63d4f364)
  [run  ] evaluate    attempt 1 ok (fp=50cfbcbf92dc)
  [skip ] register    inputs unchanged (fp=84a7343cf56e)
  result: {'registered': True, 'metric': 94}

stage bodies actually executed: {'evaluate': 2, 'ingest': 1, 'preprocess': 1, 'register': 1, 'train': 4, 'validate': 1}
Output 26.2.1: Caching and incremental recomputation in action. Run 1 executes the whole DAG and retries train once after the simulated preemption. Run 2 hits the cache on every stage and runs no work at all. Run 3 changes only epochs, so the fingerprint changes from train onward: ingest, validate, and preprocess stay cached while train and evaluate rerun. The final tally confirms each upstream body ran exactly once across all three invocations.

Three behaviors in Output 26.2.1 are worth naming. First, the retry: train failed on its first attempt in both runs that executed it and recovered on the second, which is why its body count is four (two attempts in Run 1, two in Run 3) while every other stage that runs costs one attempt. Second, the skip: Run 2 does zero work because nothing changed, exactly what you want when you rerun a pipeline to fix something downstream. Third, the propagation boundary: in Run 3 the change to epochs stops at the stages it actually reaches; register stays cached here because its input fingerprint happened to be unchanged. That last point is the whole value of content addressing: the cache reacts to what the data actually is, not to which stages a human guessed were affected.

Thesis Thread: Lineage, Scaled Out to Whole Jobs

The fingerprint-and-skip logic in Code 26.2.1 is the same lineage primitive that recovers lost partitions inside a single Spark job (Section 7.2), now lifted to coordinate whole distributed jobs across a fleet. The book's recurring move, take a mechanism that works inside one machine or one job and scale it out to span many, applies to operations themselves: a pipeline orchestrator is lineage tracking for the model-production graph. When you meet a model registry in Section 26.3, notice that it is the persistent endpoint this DAG writes to, and that the same fingerprint that decided caching here becomes the provenance record there.

4. Orchestrators: Scheduling the Graph on Shared Compute Intermediate

Code 26.2.1 ran in one process; a production orchestrator runs each node as a separate distributed job, often in its own container, on a cluster it shares with every other team. The orchestrator's responsibilities are exactly the ones the toy runner sketched, scaled up and made durable: parse the DAG, schedule each node when its upstream dependencies have succeeded, provision the right compute for that node (a Spark pool here, a GPU gang there), persist artifacts and the cache between runs, retry transient failures with backoff, and surface the state of every run for humans. The dominant tools (Apache Airflow, Kubeflow Pipelines, Argo Workflows, Flyte, Metaflow, Prefect) differ in how you express the DAG and how tightly they integrate with the cluster, but they share this core contract.

The retry behavior matters most for the train node, because the GPU stage is both the longest and the most exposed to hardware failure: a node preempted on a spot instance, a NCCL timeout, a failed health check. The orchestrator's retry is the coarse-grained outer loop that re-runs a whole failed stage, while elastic training frameworks provide the fine-grained inner loop that survives a worker loss without restarting the job (Chapter 18). A robust pipeline uses both: the inner loop absorbs the failures it can, the outer loop catches the ones it cannot, and checkpointing means an outer retry resumes rather than starting the nine-hour run from zero.

Library Shortcut: Kubeflow, Airflow, and Flyte Express the DAG Declaratively

The dependency wiring, caching, and retry logic of Code 26.2.1 (about sixty lines, and still missing distributed execution, durable artifacts, and a UI) collapse to a decorated function per stage in a modern orchestrator. The framework infers the DAG from data dependencies, runs each stage as a containerized job on the cluster, caches by content fingerprint, and retries on your policy:

# Flyte: each @task is a containerized distributed job; the @workflow is the DAG.
from flytekit import task, workflow

@task(cache=True, cache_version="1.0", retries=3)   # content cache + retry built in
def preprocess(rows: list) -> list:
    return [r * 2 for r in rows]                     # runs as its own Spark/K8s job

@task(cache=True, cache_version="1.0", retries=3, requests=Resources(gpu="8"))
def train(features: list, epochs: int) -> int:
    return sum(features)                             # claims 8 GPUs from the cluster

@workflow                                            # dependencies inferred from args
def pipeline(n_rows: int, epochs: int) -> int:
    feats = preprocess(rows=ingest(n_rows=n_rows))
    return train(features=feats, epochs=epochs)
Code 26.2.2: The same DAG, caching, and retries as Code 26.2.1 expressed in Flyte. The cache=True flag replaces the entire fingerprint/CACHE machinery, retries=3 replaces the manual retry loop, and requests=Resources(gpu="8") hands the placement problem to the cluster scheduler. Kubeflow Pipelines and Airflow offer equivalent declarative DAGs; the from-scratch runner exists only to show what these flags do underneath.
Fun Note: The Pipeline That Was Faster When You Did Nothing

A team once celebrated cutting their retrain time from nine hours to forty seconds. The secret was not a faster GPU; it was that someone had finally turned on stage caching, and the "retrain" they were timing changed nothing upstream of train, so the whole DAG was a cache hit. The lesson landed twice: caching is a real speedup, and a benchmark that measures a no-op measures your caching, not your training.

5. Reproducibility of the Whole Pipeline Advanced

Caching is only sound if a stage is a deterministic function of its inputs, which is the same property that makes a pipeline reproducible: given the same data, code, configuration, and environment, it must produce the same model. Reproducibility at the pipeline level is more demanding than at the script level because there are more moving parts to pin: the input data version, every stage's code version, the container image and library versions, the random seeds, and the resource topology that can change numerics (a different GPU count changes the reduction order of an all-reduce). The fingerprint that drives caching in Code 26.2.1 is the seed of this discipline: if the fingerprint captures everything that can change the output, then an unchanged fingerprint is a guarantee of an unchanged result, and a changed one is an honest signal that the result may differ.

In practice you make a pipeline reproducible by versioning each of those inputs and recording the exact set used by each run: an immutable data snapshot, a git commit per stage, a pinned image digest, and the recorded seeds. The orchestrator stores this provenance alongside the artifacts so that any registered model can be traced back to the precise pipeline run, code, and data that produced it. That provenance record is what the model registry in Section 26.3 persists, and it is what turns "the model from last Tuesday" into a fully reconstructible object rather than a hope.

Research Frontier: Declarative, Incremental, and Data-Centric Pipelines (2024 to 2026)

Pipeline orchestration is moving from imperative DAG scripts toward declarative, data-aware systems. Airflow 2.x and the 2024 Airflow 3 line push data-driven and asset-based scheduling, where stages fire when the datasets they depend on are updated rather than on a clock, making incremental recomputation a first-class concept rather than a bolt-on cache. Flyte and Kubeflow have hardened content-addressed caching and typed, versioned artifacts so that provenance and skip-on-unchanged are native. On the data-validation front, the lineage of TensorFlow Data Validation and Great Expectations has been joined by contract-style checks and learned drift detectors that gate training automatically. A parallel current is data-centric and incremental learning, where the pipeline retrains only on the slice of data that changed rather than the full corpus, pushing the caching idea of Code 26.2.1 down to the example level and connecting it to the streaming and online-learning machinery of Chapter 9. The throughline is that the orchestrator is becoming a reactive function of data state, not a timer.

We now have the production pipeline as a DAG of distributed jobs, the gates that stop bad data early, the caching that makes reruns cheap, the orchestrator that schedules it all on shared compute, and the provenance that makes the whole thing reproducible. The pipeline's final node, register, hands the model to a system whose job is to version, stage, and serve every model the pipeline ever produces. That system, the model registry, is where Section 26.3 begins.

Exercise 26.2.1: Where Does the Gate Belong? Conceptual

In Figure 26.2.1 the validate node sits immediately after ingest and before preprocess and train. Argue from cost why this placement is correct, and describe one check that genuinely cannot run there and must instead run after preprocess (for example, a property of the engineered features rather than the raw columns). Then explain what the orchestrator should do when a post-preprocess gate fails: halt, branch to remediation, or proceed with a warning, and why the right answer depends on whether the pipeline is a one-off research run or a scheduled production retrain.

Exercise 26.2.2: Make Caching Honest Coding

Code 26.2.1 fingerprints only a stage's inputs, not its code. Introduce a bug by changing the preprocess body (say, multiply by 3 instead of 2) and rerun with the same n_rows: observe that the stage is wrongly skipped because its input fingerprint did not change. Fix it by folding a code version (a string or a hash of the function source) into the fingerprint so that editing a stage's body invalidates its cache and propagates downstream. Verify that your fix reruns preprocess, train, and evaluate but still skips ingest and validate. Explain why a real orchestrator uses a git commit or image digest for this rather than hashing source text.

Exercise 26.2.3: Cost of the Outer Retry Analysis

Suppose the train stage runs for nine hours on a 64-GPU gang and a spot preemption hits with probability $p = 0.15$ in any given run, restarting the stage from its last checkpoint. If checkpoints are written every $c$ hours and a restart loses on average $c/2$ hours of work, write the expected wasted GPU-hours per run as a function of $c$ and $p$. Evaluate it for $c \in \{0.5, 1, 3\}$ hours, and discuss the trade-off: frequent checkpoints cut restart waste but add checkpoint-write overhead and storage traffic. Connect your answer to why the orchestrator's coarse outer retry and the framework's fine-grained elastic recovery (Chapter 18) are complementary rather than redundant.