"They told me I had near-infinite capacity. They did not mention that every single one of my answers would arrive twenty milliseconds late."
An Object Store With Excellent Aggregate Throughput
A petabyte-scale training corpus does not live on a hard drive; it lives in a storage system that is itself distributed across thousands of machines, and the shape of that system dictates how you must read from it. Two designs dominate. Object storage (Amazon S3, Google Cloud Storage, Azure Blob) is a flat key-to-blob map with effectively unbounded capacity, low cost, and very high aggregate throughput, but it charges a stiff latency toll on every individual request. Parallel POSIX filesystems (Lustre, GPFS, and their cloud cousins) give training jobs low-latency file access at a higher price per byte. The recurring lesson of this section is that object storage rewards a very specific access pattern: a small number of large, sequential reads rather than a flood of small random ones. Get that pattern right and one cheap object store can feed a thousand-GPU cluster; get it wrong and your accelerators starve while waiting on round trips.
Section 8.1 argued that the storage layer, not the accelerator, often sets the ceiling on how large a training job can grow. This section opens the layer and looks inside. The question is concrete: when a dataset is too large for any single disk, where do its bytes physically reside, what does it cost to fetch them, and what access pattern keeps the cost low? The answer shapes everything that follows in this chapter, because the format choices of Section 8.3 and the loader pipelines of later sections all exist to serve one master, the storage system's preference for big sequential reads.
1. Object Storage: A Flat Map From Keys to Blobs Beginner
An object store presents a deceptively simple model. There is no directory tree, no file handle, no notion of appending a byte to the middle of a file. There is a bucket, and inside it a flat set of objects, each named by a string key and holding an opaque blob of bytes plus a little metadata. You interact with exactly four verbs: PUT an object under a key, GET an object by key (optionally a byte range of it), DELETE a key, and LIST keys that share a prefix. The slashes in a key like data/2026/shard-0007.tar are pure convention; the store sees one flat string, and the apparent folders are an illusion that the LIST-by-prefix operation maintains. This flatness is exactly what lets the design scale: with no tree to lock and no parent directories to update, the system can spread keys across thousands of storage nodes by hashing, and capacity grows by adding nodes rather than by enlarging any single one.
That scalability is purchased with two concessions the storage layer of Section 8.1 warned about. The first is latency. Every GET travels through a request gateway that authenticates the caller, looks up which node holds the key, and only then begins streaming bytes, so the first byte arrives after a fixed delay measured in tens of milliseconds regardless of how small the object is. The second, historically, was consistency: for years the major object stores offered only eventual consistency, meaning a freshly written key might not appear in a LIST or might return a stale blob for a short window. Amazon S3 moved to strong read-after-write consistency for all operations in 2020, and the other major providers offer comparable guarantees today, so the modern worry is far smaller, but any pipeline that writes a shard and immediately re-lists the prefix should still know whether its store promises to show that write at once.
A single object GET is slow: a fixed latency floor sits in front of every request no matter how few bytes you ask for. But the store imposes almost no ceiling on how many GETs can run at once, across thousands of parallel streams and thousands of back-end nodes. So the way to extract real bandwidth is never to make one request faster; it is to make each request move many megabytes and to keep many such requests in flight. Latency you hide; aggregate throughput you harvest. Every data-loading decision in this chapter follows from that single asymmetry.
2. The Latency Tax, Measured Intermediate
The asymmetry is easy to assert and easy to underestimate, so we make it quantitative. Model the time to read one object as a latency floor plus the bytes divided by the bandwidth of a single stream. If a request pays a fixed latency $\alpha$ seconds and then streams at $\beta$ bytes per second, reading an object of $b$ bytes costs
$$t_{\text{obj}}(b) = \alpha + \frac{b}{\beta}.$$To read a whole dataset of $D$ bytes as $n$ equal objects, you pay the floor $n$ times, so the total is $t(n) = n\alpha + D/\beta$. The second term is fixed: it is the pure bandwidth cost of moving $D$ bytes and cannot be avoided. The first term, $n\alpha$, is the latency tax, and it is entirely under your control through the single choice of how many objects you split the data into. Cut $n$ by reading larger objects and the tax falls in proportion; balloon $n$ by reading millions of tiny files and the tax can dwarf the data itself. The code below sweeps $n$ across five orders of magnitude for a fixed 8 GiB dataset, using a 20 ms latency floor and a 200 MB/s per-stream bandwidth, both representative of a single object-store connection.
import time
# Model object-store access as a latency + bandwidth pipeline.
# Per request: a fixed latency floor (RTT + auth + lookup), then bytes / bandwidth.
LATENCY_S = 0.020 # 20 ms per-GET latency floor (typical S3 first-byte)
BANDWIDTH_BPS = 200e6 # 200 MB/s sustained per stream after first byte
DATASET_BYTES = 8 * 1024 * 1024 * 1024 # 8 GiB of training shards
def transfer_time(n_objects):
"""Wall-clock to read the whole dataset as n_objects equal GETs, one stream."""
bytes_each = DATASET_BYTES / n_objects
per_obj = LATENCY_S + bytes_each / BANDWIDTH_BPS
return n_objects * per_obj
# Floor: pure bandwidth if latency were zero.
bw_floor = DATASET_BYTES / BANDWIDTH_BPS
print(f"{'objects':>12} {'bytes/obj':>12} {'latency tax':>12} {'total (s)':>10} {'MB/s eff':>9}")
for n in (2_000_000, 200_000, 20_000, 2_000, 200, 64):
t = transfer_time(n)
lat = n * LATENCY_S
eff = DATASET_BYTES / t / 1e6
bpo = DATASET_BYTES / n
bpo_s = f"{bpo/1024:.1f} KiB" if bpo < 1024*1024 else f"{bpo/1024/1024:.1f} MiB"
print(f"{n:>12,} {bpo_s:>12} {lat:>11.1f}s {t:>9.1f} {eff:>9.1f}")
print()
print(f"bandwidth floor (zero latency): {bw_floor:.1f}s, {DATASET_BYTES/bw_floor/1e6:.0f} MB/s")
small = transfer_time(2_000_000)
large = transfer_time(64)
print(f"2M tiny GETs vs 64 large reads : {small/large:.0f}x slower")
transfer_time pays the latency floor once per object and adds the unavoidable bandwidth term; sweeping the object count exposes the latency tax directly, with no network or cloud account required. objects bytes/obj latency tax total (s) MB/s eff
2,000,000 4.2 KiB 40000.0s 40042.9 0.2
200,000 41.9 KiB 4000.0s 4042.9 2.1
20,000 419.4 KiB 400.0s 442.9 19.4
2,000 4.1 MiB 40.0s 82.9 103.6
200 41.0 MiB 4.0s 46.9 183.0
64 128.0 MiB 1.3s 44.2 194.2
bandwidth floor (zero latency): 42.9s, 200 MB/s
2M tiny GETs vs 64 large reads : 905x slower
The lesson is stark. The bottom rows hug the 42.9 second bandwidth floor because their latency tax has shrunk to seconds; the top row is buried under a forty-thousand-second tax that has nothing to do with the data and everything to do with the request count. The same bytes, the same network, the same store: the only thing that changed was whether the data was packed into a few large objects or scattered across millions of small ones. This is why a corpus of ten million individual JPEG files is a performance disaster on an object store, while the same images packed into a few thousand shard files stream at near line rate. The packing is not a micro-optimization; it is the difference between a job that finishes and one that never does.
A reliable way to make a cloud bill spike is to store a training set as one object per training example. The bytes are cheap, but most object stores also charge per request, so two million GETs per epoch is two million billable operations, and at a few dollars per million requests an otherwise free dataset quietly grows a monthly serving cost. The tiny-file anti-pattern loses you throughput, latency, and money in a single stroke; consolidating into shards fixes all three at once.
3. Why Training Reads in Large Sequential Chunks Intermediate
Output 8.2.1 explains a design choice that pervades every modern training pipeline: data is stored and read in large, sequential shards, not as many small random objects. A training epoch must visit every example, ideally in a shuffled order, and the naive way to shuffle is to fetch examples individually in random key order. On an object store that is the two-million-GET catastrophe from the top row. The standard fix is to pack thousands of examples into each shard (a tar file in the WebDataset convention, or a Parquet file in the lakehouse convention of Section 8.3), read whole shards sequentially with a handful of large GETs, and recover randomness by shuffling the order of shards across workers and shuffling examples within a buffer in memory. The store sees big sequential reads; the model still sees a well-mixed stream. Randomness moves from the storage layer, where it is ruinously expensive, to the loader's memory, where it is nearly free.
This is the same partitioning-and-sharding idea introduced as a distributed-systems concept in Chapter 2, now applied to bytes on disk: the dataset is split into shards, the shards are assigned to workers, and each worker streams its assigned shards independently and in parallel. Because the workers do not coordinate to read, aggregate bandwidth scales with their number, exactly the parallel-clients picture on the right of Figure 8.2.1. The MapReduce input splits of Chapter 6 and the Spark partitions of Chapter 7 are the same construct seen from the compute side; a shard is just an input split that a data loader, rather than a mapper, will consume.
Code 8.2.1 modeled object-store timing; reading real objects from Python is handled by fsspec and its S3 backend s3fs, which expose a bucket through an ordinary file-like API and add transparent read-ahead, byte-range GETs, and local caching. What would be dozens of lines of boto3 pagination and multipart bookkeeping becomes a path string:
import fsspec
# Open a 128 MiB shard straight from S3 as if it were a local file.
fs = fsspec.filesystem("s3", anon=False)
for key in fs.glob("s3://my-bucket/data/shard-*.tar"): # LIST by prefix
with fs.open(key, "rb", block_size=8 * 1024 * 1024) as f:
chunk = f.read() # one large sequential GET
# ... hand `chunk` to the loader / tar reader ...
fsspec/s3fs. The block_size argument controls read-ahead so each underlying GET moves megabytes, not kilobytes; the library handles authentication, retries, and multipart transport that raw boto3 would force you to write by hand.4. Parallel Filesystems: Low-Latency POSIX at a Price Advanced
Object storage is not the only home for AI data. High-performance computing centers, and the large GPU clusters that increasingly resemble them, often place training data on a parallel POSIX filesystem such as Lustre or IBM Spectrum Scale (GPFS), with the older Hadoop Distributed File System (HDFS) playing the same role for an earlier generation of big-data workloads. These systems present a real directory tree with real file handles, so any code that expects open() and read() works unchanged, and crucially they deliver a per-operation latency in the hundreds of microseconds rather than tens of milliseconds, two orders of magnitude below an object store. They achieve this by separating a metadata server, which resolves paths and permissions, from many data servers across which each file is striped, so a single large file is read in parallel from many disks at once (the right side of Figure 8.2.1). For a workload that must do small random reads, or that runs unmodified POSIX code, this low latency is decisive.
The price is literal and architectural. Parallel filesystems cost substantially more per terabyte than object storage, scale capacity less gracefully (the metadata server can become a bottleneck under millions of small files), and demand more operational care. The common pattern in practice is therefore a two-tier arrangement: the durable, cheap, near-infinite copy of the dataset lives in object storage, and a fast parallel filesystem or local NVMe acts as a cache that holds the working set for the current training run. Table 8.2.1 lays the two designs side by side so the choice rests on which property actually binds for a given job.
| Property | Object storage (S3 / GCS / Blob) | Parallel filesystem (Lustre / GPFS) |
|---|---|---|
| Access model | GET/PUT/LIST by key, byte ranges | Full POSIX: open, read, seek, append |
| Namespace | Flat key-to-blob map | Directory tree with metadata server |
| Per-request latency | Tens of milliseconds | Hundreds of microseconds |
| Capacity scaling | Effectively unbounded, add nodes | Large but bounded; metadata can bottleneck |
| Cost per terabyte | Lowest | Several times higher |
| Best access pattern | Few large sequential reads | Tolerates small random reads |
| Typical role | Durable system of record | Hot cache for the active run |
5. How the Bytes Survive: Replication and Erasure Coding Intermediate
A storage system spread across thousands of disks is, at any moment, a system in which some disk has just failed. Durability cannot rest on any single copy, so distributed stores keep redundant encodings, and the two main strategies trade storage overhead against repair cost in a way worth understanding. The simpler is replication, the same recovery mechanism introduced for distributed systems in Section 2.3: keep $r$ identical copies of each object on $r$ distinct machines, and the data survives as long as any one copy remains. Three-way replication ($r = 3$), long the HDFS default, tolerates two simultaneous failures but stores three bytes for every useful byte, a 200% overhead.
Erasure coding achieves comparable durability at a fraction of that overhead. Split an object into $k$ data fragments, compute $m$ parity fragments with a Reed-Solomon code, and scatter all $k + m$ across distinct machines; any $k$ of the $k + m$ fragments suffice to reconstruct the object, so the system tolerates $m$ failures while storing only a $(k + m)/k$ multiple of the data. A common cloud configuration, $k = 10$ data and $m = 4$ parity fragments, tolerates four failures at just 40% storage overhead, far cheaper than the 200% of triple replication for a stronger failure tolerance. The catch is repair cost: rebuilding one lost fragment requires reading $k$ surviving fragments and recomputing, whereas a lost replica is restored by copying one intact replica, so erasure coding favors cold, rarely-read archival data while replication suits hot data that is reconstructed often. We can state the durability comparison precisely:
$$\text{overhead}_{\text{rep}} = r, \qquad \text{overhead}_{\text{ec}} = \frac{k + m}{k}, \qquad \text{both tolerate up to } (r-1) \text{ or } m \text{ losses respectively.}$$For a training pipeline the practical consequence is reassuring: the object store you read from has already made these durability choices for you, typically erasure coding under the hood, and presents a single reliable GET. Your job is not to replicate the data yourself but to read it in the large sequential chunks that the encoding and the latency model both reward.
Who: A computer-vision team training a large image classifier on a 64-GPU cluster.
Situation: A 40-million-image dataset sat in an S3 bucket as one JPEG object per image, the way it had arrived from the annotation vendor.
Problem: The GPUs sat at under 15% utilization; profiling showed every worker blocked on object-store GETs, each fetching a single small image.
Dilemma: Move the whole dataset onto an expensive Lustre filesystem to cut per-request latency, or restructure how the data was packed and leave it on cheap object storage.
Decision: They repacked rather than re-homed, because the access pattern, not the storage tier, was the true fault, and Output 8.2.1 made the size effect unambiguous.
How: A one-time Spark job grouped the images into 4,000 shards of roughly 10,000 images each (about 100 MiB per shard) in the WebDataset tar layout, written back to the same bucket; the loader switched to streaming whole shards with shuffled order.
Result: Per-request latency stopped dominating, effective read throughput rose by more than an order of magnitude, GPU utilization climbed past 90%, and the storage bill fell because per-request charges collapsed from 40 million to a few thousand per epoch.
Lesson: When an object store starves your accelerators, suspect the object size before the storage tier. Repacking small files into large shards is cheaper than buying a faster filesystem and usually fixes the same symptom.
6. Choosing the Layer for the Job Beginner
The decision tree is short. If the dataset is enormous, append-mostly, read in large sequential passes, and cost-sensitive, which describes the vast majority of training corpora, object storage is the right system of record, and the work lies in packing the data into shards and hiding latency with prefetching. If the workload genuinely needs small random reads or runs unmodified POSIX code, or if the active working set is small enough to cache, a parallel filesystem or local NVMe earns its higher price as a hot tier in front of the durable object store. The honest answer is usually both, layered, with object storage as the foundation. The sharded, large-read access pattern that object storage demands is also what the columnar file formats of Section 8.3 are engineered to provide, and the prefetching loaders that hide the remaining latency are the subject of the sections that follow it. The storage layer sets the rules; the rest of the chapter learns to play by them.
As clusters grow past thousands of accelerators, the object-store-to-GPU path has become an active systems frontier. The Mountpoint for Amazon S3 client (AWS, 2024) presents a bucket as a POSIX mount tuned for the large sequential reads of training, narrowing the gap between object and file semantics without a separate filesystem. NVIDIA's GPUDirect Storage and the Storage-Next direction push reads from the store into GPU memory while bypassing the CPU and host buffers entirely, attacking the copy overhead that the bandwidth term of Code 8.2.1 hides. On the data side, the WebDataset and MosaicML Streaming (2023 to 2025) formats standardize shard layouts and deterministic, resumable shuffling directly over object storage, so a job can restart mid-epoch without re-reading from the start. The shared thread is unchanged from this section's model: every advance is a new way to feed the accelerator large sequential reads while hiding the latency floor, the quantity Output 8.2.1 makes concrete.
Using the model $t(n) = n\alpha + D/\beta$ behind Code 8.2.1, explain in words why the bottom three rows of Output 8.2.1 cluster near 44 seconds while the top row explodes to over 40,000. At what object count does the latency tax $n\alpha$ first exceed the fixed bandwidth term $D/\beta = 42.9$ s, and what does that crossover tell a pipeline designer about the minimum sensible shard size? State the rule of thumb you would give a teammate in one sentence.
Extend Code 8.2.1 with a function repack_time(n_small, examples_per_shard) that models the one-time cost of reading n_small tiny objects once (paying the full latency tax) and writing them back as consolidated shards, then the per-epoch cost of reading those shards. For a dataset of two million 4 KiB examples, compute how many epochs of training are needed before the repack pays for itself versus reading the tiny files every epoch. Plot or print the break-even epoch count and explain why repacking is almost always worth it for any job that trains for more than one pass.
A team must store a 2 PB durable copy of a training corpus that tolerates up to four simultaneous machine failures. Compare three-way replication ($r = 3$, which tolerates two failures, so consider $r = 5$ for four) against a Reed-Solomon $(k = 10, m = 4)$ erasure code using the overhead formulas in Section 5. Compute the raw stored bytes and the relative cost of each, then argue which you would choose given that this archive is read in full only a few times per quarter. Reference the repair-cost trade-off, and connect your reasoning to the durability mechanisms of Section 2.3.