"They promised me a thousand nodes. I started with four processes on a laptop, and that is where I learned everything the thousand nodes would later break."
A Worker That Started Small and Stayed Humble
You do not need a real cluster to run the demos in this book; you need a way to make one machine behave like several, and a path that scales the very same code from that laptop up to many real nodes. Every distributed-training example in the book is written against one abstraction, the process group: a fixed set of $K$ worker processes that each own a rank, agree on a world size, and exchange tensors through collectives. That abstraction does not care whether the $K$ processes live on one laptop, on eight GPUs in one server, on sixty-four GPUs across eight nodes, or on a fleet of preemptible cloud instances. This appendix walks the four tiers in order, with runnable launch commands at each, then closes with the reproducibility discipline that lets a result you got on tier one survive the move to tier four. Read it once end to end, then return to the tier you actually have.
The single most useful idea in this appendix is that the launch mechanism changes between tiers but the program does not. A data-parallel training script written for one laptop with four CPU processes is, line for line, the same script you submit to a Slurm queue for sixty-four GPUs; only the launcher, the backend string, and the device placement differ, and even those are usually three lines. Figure B.1 shows the four tiers as a single ladder. The rest of the appendix climbs it one rung at a time, and the reproducibility checklist in Section B.5 is what keeps the rungs connected, so that a number you measured on the bottom rung still means something on the top.
B.1 Local Multi-Process Setup (Simulating a Cluster) Beginner
The cheapest cluster you own is the machine in front of you, running several processes at once. A distributed-training job is, at bottom, $K$ operating-system processes that have joined one process group and can call collectives on each other; nothing about that requires more than one host or any GPU at all. PyTorch ships the gloo backend precisely for this case: it implements the collectives over plain CPU sockets, so you can launch $K$ worker processes on one laptop, have them all-reduce real tensors, and watch the exact averaging identity from Section 1.1 hold across processes rather than across machines. This is where you should run, debug, and read every data-parallel demo in the book before you spend a cent on hardware.
The standard launcher is torchrun, which spawns the worker processes, assigns each a RANK and a LOCAL_RANK, sets the WORLD_SIZE, and wires up the rendezvous environment variables that init_process_group reads. To simulate $K$ workers locally you simply ask for $K$ processes on the one node you have, as in Code B.1.1; the same command with a larger number is how you scale a demo to as many simulated workers as your cores allow.
# Simulate an 8-worker cluster on ONE machine, no GPU required.
# torchrun starts 8 processes, each with its own RANK in {0..7} and WORLD_SIZE=8.
torchrun --standalone --nproc-per-node=8 train_data_parallel.py --epochs 1
# The same script, real or simulated, never changes; only --nproc-per-node does.
# --standalone picks a free rendezvous port on localhost so you need no MASTER_ADDR.
--standalone flag handles localhost rendezvous, so this command runs unchanged on any laptop; raising --nproc-per-node simulates more workers up to the number of cores you have.Inside the script, every worker runs the same body: it joins the group, builds its local tensor (here standing in for a shard's partial gradient), calls all_reduce with a sum, and divides by the world size to recover the mean. Code B.1.2 is a complete, self-contained example that uses the gloo CPU backend, so it runs on a machine with no accelerator. Launch it with the command in Code B.1.1 (adjusting the worker count) and every rank will print the same averaged vector.
# File: allreduce_demo.py
# Launch with: torchrun --standalone --nproc-per-node=4 allreduce_demo.py
import os
import torch
import torch.distributed as dist
def main():
# torchrun has set RANK, WORLD_SIZE, LOCAL_RANK, MASTER_ADDR, MASTER_PORT.
dist.init_process_group(backend="gloo") # CPU collectives; no GPU needed
rank = dist.get_rank()
world_size = dist.get_world_size()
# Each worker holds a different local vector: pretend it is this worker's
# partial gradient, computed over a shard the other workers never see.
local = torch.ones(4) * (rank + 1) # rank 0 -> 1s, rank 1 -> 2s, ...
dist.all_reduce(local, op=dist.ReduceOp.SUM) # every rank now holds the SUM
local /= world_size # divide by K to get the mean
print(f"[rank {rank}/{world_size}] averaged vector = {local.tolist()}")
dist.destroy_process_group()
if __name__ == "__main__":
main()
gloo CPU backend. Each rank starts with a different vector and ends holding the identical mean, the multi-process form of the gradient identity proved in Section 1.1.To confirm the identity without a working backend on every operating system, the same logic runs as a pure-standard-library simulation in which a shared list plays the role of the network: each of four processes posts its partial vector, waits until all four are present, then sums and divides. Output B.1.1 is the real output of that simulation. All four workers, each blind to the others' starting vector, end on the same mean $2.5$, which is exactly $(1 + 2 + 3 + 4)/4$.
world_size (K) : 4
rank 0: local before = [1.0, 1.0, 1.0, 1.0] -> after mean = [2.5, 2.5, 2.5, 2.5]
rank 1: local before = [2.0, 2.0, 2.0, 2.0] -> after mean = [2.5, 2.5, 2.5, 2.5]
rank 2: local before = [3.0, 3.0, 3.0, 3.0] -> after mean = [2.5, 2.5, 2.5, 2.5]
rank 3: local before = [4.0, 4.0, 4.0, 4.0] -> after mean = [2.5, 2.5, 2.5, 2.5]
reference mean : [2.5, 2.5, 2.5, 2.5]
all workers agree : True
gloo backend, Code B.1.2 produces the identical mean through a true collective.A distributed-training program is written against a fixed set of $K$ ranked processes that exchange tensors through collectives, and that contract is identical whether the processes share one laptop, one server's GPUs, or a thousand machines. This is why you can develop and debug the entire book on a single host: a bug in the gradient-averaging logic surfaces with four gloo processes exactly as it would with four thousand NCCL ranks, but it costs you seconds instead of a cluster reservation. Get the program right at tier one, then change only the launcher and the backend.
B.2 Single-GPU and Multi-GPU Beginner
Once the program is correct on CPU processes, the move to GPUs is a change of backend and a change of device placement, not a rewrite. On one node with one or more GPUs you switch the backend string from gloo to nccl, the collective library NVIDIA provides for GPU-to-GPU transfer over NVLink and PCIe, and you bind each worker to its own GPU using the LOCAL_RANK that torchrun already set. The standard wrapper for the model is DistributedDataParallel (DDP), which fires the all-reduce of Section B.1 automatically during the backward pass, so the only lines you add are the three that place the worker on its device and wrap the model.
# File: train_ddp.py
# One node, N GPUs: torchrun --standalone --nproc-per-node=N train_ddp.py
import os
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
dist.init_process_group(backend="nccl") # GPU collectives over NVLink/PCIe
local_rank = int(os.environ["LOCAL_RANK"]) # set by torchrun: which GPU am I?
torch.cuda.set_device(local_rank) # bind this process to one GPU
model = build_model().to(local_rank) # move the model onto that GPU
model = DDP(model, device_ids=[local_rank]) # DDP all-reduces grads for you
# ... ordinary training loop; loss.backward() triggers the gradient all-reduce ...
dist.destroy_process_group()
nccl backend, the set_device(local_rank) binding, and the DDP wrapper that performs the gradient all-reduce during backward().The launch command is the one from Code B.1.1 with --nproc-per-node set to the number of GPUs in the box, so one process drives one GPU. For a single-GPU run you set it to 1 and the same script runs without any collective traffic; for an eight-GPU server you set it to 8 and the GPUs all-reduce over the node's internal interconnect. The device-placement line, set_device(local_rank), is the one piece of GPU-specific bookkeeping, and getting it wrong (two processes on one GPU, or a tensor left on the CPU) is the most common first-run error at this tier.
In Code B.1.2 you called all_reduce and divided by the world size by hand. In a real GPU training loop you never write that line: wrapping the model in DistributedDataParallel registers backward hooks that bucket the gradients and fire the NCCL all-reduce automatically, overlapping the communication with the rest of the backward pass. The roughly ten lines of manual gather-average-scatter you would otherwise maintain collapse to the single DDP(model, ...) wrap, and the library handles gradient bucketing, the all-reduce schedule, and the overlap with computation that Chapter 15 unpacks.
B.3 Multi-Node Setup (NCCL, torchrun, Slurm) Intermediate
When one node runs out of GPUs, the work spreads across several nodes, and the only genuinely new ingredient is rendezvous: the workers on different machines must find each other before they can form a process group. They do this by agreeing on a single coordinator address and port, the MASTER_ADDR and MASTER_PORT, which every rank uses to join. With torchrun you declare the node count with --nnodes, the GPUs per node with --nproc-per-node, this node's index with --node-rank, and the rendezvous endpoint, then run the identical command on every node. The collective primitives this builds on are exactly the ones introduced in Chapter 4, now carried over the network instead of a bus.
# Run this on EACH of 2 nodes, changing only --node-rank (0 on the first, 1 on the second).
# 2 nodes x 4 GPUs = a world size of 8 NCCL ranks.
torchrun \
--nnodes=2 \
--nproc-per-node=4 \
--node-rank=0 \
--rdzv-id=saibook-job \
--rdzv-backend=c10d \
--rdzv-endpoint=10.0.0.1:29400 \
train_ddp.py --epochs 1
# --rdzv-endpoint is the MASTER_ADDR:MASTER_PORT every node dials in to.
torchrun. The script train_ddp.py is unchanged from Code B.2.1; only the launcher gains --nnodes, --node-rank, and the --rdzv-endpoint that all nodes use to rendezvous.Typing that command on every node by hand does not scale past a handful of machines, which is why a batch scheduler runs it for you. Slurm is the standard scheduler on research and HPC clusters; you describe the job's shape in an sbatch script (how many nodes, how many GPUs per node, a time limit) and Slurm allocates the nodes and launches one task per node. Code B.3.2 is a complete sbatch script that reserves four nodes of four GPUs each and starts the same DDP program across all sixteen ranks. The cluster scheduling that turns such a request into running processes is the subject of Chapter 33.
#!/bin/bash
#SBATCH --job-name=saibook-ddp
#SBATCH --nodes=4 # 4 nodes ...
#SBATCH --ntasks-per-node=1 # ... one torchrun launcher per node
#SBATCH --gpus-per-node=4 # 4 GPUs each: world size = 16
#SBATCH --cpus-per-task=16
#SBATCH --time=02:00:00
#SBATCH --output=logs/%x-%j.out
# The first node in the allocation is the rendezvous coordinator.
export MASTER_ADDR=$(scontrol show hostnames "$SLURM_JOB_NODELIST" | head -n1)
export MASTER_PORT=29400
# NCCL environment: pick the right network interface and surface errors early.
export NCCL_DEBUG=INFO # log the chosen transport (NVLink, IB, socket)
export NCCL_SOCKET_IFNAME=^lo,docker0 # ignore loopback and docker interfaces
# srun launches one torchrun per node; SLURM_* vars feed the rendezvous.
srun torchrun \
--nnodes=$SLURM_NNODES \
--nproc-per-node=4 \
--node-rank=$SLURM_NODEID \
--rdzv-id=$SLURM_JOB_ID \
--rdzv-backend=c10d \
--rdzv-endpoint=$MASTER_ADDR:$MASTER_PORT \
train_ddp.py --epochs 1
sbatch script for a 4-node, 16-GPU job. Slurm allocates the nodes and runs one torchrun per node; the NCCL_* variables select the network interface and turn on transport logging, the two settings that most often decide whether a multi-node job runs fast or stalls.The single most common multi-node failure is not a crash but a silent slowdown: NCCL picks a slow or wrong network interface (a management link, a docker bridge) and the all-reduce crawls. Set NCCL_DEBUG=INFO on the first run of any new cluster and read the log line that names the transport it selected; if it says Socket where you expected NVLink or IB (InfiniBand), fix it with NCCL_SOCKET_IFNAME or NCCL_IB_HCA before you benchmark anything. Treat the first multi-node run as a transport audit, not a training run, and you will not chase phantom scaling losses for a week.
B.4 Cloud and Spot-Instance Setup Intermediate
Most readers will not own a cluster, and they do not need to: the cloud rents the same nodes by the minute. Provisioning a GPU job in the cloud means choosing an instance type (the number and model of GPUs per node), requesting one or more of them, installing the environment, and launching the identical torchrun command from Section B.3. The new decision is whether to rent on-demand instances, which run until you stop them, or spot (preemptible) instances, which cost a fraction as much but can be reclaimed by the provider on a few seconds' notice. For training, spot is usually the right economic choice, on the condition that the job can survive being interrupted, which means checkpointing. The cost and elasticity trade-offs here are exactly those analyzed in Chapter 33.
Tools like SkyPilot turn the whole provision-install-launch sequence into one declarative file and one command, and they will request spot instances and re-provision them after a preemption automatically. Code B.4.1 is a SkyPilot-style task definition that asks for a node of four GPUs on the spot market and runs the multi-node launcher; the launcher and the training script are the ones you already have from Section B.3.
# File: cluster.sky.yaml (launch with: sky launch -c saibook cluster.sky.yaml)
resources:
accelerators: A100:4 # 4 x A100 per node
use_spot: true # preemptible: cheap, can be reclaimed any time
spot_recovery: auto # re-provision and resume automatically on preemption
num_nodes: 2 # 2 nodes x 4 GPUs = world size 8
file_mounts:
/ckpt: s3://my-bucket/saibook-ckpt # durable checkpoint store, survives preemption
run: |
torchrun \
--nnodes=$SKYPILOT_NUM_NODES \
--nproc-per-node=4 \
--node-rank=$SKYPILOT_NODE_RANK \
--rdzv-backend=c10d \
--rdzv-endpoint=$SKYPILOT_HEAD_IP:29400 \
train_ddp.py --epochs 50 --ckpt-dir /ckpt --resume
use_spot and spot_recovery lines buy the cheap preemptible capacity and automate re-provisioning; the durable /ckpt mount and the --resume flag are what let the job pick up where a preemption left it.A preemptible instance can be reclaimed mid-step, so a spot training job without checkpointing is not cheap, it is a loss waiting to be billed. The discipline that makes spot economical is writing the model, optimizer, and data-loader state to durable storage on a fixed cadence and resuming from the latest checkpoint on restart. With that in place, a preemption costs at most the work since the last checkpoint; without it, a preemption costs the entire run. Checkpoint frequency is a tuning knob: too rare and a preemption is expensive, too frequent and the writes themselves slow the job, and the elastic, fault-tolerant training of Chapter 18 turns this knob into a first-class part of the training loop.
B.5 Reproducibility Checklist Intermediate
A result is only as trustworthy as your ability to reproduce it, and distribution multiplies every source of irreproducibility: more machines, more library versions, more sources of nondeterminism, more ways for "it worked yesterday" to mean nothing. The remedy is to capture, alongside every result, the exact environment, seeds, configuration, and cluster shape that produced it, so that re-running is one command rather than an archaeology project. This discipline is the operational core of Chapter 26 and the standard that Chapter 19 holds large training runs to; here it is a checklist you apply to every experiment in the companion lab.
The first item is a pinned environment. Record exact versions, not ranges, so that the same wheels install months later; the cleanest form is a container image whose digest you log, but a fully pinned requirements file is the minimum. Code B.5.1 shows both the pin and the one-line seeding that removes the most common source of run-to-run drift.
# 1. PINNED ENVIRONMENT: exact versions, captured from the run that produced the result.
# requirements.lock (generated with `pip freeze`, committed alongside results)
torch==2.4.1+cu121
numpy==2.1.1
# ... every transitive dependency pinned to an exact version, no ranges ...
# A container is stronger than a lock file: log its immutable digest, not its tag.
# docker run --gpus all ghcr.io/myorg/saibook@sha256:9f2c... torchrun ...
sha256 digest rather than a mutable tag) make the software side of a result reproducible months after the fact.# 2. SEEDS + 3. CONFIG LOGGING + 4. CLUSTER SPEC: capture them inside the run.
import json, os, random, subprocess
import numpy as np
import torch
def set_all_seeds(seed: int = 0):
random.seed(seed); np.random.seed(seed); torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.use_deterministic_algorithms(True, warn_only=True) # flag nondeterminism
def capture_run(config: dict, out="run_manifest.json"):
manifest = {
"config": config, # 3. every hyperparameter
"world_size": int(os.environ.get("WORLD_SIZE", 1)), # 4. cluster shape
"gpus_per_node": torch.cuda.device_count(),
"git_commit": subprocess.check_output(
["git", "rev-parse", "HEAD"]).decode().strip(), # exact code version
"torch": torch.__version__, "cuda": torch.version.cuda,
}
with open(out, "w") as f:
json.dump(manifest, f, indent=2) # one artifact per run
set_all_seeds removes run-to-run drift, and capture_run writes a single manifest recording the configuration, world size, GPUs per node, git commit, and library versions that produced the result.The final item is the one-command reproduction. Every result in the lab should be re-runnable by a single entry point that reads the manifest, rebuilds the pinned environment, and launches the same job, so that reproduction does not depend on anyone remembering the flags. Output B.5.1 shows the shape of the manifest that Code B.5.2 writes; that file, the lock file, and the launch command together are the complete record of a run.
$ python -c "from repro import capture_run; capture_run({'lr': 3e-4, 'epochs': 50})"
$ cat run_manifest.json
{
"config": {"lr": 0.0003, "epochs": 50},
"world_size": 16,
"gpus_per_node": 4,
"git_commit": "a1b9f4c7e2d8...",
"torch": "2.4.1+cu121",
"cuda": "12.1"
}
Who: A graduate student preparing a distributed-training result for a paper.
Situation: A speedup measured on a 16-GPU spot allocation looked excellent and went into a draft table.
Problem: Two weeks later, on a fresh allocation, the same script ran 30 percent slower and the accuracy shifted, with no code change to explain it.
Dilemma: Trust the original number and risk an unreproducible claim, or invest in re-running and risk finding the first result was an artifact.
Decision: They reconstructed the environment, only to discover the first run had floated to a newer CUDA wheel and a different NCCL transport, neither of which had been recorded.
How: They adopted the checklist: a digest-pinned container, the seeding and manifest capture of Code B.5.2, NCCL_DEBUG=INFO in the logs, and a one-command launcher that reads the manifest.
Result: The re-run matched within noise, the corrected number went into the paper, and every later experiment shipped with a manifest that made reproduction a single command.
Lesson: On a cluster, an unpinned environment is a silent variable. Capture the environment, seeds, config, and cluster spec with the result, or the result is a story, not a measurement.
With these five tiers in hand you can take any data-parallel demo in the book from a laptop to a real cluster without rewriting it: simulate the workers locally (Section B.1), bind them to GPUs on one node (Section B.2), spread them across nodes with torchrun or Slurm (Section B.3), rent that capacity cheaply on spot instances with checkpointing (Section B.4), and capture every run so the result survives the journey (Section B.5). The companion lab is deliberately the same program at every rung of Figure B.1; the notation those programs use is collected in Appendix C.