"They gave me thirty million rows and asked one day's worth. I read all thirty million. Nobody told me the answer was sitting in one file the whole time."
A Full Scan That Could Have Been a Lookup
A distributed table is not an abstract relation; it is a concrete set of files on object storage, and where you put the bytes decides how fast every reader can find them. The previous section gave us columnar files and a lakehouse table format that tracks which files belong to a table. This section is about the layout decision underneath: how to group rows into directories and files so that a query, or a training epoch, touches as few bytes as possible. Two levers do almost all the work. Partitioning sorts rows into directories by a column so a reader can skip whole directories it does not need (partition pruning), and per-file statistics let it skip individual files whose values cannot match (data skipping). Both levers are defeated by the same mistake: splitting the data into too many tiny files, where the per-file open cost swamps any saving. The craft of layout is choosing a partition column, a target file size, and a sort order so that pruning and skipping fire while the file count stays small, and running compaction to keep it that way.
In Section 8.3 we settled on columnar files (Parquet) gathered into a lakehouse table that tracks its own file list and statistics. That gives a reader the ability to read one column without the others, and to consult a manifest instead of listing a directory. What it does not yet give is a reason to read only some of the files. A table stored as one giant heap of files still forces every query and every training loader to scan everything, because nothing tells the reader which files are irrelevant. Physical layout is the missing ingredient: by deciding which rows land in which files, and recording cheap summaries of each file, we let readers prove that most files cannot contain what they want and skip them without opening them. The payoff is large and the failure modes are sharp, so this section treats layout as a first-class engineering decision rather than a default.
The stakes are highest for AI workloads precisely because they read the same table over and over. A SQL analyst runs a query once; a training job sweeps the entire dataset every epoch, for dozens or hundreds of epochs, with many parallel workers each opening files. A layout that wastes a factor of ten on bytes read costs that factor on every epoch and every worker, turning a layout mistake into the dominant line in the training bill. The same partitioning idea that makes a query prune directories is the one that, in the next section, lets each DataLoader worker claim a disjoint set of shards without coordinating with its peers.
1. Partitioning: Sort Rows Into Directories So Readers Can Skip Them Beginner
Partitioning is the oldest and bluntest layout lever: choose a column, and write all rows that share a value of that column into the same directory. A table of training events partitioned by event_date becomes a tree of directories, event_date=2026-01-01/, event_date=2026-01-02/, and so on, each holding the files for that day. The value is encoded in the directory path itself, which is why this scheme is sometimes called Hive-style partitioning after the system that popularized it. The encoding is what makes it cheap: a reader that wants only January 1st does not open a single file from any other day, because it can see from the path alone that those directories are irrelevant. Eliminating whole directories before reading their files is called partition pruning, and it is the single most effective layout optimization there is when the query filters on the partition column.
This is the same partitioning concept introduced abstractly in Section 2.3, where partitioning a key space across nodes let a request go straight to the node that owns its key. Here the "nodes" are directories on object storage and the "request" is a query predicate, but the principle is identical: a good partition function turns a search over everything into a lookup over a known subset. Choose the partition column to match how the data is actually filtered. Event data is almost always filtered by time, so partitioning by date is the canonical choice; a multi-region service that always queries one region at a time might partition by region, or by both (region=eu/event_date=2026-01-01/) to prune along two axes at once.
The danger is the mirror image of the benefit. Partition by a high-cardinality column, say user_id with millions of distinct values, and you create millions of directories, most holding a handful of rows in a tiny file. The pruning still works in principle, but the table has been shattered into so many fragments that listing and opening them costs more than the scan you were trying to avoid. This is over-partitioning, and it is the most common way a well-intentioned layout makes reads slower. The rule of thumb is to partition only on low-to-moderate cardinality columns that appear in filters, and to keep each partition large enough to hold files of a healthy size, the topic of the next subsection.
Partition pruning is free speed when the query filters on the partition column and pays nothing, or costs you dearly, otherwise. A table partitioned by date answers "give me yesterday" by touching one directory and "give me everything for user 42" by touching every directory, because nothing in the path tells it where user 42 lives. You get at most one or two cheap pruning axes, so spend them on the columns your readers filter most. Everything else is handled by the finer-grained, statistics-based skipping we add next, not by adding more partition columns.
2. The Small-Files Problem and the Target File Size Beginner
Object stores and distributed schedulers both have a strong opinion about file size, and it is the opposite of what a naive writer produces. Every file carries fixed overhead independent of its contents: a request round-trip to open it (tens of milliseconds of latency on an object store), metadata to track it in the table manifest, and a unit of work for the scheduler to assign. When files are large, that overhead is amortized over millions of rows and disappears. When files are tiny, the overhead is paid per handful of rows and dominates everything. A dataset stored as one million 1-megabyte files and the same dataset stored as a few thousand 256-megabyte files hold identical bytes, but the first is slower to read by orders of magnitude because the reader spends its life opening files instead of scanning rows. This is the small-files problem, and it is the layout failure that bites distributed AI pipelines hardest, because both object storage and the data loader punish file count directly.
The remedy is to aim for a target file size, typically in the range of one hundred megabytes to a gigabyte, with a few hundred megabytes a common default. Files in this range are large enough that open overhead is negligible against scan time, and small enough that a single worker can read one without blowing its memory budget or becoming a straggler that holds up the others. Writers should be configured to roll over to a new file when the current one reaches the target, and to coalesce small outputs rather than emit one file per input partition. Let $B$ be the total bytes in a table and $s$ the target file size; then the file count is roughly $B / s$, and the per-query fixed cost scales with the number of those files a reader must open. Shrinking $s$ to chase finer pruning increases that count linearly, which is why "more, smaller files" is almost never the right move past the target.
The cost model makes this concrete. If opening a file costs a fixed $c_{\text{open}}$ and scanning a row costs $c_{\text{row}}$, a query that must read $f$ files holding $r$ rows in total takes
$$T \approx f \cdot c_{\text{open}} + r \cdot c_{\text{row}}.$$Pruning and skipping shrink both $f$ and $r$ together by eliminating whole files. Over-partitioning shrinks $r$ per file but multiplies $f$, and once $f \cdot c_{\text{open}}$ exceeds the scan it was meant to save, the layout has made things worse. The demo in Section 4 evaluates exactly this expression on a synthetic table to show both effects with real numbers.
3. Compaction, Clustering, and File Statistics Intermediate
Real tables drift toward small files whether you want them to or not. Streaming ingestion appends a little file every few minutes; many parallel writers each emit their own output; incremental updates leave behind fragments. Left alone, a healthy table slowly degrades into the small-files problem. Compaction is the lakehouse maintenance operation that fixes this: a background job periodically reads the small files in a partition, merges them, and rewrites the data as a smaller number of files at the target size, then atomically swaps the new files for the old in the table manifest. Because the lakehouse table format from Section 8.3 tracks the file list transactionally, readers see either the old layout or the new one, never a half-rewritten mess, and the compaction can run concurrently with reads and writes. Compaction is to a lakehouse table what defragmentation was to a disk: a routine you schedule, not a one-time setup.
While compaction is rewriting files anyway, it is the natural moment to also fix the order of rows within them, and the order is what makes file statistics useful. Every columnar file records cheap per-column statistics, at minimum the minimum and maximum value of each column in that file. A reader can consult these stats and skip any file whose range cannot satisfy the predicate: a query for user_id = 42 opens only files whose recorded [min, max] for user_id straddles 42. This is data skipping, the finer-grained cousin of partition pruning that works on any column with statistics, not just the partition column. Crucially, skipping only helps if values that are queried together also live together. If every file's user_id range is [0, 1\,000\,000], no file can be skipped; if the table is sorted so that each file owns a narrow band, almost all of them can.
Arranging rows so co-queried values are co-located is clustering, and its simplest form is a plain sort order: sort the table by user_id and each file ends up with a tight, non-overlapping range, making min/max skipping devastatingly effective. A plain sort optimizes one column at the expense of the others. When readers filter on several columns, Z-ordering (a space-filling-curve interleaving of multiple columns) gives every clustering column a usefully narrow per-file range at once, so skipping fires whichever of those columns a query happens to filter on. Partition pruning, clustering, and file statistics form a layered defense: pruning eliminates directories, clustering tightens the per-file ranges, and statistics turn those tight ranges into skipped files. Figure 8.4.1 shows the layered structure and the contrast with a shattered, un-compacted table.
[min, max] statistics on a clustered user_id column to open just one file out of four. On the right, the same partition stored well (four files at the target size, few opens) versus shattered into many tiny files, where compaction is the operation that merges the bottom layout back into the top.4. A Cost Model You Can Run Intermediate
The argument so far is quantitative, so let us quantify it. The program below builds a synthetic lakehouse table of thirty million training events, partitioned by date, with a range-clustered user_id column so each file records a non-overlapping [min, max] band. It then prices a single query, "one day, a quarter of the user-id space", under three strategies using the cost model $T \approx f \cdot c_{\text{open}} + r \cdot c_{\text{row}}$ from Section 2: a full scan, partition pruning alone, and pruning plus min/max data skipping. Finally it rebuilds the same bytes as an over-partitioned table with thousands of tiny files per day and reprices the pruned query, isolating the small-files penalty. No external libraries are used; the file descriptors and their statistics are plain Python objects, so the logic of pruning and skipping is visible rather than hidden inside an engine.
import random
random.seed(7)
N_DATES, N_FILES, ROWS_FILE = 30, 4, 250_000 # 30 days, 4 files/day, ~rows/file
OPEN_COST, ROW_COST = 12.0, 4e-6 # ms to open a file; ms to scan a row
def build_layout(files_per_day):
"""Files carry a partition (date) and a clustered user_id [min,max] band."""
rows = (N_DATES * N_FILES * ROWS_FILE) // (N_DATES * files_per_day)
files = []
for d in range(N_DATES):
for f in range(files_per_day): # range-clustered band
lo = f * (1_000_000 // files_per_day)
hi = (f + 1) * (1_000_000 // files_per_day) - 1
files.append(dict(date=d, lo=lo, hi=hi, rows=rows))
return files
def query_cost(files, want_date, uid_lo, uid_hi, use_pruning, use_skipping):
opened = rows_scanned = 0
for fl in files:
if use_pruning and fl["date"] != want_date: # partition pruning
continue
if use_skipping and (fl["hi"] < uid_lo or fl["lo"] > uid_hi): # data skipping
continue
opened += 1
rows_scanned += fl["rows"]
return opened, rows_scanned, opened * OPEN_COST + rows_scanned * ROW_COST
good = build_layout(N_FILES) # compacted: 4 files/day
qd, lo, hi = 17, 250_000, 499_999 # one day, a quarter of the keys
scan = query_cost(good, qd, lo, hi, False, False)
prune = query_cost(good, qd, lo, hi, True, False)
both = query_cost(good, qd, lo, hi, True, True)
print(f"layout: {len(good)} files, {len(good)*ROWS_FILE:,} rows total")
for name, r in [("full scan", scan), ("partition pruning", prune),
("pruning + data skipping", both)]:
print(f"{name:<26}{r[0]:>7}{r[1]:>14,}{r[2]:>11.0f}")
print(f"speedup (both vs scan) : {scan[2]/both[2]:.0f}x")
tiny = build_layout(4000) # over-partitioned: 4000 files/day
t = query_cost(tiny, qd, lo, hi, True, False) # same pruned query
print(f"\nsmall-files layout : {len(tiny):,} files for the same data")
print(f"pruned query opens : {t[0]:,} files, cost {t[2]:.0f} ms")
print(f"open-overhead penalty : {t[2]/prune[2]:.0f}x slower than compacted")
[lo, hi] statistic; query_cost applies partition pruning and min/max data skipping by simply not counting files it can prove are irrelevant, then prices the survivors with the open-plus-scan model.layout: 120 files, 30,000,000 rows total
strategy files rows read cost ms
full scan 120 30,000,000 1560
partition pruning 4 1,000,000 52
pruning + data skipping 1 250,000 13
speedup (both vs scan) : 120x
small-files layout : 120,000 files for the same data
pruned query opens : 4,000 files, cost 48004 ms
open-overhead penalty : 923x slower than compacted
Two lessons fall straight out of Output 8.4.1. First, pruning and skipping compound: directories drop the file count by the number of partitions, and clustering plus statistics drop it again within the surviving partition, so the final read touches a single file instead of the whole table. Second, the small-files penalty is not a rounding error but the dominant cost; the over-partitioned table reads the right rows yet pays nearly a thousand times more because it opens four thousand files to do it. Compaction exists precisely to keep a table on the fast row of that output and off the slow one.
Who: A data platform engineer maintaining the event lakehouse behind a recommendation model's nightly training.
Situation: Clickstream events streamed into a Parquet table partitioned by event_date, with a micro-batch landing a new file every two minutes around the clock.
Problem: Each partition accumulated roughly 700 files of a few megabytes each, and the nightly training loader, sweeping the last 30 days, spent most of its wall-clock opening files rather than reading rows.
Dilemma: Slow the ingestion to write larger files (hurting freshness for the online features), or leave the layout alone and throw more loader workers at the open latency (more cost, same waste).
Decision: Keep ingestion fast and small, but add a daily compaction job that merges each closed day's micro-files into a handful of target-size files and sorts them by user_id so min/max skipping would fire.
How: A scheduled lakehouse maintenance task ran OPTIMIZE with a Z-order on user_id over partitions older than one day, swapping the compacted files into the manifest atomically while readers kept running.
Result: File count per partition fell from about 700 to 5, the nightly loader's file-open time dropped by more than 100x, and per-user evaluation queries gained data skipping for free; freshness was untouched because only sealed partitions were compacted.
Lesson: Fast ingestion and read-friendly layout are not in conflict if you separate them in time: write small for freshness, compact later for reads.
Code 8.4.1 modeled the effect of layout; it did not perform it. In a real lakehouse you never write a compaction loop by hand. The table format from Section 8.3 exposes compaction and clustering as a single maintenance command that reads the small files, rewrites them at the target size, optionally Z-orders the rows, recomputes the file statistics, and commits the new file list transactionally, all while concurrent readers and writers continue:
# Delta Lake on Spark: compact a partition and cluster it for data skipping.
from delta.tables import DeltaTable
dt = DeltaTable.forPath(spark, "s3://lake/events")
(dt.optimize()
.where("event_date >= '2026-01-01'") # only sealed partitions
.executeZOrderBy("user_id")) # bin-pack to target size + cluster
# Apache Iceberg exposes the same operation as a maintenance action:
# CALL catalog.system.rewrite_data_files(table => 'events',
# strategy => 'sort', sort_order => 'user_id')
optimize().executeZOrderBy(...); the engine handles bin-packing files to the target size, recomputing min/max statistics, and the atomic manifest swap that keeps readers consistent.5. Layout Is the Contract Between the Writer and Every Reader Advanced
A layout decision made once at write time is paid back, or punished, on every read for the life of the table. That asymmetry is what makes layout worth careful thought in an AI pipeline: the writer is a single job, but the readers are every training epoch, every evaluation pass, every feature-backfill, each fanned out across many workers. A partition column that matches the dominant filter, a target file size that amortizes open cost, a sort order that tightens min/max ranges, and a compaction schedule that holds the line: these four choices together decide whether the table is a fast lookup or a slow scan for thousands of reads to come. None of them changes a single value in the data; they change only where the bytes sit, which is exactly why they are pure engineering leverage.
This layout also sets up the next section directly. Once a training table is partitioned and compacted into target-size files, those files are the natural unit of sharding: each DataLoader worker can claim a disjoint subset of files and read them independently, no two workers touching the same bytes, no coordination needed beyond the initial assignment. The same partitioning that prunes a query is what lets a distributed loader split an epoch without overlap or omission, the partitioning-and-sharding arc that runs from Section 2.3 through this chapter and on into model sharding later in the book. Good layout is not only about reading less; it is about reading in parallel without stepping on your peers.
Static partitioning forces you to pick the partition column and file size up front, before you know the query and training-access patterns the table will actually face. Recent lakehouse work makes layout adaptive instead. Databricks' liquid clustering (general availability progressing through 2024 to 2025) replaces fixed Hive-style partitioning and rigid Z-order with a clustering scheme that lets the engine re-cluster incrementally as data and access patterns shift, avoiding both over-partitioning and full rewrites. Apache Iceberg's continued evolution of hidden partitioning and partition evolution lets a table change its partition spec without rewriting history, so a layout mistake is no longer permanent. In parallel, a research thread on learned data layout and instance-optimized storage (in the lineage of learned-index and cracking work, extended to columnar lakes through 2024 to 2026) trains lightweight models or uses query feedback to choose clustering keys and file boundaries that minimize bytes scanned for the observed workload. The common direction is the same: treat layout as a continuously optimized, workload-aware decision rather than a one-time guess, which matters precisely because AI training reads the same table thousands of times.
A classic way to discover the small-files problem is to try to delete the table. A pipeline that partitioned by user_id once produced a few million single-row files, and the cleanup job that tried to LIST the prefix before deleting timed out, repeatedly, because listing several million keys on an object store is itself a slow paginated crawl. The data was a few gigabytes; the file count made it un-listable. The fix, ironically, was a compaction job to merge the files down to a few hundred before anything could be done with them at all, including throwing them away.
You are laying out a 5-terabyte table of model-inference logs with columns request_ts (timestamp), region (5 distinct values), model_id (40 distinct values), and request_id (unique per row). Two query patterns dominate: "all requests in one region on one day" and "trace one request_id". Propose a partition scheme (which columns, in which order) and justify it using partition pruning. Explain specifically why partitioning by request_id would be a disaster, and how you would still make the single-request_id trace reasonably fast without partitioning on it.
Extend Code 8.4.1 to sweep files_per_day over a range (for example 1, 4, 16, 64, 256, 1024, 4096) while holding the total bytes fixed, and for each value compute the cost of the pruned-and-skipped query. Plot or print cost against file count and identify the file size at which open overhead starts to dominate scan time. Then change OPEN_COST from 12 ms to 1 ms (a warm local cache instead of cold object storage) and rerun. Explain how the break-even target file size shifts with open latency, and what that implies for choosing a target on fast local NVMe versus on S3.
In Code 8.4.1 the user_id column is perfectly range-clustered, so each file owns a disjoint band and skipping eliminates all but one file. Modify build_layout so each file instead gets a random [min, max] band that spans most of the key space (simulating an unsorted table), and rerun the skipping query. Quantify how many files now survive skipping and why. Use the result to argue, in two or three sentences, why compaction must usually sort or cluster the rows it merges, not merely bin-pack them, for data skipping to be worth its statistics.