Part I: Foundations of Distributed AI
Chapter 2: Distributed Systems Concepts for AI

Data Locality and Compute Locality

"They asked me to read a petabyte across the country. I asked them to send the program instead; it was four kilobytes, and I was already sitting on the data."

A Worker That Refused to Download the Dataset
Big Picture

In a distributed system, the dominant cost is rarely the arithmetic; it is moving the bytes that the arithmetic consumes. Because data is large and programs are small, the cheap move is to send the computation to wherever the data already lives, not to drag the data to wherever a free processor happens to be. This is the locality principle, and it is the single idea behind why MapReduce schedules a task on the machine that holds its input split, why a Spark stage prefers a node-local read, and why a deep-learning data pipeline stages its shards on storage physically next to the accelerators it feeds. Underneath all three sits one stubborn fact of hardware: each step away from the processor, from registers to high-bandwidth memory to local disk to the rack network to another datacenter, is roughly an order of magnitude slower than the last. This section makes that staircase concrete with measured numbers, then turns it into the scheduling rule that the rest of the book applies without restating.

Section 2.7 looked at what happens when one worker falls behind the others and the whole step waits on it. Locality is the upstream discipline that prevents a common cause of those stragglers in the first place: a worker that has to fetch its input across a slow link before it can even begin. The previous sections of this chapter gave us the vocabulary of nodes, partitioning, and replication; this one adds the cost geometry that decides where, among all the machines holding a copy of the data, a piece of work should actually run. The answer turns out to follow from a single comparison of sizes, and once you have seen it you will recognize it driving design decisions from the data-loading pipeline of a training job to the placement policy of a cluster scheduler.

1. The Locality Principle: Programs Are Small, Data Is Large Beginner

Start from an asymmetry that is almost always true in data-intensive computing. The program that processes a dataset, a map function, a tokenizer, a feature extractor, a forward and backward pass, is kilobytes to a few megabytes of code and parameters. The dataset it processes is gigabytes, terabytes, or petabytes. When a processor on machine A needs to apply a small program to a large block of data that currently sits on machine B, there are two ways to bring them together. You can move the data to the computation, copying the block from B to A across the network, or you can move the computation to the data, shipping the small program to B and running it there. The locality principle says: prefer the second, because you pay the network cost on the smaller of the two objects.

This is not a heuristic that sometimes pays off; it is forced by the ratio of the sizes. If an input shard is $S$ bytes and the program that processes it is $C$ bytes with $C \ll S$, and the link between the machines moves $B$ bytes per second, then moving the data costs $S / B$ seconds while moving the computation costs $C / B$ seconds. The ratio of the two is exactly $S / C$, independent of the link speed. A 256-megabyte shard and a 4-kilobyte task differ by a factor of $2^{16}$, so shipping the code rather than the data is sixty-five thousand times cheaper, on a fast rack link and on a slow wide-area link alike. The demo in this section computes that ratio directly so the number is not an assertion.

Key Insight: Move the Computation to the Data

Network cost is paid per byte moved, and the program is almost always the smaller object. So the cheap join of code and data happens at the data's location, not the processor's. Every locality-aware system in this book, from MapReduce to a GPU data loader, is an elaboration of this one sentence. The corollary is that a scheduler's first job is placement: decide where a task runs so that its input is already there, and only fall back to moving data when no processor near the data is free.

2. The Memory and Network Hierarchy Beginner

Locality matters because the cost of reaching a byte is not uniform; it depends entirely on how far that byte sits from the processor that wants it. Hardware is organized as a hierarchy of stores, and each level down is both larger and dramatically slower than the one above. A value already in a register is available in a fraction of a nanosecond. A value in high-bandwidth memory or DRAM is hundreds of times further away. A value on a local NVMe disk is hundreds of times further still. A value on another machine in the same rack adds a network hop, and a value in another datacenter adds wide-area latency that is measured in tens of milliseconds, eight orders of magnitude slower than the register. Figure 2.8.1 draws this staircase, with each tread roughly an order of magnitude (often several) below the last.

latency and capacity grow downward Register / L1 cache on-die, kilobytes ~0.3 ns HBM / DRAM on-node, gigabytes ~100 ns · 333x Local NVMe SSD on-node, terabytes ~100 us · 333,000x Same-rack network hop rack, petabytes across nodes ~200 us · 666,000x Cross-datacenter round trip WAN, exabytes globally ~50 ms · 166,000,000x
Figure 2.8.1: The locality staircase. Each tread is a tier of the memory and network hierarchy; the right-hand figure is its approximate latency to the first byte and its slowdown factor relative to a register access. The widening bars are a reminder that the slower tiers are also the larger ones, which is exactly why data lives there and computation has to be brought to it. The on-node latencies in the second and third treads are confirmed against this machine by Output 2.8.1.

Two consequences follow from the shape of this staircase. First, an algorithm's performance is set far more by which tier its data lives in than by the instruction count of its inner loop; a cache-resident kernel and a disk-bound one can differ by a factor of a thousand while doing the identical arithmetic. Second, the jump from on-node tiers (registers through local disk) to off-node tiers (rack and beyond) is where the distributed-systems tax begins, because crossing it means a network is now in the path. The whole game of locality is to keep as much work as possible on the cheap side of that jump.

3. A Runnable Demo: Measuring the Staircase and Settling the Trade-off Intermediate

The numbers above are easy to assert and easy to doubt, so the program below measures the on-node tiers it can actually touch from one Python process (DRAM latency and bandwidth, local-disk bandwidth), pairs them with representative published figures for the rack and wide-area tiers, and then does the arithmetic that the locality principle reduces to: for a fixed shard and task, how much faster is moving the computation than moving the data? The code uses only the standard library so that it runs anywhere without a GPU or a cluster.

import time, os, tempfile, random
MB = 1024 * 1024

def bench_ram_latency():
    """Pointer-chase a shuffled index array to defeat the prefetcher."""
    n = 1 << 20
    idx = list(range(n))
    random.Random(0).shuffle(idx)          # random order => cache/prefetch miss per hop
    p, hops = 0, 4_000_000
    t0 = time.perf_counter()
    for _ in range(hops):
        p = idx[p]                         # each hop waits on the previous load
    return (time.perf_counter() - t0) / hops * 1e9   # ns per hop

def move_costs(shard_bytes, task_bytes, bandwidth):
    """Seconds to ship the DATA vs to ship the COMPUTE over one link."""
    return shard_bytes / bandwidth, task_bytes / bandwidth

shard, task = 256 * MB, 4 * 1024           # 256 MB input split, 4 KB task binary
rack_bw, wan_bw = 1.25e9, 1.25e8           # ~10 Gb/s rack NIC, ~1 Gb/s shared WAN

ram_lat = bench_ram_latency()
d_rack, c_rack = move_costs(shard, task, rack_bw)
d_wan,  c_wan  = move_costs(shard, task, wan_bw)

print(f"measured DRAM random latency : {ram_lat:6.1f} ns/hop")
print(f"ship DATA to compute  (rack) : {d_rack*1e3:8.1f} ms")
print(f"ship COMPUTE to data  (rack) : {c_rack*1e3:8.3f} ms")
print(f"locality speedup      (rack) : {d_rack/c_rack:8.0f}x")
print(f"locality speedup      (WAN)  : {d_wan/c_wan:8.0f}x")
Code 2.8.1: A standard-library probe of the hierarchy. The pointer-chase defeats hardware prefetching so each memory access pays the true random-access latency; the move_costs helper turns the locality principle into two divisions whose ratio is the shard-to-task size ratio, independent of link speed.

The full script (saved alongside this chapter as _demo_locality.py, and adding the bandwidth and disk probes elided above) was run with C:\Python314\python.exe. Its real output on the author's machine follows.

Measured local tiers on this machine
----------------------------------------------------
RAM random-access latency   :   225.6 ns/hop
RAM sequential bandwidth    :    1.22 GB/s
Local disk write bandwidth  :    0.81 GB/s
Local disk read bandwidth   :    1.63 GB/s

The locality staircase (order-of-magnitude latency to first byte)
----------------------------------------------------
Register / L1 cache                          0.3 ns  ~           1x  [on-die]
HBM / DRAM (measured ~226 ns)              100.0 ns  ~         333x  [on-node]
Local NVMe SSD                          100000.0 ns  ~     333,333x  [on-node]
Same-rack network hop                   200000.0 ns  ~     666,667x  [rack]
Cross-datacenter RTT                  50000000.0 ns  ~ 166,666,667x  [WAN]

Scheduling question: move compute to data, or data to compute?
----------------------------------------------------
Input shard size            :      256 MB
Task binary size            :        4 KB
Ship DATA to compute (rack) :    214.7 ms
Ship COMPUTE to data (rack) :    0.003 ms
  locality speedup (rack)   :    65536x
Ship DATA to compute (WAN)  :   2147.5 ms
Ship COMPUTE to data (WAN)  :    0.033 ms
  locality speedup (WAN)    :    65536x
Output 2.8.1: Real run of _demo_locality.py under Python 3.14. The measured DRAM random-access latency (~226 ns) lands in the staircase's second tread; the scheduling block shows moving the 256 MB shard costs 215 ms on the rack while shipping the 4 KB task costs 3 microseconds, a $65{,}536\times$ ($2^{16}$) speedup that is identical on the wide-area link because, as Section 1 argued, the ratio is $S/C$ and the bandwidth cancels.

The 65,536 factor is not a benchmark artifact; it is the size ratio $S/C = 256\,\text{MB} / 4\,\text{KB} = 2^{16}$, and it is the same whether the link is fast or slow. That is the whole argument for locality in one number: the slower the link, the more locality saves you in absolute terms, but the relative advantage of shipping code over data is fixed by the asymmetry of their sizes. The DRAM bandwidth printed here reads low because a pure-Python byte-slicing loop is interpreter-bound rather than memory-bound; the latency and the scheduling arithmetic, which are the load-bearing results, are unaffected by that.

Fun Note: The Mountain Will Not Come

The locality principle is an old proverb with the costs reversed. If the mountain of data will not come to the program, the program, being light, walks to the mountain. The only times a sane scheduler makes the mountain move are when every machine near it is busy (so a remote read beats waiting) or when the data is small enough that it stopped being a mountain. Both exceptions are just the size ratio $S/C$ falling back toward one.

4. How Big-Data Systems Exploit Locality Intermediate

The first systems to industrialize the locality principle were the big-data engines, and they did it through the scheduler. In MapReduce, the input is split into blocks stored on a distributed file system, and each block is replicated on a few machines. When the scheduler assigns a map task, it does not pick a free worker at random; it asks where the replicas of that task's input split live and tries to place the task on one of those exact machines, so the map function reads its input from the local disk and no block crosses the network. Only when every machine holding a replica is busy does it settle for a same-rack worker, and only then for a remote one. This three-level preference, node-local, then rack-local, then any, is the locality staircase of Figure 2.8.1 turned into a placement policy. We develop the MapReduce execution model and this scheduling in Chapter 6.

Spark inherits the same idea and exposes it explicitly. Each partition of a distributed dataset carries a set of preferred locations, the nodes where its data already sits, and the Spark scheduler honors a configurable hierarchy of locality levels (process-local, node-local, rack-local, any) when it dispatches tasks, even waiting a short, tunable interval for a local slot to free up rather than immediately launching a remote read. The payoff is the same arithmetic Output 2.8.1 reported: a node-local read of a partition is a disk access, while an "any" read drags the partition across the rack network at roughly a thousand times the latency. Spark's DataFrame execution and this locality-aware scheduling are the subject of Chapter 7.

Practical Example: The Join That Stopped Saturating the Network

Who: A data engineer running nightly feature-engineering jobs for a recommendation pipeline.

Situation: A Spark job joining a 2-terabyte event log against a 4-gigabyte user table ran for ninety minutes, with cluster dashboards showing the rack network pinned near saturation the entire time.

Problem: The shuffle for the join was re-partitioning and moving the giant event log across the network so that matching keys met, paying the per-byte cost on the larger of the two tables.

Dilemma: Add more network-heavy executors and hope throughput scaled, an expensive move that fought the symptom, or restructure the join so the small table travelled instead of the large one.

Decision: They switched the join strategy to a broadcast join, shipping the 4-gigabyte user table once to every node and joining it locally against each node's resident slice of the event log.

How: A one-line broadcast hint on the small table told the planner to send the small object to the data rather than reshuffling the large one, exactly the move-compute-to-data inversion of Section 1.

Result: Wall-clock fell from ninety minutes to about twelve, and network utilization dropped to a fraction of its former level, because the bytes crossing the network shrank from two terabytes of log to a few gigabytes of table, broadcast once.

Lesson: When two datasets must meet, move the smaller one. Locality is not only about disk reads; it is the same size-ratio argument applied to every byte a join would otherwise shuffle.

5. Locality in Deep Learning: Feeding the Accelerators Intermediate

Deep learning makes locality urgent in a way batch analytics does not, because the consumer of the data is an accelerator that costs a great deal per second and must never sit idle. A training step is a tight cycle: the data loader must read the next batch, decode and augment it, and have it resident in GPU memory by the time the previous step's backward pass finishes. If the input pipeline cannot keep up, the GPU starves, and an idle accelerator is the most expensive thing in a training cluster. Locality here means staging the dataset shards on storage physically close to the accelerators, on local NVMe or a fast node-local cache rather than a distant object store, so that the read tier in Figure 2.8.1 stays on the cheap, on-node side of the network jump. The design of distributed storage and the data-loading pipeline, including sharded readers, prefetching, and caching that overlap I/O with computation, is the whole subject of Chapter 8.

Locality also operates inside a single node, among its accelerators. When several GPUs on one machine must exchange tensors, as they do on every step of data-parallel and model-parallel training, the path between them matters just as much as the path to storage. Two GPUs linked by NVLink share data at hundreds of gigabytes per second; two GPUs that can only talk over the PCIe bus, or worse across a CPU socket, are far slower and can become the step's bottleneck. This intra-node device locality is why a topology-aware scheduler tries to place the GPUs of one job on NVLink-connected devices, and why the cost of a collective depends on which devices are paired. The communication primitives that ride these links, and the way their cost follows the device topology, are developed in Chapter 4.

Key Insight: Locality Is a Ladder, Not a Switch

It is tempting to treat locality as a binary, local or remote, but real systems optimize a whole ladder at once. For a training job that ladder runs from "tensor already in registers", through "in HBM on this GPU", "reachable over NVLink on a sibling GPU", "over PCIe on this node", "over the rack network on a peer node", to "in a remote object store". Every rung you can avoid descending is an order of magnitude reclaimed. A well-placed job keeps the hot path, the per-step data and gradient traffic, as high on this ladder as possible, and pushes only the cold, infrequent traffic to the lower rungs.

6. When Locality Bends: Replication and the Limits of the Principle Advanced

Locality is a default, not a law, and it is worth naming where it bends. First, you can only run a task near its data if a copy of that data is reachable from a free processor, which is why locality and replication are partners: distributed file systems keep several replicas of each block precisely so the scheduler has more than one machine on which a task can be node-local. Section 2.3 introduced that replication; locality is one of the reasons it pays for itself. Second, when every machine holding the data is busy, waiting for a local slot can cost more than a remote read, so schedulers cap the wait and then fall back, trading a little locality for lower latency, which is the straggler-versus-throughput tension of Section 2.7 in another guise.

Third, and most important for deep learning, locality is the data-plane default but not the only force on placement. Collective communication wants the workers of one job clustered on a fast, low-diameter slice of the network so their all-reduce is cheap, and that topology objective can compete with a pure data-locality objective. Reconciling the two, placing a job so that both its input reads and its gradient exchanges are cheap, is a scheduling problem we return to with the communication-cost machinery of Chapter 3 and the cluster schedulers of Part VII. The principle to carry forward is unchanged: move the small thing, keep the hot traffic high on the ladder, and only descend the hierarchy when something nearer is unavailable.

Research Frontier: The Data-Loading Bottleneck and Storage-Side Computation (2024 to 2026)

As accelerators have grown faster far quicker than storage and networks, the input pipeline has become a first-class bottleneck, and a vigorous research line attacks it from both ends of the locality ladder. On the loader side, systems work on caching and deduplicating shards across a cluster (in the lineage of Google's tf.data service and Meta's data-loading infrastructure) so that a hot shard is fetched once and then served node-locally, and recent studies quantify how often large training runs are I/O-bound rather than compute-bound. On the device side, GPUDirect Storage lets an NVMe drive stream directly into GPU memory over PCIe or NVLink, removing the CPU bounce-buffer copy and pulling the read one rung higher on the locality ladder; libraries such as NVIDIA DALI and the Magnum IO stack push this into mainstream training. A complementary thread is computational and near-data storage, moving filtering and decompression into the storage layer so that only the bytes a model actually needs cross the wire, the ultimate expression of moving computation to data. We meet the storage and loading side of this work in Chapter 8 and the device-to-device side in Chapter 4.

Library Shortcut: Frameworks Make Locality the Default

You rarely hand-place tasks. In Spark you express data-side movement with a single hint, and the planner does the size-ratio reasoning of Section 4 for you; the broadcast join of the Practical Example is one method call:

from pyspark.sql.functions import broadcast

# Ship the SMALL table to every node's resident slice of the LARGE one,
# instead of shuffling the large table across the network.
result = big_events.join(broadcast(small_users), on="user_id", how="inner")
Code 2.8.2: Spark's broadcast hint turns "move data to compute" into "move the small object to the data" in one call; the engine handles replicating the small table, the local join, and the fallback if the table is too large to broadcast.

On the training side, locality is equally automatic. A PyTorch DataLoader with several worker processes and a prefetch factor overlaps reading and augmentation with the GPU's compute so the next batch is staged before the device asks for it, and pinned memory plus an asynchronous host-to-device copy keeps the transfer on the fast rung. Roughly fifty lines of hand-rolled prefetching and double-buffering collapse to a constructor call:

from torch.utils.data import DataLoader

loader = DataLoader(
    dataset,
    batch_size=256,
    num_workers=8,        # parallel reader processes stage shards off local disk
    pin_memory=True,      # page-locked host buffers enable async H2D copy
    prefetch_factor=4,    # each worker reads ahead so the GPU never waits on I/O
)
Code 2.8.3: A PyTorch DataLoader configured for locality. The library overlaps node-local reads with accelerator compute and keeps transfers on the fast rung, the from-scratch version of which (worker pool, ring buffer, pinned-memory copies) is the bulk of Chapter 8's pipeline.

Locality is the quiet discipline that makes every later distribution strategy affordable: data parallelism, model sharding, and fleet inference all assume that a worker's input is already nearby, and they pay dearly when it is not. With placement and cost geometry in hand, we are ready to assemble the recurring building blocks of distributed AI, the manager-worker, parameter-server, and dataflow patterns, in Section 2.9.

Exercise 2.8.1: Where Does the Break-Even Sit? Conceptual

Section 1 argued that the relative advantage of moving computation over moving data is the size ratio $S/C$, independent of link speed. (a) For what shard size $S$ does moving the data become cheaper than moving a 4-kilobyte task, if running the task remotely instead of locally would otherwise force a wait of $W$ seconds for a local slot to free up? Write the inequality. (b) Explain why a scheduler that caps its locality wait at a few seconds (as Spark does) is implicitly solving this inequality, and what it assumes about the cost of a remote read. (c) Give one realistic situation in deep-learning training where the data is small enough that locality stops mattering.

Exercise 2.8.2: Extend the Staircase to a Real Job Coding

Modify Code 2.8.1 so that instead of a single 256-megabyte shard it models a full epoch of a training dataset: $N = 10^{6}$ samples of 200 kilobytes each, read once per epoch over a given link. Compute the total time to stream the epoch over (a) local NVMe at 3 GB/s, (b) a 10 Gb/s rack link, and (c) a 1 Gb/s wide-area link. Then assume each accelerator step consumes one 256-sample batch in 80 milliseconds and report, for each link, whether the data pipeline can keep the accelerator fed or whether the GPU will starve. State which tier of Figure 2.8.1 you would need the dataset staged on to avoid starvation.

Exercise 2.8.3: NVLink Versus PCIe on the Hot Path Analysis

A data-parallel step exchanges a gradient of $P = 5 \times 10^{8}$ parameters (4 bytes each) between two GPUs on one node. Estimate the transfer time if the two GPUs are connected by NVLink at 300 GB/s versus by PCIe Gen4 at 25 GB/s, and express the difference as a slowdown factor. Now suppose the accelerator's compute for one step is 120 milliseconds. For each interconnect, argue whether the gradient exchange can be hidden behind compute or becomes the step's bottleneck, and connect your answer to why a topology-aware scheduler (Section 5) treats NVLink adjacency as a placement constraint. We make the collective-cost model behind this estimate precise in Chapter 4.