Part II: Distributed Data Processing for AI
Chapter 8: Distributed Storage and Data Loading

Sharded Training Data and the DataLoader Bottleneck

"I am a very expensive accelerator, and I have spent the last forty milliseconds watching a JPEG decode itself. Send help, or at least send a worker."

A GPU Idling on a Communication Barrier
Big Picture

An accelerator can only train as fast as data arrives to feed it, so the input pipeline is a distributed-systems problem in miniature: read, decode, augment, collate, and transfer must finish before the model step needs the batch, or the most expensive hardware in the building sits idle. Section 8.1 argued that the storage layer sets the ceiling for the whole cluster; this section brings that argument down to a single node and a single training step. Each example must be pulled off storage, turned into a tensor, and copied onto the device, and every one of those stages can be the slow one. The remedy is the same overlap idea that runs through the book: hide loading behind compute with many worker processes and a prefetch buffer, and split the dataset so each of the $K$ data-parallel workers reads a disjoint, balanced slice. When the pipeline is faster than the model step, the accelerators never wait; when it is not, no amount of compute helps.

The previous section laid out how data is partitioned and compacted on storage so that a reader can fetch large contiguous chunks efficiently. That work pays off only if the bytes actually reach the accelerator in time. A training step is a small assembly line. A batch of examples is read from storage, decoded from its on-disk encoding (a JPEG, a tokenized record, a Parquet row group) into raw arrays, augmented or normalized, collated into a single padded tensor, and finally transferred across the PCIe or NVLink bus into device memory. The model's forward and backward pass then consumes that tensor. If the assembly line cannot keep one batch ready at all times, the model step blocks, and the accelerator, which may cost several dollars an hour, computes nothing while it waits. This is the per-node echo of the cluster-wide storage argument from Section 8.1: a fast worker fed by a slow pipeline is no faster than the pipeline.

Full training dataset N examples on storage DistributedSampler: disjoint shards Worker 0 shard 0 Worker 1 shard 1 Worker 2 shard 2 Worker K-1 shard K-1 each reads a balanced slice inside one worker: the prefetch pipeline read fetch chunk decode bytes to arrays augment normalize, crop collate pad into a batch prefetch queue double-buffered batches loader workers (xK) fill the queue in parallel pinned memory async host to device accelerator step forward + backward while the accelerator runs step i, the workers are already building step i+1 (overlap)
Figure 8.5.1: Two views of the data path. Top: a distributed sampler splits the $N$ examples into $K$ disjoint, balanced shards, one per data-parallel worker, so no example is trained on twice per epoch. Bottom: inside one worker, the read, decode, augment, and collate stages feed a prefetch queue that loader processes fill in parallel; pinned memory enables an asynchronous host-to-device copy so the next batch is ready before the current model step finishes.

1. The Input Pipeline Is an Assembly Line Beginner

Think of the data path as five stages, each consuming the output of the one before it. Reading pulls raw bytes off storage, local disk, a distributed filesystem, or object storage as covered in Section 8.2. Decoding turns those bytes into usable arrays, which for images means JPEG decompression and for text means tokenization. Augmentation applies the random crops, flips, and normalizations that regularize training. Collation stacks a list of examples into one dense, padded tensor. Transfer copies that tensor across the bus into device memory. Only after all five finish can the model step begin, and the model step is the only stage that uses the accelerator.

The throughput of an assembly line is set by its slowest stage, not its average stage. A pipeline that decodes quickly but reads from a cold object store one tiny file at a time will be read-bound; a pipeline that reads fast from a local cache but applies heavy per-image augmentation on a single CPU core will be augment-bound. The first diagnostic skill is to measure each stage in isolation and find which one dominates, because the fix differs by stage: read-bound pipelines want larger contiguous chunks and caching, decode-bound and augment-bound pipelines want more CPU parallelism or accelerated kernels, and collate-bound pipelines want a cheaper batch layout.

Key Insight: The Input Pipeline and the Model Race, and Only the Slower One Sets the Pace

Let $t_\text{load}$ be the time to prepare one batch and $t_\text{compute}$ the time for one model step. If the two run on separate resources (CPU workers and the accelerator) and overlap perfectly, the wall-clock time per step is $\max(t_\text{load}, t_\text{compute})$, not their sum. When $t_\text{load} \le t_\text{compute}$ the loading is fully hidden and the accelerator is the bottleneck, which is exactly where you want to be. When $t_\text{load} > t_\text{compute}$ the accelerator stalls for $t_\text{load} - t_\text{compute}$ every step, and buying a faster accelerator makes the gap worse, not better. The entire art of data loading is driving $t_\text{load}$ below $t_\text{compute}$ and keeping it there.

2. Overlap: Workers, Prefetching, and Double-Buffering Intermediate

A single process that loads a batch and then trains on it does the two jobs in sequence, paying $t_\text{load} + t_\text{compute}$ every step. The cure is to run loading and compute concurrently so the next batch is built while the current one trains. This is the identical overlap principle that hides communication behind computation in distributed training, where an all-reduce is launched during the backward pass so the network transfer finishes before the optimizer needs the result (Section 4.10). Here the rival to compute is not the network but the CPU-bound input pipeline, and the tool is the same: do the slow thing in the background.

Two mechanisms make this concrete. Multiple worker processes parallelize the pipeline itself: with $w$ loader processes, up to $w$ batches are read, decoded, and augmented at once, cutting the effective per-batch load time toward $t_\text{load}/w$ until some shared resource (disk bandwidth, the number of CPU cores) saturates. Prefetching with double-buffering decouples the loaders from the model: the workers push finished batches into a small queue, and the training loop pulls from the queue, so the accelerator finds its next batch already waiting instead of triggering a load on demand. The two together mean that to hide a load that is $r = t_\text{load}/t_\text{compute}$ times slower than compute, you need roughly $\lceil r \rceil$ workers; beyond that, extra workers only deepen the buffer.

The simulation below models a training loop with these mechanics. A pool of loader workers fills a prefetch queue while a single accelerator consumes batches, each load taking $28$ ms and each model step $10$ ms, so loading is initially the slower stage. It reports accelerator utilization as the worker count rises.

COMPUTE_MS = 10.0     # one model step on the accelerator
LOAD_MS    = 28.0     # one batch through read + decode + augment + collate
STEPS      = 200

def simulate(num_workers):
    """Return (wall_clock_ms, accelerator_busy_ms) for one epoch."""
    worker_free_at = [0.0] * num_workers          # when each loader becomes idle
    batch_ready_at = []
    for i in range(STEPS):
        w = min(range(num_workers), key=lambda j: worker_free_at[j])  # earliest-free worker
        worker_free_at[w] += LOAD_MS
        batch_ready_at.append(worker_free_at[w])  # this batch is ready at that time

    accel_free_at = 0.0
    for i in range(STEPS):
        step_start = max(accel_free_at, batch_ready_at[i])  # wait if the batch is not ready
        accel_free_at = step_start + COMPUTE_MS
    return accel_free_at, STEPS * COMPUTE_MS

print(f"compute/step = {COMPUTE_MS:.0f} ms   load/batch = {LOAD_MS:.0f} ms   steps = {STEPS}")
print(f"{'workers':>8} {'wall (ms)':>11} {'util %':>8} {'verdict':>22}")
ideal_wall = LOAD_MS + STEPS * COMPUTE_MS    # one batch of startup latency, then pure compute
for k in (1, 2, 3, 4, 6, 8):
    wall, busy = simulate(k)
    util = 100.0 * busy / wall
    verdict = "load hidden" if wall <= ideal_wall * 1.001 else "starved by input"
    print(f"{k:>8} {wall:>11.0f} {util:>7.1f}% {verdict:>22}")
Code 8.5.1: A pure-Python model of prefetch overlap. Loader workers fill a queue (the batch_ready_at times); the accelerator consumes a batch only once it is ready, stalling otherwise. Utilization is the fraction of wall-clock the accelerator spent computing rather than waiting.
compute/step = 10 ms   load/batch = 28 ms   steps = 200
 workers   wall (ms)   util %                verdict
       1        5610    35.7%       starved by input
       2        2820    70.9%       starved by input
       3        2028    98.6%            load hidden
       4        2028    98.6%            load hidden
       6        2028    98.6%            load hidden
       8        2028    98.6%            load hidden
Output 8.5.1: Accelerator utilization climbs from 35.7% with one worker to 98.6% with three, then flattens. Since one load is $28/10 = 2.8$ compute steps long, $\lceil 2.8 \rceil = 3$ workers suffice to hide it; the residual 1.4% is the startup latency of the first batch, and more workers only enlarge the buffer.

The shape of that table is the whole lesson. With one worker the accelerator is starved two-thirds of the time, computing for only 36% of the wall-clock; the hardware bill is mostly paying for idle silicon. Adding workers parallelizes the input pipeline until, at three workers, loading is fully hidden behind compute and the accelerator runs essentially flat-out. Past that point the model step is the bottleneck, exactly the regime the Chapter 3 performance models tell us to aim for, and extra workers waste memory without buying speed. Finding the knee of this curve by measurement, not by guessing, is the practical job.

Fun Note: The Two-Dollar-an-Hour Spectator

An idle high-end accelerator does not save money; it burns it at full price while doing nothing. At 36% utilization, nearly two-thirds of the rental cost in Output 8.5.1 buys a very expensive seat from which to watch JPEGs decode. The cheapest performance win in deep learning is often not a faster chip but three more CPU workers and a prefetch queue, a fix that costs almost nothing and can nearly triple effective throughput.

3. Pinned Memory and Asynchronous Transfer Advanced

Hiding the read, decode, and augment stages still leaves the final transfer across the bus, and that stage has its own trick. By default a batch lives in ordinary pageable host memory, which the operating system may relocate at any time; a copy to the device from such memory must first stage through a hidden pinned buffer, and the copy cannot overlap with compute. If the batch is allocated in page-locked (pinned) memory instead, the driver can issue an asynchronous copy on a separate stream, so the host-to-device transfer of batch $i+1$ proceeds while the accelerator computes on batch $i$. This is the last layer of the same overlap idea: the transfer hides behind compute just as the loading did.

Pinned memory is not free; it cannot be paged out, so over-allocating it starves the rest of the system. The standard recipe is to pin only the prefetched batches and to combine pinning with an asynchronous copy and the worker pool, so that read, decode, augment, and transfer all overlap the model step. When every non-compute stage is hidden, the wall-clock per step collapses to $t_\text{compute}$ and the accelerator utilization approaches the 98.6% ceiling seen above, limited only by the startup latency of the first batch and the occasional refill of the queue.

Library Shortcut: torch DataLoader Wires Up Overlap for You

Code 8.5.1 modeled the worker pool and queue by hand. PyTorch's DataLoader implements exactly this machinery and exposes it as constructor arguments: num_workers forks the loader processes, prefetch_factor sets the queue depth per worker, and pin_memory=True turns on page-locked staging for the asynchronous device copy. The dozens of lines of process management, queue handling, and stream coordination collapse to a single call.

from torch.utils.data import DataLoader

loader = DataLoader(
    dataset,
    batch_size=256,
    num_workers=8,        # 8 loader processes fill the prefetch queue (Section 2)
    prefetch_factor=2,    # each worker keeps 2 batches ready: double-buffering
    pin_memory=True,      # page-locked batches enable async host-to-device copy
)
for batch in loader:                      # batches arrive already built and pinned
    batch = batch.to("cuda", non_blocking=True)   # async transfer overlaps the step
    loss = model(batch).backward()
Code 8.5.2: The same overlap as Output 8.5.1, now as DataLoader arguments. The library handles process spawning, the prefetch queue, pinned-buffer allocation, and the non_blocking transfer that the from-scratch model only approximated.

4. Sharding the Dataset Across Data-Parallel Workers Intermediate

Everything so far concerned one node feeding one accelerator. Data-parallel training, the exact-gradient method introduced in Section 1.1 and built into a full loop in Chapter 15, runs $K$ replicas of the model at once, and each replica needs its own stream of batches. The correctness requirement is precise: within one epoch the $K$ workers together must cover the dataset exactly once, with no example seen by two workers and none skipped. If two replicas trained on the same batch, that example would be over-weighted in the averaged gradient and the run would silently drift from the single-machine answer that data parallelism is supposed to reproduce exactly.

The mechanism is a sampler that partitions the index set. Worker $k$ out of $K$ is assigned the examples whose index satisfies $i \bmod K = k$ (or an equivalent contiguous slice), giving $K$ disjoint shards that tile the dataset. Two further requirements matter at scale. The shards must be balanced, because synchronous data parallelism waits at an all-reduce barrier every step, so the slowest worker sets the pace; a worker handed 10% more examples drags the whole group, the per-step version of the straggler problem. And the shuffle must be coordinated by a shared epoch seed, so that all workers reshuffle consistently and the union of their shards is still a clean permutation of the dataset each epoch rather than a fixed split that never mixes.

Practical Example: The Eighth Worker That Trained on Nothing

Who: An ML platform engineer scaling an image-classification run from four GPUs to eight.

Situation: The four-GPU job hit 95% GPU utilization and reproduced the single-GPU accuracy exactly, so eight GPUs were expected to roughly halve wall-clock.

Problem: At eight GPUs the throughput barely improved and validation accuracy dropped, even though every GPU reported high utilization.

Dilemma: Blame the model and tune hyperparameters, blame the interconnect and profile the all-reduce, or audit the data path that had just changed shape under the new worker count.

Decision: They audited the data path first, logging exactly which example indices each of the eight workers received across an epoch.

How: The custom sampler had been written for four workers and used a hardcoded stride; at eight workers it handed every example to two different ranks and left an eighth of the data untouched, so each batch was effectively duplicated and the epoch covered only seven-eighths of the dataset.

Result: Replacing the homemade sampler with DistributedSampler, which derives disjoint balanced shards from the rank and world size, restored exact dataset coverage; throughput scaled and accuracy matched the four-GPU run.

Lesson: Data parallelism is only exact when the shards are disjoint, balanced, and cover the dataset once. Let the framework's sampler compute the partition from the world size rather than hardcoding strides that silently break when the worker count changes.

Library Shortcut: DistributedSampler Computes the Disjoint Shards

DistributedSampler reads the process rank and the world size from the distributed group and hands each worker the disjoint, balanced index slice automatically, including the consistent per-epoch reshuffle. Pairing it with the DataLoader from Code 8.5.2 gives a fully sharded, fully overlapped input pipeline.

from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler

sampler = DistributedSampler(dataset)     # reads rank + world_size; disjoint balanced shards
loader = DataLoader(dataset, batch_size=256, sampler=sampler,
                    num_workers=8, pin_memory=True)

for epoch in range(num_epochs):
    sampler.set_epoch(epoch)              # reshuffle consistently across all K workers
    for batch in loader:
        ...                               # this rank sees only its 1/K of the data
Code 8.5.3: Sharding the loader across $K$ data-parallel workers. The set_epoch call is the shared seed that keeps every rank's reshuffle in step, so the union of the shards is a clean permutation of the dataset each epoch.
Thesis Thread: Sharding Returns, This Time on the Input

Splitting an average across disjoint shards was the seed of the whole book in Section 1.1: the gradient is a sum, a sum regroups, and $K$ workers each computing over a slice recover the exact answer when combined by all-reduce. Here the same sharding moves one layer earlier, onto the data loader that feeds those workers. The all-reduce makes data parallelism exact only if every replica reads a disjoint, balanced slice; the DistributedSampler is what guarantees the partition that the gradient identity assumes. Sharding the compute and sharding the input are two halves of one idea, and they must agree on the same $K$.

5. Diagnosing and Fixing a Starved Pipeline Intermediate

When utilization is low, the diagnostic order follows the assembly line. First confirm the symptom: high accelerator idle time between steps with low GPU utilization points at the input pipeline rather than the model. Then locate the slow stage by timing each in isolation, loading without training to measure pure $t_\text{load}$, and training on a single cached batch to measure pure $t_\text{compute}$. If $t_\text{load} > t_\text{compute}$ with few workers, add workers until the knee in Output 8.5.1 appears; if more workers stop helping before utilization is high, a shared resource has saturated, usually storage read bandwidth, and the fix moves upstream to the layout and caching of Section 8.4 or to a streaming format that reads large sequential chunks, which is the subject of Section 8.6.

The pathological case is many tiny files. Reading a million small images one open-and-close at a time spends most of its time in filesystem metadata operations rather than moving bytes, and no number of workers rescues a pipeline that is latency-bound on storage. The standard answer is to pack examples into a few large sequential archives so the reader streams big contiguous chunks, which is precisely the WebDataset-style sharding that the next section builds. Once reads are sequential and large, the worker pool and prefetch queue of this section can finally do their job.

Research Frontier: Data Loading at Scale (2024 to 2026)

As accelerators get faster, the input pipeline becomes the binding constraint for an ever larger share of training jobs, and recent work attacks it from several directions. GPU-resident loading moves decode and augmentation onto the accelerator itself: NVIDIA DALI runs JPEG decoding and image augmentation as device kernels, and the nvImageCodec and nvJPEG libraries push hardware-accelerated decode further, freeing the CPU from a stage that often dominated $t_\text{load}$. On the storage side, GPUDirect Storage establishes a direct path from NVMe or network storage into device memory that bypasses a staging copy through host RAM, shortening the transfer stage for the largest datasets. At cluster scale, shared caching layers and node-local cache tiers (in the lineage of systems like Alluxio and the caching paths in recent large-model data stacks) keep hot shards close to the accelerators so repeated epochs do not re-read cold object storage. The common thread is that the data path is now engineered with the same seriousness as the model kernels, because at the current accelerator speeds an unoptimized loader is the first thing to fall behind.

We now have a node that keeps its accelerator fed (workers, prefetching, pinned transfer) and a cluster whose $K$ workers each read a disjoint balanced shard. The remaining weakness is the storage access pattern itself: random reads of small files defeat even a well-tuned loader. Section 8.6 closes that gap with streaming, WebDataset-style pipelines that pack examples into large sequential shards built to be read straight through, turning the loader's worst case into its best one.

Exercise 8.5.1: Which Stage Is the Bottleneck? Conceptual

For each pipeline, name the stage (read, decode, augment, collate, transfer) most likely to dominate $t_\text{load}$, and state whether adding loader workers will help: (a) a dataset of ten million 4 KB JPEG thumbnails on a network filesystem, trained with light augmentation; (b) a dataset of a few thousand 4K-resolution images on a fast local NVMe, with heavy random-crop-and-color-jitter augmentation; (c) a tokenized text corpus already stored as packed integer arrays in large Parquet files, with no augmentation. Explain why the remedy differs across the three.

Exercise 8.5.2: Find the Knee Coding

Extend Code 8.5.1 so each loader incurs a fixed per-batch shared-resource cost (model disk bandwidth: no more than two loads may overlap at once, regardless of worker count). Re-run the sweep over worker counts and plot or print utilization. Show that beyond two effective loaders extra workers no longer raise utilization, and relate the resulting ceiling to the saturation case described in Section 5. Then vary LOAD_MS and confirm that the number of workers at the knee tracks $\lceil t_\text{load}/t_\text{compute}\rceil$ until the shared-resource cap binds first.

Exercise 8.5.3: The Cost of an Unbalanced Shard Analysis

Suppose $K = 8$ data-parallel workers train synchronously, barrier-synchronizing at an all-reduce every step, and the per-step compute is identical across workers except that one worker was handed 20% more examples per batch by a buggy sampler. Model the per-epoch wall-clock as set by the slowest worker and estimate the throughput loss relative to a balanced split. Now argue why this straggler effect is strictly worse than the same imbalance would be in an asynchronous parameter-server scheme, and connect your answer to the balance requirement that DistributedSampler enforces in Section 4.