Part II: Distributed Data Processing for AI
Chapter 8: Distributed Storage and Data Loading

Columnar Formats and the Lakehouse

"They asked for two columns. I, a humble row, handed over all twenty and apologized for the wait."

A Record That Reads Everything to Answer Anything
Big Picture

The bytes you scan, not the rows you keep, decide what an analytical or feature query costs at scale, so the layout of data on disk is a first-class systems decision. The previous section put the dataset on cheap object storage as opaque blobs. This section gives those blobs a shape. Columnar formats (Parquet, ORC) store each column contiguously, so a query reads only the columns it needs and compresses each column far better than a row store ever could. Apache Arrow carries that same column-at-a-time layout in memory, letting independent systems share data without re-serializing it. On top of both sits the lakehouse: a transactional table layer (Delta Lake, Apache Iceberg, Hudi) that adds ACID commits, schema evolution, time travel, and efficient upserts to object storage. For AI, the payoff is concrete: feature reads that touch a tenth of the bytes, and training tables you can pin to an exact version and reproduce months later.

A training pipeline and an analytics pipeline ask the same physical question of storage thousands of times a day: read these few attributes from these many records. A churn-feature job wants two columns out of forty; a label-distribution check wants one column out of two hundred. How the bytes are arranged on the object store decides whether that query reads what it asked for or drags the entire dataset off disk to discard most of it. Section 8.2 settled where the data lives (object storage, distributed filesystems). This section settles how it is encoded there, and that single choice moves analytical and feature-read cost by an order of magnitude.

1. Row Layout Versus Columnar Layout Beginner

A table is a logical grid, but disk is one-dimensional, so the grid must be flattened into a byte sequence, and there are two natural ways to do it. A row store writes record 1 in full (all its columns), then record 2 in full, and so on; the fields of one row sit physically adjacent. This is ideal when you fetch whole records by key, which is why transactional databases default to it. A columnar store does the opposite: it writes all values of column 1 together, then all values of column 2, and so on; the values of one column sit physically adjacent. Parquet and ORC are the dominant columnar formats on object storage, and the layout difference is the entire story of why analytical scans are cheap on them.

Two consequences follow directly from putting a column's values next to each other. The first is column pruning: a query that needs two of twenty columns reads only those two contiguous regions and skips the rest, so bytes scanned fall in proportion to columns touched, not rows kept. The second is compression: values within one column share a type and a distribution, and once similar values are adjacent, dictionary encoding, run-length encoding, and bit packing all become dramatically effective, far more than they ever are on the mixed-type byte soup of a row. Figure 8.3.1 shows both layouts side by side and marks the bytes a two-column query must read in each.

Row store: rows are contiguous read order goes row by row A1 B1 C1 D1 A2 B2 C2 D2 A3 B3 C3 D3 A4 B4 C4 D4 to read columns A and C, the reader still streams past B and D in every row Columnar store: columns are contiguous read order goes column by column A1 A2 A3 A4 col A B1 B2 B3 B4 col B C1 C2 C3 C4 col C D1 D2 D3 D4 col D to read A and C, the reader seeks two runs and never touches B or D at all shaded = bytes a query for columns A and C must read
Figure 8.3.1: The same four-by-four table flattened two ways. In the row store (left) the fields of each row are adjacent, so a query for columns A and C still streams past B and D in every record. In the columnar store (right) each column's values are adjacent, so the same query reads two contiguous runs and skips the other two columns entirely. Shaded cells are the bytes the query must read; the unshaded cells are what columnar pruning saves.
Key Insight: Cost Tracks Bytes Scanned, and Layout Decides Bytes Scanned

An analytical or feature query is paid for in bytes read off storage, not rows in the table. A columnar layout lets the reader scan only the columns the query names and only after compression has shrunk each of them. For a feature job that selects a handful of attributes from a wide table, this is the difference between reading the whole dataset and reading a tenth of it. The table did not get smaller; the query learned to touch less of it.

2. Measuring the Two Wins: Pruning and Compression Intermediate

The two benefits are easy to state and easy to measure, so we measure them. The code below builds a 200,000-row table of twenty columns: nineteen noisy float columns plus one low-cardinality categorical column (a region code with five distinct values), arranged in clustered runs the way a real table sorted on a key would be. It then computes, from an explicit byte model, how much a row store must read for a full scan versus how much a columnar store reads when a query asks for just two of the twenty columns. It measures the columnar compression ratio on the clustered categorical column. Finally, when pyarrow is installed, it writes a real Parquet file and reads back the compressed size of only the two requested column chunks, so the from-scratch model can be checked against a production encoder.

import io, random

random.seed(7)
N_ROWS, N_COLS, READ_COLS = 200_000, 20, 2     # a query touching 2 of 20 columns

cols = {f"f{c}": [random.gauss(0, 1) for _ in range(N_ROWS)] for c in range(N_COLS - 1)}
categories = ["US", "EU", "APAC", "LATAM", "MEA"]
region = []                                     # low-cardinality key in clustered runs
while len(region) < N_ROWS:
    region.extend([categories[random.randint(0, 4)]] * random.randint(200, 2000))
cols["region"] = region[:N_ROWS]

BF, BC = 8, 8                                   # bytes per float, per category slot
row_bytes = (N_COLS - 1) * BF + BC              # a full row record
row_store_full_scan = row_bytes * N_ROWS       # row store: a query must read every row

want = ["f0", "region"]                         # columnar store reads only these two
columnar_pruned_scan = BF * N_ROWS + BC * N_ROWS
print("row-store full read (MB)   :", f"{row_store_full_scan / 1e6:.1f}")
print("columnar pruned read (MB)  :", f"{columnar_pruned_scan / 1e6:.1f}")
print("bytes-scanned reduction    :", f"{row_store_full_scan / columnar_pruned_scan:.1f}x")

# Compression: similar values adjacent => dictionary + run-length collapses the column.
raw = BC * N_ROWS
codes = [categories.index(v) for v in cols["region"]]
runs = 1 + sum(codes[i] != codes[i - 1] for i in range(1, len(codes)))
rle = sum(len(s) for s in categories) + 5 + runs * 5   # tiny dict + (code, run-length)
print("region raw (MB)            :", f"{raw / 1e6:.2f}")
print("region dict+RLE (MB)       :", f"{rle / 1e6:.2f}")
print("columnar compression ratio :", f"{raw / rle:.1f}x")

try:                                            # cross-check against a real Parquet writer
    import pyarrow as pa, pyarrow.parquet as pq
    table = pa.table(cols); buf = io.BytesIO()
    pq.write_table(table, buf, compression="snappy")
    md = pq.ParquetFile(io.BytesIO(buf.getvalue())).metadata
    idx = {table.schema.field(i).name: i for i in range(table.num_columns)}
    pruned = sum(md.row_group(g).column(idx[n]).total_compressed_size
                 for g in range(md.num_row_groups) for n in want)
    print("parquet total (MB)         :", f"{buf.getbuffer().nbytes / 1e6:.2f}")
    print("parquet 2-col chunks (MB)  :", f"{pruned / 1e6:.2f}")
    print("parquet column-prune ratio :", f"{buf.getbuffer().nbytes / pruned:.1f}x")
except Exception as e:
    print("pyarrow not available:", type(e).__name__)
Code 8.3.1: A byte-level model of column pruning and columnar compression, cross-checked against a real Parquet file written by pyarrow. The model reads only the two requested columns; the Parquet cross-check sums the compressed size of just those two column chunks.
row-store full read (MB)   : 32.0
columnar pruned read (MB)  : 3.2
bytes-scanned reduction    : 10.0x
region raw (MB)            : 1.60
region dict+RLE (MB)       : 0.00
columnar compression ratio : 2075.2x
parquet total (MB)         : 35.57
parquet 2-col chunks (MB)  : 1.87
parquet column-prune ratio : 19.0x
Output 8.3.1: Reading two of twenty columns cuts the scan by $10\times$ in the byte model, and the real Parquet file does better still at $19\times$ because each column was also compressed. The clustered region column collapses by over three orders of magnitude under run-length encoding, the gain a row store can never realize because its bytes are never adjacent.

The numbers separate the two effects cleanly. Column pruning alone gives the $10\times$ scan reduction from the byte model: two columns instead of twenty, with no compression assumed. The real Parquet file reaches $19\times$ because pruning and per-column compression stack, and the clustered categorical column shows why compression is a columnar privilege, collapsing by more than $2000\times$ once its repeated values lie next to one another. A row store cannot reach that ratio on the same column, because in a row layout each region value is wedged between nineteen unrelated floats and never forms a run. Real workloads sit between these extremes, but the direction is invariant: the wider the table and the more selective the query, the more a columnar layout wins.

Fun Note: The Column That Compressed Itself Into Almost Nothing

A region column of 200,000 values shrank to under a kilobyte because the data was sorted and the same five labels arrived in long runs. The lesson cuts both ways: clustered data compresses spectacularly, and shuffled data does not. This is the entire reason that sorting a table on a low-cardinality key before writing it can shrink the file more than picking a fancier codec, a layout trick Section 8.4 turns into a deliberate compaction strategy.

Column pruning is not a trick the storage layer pulls off alone; the query engine has to know which columns the query actually needs and push that knowledge down to the reader. This is exactly the projection pushdown that Spark's Catalyst optimizer performs: it inspects the logical plan, determines the minimal set of columns required, and instructs the Parquet reader to skip the rest before a single irrelevant byte leaves the object store. The columnar format and the optimizer are partners; we built the optimizer side of this in Section 7.3, and Parquet is the storage side that makes its pruning pay off.

Library Shortcut: pyarrow Prunes Columns in One Argument

Code 8.3.1 modeled pruning by hand to expose the byte accounting. In practice you never compute byte offsets yourself; you name the columns you want and the Parquet reader skips everything else, fetching only the requested column chunks from the object store:

import pyarrow.parquet as pq

# Read ONLY two columns from a wide Parquet file. The reader fetches just those
# column chunks; the other eighteen columns are never decoded or transferred.
table = pq.read_table("features.parquet", columns=["f0", "region"])

# Push a predicate down too: rows are skipped using per-row-group min/max stats,
# so whole chunks are never read when they cannot match the filter.
table = pq.read_table("features.parquet", columns=["f0", "region"],
                      filters=[("region", "=", "EU")])
Code 8.3.2: The columns= argument is column pruning and filters= is predicate pushdown, the two optimizations the manual model in Code 8.3.1 spelled out. The roughly twenty lines of byte bookkeeping collapse to two calls, and pyarrow handles row-group statistics, chunk seeking, and decompression internally.

3. Apache Arrow: The Columnar Layout, In Memory Intermediate

Parquet is a columnar layout on disk, optimized for compact storage and fast scans. Apache Arrow is the same columnar idea in memory, optimized for fast processing and, above all, for sharing. Arrow defines a standard in-memory layout for columnar data: a column is a contiguous buffer of values plus a validity bitmap, with a byte specification precise enough that any system implementing it lays the bytes out identically. That shared specification is what makes Arrow valuable. When two systems both speak Arrow, one can hand a table to the other by passing a pointer to the buffers, with no serialization, no copy, and no per-row translation. This is the zero-copy exchange that lets a Parquet reader, a query engine, a Python dataframe, and a GPU library move data between them at memory bandwidth rather than parsing speed.

For AI pipelines the consequence is direct. A feature-engineering step in Spark can produce Arrow batches that a training data loader consumes without a serialize-then-deserialize round trip, and a vectorized user-defined function can run Python code over Arrow batches at near-native speed because the data arrives in the layout NumPy and pandas already use. That vectorized-UDF path, where Spark ships Arrow batches to Python workers instead of pickling row by row, is exactly the mechanism we examined in Section 7.8; Arrow is the wire format and the in-memory format that makes it fast. The throughline is that columnar is not only a disk decision: it is the shape data should keep as it crosses every boundary in the pipeline.

Fun Note: A Standard So Boring It Became Universal

Arrow's superpower is that it is deliberately unexciting: a frozen, exhaustively specified byte layout that no single system owns. The payoff arrives precisely because it refuses to be clever. Twenty libraries agreeing to put a column's bytes in the same order is worth more than any one of them inventing a faster encoding nobody else can read, because agreement is what turns a copy into a pointer pass.

4. The Lakehouse: A Transactional Table Over Object Storage Intermediate

Columnar files on object storage are fast to scan but, by themselves, a loose pile of immutable blobs. They have no notion of a table version, no atomic way to add or replace rows, and no protection against a reader seeing a half-written update. The data lake gave us cheap, scalable storage and surrendered the guarantees a warehouse always had. The lakehouse closes that gap: it is a transactional table layer placed over the same columnar files on the same object storage, adding the warehouse's guarantees without giving up the lake's cost and scale. Delta Lake, Apache Iceberg, and Apache Hudi are the three dominant implementations, and although they differ in detail they share one mechanism: a metadata log of commits that defines, at every point in time, exactly which data files constitute the table.

That commit log is what turns a directory of Parquet files into a table with real semantics. ACID transactions mean a writer's changes appear all at once or not at all, so a reader never sees a partial commit even while a write is in flight. Schema evolution means columns can be added, renamed, or retyped through metadata, with old files reinterpreted under the new schema rather than rewritten. Time travel means every commit is a queryable version, so you can read the table exactly as it stood at a past commit or timestamp. Efficient upserts (merge, update, delete) mean you can change a few rows without rewriting the dataset, by writing new files and recording in the log which old files they supersede. Figure 8.3.2 shows the layering: immutable columnar files at the bottom, a metadata commit log defining table versions in the middle, and the engines that read and write through that log on top.

Compute engines (read and write through the log) Spark / SQL engine Training data loader Batch / stream writer Lakehouse metadata commit log (Delta / Iceberg / Hudi): each version pins a file set v1files {a, b} v2 (upsert)files {a, c} v3 (schema +col)files {a, c, d} Object storage: immutable columnar (Parquet) data files a.parquet b.parquet c.parquet d.parquet never mutated in place
Figure 8.3.2: The lakehouse as three layers. At the bottom, immutable Parquet files sit on object storage and are never edited in place. In the middle, a metadata commit log records versions, where each version (v1, v2, v3) pins the exact set of files that make up the table at that point; an upsert writes new files and commits a new version that supersedes the old ones, and a schema change is recorded as metadata. At the top, engines read and write only through the log, so every reader sees a consistent, versioned table.

The mechanism behind every one of these features is the same idea this chapter keeps returning to. The data files are immutable shards on object storage; the table is the set of shards a given commit points to. An upsert does not edit a file in place, an operation object stores do not support well anyway; it writes new shards and atomically commits a new version of the file set. Time travel is just reading the file set named by an older commit. This is partitioning and sharding wearing a transactional hat, and it is the same arc that began as data shards in this chapter and returns as sharded parameters and model shards later in the book.

Thesis Thread: Sharding Returns as a Versioned, Transactional Table

Chapter 2 introduced partitioning and sharding as a coordination concept, and Section 8.2 made it physical with data shards on object storage. The lakehouse adds a thin, powerful layer on top: a metadata log that names which shards constitute the table at each version. Nothing about the shards changed; what changed is that an atomic pointer swap over a set of immutable shards now buys ACID commits, time travel, and upserts. The very same sharding idea reappears as sharded parameter tables in Chapter 11 and as model and index shards in Chapter 15 and Chapter 25. Once you see "a table is a committed set of shards," you have seen the structural pattern that organizes distributed data, models, and indexes alike.

5. Why This Matters for AI: Reproducible, Versioned Training Tables Advanced

A model is only as reproducible as the data it was trained on, and "the data" on a plain data lake is a moving target: files get appended, corrected, and backfilled, so the dataset you trained on last month may no longer exist in the same form. The lakehouse fixes this by making the training set an explicit, immutable version. You pin training to a specific table commit, record that version id alongside the model checkpoint, and months later you can read the table exactly as it stood, re-run the pipeline, and get a dataset identical to the original byte for byte. Time travel turns "which data produced this model?" from an archaeology problem into a single version number, and it does so on cheap object storage rather than an expensive warehouse. This data-versioning story is foundational enough that Section 8.9 develops lineage and reproducible dataset versioning in full.

The efficient-upsert capability matters just as much for the daily reality of feature pipelines. Training and feature tables are rarely static; labels get corrected, late-arriving events backfill yesterday's partition, and a feature definition changes and must be recomputed for a subset of entities. On raw Parquet, any of these forces a full rewrite of the affected partitions. On a lakehouse table, a MERGE updates only the rows that changed by writing new files and retiring superseded ones, so a correction touches kilobytes of metadata and a handful of files rather than the whole dataset. The same transactional table that ingests a stream of corrections is also the one a training job reads from at a pinned version, which is why the lakehouse sits at the seam between batch and streaming ingestion that Chapter 9 takes up.

Practical Example: The Feature Table That Could Reproduce Any Model

Who: A data platform engineer at a fintech company supporting a fraud-model team.

Situation: Features for millions of accounts lived as daily Parquet dumps on object storage, regenerated nightly by overwriting the previous day's files.

Problem: When a model misbehaved in production, the team could not reconstruct the exact feature values it had been trained on, because the underlying files had since been overwritten by later runs.

Dilemma: Copy the entire feature set into a dated snapshot every day, simple but storage-heavy and still without atomic updates, or move to a transactional table layer that versions the data in place and supports row-level corrections.

Decision: They converted the feature directory into a Delta Lake table, leaving the Parquet files where they were and adding only the commit log on top.

How: Nightly jobs switched from overwrite to MERGE, so corrections and backfills updated only changed rows; each training run recorded the table version it read, and time travel let any past version be queried on demand.

Result: Reproducing a model's training data became a one-line read at a pinned version, backfills stopped rewriting whole partitions, and storage grew slowly because only changed files were added rather than full daily copies.

Lesson: A transactional table layer over the same object storage buys reproducibility and cheap corrections at once, turning "which bytes trained this model?" into a recorded version number.

Research Frontier: Open Table Formats and ML-Native Columnar Layouts (2024 to 2026)

Two currents are reshaping this layer. The first is consolidation around open table formats: Apache Iceberg has become a de facto interoperability standard that multiple engines and managed catalogs read and write, and the 2024 to 2025 wave of catalog services (the open-sourcing of efforts like Unity Catalog and Polaris, and broad vendor support for Iceberg's REST catalog) is pushing the lakehouse toward a single table layer many engines share rather than one vendor's silo. The second current is columnar formats designed for machine learning rather than analytics. Lance is a columnar format built for fast random access and vector data, targeting the point-lookup and embedding-retrieval patterns that Parquet's scan-optimized layout handles poorly, and Nimble (from Meta) revisits the on-disk encoding for wide, deeply nested feature tables. The open question these share: Parquet was designed for scan-heavy SQL analytics, and AI workloads (random-access feature lookups, shuffled training reads, vector columns) stress different access patterns, so the format that wins for training data may not be the one that won for the warehouse. We meet the vector-retrieval side of this story in Chapter 25.

Columnar encoding decides what a scan costs, Arrow decides what crossing a system boundary costs, and the lakehouse decides what a table version costs, and together they turn cheap object storage into a reproducible, queryable, transactional foundation for training and feature data. What none of them decides yet is how the files are arranged within a table: how rows are grouped into partitions, how small files are compacted into large ones, and how data is sorted so that pruning and predicate pushdown actually skip the bytes they could. That layout question is where the costs in this section are won or lost in practice, and it is the subject of Section 8.4.

Exercise 8.3.1: When Columnar Loses Conceptual

Columnar layout wins decisively for analytical scans that touch few columns of many rows. Describe two access patterns where a row store beats a columnar store, and explain each in terms of bytes scanned and the cost of reassembling a record. Specifically address: (a) an online service that fetches a single complete user record by primary key thousands of times per second, and (b) a workload that inserts one row at a time with strict latency. For each, state which physical property of the columnar layout (column contiguity, per-column compression, large immutable files) works against the pattern.

Exercise 8.3.2: Measure Pruning and Compression on Your Own Schema Coding

Adapt Code 8.3.1 to a wide table that mixes column types: several high-cardinality float columns, one near-constant boolean column, one timestamp column that increases monotonically, and one shuffled (not clustered) low-cardinality string column. Write it to Parquet with pyarrow and report, per column, the compressed-to-raw ratio. Then sort the table on the low-cardinality string column, write it again, and re-measure. Explain why the monotonic timestamp and the near-constant boolean compress well even unsorted, why the shuffled string compresses poorly, and how sorting changes its ratio. Finally, compute the column-prune ratio for a query that selects only the boolean and timestamp columns.

Exercise 8.3.3: The Cost of an Upsert Analysis

A lakehouse table holds $10^9$ rows spread across 10,000 Parquet files of roughly equal size. A nightly correction updates 50,000 rows that happen to be scattered across 2,000 of those files. Under copy-on-write semantics (a file containing any changed row is rewritten in full), estimate how many bytes the upsert rewrites and compare it to rewriting the entire table. Now consider merge-on-read semantics (changed rows are written as small delete-and-insert deltas, merged at read time): argue qualitatively how it shifts cost from write time to read time, and state one workload where copy-on-write is the better choice and one where merge-on-read is. Connect your reasoning to the file-count and file-size trade-offs that Section 8.4 formalizes.