Part III: Distributed Machine Learning
Chapter 11: Parameter Servers and Distributed Embeddings

Fault Tolerance in Parameter Servers

"I am the only copy of these weights on the entire cluster. Sleep well, everyone."

A Parameter Shard With No Replica
Big Picture

A parameter server holds the authoritative copy of the model, so a parameter server that fails does not slow the job down, it deletes part of the model. This is the asymmetry that governs fault tolerance in the push-pull architecture: workers are stateless and disposable, but the server shards carry irreplaceable state. The defenses follow directly from that single fact. Replicate every shard so a primary and its replicas never die together; checkpoint the parameters to durable storage so a correlated failure still has a floor; and recover through a consistent control-plane protocol so the cluster agrees on exactly who now owns each shard. A failed worker is just restarted and re-pulls; a failed server is a small consensus problem wearing a training-job costume.

Up to this section the chapter has treated the parameter server as a reliable backplane: workers push gradients and pull parameters (Section 11.2), the table is split across shards (Section 11.3), and updates flow synchronously or asynchronously (Section 11.4) under a staleness bound (Section 11.5). We assumed the shards stay up. On a real cluster of hundreds of machines, at any given moment something is broken, and the question stops being whether a server fails and becomes what happens to the model when one does. This section answers that, building on the general recovery machinery of Section 2.4 and specializing it to the one component that cannot afford to forget.

1. The Asymmetry: Stateful Servers, Stateless Workers Beginner

In the push-pull architecture two roles fail in completely different ways, and conflating them is the first mistake. A worker pulls the current parameters, computes a gradient on its data shard, and pushes that gradient back. It keeps nothing that the cluster cannot regenerate: its model copy is a cache of the server's state, and its in-flight gradient is recomputable from the same minibatch. When a worker dies, the loss is one minibatch of partially-done arithmetic. You restart the process, it re-pulls the latest parameters, and it rejoins the round as if it had merely been slow. Nothing about the model is at risk.

The server is the opposite. Each shard holds the authoritative value of a slice of the parameters, the one true copy that every worker reads from and writes to. There is no upstream source to re-pull it from, because the shard is the source. When a server shard dies without a replica, the parameters it owned are gone, and the only way to get them back is to recompute them from data, which for a model trained over days is not a recovery, it is starting over. This is why fault tolerance in a parameter server is fundamentally a story about protecting state, not about restarting compute.

Key Insight: Protect the State, Not the Computation

Worker failure costs you computation, which is cheap and regenerable: restart and re-pull. Server failure costs you state, which is irreplaceable: the shard is the only copy of its parameters. Every parameter-server fault-tolerance mechanism, replication, checkpointing, consensus-backed recovery, exists to make sure no single failure is the last copy of any parameter. Match the mechanism to the role, and never spend a replication budget protecting the stateless half of the system.

2. Replication: A Primary and Its Replicas Intermediate

The direct defense against losing a shard is to keep more than one copy of it. Each logical shard becomes a small replica group: one primary that workers talk to, plus one or more replicas that hold the same parameters. Every write the primary applies is forwarded to the replicas, so that if the primary dies, a replica already holds the up-to-date state and can be promoted to primary. With a replication factor of $r$, the parameters of a shard survive as long as at least one of its $r$ copies survives. If each machine fails independently with probability $p$ in a recovery window, the chance that a specific shard loses every copy drops from $p$ to $p^{r}$, the familiar exponential payoff of redundancy.

The cheapest way to keep replicas current is chain replication: arrange the copies in a line, send each write to the head, and let it flow primary to replica to replica down the chain, with reads served from the tail once the write has reached it. The structure makes the consistency argument easy, because a value is only acknowledged after it has traversed the whole chain, so the tail never serves a value the head has not committed. Figure 11.8.1 shows the replica group before and after a primary failure, with the promotion that keeps the parameters available.

Before: primary serves the chain Worker push grad Primary params Replica 1 params Replica 2 params write flows head to tail After: replica promoted, no loss Worker re-route Primary failed Replica 1 now primary Replica 2 params replica held every committed write, so promotion loses zero parameters
Figure 11.8.1: A replicated parameter shard under chain replication. On the left, a worker's push flows head to tail through the primary and its two replicas, so every replica holds each committed write. On the right, the primary has failed; because Replica 1 already held the up-to-date parameters, promoting it to primary keeps the shard available and loses nothing. Without replicas, the crossed-out box would simply take its parameters with it.

3. Checkpointing and the Recovery Protocol Intermediate

Replication defends against independent failures, but failures are not always independent. A rack loses power, a cloud zone reboots, a bad deploy crashes every server process at once; correlated failures can take down a whole replica group together. The floor under that worst case is the periodic checkpoint: every so often, each shard writes its parameters to durable storage (a distributed file system or object store), and the job records which step the checkpoint corresponds to. If every live copy of a shard is gone, recovery loads the last checkpoint and replays from there, trading some lost steps for a guarantee that the model can always be reconstructed. Checkpoint frequency is the usual tension: more often means less lost work on recovery but more bytes written during normal training, the same write-amplification bound the elastic-training machinery turns into a budget in Section 2.4.

Replication and checkpointing supply the redundant copies; the recovery protocol is what turns those copies back into a single coherent model. When a primary fails, the cluster must agree on three things: that the primary is actually dead (not merely slow), which replica becomes the new primary, and that workers re-route their pushes and pulls to it. If two replicas each believe they are the new primary, they will accept divergent writes and the shard's state forks, the classic split-brain failure. Avoiding that is a consensus problem, solved by the control plane exactly as in Section 2.6: a coordination service backed by a consensus algorithm holds the authoritative shard-to-primary assignment, and a promotion is only valid once that service has committed it. The training data plane stays fast and lock-free; the rare, slow, must-be-correct decision of who owns a shard is delegated to the consensus layer built for exactly that.

Practical Example: The Checkpoint Cadence That Paid for Itself

Who: A platform engineer running a week-long parameter-server training job for a large recommendation model on a few hundred spot instances.

Situation: The embedding shards held terabytes of parameters; spot reclamation killed several server machines per day, and each replica group had a replication factor of two.

Problem: Twice the team watched a rack-level reclamation take both copies of a shard within the same minute, and with no recent checkpoint the only option was to restart the run from scratch.

Dilemma: Checkpoint every few minutes and pay a heavy, constant write load to the object store, or checkpoint hourly and risk losing an hour of progress on every correlated failure.

Decision: They raised the replication factor to three for the embedding shards and set an adaptive checkpoint cadence: frequent during the volatile daytime spot market, sparse overnight when reclamation was rare.

How: A consensus-backed coordinator tracked primaries and triggered promotion on failure; checkpoints were written incrementally so only changed embedding rows hit the store.

Result: Over the next month no run was lost. Correlated failures still happened, but a surviving replica or a recent incremental checkpoint always covered them, and the extra write load was a small fraction of the gradient traffic.

Lesson: Replication handles the common independent failure; checkpointing is the floor under the rare correlated one. You need both, and the cadence of each should track how the failures actually arrive.

4. Consistency or Availability: The Store Must Choose Advanced

A replicated parameter store is a distributed store, so it inherits the trade-off of every distributed store. When the network partitions a primary from its replicas, the system faces a choice it cannot dodge: keep accepting writes on the reachable side and risk the copies diverging, or refuse writes until the partition heals and stay consistent at the cost of availability. This is the CAP trade-off applied to the parameter store, the same tension formalized in Section 2.5, and the right answer depends on how the training loop consumes the parameters.

The pleasant surprise is that machine-learning training is unusually tolerant of the weak side of this trade-off. The optimization is already robust to staleness (Section 11.5): a worker that reads parameters a few steps behind, or whose push is briefly delayed, still moves the model in a useful direction, because stochastic gradient descent is a noisy process to begin with. So a parameter store can lean toward availability, serving slightly stale reads during a partition and reconciling when it heals, where a bank ledger never could. This is also why asynchronous training tolerates a transiently missing worker gracefully: in async mode no worker waits on any specific peer, so a worker that vanishes for a few seconds, or forever, simply stops contributing, and the others march on without a barrier to stall them. A synchronous step, by contrast, blocks on the slowest participant, which is why elastic and asynchronous schemes are the natural partners of fault tolerance, a thread picked up for full training jobs in Chapter 10.

Thesis Thread: Recovery Is a Consensus Problem in Disguise

The recurring move of this book is that scaling out forces general distributed-systems primitives into the heart of an AI workload. Here the primitive is consensus. Deciding which replica owns a shard after a failure is the same agreement problem that runs MapReduce re-execution in Chapter 6 and that elastic deep-learning training will generalize to entire worker groups joining and leaving mid-run. A parameter server does not get to invent its own notion of "who is in charge"; it borrows the control-plane consensus of Section 2.6 and spends it sparingly, on the rare decisions that must be exactly right.

5. A Failure, Simulated Intermediate

The code below makes the asymmetry concrete with a pure-Python parameter store. Sixteen parameters are spread across four shards by a hash; gradients are pushed for five hundred steps; and partway through, the primary for shard 2 crashes. We run it twice. In the replicated cluster, shard 2's replica already holds every committed write, so the control plane promotes it and training continues. In the unreplicated cluster, shard 2's parameters simply vanish. A third, failure-free run gives the ground-truth model to measure drift against.

import hashlib

NUM_SHARDS = 4

def owner(key):
    """Deterministic shard placement, like consistent hashing in a real PS."""
    h = int(hashlib.sha1(str(key).encode()).hexdigest(), 16)
    return h % NUM_SHARDS

class Shard:
    """One server shard: an authoritative slice of the model, plus an optional
    chained replica that receives every write the primary applies."""
    def __init__(self, shard_id, replica=None):
        self.shard_id = shard_id
        self.params = {}           # parameter_id -> value (the authoritative state)
        self.replica = replica     # next link in the replication chain, or None
        self.alive = True

    def push(self, key, grad, lr=0.1):
        if not self.alive:
            raise RuntimeError(f"shard {self.shard_id} is down")
        self.params[key] = self.params.get(key, 0.0) - lr * grad
        if self.replica is not None:                 # chain the write downstream
            self.replica.params[key] = self.params[key]

    def pull(self, key):
        if not self.alive:
            raise RuntimeError(f"shard {self.shard_id} is down")
        return self.params.get(key, 0.0)

def build_cluster(replicated):
    replicas = [Shard(f"{i}-replica") for i in range(NUM_SHARDS)] if replicated else [None]*NUM_SHARDS
    primaries = [Shard(i, replicas[i]) for i in range(NUM_SHARDS)]
    return primaries, replicas

def train(primaries, replicas, replicated, kill_after=200):
    """Push gradients for 16 parameters across 500 steps; shard 2 dies mid-run."""
    keys = list(range(16))
    failed = False
    for step in range(500):
        if step == kill_after and not failed:
            primaries[2].alive = False               # primary for shard 2 crashes
            if replicated:
                # control-plane recovery: promote the replica to primary
                promoted = replicas[2]
                promoted.alive = True
                primaries[2] = promoted
            failed = True
        for key in keys:
            s = owner(key)
            grad = 0.01 * ((step % 7) - 3)            # a deterministic pseudo-gradient
            try:
                primaries[s].push(key, grad)
            except RuntimeError:
                pass                                  # write lost: no replica to take over
    recovered, lost = {}, 0
    for key in keys:
        s = owner(key)
        try:
            recovered[key] = primaries[s].pull(key)
        except RuntimeError:
            lost += 1
    return recovered, lost

# Reference run: no failure at all, gives the ground-truth model.
ref_primaries, ref_replicas = build_cluster(replicated=True)
reference, _ = train(ref_primaries, ref_replicas, replicated=True, kill_after=10**9)

# Replicated cluster: shard 2 primary fails, its replica is promoted.
rep_primaries, rep_replicas = build_cluster(replicated=True)
rep_model, rep_lost = train(rep_primaries, rep_replicas, replicated=True)

# Unreplicated cluster: shard 2 fails and its parameters are gone.
bare_primaries, bare_replicas = build_cluster(replicated=False)
bare_model, bare_lost = train(bare_primaries, bare_replicas, replicated=False)

def max_drift(model):
    return max((abs(model[k] - reference[k]) for k in reference if k in model), default=0.0)

print(f"parameters in model              : {len(reference)}")
print(f"shard 2 owns parameter ids       : {[k for k in range(16) if owner(k) == 2]}")
print()
print("REPLICATED  (replica promoted on failure)")
print(f"  parameters lost                : {rep_lost}")
print(f"  max drift vs no-failure run     : {max_drift(rep_model):.2e}")
print(f"  training continued             : yes")
print()
print("UNREPLICATED  (shard 2 had no replica)")
print(f"  parameters lost                : {bare_lost}")
print(f"  parameters still present       : {len(bare_model)}")
print(f"  fraction of model lost          : {bare_lost / len(reference):.0%}")
Code 11.8.1: A pure-Python parameter store in which one shard's primary fails mid-training. The only difference between the two clusters is whether each shard carries a chained replica; everything else, the data, the gradients, the failure step, is identical.
parameters in model              : 16
shard 2 owns parameter ids       : [4, 7, 15]

REPLICATED  (replica promoted on failure)
  parameters lost                : 0
  max drift vs no-failure run     : 0.00e+00
  training continued             : yes

UNREPLICATED  (shard 2 had no replica)
  parameters lost                : 3
  parameters still present       : 13
  fraction of model lost          : 19%
Output 11.8.1: The replicated cluster loses zero parameters and its final model is bit-for-bit identical to the failure-free run; the unreplicated cluster permanently loses the three parameters shard 2 owned, 19% of the model. The failure was the same in both cases; only the replica decided whether it mattered.

The two runs differ in exactly one design choice, and that choice is the whole subject of this section. The replicated cluster treats a server crash as a routing event: the control plane promotes a replica, workers re-route, and the model is untouched. The unreplicated cluster treats the same crash as data loss, because the shard truly was the last copy of its parameters. Note also what the workers never had to do; they are stateless, so none of this recovery touched them, exactly the asymmetry Section 1 set up.

Fun Note: The Server That Was Too Important to Crash

There is a folk failure mode where a team, proud that their single parameter server has never gone down, treats its uptime as a feature rather than a ticking clock. The model lives entirely in that one box's memory. The day the box finally reboots, for a kernel patch nobody scheduled, a week of training evaporates in the time it takes a fan to spin down. A replica is not a vote of no confidence in your hardware; it is an admission that hardware is mortal and the model would prefer to outlive it.

Library Shortcut: Replication and Checkpointing You Do Not Hand-Roll

Code 11.8.1 wires up replication and promotion by hand to expose the mechanism. In production you lean on systems that already solve it. A coordination service such as etcd or ZooKeeper holds the authoritative shard-to-primary assignment behind a Raft or ZAB consensus core, so promotion never forks; you register a shard and watch a key, you do not implement leader election. For the checkpoint floor, frameworks expose a one-line save of the entire distributed parameter state to durable storage:

# Periodic durable checkpoint of the sharded parameter state.
# The framework gathers every shard (and skips redundant replicas) and writes
# one consistent snapshot tagged with the training step.
if step % checkpoint_every == 0:
    server.save_checkpoint(path=f"s3://ckpts/model-step-{step}",
                           include_optimizer_state=True)   # resume exactly here
Code 11.8.2: The checkpoint floor as one call. The dozens of lines of gather, dedup, and consistent-snapshot logic that a hand-rolled version needs collapse into a framework method; the consensus-backed coordinator (etcd, ZooKeeper) separately handles the promotion that Code 11.8.1 simulated.

6. Worker Failure: The Easy Half Beginner

For completeness, the other side of the asymmetry deserves an explicit statement, because it is the half people over-engineer. A worker carries no authoritative state, so its failure has a one-line recovery: detect that it stopped sending heartbeats, restart the process (or hand its data shard to a spare), and let it re-pull the current parameters and resume. In synchronous training the coordinator either waits briefly for the restart or drops the worker from the round; in asynchronous training nothing waits at all, since no step is gated on any particular worker. Either way no checkpoint, no consensus, and no replica is involved, because there is nothing to protect. The entire fault-tolerance budget of a parameter-server system should flow to the stateful servers, where a single loss is permanent, and not to the workers, where every loss is a quick restart.

Research Frontier: Cheaper Reliable State (2024 to 2026)

The cost of protecting parameter-server state is now itself a research target. Recent checkpointing systems push toward making snapshots nearly free: asynchronous and tiered checkpointing in the lineage of CheckFreq and Gemini overlap the write with computation and stage snapshots in host or peer memory before they reach durable storage, so a terabyte-scale model can checkpoint far more often without stalling the step. In parallel, the elastic-training stacks that grew out of TorchElastic and Ray treat a server or worker leaving as a membership change to be reconciled rather than a crash to be feared, leaning on the same consensus-backed coordination this section describes. A third thread, sharded and in-memory replication that survives single-host loss without a full durable write, is converging parameter-server reliability with the failure model of large foundation-model training, where the cost of losing even minutes of a thousand-GPU run makes correlated-failure recovery a first-class design concern. The common direction is clear: keep the authoritative state continuously protected while paying as little as possible of the training step's time to do it.

With the model now able to survive the loss of the servers that hold it, the chapter has a complete picture of the parameter-server architecture: how it stores parameters, shards them, updates them, bounds their staleness, scales them to terabytes, and keeps them alive through failure. The natural next question is whether this centralized-state design is even the right one for modern systems, or whether the all-reduce of Chapter 10 has quietly made it obsolete for many workloads. That comparison is the subject of Section 11.9.

Exercise 11.8.1: Where Does the Budget Go? Conceptual

A team protects every component of their parameter-server job equally: replication factor three on the workers and replication factor three on the server shards. Explain why half of that budget is wasted, what failure each half is actually guarding against, and rewrite the policy so the redundancy lands where state lives. Then describe one scenario in which a worker does hold state worth protecting, and what minimal mechanism (not full replication) would suffice for it.

Exercise 11.8.2: Make the Loss Worse, Then Fix It Coding

Extend Code 11.8.1 so that two shards fail during training, and add a replication factor parameter $r$ (a chain of $r$ copies per shard). Sweep $r$ from one to three and, for each, kill a random subset of primaries; report the fraction of parameters lost. Then add a periodic checkpoint: every fifty steps, copy each shard's parameters to a separate dictionary, and on a total replica-group loss, restore the affected parameters from the last checkpoint. Measure how many steps of progress a checkpoint-based recovery costs versus the zero-loss replica promotion, and state when each mechanism is the one that saves you.

Exercise 11.8.3: Consistency Versus Availability Under Partition Analysis

A primary is partitioned from its replicas for $T$ seconds while workers keep pushing. Argue, using the staleness tolerance of Section 11.5 and the CAP trade-off of Section 2.5, for an availability-leaning policy that keeps serving slightly stale parameters during the partition, and identify the one property of stochastic gradient descent that makes this safe for training but unsafe for, say, a financial ledger. Then state a concrete bound on $T$ beyond which even SGD's tolerance breaks down, and explain what observable training symptom would tell you the partition has lasted too long.