"They keep calling me rank 0 like it is an honor. It just means I am the one who has to wait for everybody else to check in."
A Coordinator That Counted to World Size
A distributed AI job is a fixed set of processes, spread across machines and attached to accelerators, that all run the same program and tell themselves apart by a single integer called their rank. Before any chapter can teach a collective, a shard, or a recovery protocol, we need an exact vocabulary for the things being coordinated. A node is a machine; a device is an accelerator inside it; a process is one running instance of your program; a worker is a process doing the computation; a coordinator is the one process that bootstraps and oversees the group. The previous chapter argued that the gradient splits exactly across machines; this section names the machines, the processes on them, and the three identifiers (world size, rank, local rank) that turn "many copies of one script" into a single coordinated job.
Chapter 1 established the thesis: once a ceiling binds, the work is split across many machines that must communicate to act as one. To make that split operational we need words that do not blur together. In casual speech "machine", "worker", "GPU", and "process" are used interchangeably, and that imprecision is harmless until you try to debug why rank 5 hangs on a barrier while rank 5's GPU sits idle. The distinction between the process and the device it drives, between the node and the workers on it, is the difference between a diagnosis and a guess. This section fixes the vocabulary, then shows the exact mechanism, the SPMD model and its rank identifiers, that every distributed training launcher in this book relies on.
1. Four Words That Are Not Synonyms Beginner
Start with the physical layer and move up. A node is one machine: a host with its own operating system, memory, network interface, and some number of accelerators plugged into it. A device is one of those accelerators, a single GPU (or TPU core, or other) with its own memory; a node with eight GPUs has eight devices. Nodes are the unit of failure and of network distance, because traffic inside a node crosses a fast internal link (NVLink or PCIe) while traffic between nodes crosses a slower data-center network. That gap, intra-node versus inter-node bandwidth, shapes nearly every placement decision in the book, and Chapter 4 builds its collective-communication cost model directly on it.
Above the hardware sits the software. A process is one running instance of your program, with its own Python interpreter, its own memory, and its own copy of the model. A process is not a machine and not a device; it is a tenant. The standard arrangement in deep learning is one process per device, so a node with three GPUs runs three processes, each pinned to one GPU. A worker is a process whose job is to do the actual computation (read data, run forward and backward passes, update parameters). A coordinator is the one distinguished process, conventionally rank $0$, that bootstraps the group, holds the canonical copy of shared state such as the global step, writes checkpoints, and logs. The coordinator is itself usually also a worker; "coordinator" names a role, not a separate idle machine. Figure 2.1.1 shows all four levels at once: two nodes, three devices each, one process per device, with rank $0$ wearing the coordinator hat.
Distributed training coordinates processes, not machines and not GPUs. A collective such as all-reduce is an agreement among processes; a barrier blocks processes; a rank names a process. The machine and the device matter for performance and placement, but the object that joins the group, sends a tensor, and can hang on a barrier is always a process. Keeping this straight is what lets you reason about a job where one node holds eight processes and a hang on "rank 5" is a software event you can attach a debugger to, not a dead machine.
2. SPMD: One Program, Many Ranks Beginner
The dominant structure for distributed training is SPMD, single program, multiple data. Every process runs the byte-for-byte identical script; what differs is a handful of environment variables that tell each process who it is, and therefore which slice of the data it should read and which device it should use. There is no separate "driver script" and "worker script", no master program that spawns subordinates with different code. You write one program, the launcher starts $W$ copies of it, and each copy specializes its behavior from its rank. This is why distributed training code can look deceptively like single-machine code: the parallelism lives in the launch and in three integers, not in a different program.
Those three integers are the vocabulary every launcher and every collective library shares. The world size $W$ is the total number of processes in the job, across all nodes. The rank is a process's unique global identifier, an integer in $\{0, 1, \dots, W-1\}$; it is the name the process answers to in any collective. The local rank is the process's index within its own node, in $\{0, \dots, L-1\}$ where $L$ is the number of processes on that node; its job is to pick the device, almost always with torch.device(f"cuda:{local_rank}"). With $L$ processes per node the relationship is simply
Rank is global and unique across the whole job; local rank is per-node and repeats on every node (each node has a local rank $0$). The script below makes the SPMD idea concrete without needing a GPU: it spawns six processes, hands each its rank, and lets each derive its node, local rank, device string, and role exactly as a real launcher would. We run it and read the assignment straight off the output.
import os
from multiprocessing import Process
WORLD_SIZE = 6 # total ranks across the whole job
LOCAL_WORLD_SIZE = 3 # ranks (devices) per node, so 6 ranks -> 2 nodes
def worker(rank):
node_id = rank // LOCAL_WORLD_SIZE # which machine this rank runs on
local_rank = rank % LOCAL_WORLD_SIZE # which device on that machine
# torchrun sets exactly these environment variables before your script runs.
os.environ["RANK"] = str(rank)
os.environ["WORLD_SIZE"] = str(WORLD_SIZE)
os.environ["LOCAL_RANK"] = str(local_rank)
role = "coordinator" if rank == 0 else "worker"
print(f"rank={rank} node={node_id} local_rank={local_rank} "
f"device=cuda:{local_rank} role={role}")
if __name__ == "__main__":
procs = [Process(target=worker, args=(r,)) for r in range(WORLD_SIZE)]
for p in procs: p.start()
for p in procs: p.join()
RANK, WORLD_SIZE, and LOCAL_RANK environment variables.rank=0 node=0 local_rank=0 device=cuda:0 role=coordinator
rank=1 node=0 local_rank=1 device=cuda:1 role=worker
rank=2 node=0 local_rank=2 device=cuda:2 role=worker
rank=3 node=1 local_rank=0 device=cuda:0 role=worker
rank=4 node=1 local_rank=1 device=cuda:1 role=worker
rank=5 node=1 local_rank=2 device=cuda:2 role=worker
Read the output against Figure 2.1.1 and the two agree line for line: global rank is unique across all six processes, node index is the rank divided by three, and the local rank cycles $0,1,2$ on each machine to pick cuda:0, cuda:1, cuda:2. Rank $0$ alone is tagged the coordinator. Nothing here is specific to multiprocessing; replace the spawn loop with a real launcher and the body of worker is unchanged. That invariance is the whole point of SPMD.
By unwritten convention rank $0$ is the one that prints the progress bar, saves the checkpoint, and uploads the metrics, because if all $W$ processes did it you would get $W$ copies of every log line and a checkpoint race. So the "coordinator" is less a commander than the process that drew the short straw for housekeeping. Guard such code with if rank == 0: and the other ranks happily skip the chores.
3. How a Training Job Maps Onto Ranks Intermediate
The reason every process needs a unique rank is that the rank decides which slice of work it owns. In data-parallel training (the workhorse of Chapter 15), every process holds a full copy of the model but reads a disjoint shard of the data, and the rank is exactly the key that selects the shard. With $N$ examples and world size $W$, process $r$ takes a contiguous block of about $N/W$ examples, and the union of all blocks covers the dataset once with no overlap. Getting this partition right is what makes the gradient identity from Chapter 1 hold: each example must be owned by exactly one rank, or the averaged gradient is silently biased. The function below assigns a near-equal contiguous shard from rank and world size, and we check that the shards tile the dataset exactly.
import numpy as np
def shard_for_rank(n_items, world_size, rank):
# Contiguous, near-equal partition: rank r owns indices [start, end).
base, extra = divmod(n_items, world_size)
start = rank * base + min(rank, extra)
count = base + (1 if rank < extra else 0)
return start, start + count
N, WORLD_SIZE = 1000, 6
covered = np.zeros(N, dtype=int)
for rank in range(WORLD_SIZE):
start, end = shard_for_rank(N, WORLD_SIZE, rank)
covered[start:end] += 1
print(f"rank={rank} owns indices [{start:4d}, {end:4d}) -> {end-start} examples")
print("every example owned exactly once:", bool(np.all(covered == 1)))
shard_for_rank turns a process's rank and the world size into a disjoint, near-equal index range; the covered check confirms the shards partition the data with no gap and no overlap.rank=0 owns indices [ 0, 167) -> 167 examples
rank=1 owns indices [ 167, 334) -> 167 examples
rank=2 owns indices [ 334, 501) -> 167 examples
rank=3 owns indices [ 501, 668) -> 167 examples
rank=4 owns indices [ 668, 834) -> 166 examples
rank=5 owns indices [ 834, 1000) -> 166 examples
every example owned exactly once: True
Notice that $1000$ is not a multiple of $6$, so the shards are not all the same size; four ranks carry one extra example. This unevenness is normal and is precisely why the correct combine step in distributed training divides the summed gradient by the global example count rather than averaging per-rank means, the subtlety that Exercise 1.1.2 of the previous chapter asked you to break and then fix. The control plane that hands ranks their shards, restarts a process that dies, and elects the rank that holds the canonical state is the subject of the rest of this chapter and returns, scaled to thousands of nodes, in Chapter 33 on cluster scheduling.
Code 2.1.1 spawned processes and computed ranks by hand to expose the mechanism. In practice you never do this: PyTorch's torchrun launcher spawns one process per device, sets RANK, WORLD_SIZE, LOCAL_RANK, and the rendezvous address in each process's environment, and your script just reads them. The roughly twenty lines of spawn-and-assign collapse to a one-line launch plus a few reads:
# Launch on each node with, e.g.:
# torchrun --nnodes=2 --nproc_per_node=3 --rdzv_backend=c10d \
# --rdzv_endpoint=HOST:29400 train.py
import os, torch, torch.distributed as dist
dist.init_process_group("nccl") # joins the group; reads RANK/WORLD_SIZE
rank = dist.get_rank() # global id, 0..W-1
world_size = dist.get_world_size() # W
local_rank = int(os.environ["LOCAL_RANK"]) # device index on this node
torch.cuda.set_device(local_rank) # pin this process to its GPU
torchrun. The launcher owns process spawning, environment setup, and the rendezvous handshake that init_process_group completes; your code reads three values and pins its device.Who: An ML platform engineer onboarding a research team's first multi-GPU training script.
Situation: A model that trained correctly on one GPU was launched with torchrun --nproc_per_node=8 and ran without errors, but validation accuracy was no better than the single-GPU baseline despite eight times the compute.
Problem: Throughput had scaled, yet the model learned nothing extra. Something was wrong with what each rank actually consumed.
Dilemma: Suspect the optimizer, the learning-rate schedule, or the model, all plausible, or suspect the most basic thing of all, the data each rank was reading.
Decision: They logged, on every rank, the first ten example indices the data loader returned. All eight ranks reported the identical indices.
How: The loader had been built without a rank-aware sampler, so every process iterated the full dataset in the same order. Eight processes were redundantly training on identical batches, which is mathematically a single-GPU run paying for eight. Wrapping the dataset in a DistributedSampler (the production form of shard_for_rank in Code 2.1.2) gave each rank its disjoint shard keyed by rank and world size.
Result: With true data sharding, the eight ranks covered the dataset once per epoch, effective batch size grew eightfold, and accuracy and wall-clock both improved as expected.
Lesson: SPMD means every process runs the same code, so the data split must be derived from the rank explicitly. "It launched on eight GPUs" is not "it used eight GPUs"; the rank has to reach the data loader, or you have paid for parallelism you did not get.
4. Workers, Coordinators, and the Control Plane Intermediate
It helps to separate two planes that share the same processes. The data plane is the high-bandwidth path along which workers exchange tensors, gradients flowing through all-reduce, activations through point-to-point sends. It is symmetric: in pure data-parallel training there is no boss, every rank does the same all-reduce and ends with the same averaged gradient. The control plane is the low-bandwidth path that decides membership and shared state: who is in the group, what the current global step is, where to write the checkpoint, and which process to promote if one dies. The control plane is where the coordinator role lives, and where, at cluster scale, genuine consensus protocols appear.
For the data-parallel jobs of Part IV the control plane is light: rank $0$ holds the canonical step counter and writes checkpoints, the launcher's rendezvous service tracks membership, and the heavy lifting is all symmetric collectives on the data plane. This is a deliberate design choice that Section 2.6 defends: the training data plane uses collectives precisely so it does not need a consensus protocol on the critical path, because running Raft for every gradient step would be ruinous. The asymmetry between a featherweight control plane and a heavyweight symmetric data plane is a recurring pattern, and naming the worker and coordinator roles now lets us talk about it precisely throughout the book.
Classical SPMD fixes the world size for the life of the job: lose one process and the whole group must restart. Recent work treats world size as elastic. PyTorch's torchrun with the c10d rendezvous backend and TorchElastic let a job shrink when a node fails and grow when one returns, re-deriving ranks on the fly, and projects like Oobleck and Bamboo (2023 to 2024) push resilient pipeline-parallel training on preemptible and spot instances where the rank set changes mid-run. The hard part is keeping the data partition correct and the optimizer state consistent as the world size changes under the job, which is the explicit subject of elastic and fault-tolerant training in Chapter 18. The vocabulary of this section, rank, world size, rendezvous, is exactly what those systems renegotiate at runtime.
We now have the nouns the rest of distributed AI is spoken in: nodes and the devices inside them, the processes that drive those devices, the worker and coordinator roles those processes play, and the world-size, rank, and local-rank identifiers that let one program behave as many. With the actors named, the next question is how they actually talk: the messages, the synchronization barriers, and the coordination patterns that turn a set of ranks into a system that agrees on a result. That is the subject of Section 2.2, Communication, Synchronization, and Coordination.
For each statement, say whether it is about a node, a device, a process, or a role (worker or coordinator), and justify the choice: (a) "rank 5 is stuck on a barrier"; (b) "we lost the host with GPUs 4 through 7"; (c) "only this one writes the checkpoint"; (d) "set this to cuda:{local_rank}"; (e) "out-of-memory at 40 GB of the 80 available". Then explain why "the machine crashed" and "the process hung" call for different responses from the launcher.
Extend Code 2.1.1 to a job with world size $32$ and $8$ processes per node. Print, for each rank, its node index, local rank, and device string, and verify there are exactly $4$ nodes with local ranks $0$ through $7$ on each. Then add an assertion that no two ranks share the same (node index, local rank) pair, and explain in one sentence why that pair must be globally unique even though local rank alone is not.
Using Code 2.1.2, suppose the world size is $7$ and $N = 1000$. Compute each rank's shard size and identify the largest and smallest. If every rank must finish its shard before the all-reduce can begin (a synchronous step), argue from these sizes which rank the whole group waits on and by how much, in fraction of a step, the slowest rank delays the fastest. Connect your answer to the notion of a straggler, which Chapter 3 quantifies with Amdahl's law.