"They asked me to shuffle the dataset. I can see eight thousand of its ten billion examples at a time. I do my best, and I have made my peace with that."
A Shuffle Buffer of Bounded Ambition
When a training set is too large to fit on any single worker's local disk, you stop copying the data to the trainer and instead stream it past the trainer, reading straight from object storage in big sequential chunks while it trains. Two facts from earlier in this chapter force the design. Object storage rewards a few large reads and punishes many small ones (Section 8.2), and a dataset of millions of tiny per-sample files is the worst possible shape to read at scale (Section 8.4). The streaming answer packs many samples into large sequential shards, pulls those shards over the network, and decodes samples on the fly. The price you pay is that a stream cannot be randomly indexed, so the perfect shuffle of a map-style dataset becomes an approximate shuffle through a bounded buffer. This section explains the shard-of-tar pattern, the shuffle-buffer trade-off, and how to keep streaming deterministic and resumable across workers and epochs.
In Section 8.5 we sharded the training data and met the data-loader bottleneck: when the dataset lives on fast local disk, the loader's job is to keep the accelerator fed. This section removes the comfortable assumption that the data is on local disk at all. Foundation-model corpora are measured in tens of terabytes to petabytes, far past what you would replicate to every training node, and copying the whole set to each worker before training even starts can take longer than an epoch. The streaming pattern reads the canonical copy in place from object storage (S3, GCS, Azure Blob), so the only data ever on a worker is the small window currently in flight. The challenge is to make that streaming read fast, correctly shuffled, evenly divided across workers, and resumable after a crash, all without random access to the underlying bytes.
1. Map-Style Versus Iterable Datasets Beginner
Every training data pipeline is one of two shapes, and the difference decides everything that follows. A map-style dataset is an indexed collection: it knows its length $N$ and can return sample $i$ for any $i$ on demand, the way a Python list answers data[i]. Map-style datasets make shuffling trivial, because to shuffle you simply permute the index set $\{0, 1, \dots, N-1\}$ and read samples in the permuted order. PyTorch's classic Dataset with a __getitem__ method is map-style, and it is the right tool whenever the data fits where random access is cheap, such as local NVMe.
An iterable dataset is a stream: it yields samples one after another and does not support data[i]. You can ask it for the next sample, but you cannot ask it for the 4,000,000th sample without reading the first 3,999,999. This is exactly the shape of data arriving over a network connection from object storage, and it is the shape this section is about. The cost of giving up indexing is that shuffling is no longer a permutation of an index set, because there is no index set to permute; we will have to approximate it. The benefit is that the dataset can be unbounded, can live entirely on remote storage, and never needs a single machine large enough to hold it. Table 8.6.1 contrasts the two.
| Property | Map-style (indexed) | Iterable (streaming) |
|---|---|---|
| Access | data[i] for any $i$ | next(it) only, in order |
| Knows length | Yes ($N$ fixed) | Often no (possibly unbounded) |
| Shuffle | Exact: permute $\{0,\dots,N-1\}$ | Approximate: shuffle buffer |
| Storage fit | Must support random access (local disk) | Sequential reads from object storage |
| Best when | Data fits on fast local disk | Data exceeds local disk, lives remote |
| PyTorch base | Dataset + __getitem__ | IterableDataset + __iter__ |
Random access and arbitrary dataset size are in tension. A map-style dataset can shuffle perfectly but only because it can address every sample, which requires the data to sit somewhere random access is cheap. The moment the data is too big for that place and must be streamed from object storage, you lose the index, and with it the exact shuffle. Streaming pipelines do not fight this; they accept sequential-only access as the cost of unbounded scale and then recover an approximate shuffle cheaply. The whole design of WebDataset-style pipelines follows from taking that trade seriously.
2. The Shard-of-Tar Pattern Beginner
If you store a ten-million-image dataset as ten million separate object-storage keys, you have built the small-files disaster of Section 8.4 at web scale: every sample is its own network round trip, per-object request overhead dominates, and throughput collapses to a tiny fraction of the link's capacity. Object storage, as Section 8.2 showed, wants a few large sequential reads, not millions of tiny ones. The WebDataset pattern resolves this by packing many samples into large sequential archives called shards, each a plain POSIX tar file holding hundreds or thousands of samples concatenated back to back.
The tar format is deliberately humble. Reading a tar is purely sequential: you stream bytes from the front and the archive hands you one member file after another, which is precisely the access pattern object storage is fastest at and exactly what an iterable dataset needs. Inside a shard, the samples for one training example share a common basename and differ only by extension, so 000017.jpg, 000017.cls, and 000017.json are grouped into a single sample {"jpg": ..., "cls": ..., "json": ...}. Decoding happens in memory as bytes arrive; nothing is unpacked to disk. A typical shard is sized so that one sequential read is large enough to amortize request overhead, commonly a few hundred megabytes to a gigabyte, which is the large-read sweet spot from Section 8.2.
tar shards live in object storage; the loader streams a shard, decodes samples from raw bytes as they arrive, passes them through a bounded shuffle buffer of capacity $B$, and emits batches to the trainer. The buffer is the only place samples accumulate; its size $B$ trades shuffle quality against memory, as the demo in Section 4 measures.This single decision, packing samples into large tar shards, fixes both problems at once. It turns millions of small reads into a handful of large sequential reads (matching object storage), and it makes the dataset naturally iterable (matching the stream). The shard is also the unit of parallelism: with $S$ shards and $W$ workers, you assign disjoint subsets of shards to workers, and within each shard the reads stay sequential. We return to how foundation-model pretraining corpora are built and consumed this way in Chapter 19.
You do not write the tar reader, the sample grouper, the decoders, or the shuffle buffer by hand. The webdataset library expresses the whole pipeline as a chain of composable stages, and the Hugging Face datasets library exposes the same streaming behavior with a single flag:
import webdataset as wds
# Read shards straight from S3 by URL; {0000..1023} is a brace expansion over shards.
url = "pipe:aws s3 cp s3://my-bucket/imagenet-train-{0000..1023}.tar -"
ds = (
wds.WebDataset(url, shardshuffle=True) # shuffle the ORDER of shards each epoch
.shuffle(10000) # the sample-level shuffle buffer, B = 10000
.decode("pil") # decode .jpg bytes to PIL images on the fly
.to_tuple("jpg", "cls") # group members into (image, label)
)
loader = wds.WebLoader(ds, batch_size=64, num_workers=8)
# Hugging Face datasets: the SAME idea behind one argument.
from datasets import load_dataset
hf = load_dataset("c4", "en", split="train", streaming=True) # never downloads the whole set
hf = hf.shuffle(buffer_size=10000, seed=42) # the same bounded buffer
.shuffle(10000) stage here; streaming=True on Hugging Face is the same construction. Roughly two hundred lines of tar parsing, decoding, buffering, and worker sharding collapse to a dozen, and the library handles byte-range streaming, retries, and per-worker shard assignment internally.3. Approximate Shuffle With a Bounded Buffer Intermediate
A stream arrives in whatever order the shards were written, which for a corpus assembled from sorted crawls or grouped sources is highly correlated: nearby samples in the stream tend to be similar. Training on that order is harmful, because mini-batch stochastic gradient descent assumes each batch is a roughly independent sample of the data; feed it a long run of near-duplicates and the gradient estimates become biased and the optimization wobbles. With a map-style dataset you would simply permute the index set. With a stream you cannot, so you approximate.
The standard approximation is a shuffle buffer. You keep a reservoir of $B$ samples in memory. To produce the next output sample, you pick a uniformly random slot of the buffer, emit the sample there, and immediately refill that slot with the next sample pulled from the stream. A sample that entered the buffer can leave on the very next step or linger for many steps, so the output order is a randomized version of the input order, with randomness reaching exactly $B$ positions deep. The cost is $B$ samples of memory and nothing else; there is no second pass and no index.
The quality of the shuffle is governed entirely by $B$. If $B = 1$ the buffer is a pass-through and the output equals the input order. As $B$ grows the output decorrelates from the input, and when $B$ reaches the full dataset size the shuffle becomes exact, identical to a map-style permutation. Concretely, a sample at input position $t$ can only be emitted at output positions within a window set by $B$, so the displacement is bounded and the rank correlation between input and output order decays as $B$ grows. We measure exactly this next.
4. Demo: How Buffer Size Sets Shuffle Quality Intermediate
The code below implements the shuffle buffer in pure Python over a perfectly sorted input stream, where the original index equals the value, so any residual order in the output is visible directly. We measure shuffle quality as the Pearson correlation between each emitted sample's original index and its output position: correlation near $1$ means the output still follows the input order (a poor shuffle), and correlation near $0$ means output position is independent of input order (a good shuffle). The peak memory is simply the buffer capacity $B$.
import random
def shuffle_buffer(stream, buffer_size, seed=0):
"""Emit samples from an iterable in approximately shuffled order using B slots."""
rng = random.Random(seed)
buf = []
for item in stream:
if len(buf) < buffer_size: # fill phase: load the reservoir
buf.append(item)
continue
j = rng.randrange(buffer_size) # steady state: emit a random slot ...
yield buf[j]
buf[j] = item # ... and refill it from the stream
rng.shuffle(buf) # drain whatever remains
yield from buf
def order_correlation(order):
"""Pearson corr between original index (the value) and output position."""
n = len(order)
pos = range(n)
mx, my = sum(order) / n, sum(pos) / n
num = sum((order[i] - mx) * (i - my) for i in range(n))
dx = sum((v - mx) ** 2 for v in order) ** 0.5
dy = sum((p - my) ** 2 for p in pos) ** 0.5
return num / (dx * dy)
N = 100_000
print(f"{'buffer_size B':>14} | {'corr(out,in)':>12} | {'peak items':>10}")
print("-" * 44)
for B in [1, 10, 100, 1000, 10000, 100000]:
out = list(shuffle_buffer(iter(range(N)), B, seed=0)) # stream is sorted: index == value
print(f"{B:>14} | {order_correlation(out):>12.4f} | {B:>10}")
range(N) is perfectly sorted, so the correlation between output position and original index isolates exactly how much input order survives for each buffer size $B$. buffer_size B | corr(out,in) | peak items
--------------------------------------------
1 | 1.0000 | 1
10 | 1.0000 | 10
100 | 1.0000 | 100
1000 | 0.9994 | 1000
10000 | 0.9482 | 10000
100000 | -0.0046 | 100000
The numbers make the trade-off concrete. With $B$ in the hundreds, the output is statistically indistinguishable from the sorted input: the buffer is too shallow to break the long-range order, so consecutive batches still draw near-duplicate samples and the independence that mini-batch SGD relies on is violated. The correlation only begins to fall once $B$ reaches roughly a tenth of the dataset and vanishes only when $B$ equals the whole dataset, recovering the exact permutation a map-style loader would give for free. This is the bias/quality trade-off of the shuffle buffer: a larger $B$ buys a better-mixed stream at a memory cost that grows one sample at a time. In practice you do not need $B = N$; you choose $B$ large enough that no single source or duplicate run is longer than the buffer, then lean on shard-order shuffling (shuffling which shards you read, each epoch) to break the remaining correlation cheaply, since reordering whole shards costs no extra memory.
A shuffle buffer is the data pipeline's most honest component. It never pretends to deliver a perfect shuffle; it delivers exactly as much randomness as its capacity allows and not one sample more. When someone reports that their streamed model trained worse than the same model on local disk, the buffer is the usual suspect: set too small, it quietly served the trainer a near-sorted stream while reporting that everything was shuffled. The fix is rarely subtle. Make the buffer bigger, or shuffle the shard order too.
5. Deterministic Sharding, Resumability, and Epochs Advanced
Streaming across many workers raises three correctness questions that local map-style loaders answer for free. First, no sample twice, no sample missed: with $W$ data-loading workers reading the same shard list, each shard must be claimed by exactly one worker, or the epoch silently double-counts some samples and skips others, biasing the gradient. The clean rule is to assign shards by a deterministic function of a global seed and the worker rank, for instance worker $w$ takes shards $\{ \pi(s) : s \equiv w \pmod{W} \}$ under a seeded permutation $\pi$ of shard indices. Because every worker computes the same $\pi$ from the same seed, the partition is consistent without any worker coordinating with another.
Second, a different order each epoch: reusing the identical sample order every epoch lets the model memorize the sequence and weakens generalization. You re-seed the shard permutation and the buffer from a function of (base seed, epoch number) at the start of each epoch, which gives a fresh order that is still fully reproducible from the two integers.
Third, resumability: a training run that loses a worker after twenty hours must resume mid-epoch without replaying samples it already trained on, and without a worker re-reading shards a peer already consumed. Because the order is a deterministic function of (seed, epoch, rank), the loader's position is captured by a small amount of state: which shards remain, and how far into the current shard the cursor sits. Checkpointing that state alongside the model weights lets the run resume the stream exactly where it stopped. This is the data-side half of the elastic, fault-tolerant training story that Chapter 18 develops on the model side; a checkpoint is only correct if both the weights and the data cursor are restored together.
Who: A data infrastructure engineer at a startup pretraining a small language model on a 40-terabyte filtered web corpus.
Situation: The corpus lived in S3 as millions of small per-document JSON files, and the training nodes had 2 terabytes of local NVMe each, far too little to hold the set.
Problem: The first pipeline downloaded documents individually at training time; S3 request overhead throttled throughput so badly that the eight GPUs sat idle most of the time, and a full copy-to-disk was impossible anyway.
Dilemma: Buy nodes with 50 terabytes of local disk each to hold the corpus (expensive, and it grows monthly), or restructure the data so it could be streamed in place from object storage with no local copy.
Decision: They repacked the corpus into 4,000 tar shards of about 10 gigabytes each and switched the loader to WebDataset streaming with a 50,000-sample shuffle buffer and per-epoch shard-order shuffling.
How: A one-time Spark job (Chapter 7) grouped documents into shards; the loader assigned shards to workers by seeded rank, and the data cursor was checkpointed with the model.
Result: Sequential shard reads saturated the network link, GPU utilization rose from under 30 percent to over 90 percent, no local copy was needed, and a mid-epoch crash resumed without replaying or skipping data.
Lesson: The fix for a streaming bottleneck is usually upstream in the data layout, not in the loader. Pack small files into large shards, then stream.
As pretraining corpora pushed past the petabyte mark, the streaming loader itself became a research and engineering frontier. MosaicML's Streaming (the StreamingDataset library) introduced a shard format (MDS) with elastic determinism: any number of nodes can resume the exact same global sample order mid-run, and its shuffle algorithm bounds host-memory use while keeping shuffle quality high across thousands of GPUs. NVIDIA's Energon (part of Megatron-Energon) targets multimodal pretraining, blending many heterogeneous WebDataset sources with reproducible weighting and cross-node deterministic sampling. The Hugging Face datasets streaming path and webdataset itself continue to harden around resumable state and byte-range reads. The common thread is that the loader is now a distributed system in its own right: it must shuffle approximately under a memory budget, divide shards across a changing set of workers, and resume the global order exactly after preemption, all of which this section's primitives (shards, shuffle buffer, deterministic seeded sharding) are the building blocks of. We meet these loaders again as the data backbone of Chapter 19.
With streaming in place, the data can be arbitrarily larger than any node, read in place from object storage, shuffled well enough for SGD, divided cleanly across workers, and resumed exactly after a failure. What we have assumed so far is that the shards already exist in a clean, decode-ready form. Producing them, parsing, filtering, deduplicating, and tokenizing raw data at a scale that itself exceeds one machine, is the distributed-preprocessing problem, and it is where Section 8.7 goes next.
For each scenario, decide whether you would use a map-style or an iterable/streaming dataset, and justify it from the properties in Table 8.6.1: (a) a 5-gigabyte image classification set on a node with 2 terabytes of local NVMe; (b) a 30-terabyte multimodal corpus in S3 trained on nodes with 1 terabyte of local disk; (c) an online system that trains continuously on an unbounded event stream that never ends. Explain specifically what would go wrong if you chose the other shape in each case.
Extend Code 8.6.2 so the input stream is not a single sorted run but $C$ contiguous class blocks (for example $N = 100{,}000$ samples in $C = 100$ blocks of one class each, in block order). For a range of buffer sizes $B$, draw consecutive mini-batches of size 64 from the shuffled output and measure the average number of distinct classes per batch. Plot distinct-classes-per-batch against $B$ and explain why a buffer smaller than one class block leaves nearly every batch single-class, and what that implies for the gradient mini-batch SGD computes.
A corpus has $N = 2 \times 10^{9}$ samples averaging 8 kilobytes each, stored as tar shards. Object storage delivers 1 gigabyte per second per connection but charges a fixed 20-millisecond overhead per object request. (a) Choose a shard size so that request overhead is under 1 percent of the time spent reading each shard, and compute how many shards result. (b) If each sample held in the shuffle buffer costs about 8 kilobytes of host memory, how large can $B$ be on a node with 64 gigabytes of host RAM reserved for buffering, and what fraction of $N$ is that? (c) Argue from part (b) why per-epoch shard-order shuffling is necessary in addition to the sample buffer, rather than relying on the buffer alone.