Part II: Distributed Data Processing for AI
Chapter 7: Spark and Distributed DataFrames

DataFrames and Spark SQL

"I used to micromanage every map and filter by hand. Then the optimizer read my query, rearranged my whole life, and somehow I ran twice as fast doing half the work."

An RDD That Learned to Let Go
Big Picture

When you describe a computation as a declarative query over named, typed columns instead of a hand-written sequence of transformations, you hand the engine enough structure to rewrite your plan, and a good rewrite beats almost any code you would have written by hand. Section 7.2 gave us Resilient Distributed Datasets, where you spell out every map and filter in order and the cluster does exactly what you said. DataFrames invert that contract: you say what result you want, and Spark's Catalyst optimizer decides how to get it, pushing filters down to the data source, dropping columns nobody reads, and reordering joins before a single byte moves. This section shows why that structure is the source of Spark's speed, builds a miniature optimizer to see the rewrites happen on real numbers, and connects the structured table abstraction to the columnar storage of Chapter 8 and the feature tables that feed every model in this book.

The Resilient Distributed Dataset of Section 7.2 is a powerful and honest abstraction: a partitioned collection of arbitrary Python or Scala objects, transformed by functions you supply. Its honesty is also its limitation. Because the elements are opaque objects and the transformations are opaque closures, the engine cannot see inside them. It cannot know that your map only ever reads two of the twenty fields in each record, that your filter would eliminate ninety percent of the rows if it ran first, or that two of your joins commute. It must run your steps in the order you wrote them, materializing whatever you told it to materialize. The RDD does what you say, which means the quality of the plan is entirely on you.

The DataFrame changes the abstraction in one decisive way: the data now has a schema. A DataFrame is a distributed table of rows with named, typed columns, exactly the shape of the tabular features that classical models in Chapter 12 consume. Once Spark knows the columns and their types, and once your computation is expressed as relational operations over those columns (select, filter, join, group) rather than as opaque closures, the engine can read, analyze, and rewrite your query before running it. That rewrite engine is Catalyst, and the structure is what makes it possible.

Declarative query DataFrame / SQL Logical plan Catalyst optimizer predicate pushdown column pruning join reordering constant folding cost-based choices Physical plan Tungsten codegen compiled JVM bytecode
Figure 7.3.1: The structured-query pipeline. A declarative DataFrame or SQL query becomes a logical plan; Catalyst rewrites that plan with rules such as predicate pushdown, column pruning, and join reordering; the resulting physical plan is handed to the Tungsten engine, which generates compiled code that operates on a compact in-memory layout. The optimizer in the middle box is the reason structured Spark usually beats hand-written RDD code, and it is the box Section 1 of this section dissects.

1. From Imperative RDDs to Declarative DataFrames Beginner

The clearest way to feel the difference is to write the same computation twice. Suppose we have event records and we want, per country, the total spend of users on mobile devices. With RDDs you script the dataflow: parse each line, filter to mobile, project the fields you need, key by country, and reduce. Every decision about order and intermediate shape is yours, and Spark executes it verbatim.

# RDD style: you specify every step and its order.
rdd = sc.textFile("events.csv")
result = (rdd
    .map(lambda line: line.split(","))                       # parse every column
    .filter(lambda r: r[3] == "mobile")                      # keep mobile rows
    .map(lambda r: (r[1], float(r[5])))                       # (country, spend)
    .reduceByKey(lambda a, b: a + b))                         # sum per country
Code 7.3.1: The imperative RDD form. Spark parses all six columns of every row before the filter runs, because the closures are opaque and the engine cannot tell that only two columns survive or that filtering first would shrink the work.

Now the DataFrame form. You declare the table, the predicate, the grouping, and the aggregate, and you say nothing about order or intermediate representation. The equivalent Spark SQL string expresses the identical intent in the language a data analyst already knows.

# DataFrame style: you declare the result; Spark plans the execution.
df = spark.read.csv("events.csv", header=True, inferSchema=True)
result = (df
    .filter(df.device == "mobile")        # a relational predicate, not a closure
    .groupBy("country")
    .sum("spend"))

# The same query as Spark SQL over the same engine and the same optimizer.
df.createOrReplaceTempView("events")
result = spark.sql("""
    SELECT country, SUM(spend) AS total_spend
    FROM events
    WHERE device = 'mobile'
    GROUP BY country
""")
Code 7.3.2: The declarative DataFrame and Spark SQL forms of the same query. Both compile to one logical plan and pass through the same Catalyst optimizer, so the choice between the fluent API and a SQL string is a matter of taste, not performance.

The two listings compute the same answer, but they are not the same instruction to the engine. Code 7.3.1 is a recipe Spark must follow literally. Code 7.3.2 is a description of the goal that Spark is free to achieve any correct way it likes, and "any correct way it likes" is precisely the freedom an optimizer needs. The DataFrame and the SQL string in Code 7.3.2 are interchangeable surfaces over one engine: both lower to the same logical plan, so everything we say about the optimizer applies equally to the analyst writing SQL and the engineer writing Python.

Key Insight: Structure Buys the Engine the Right to Rewrite You

An RDD transformation is an opaque closure; the engine must run it exactly as written. A DataFrame operation is a relational expression over a known schema; the engine can see what it reads, what it discards, and how it composes, and may therefore rewrite the plan into any equivalent that runs faster. You give up the ability to dictate execution order, and in exchange you gain an optimizer that almost always orders the work better than you would have. Declarative is not a stylistic preference here; it is what makes optimization legal.

2. What Catalyst Actually Does Intermediate

Catalyst is a tree-rewriting system. It represents your query as a logical plan, a tree of relational operators (scan, filter, project, join, aggregate), and applies rules that transform the tree into an equivalent tree with lower cost. Three rewrites carry most of the benefit, and all three exploit the schema that RDDs lack.

Predicate pushdown moves filters as close to the data source as possible. If your query reads a table and then keeps only the rows where country = 'US', Catalyst pushes that predicate into the scan itself, so a Parquet or ORC reader skips entire row groups whose statistics prove no row can match, and the network and CPU never touch rows that will be discarded. Column pruning drops columns that no downstream operator references; if your final projection needs two of twenty columns, the scan reads only those two (plus any the pushed filter requires), which on columnar storage means reading a small fraction of the file. Join reordering rearranges a chain of joins so that the most selective, smallest intermediate results are produced first, shrinking the data that flows into later joins. Spark also folds constants, collapses adjacent projections, and, when statistics are available, makes cost-based choices such as switching a shuffle join to a broadcast join when one side is small.

The crucial point is that pushdown and pruning change how much data is read and moved, and in a distributed system data movement is the dominant cost, the same shuffle cost we traced through MapReduce in Chapter 6 and will quantify against the alpha-beta model of Chapter 3. To make the rewrites concrete rather than abstract, the code below implements a miniature optimizer over a tiny logical plan. It applies predicate pushdown and column pruning to a declared query and reports the rows and cells scanned before and after, the same accounting Catalyst does at scale.

"""Tiny logical-plan optimizer: predicate pushdown + column pruning."""

TABLE = "events"
COLUMNS = ["user_id", "country", "ts", "device", "clicks", "spend"]
ROWS_ON_DISK = 1_000_000
FILTER_SELECTIVITY = 0.18                       # fraction surviving country == 'US'

# The DECLARED query, bottom-up: Scan -> Filter -> Project.
declared_plan = [
    ("Scan",    {"table": TABLE, "columns": COLUMNS}),
    ("Filter",  {"predicate": "country == 'US'", "selectivity": FILTER_SELECTIVITY}),
    ("Project", {"columns": ["user_id", "spend"]}),
]

def cost_of(plan):                              # rows and cells touched at the scan
    scan = plan[0][1]
    rows = scan.get("scan_rows", ROWS_ON_DISK)
    cols = len(scan["columns"])
    return rows, cols, rows * cols

def predicate_pushdown(plan):                   # let the scan read only matching rows
    out = [list(op) for op in plan]
    flt = next((op[1] for op in out if op[0] == "Filter"), None)
    if flt is not None:
        out[0][1]["scan_rows"] = int(ROWS_ON_DISK * flt["selectivity"])
        out[0][1]["pushed_predicate"] = flt["predicate"]
    return out

def column_pruning(plan):                       # keep only columns someone reads
    out = [list(op) for op in plan]
    needed = set()
    for op in out:
        if op[0] == "Project":
            needed.update(op[1]["columns"])
        if op[0] == "Filter":
            needed.add(op[1]["predicate"].split(" ")[0])   # column in the predicate
    out[0][1]["columns"] = [c for c in out[0][1]["columns"] if c in needed]
    return out

def report(label, plan):
    rows, cols, cells = cost_of(plan)
    print(f"{label:<26} rows={rows:>9,}  cols={cols}  cells={cells:>11,}")
    return cells

print("Query: SELECT user_id, spend FROM events WHERE country = 'US'\n")
base = report("unoptimized plan", declared_plan)
opt  = column_pruning(predicate_pushdown(declared_plan))
done = report("optimized plan", opt)
print(f"\npushed predicate : {opt[0][1].get('pushed_predicate')}")
print(f"pruned columns   : {opt[0][1]['columns']}")
print(f"cells scanned    : {base:,} -> {done:,}")
print(f"data movement cut : {100 * (1 - done / base):.1f}%")
Code 7.3.3: A from-scratch logical-plan optimizer. It rewrites a declared Scan -> Filter -> Project plan by pushing the predicate into the scan (fewer rows) and pruning unread columns (fewer columns), then reports the scan cost in cells, exactly the read-and-move cost Catalyst minimizes.
Query: SELECT user_id, spend FROM events WHERE country = 'US'

unoptimized plan           rows=1,000,000  cols=6  cells=  6,000,000
optimized plan             rows=  180,000  cols=3  cells=    540,000

pushed predicate : country == 'US'
pruned columns   : ['user_id', 'country', 'spend']
cells scanned    : 6,000,000 -> 540,000
data movement cut : 91.0%
Output 7.3.3: Two rewrites cut the scan from six million cells to roughly half a million, a 91 percent reduction, without changing the query's meaning. Predicate pushdown removed 82 percent of the rows; column pruning kept only the three columns the filter and projection actually touch.

Nothing about the answer changed; the query still returns the spend of every US user. What changed is the volume of data the engine reads and ships, and that volume fell by an order of magnitude from two rules a beginner would not think to apply by hand. This is the mechanism behind the headline claim of the section: because Catalyst applies these rewrites uniformly and the RDD programmer usually does not, the structured path tends to win.

Fun Note: The Optimizer Reads Your Query Better Than You Wrote It

A reliable way to lose a benchmark is to "help" Spark by manually reordering a DataFrame query the way you think is fastest. Catalyst has already considered your ordering, and several you did not, and picked one with a cost model. The structured API quietly rewards the engineer who writes the clearest query rather than the cleverest one, which is a rare and pleasant thing in performance work.

3. Why DataFrames Usually Beat Hand-Written RDD Code Intermediate

Optimization is only half the story. Once Catalyst has chosen a physical plan, a second engine, Tungsten, executes it efficiently. Tungsten stores rows in a compact, cache-friendly binary layout rather than as boxed JVM objects, manages memory off the garbage-collected heap, and generates Java bytecode at runtime that fuses a whole chain of operators into a single tight loop (whole-stage code generation). The result is that a DataFrame pipeline often runs as a handful of compiled loops over packed memory, while the equivalent RDD pipeline runs as a chain of virtual function calls over scattered objects. Structure pays twice: once when Catalyst shrinks the work, and again when Tungsten executes what remains close to the metal.

There is a productivity argument layered on top of the performance one. The hand-written RDD in Code 7.3.1 hard-codes a column order and a parsing scheme; change the input and it breaks silently. The DataFrame in Code 7.3.2 carries a schema, so a type mismatch is caught at analysis time, before the job launches across the cluster. For the tabular feature engineering that precedes nearly every model, joining user tables to event tables, aggregating to per-entity features, encoding categoricals, this combination of safety and speed is why DataFrames, not RDDs, are the default surface for data preparation in production AI pipelines.

Thesis Thread: The Same Shuffle, Now Optimized by a Planner

The expensive primitive under a groupBy or a join is the shuffle: the all-to-all repartition that moved data between mappers and reducers in Chapter 6. DataFrames do not abolish the shuffle; they let a planner decide how much of it is necessary. Pushing a filter below a join, pruning columns before a repartition, or broadcasting a small table to avoid a shuffle entirely are all ways the optimizer minimizes the same cross-machine data movement this book treats as the central tax on scale-out. The shuffle returns yet again as gradient all-reduce in data-parallel training (Chapter 15), where, lacking a query optimizer, engineers minimize it by hand. Spark's lesson is that a declarative interface lets a machine do that minimization for you.

Library Shortcut: PySpark Plans and Runs the Whole Thing for You

Code 7.3.3 modeled two rewrites on a toy plan in about forty lines. In PySpark you write the query and the optimizer, the columnar reader, the shuffle, and the code generator are all already there. You can even inspect the plan Catalyst produced and confirm the pushdown and pruning happened, the same two rewrites our toy optimizer performed:

df = spark.read.parquet("events.parquet")          # columnar source with statistics
q  = (df.filter(df.country == "US")
        .select("user_id", "spend"))               # declare; do not micromanage

q.explain(mode="formatted")
# In the printed physical plan you will see, with no extra work on your part:
#   PushedFilters: [EqualTo(country, US)]          <- predicate pushdown
#   ReadSchema: struct<user_id,spend>              <- column pruning
Code 7.3.4: The same two optimizations as Output 7.3.3, now applied automatically. The roughly forty lines of Code 7.3.3 collapse to a query and an explain call; PySpark contributes Catalyst (the rewrites), the Parquet reader (statistics-based row-group skipping), and Tungsten (compiled execution).

4. Structured Tables Meet Columnar Storage Advanced

Predicate pushdown and column pruning deliver their full benefit only when the storage layer can act on them, and that is the bridge from this section to the next chapter. A row-oriented file (one row's fields stored contiguously) cannot read column spend without also reading every other field of that row, so column pruning saves little. A columnar file such as Parquet or ORC stores each column contiguously and ships per-column statistics (min, max, null counts) for each block, so pruning reads only the requested columns and pushdown skips whole blocks whose statistics rule out a match. The 91 percent reduction in Output 7.3.3 is a paper exercise on a row store; on columnar storage it is roughly what you actually observe, because the format was designed to let the optimizer's decisions translate into bytes never read from disk.

This is why structured Spark and columnar storage are a matched pair, and why Chapter 8 takes up distributed storage and data loading directly after this chapter. The DataFrame schema, the Catalyst plan, and the Parquet layout are three views of the same idea: tell the system the structure of your data, and every layer from the query planner to the disk reader can exploit it. For AI workloads this matters twice over, because the feature tables you build here become the input pipeline that feeds the training loop, and a reader that skips 90 percent of the bytes is a training job that starts 90 percent sooner.

Research Frontier: Optimized Tabular and Vector Pipelines for AI (2024 to 2026)

The DataFrame-plus-columnar pattern is being pushed hard by the demands of modern AI data preparation. Apache Arrow has become the lingua franca for zero-copy columnar exchange between Spark, pandas, and GPU frameworks, and vectorized Python paths (Arrow-based pandas UDFs, the Spark Connect client introduced in Spark 3.4 and matured through the 3.5 and 4.0 line in 2024 to 2025) cut the serialization tax that once made Python on Spark slow. Lakehouse table formats (Delta Lake, Apache Iceberg, Apache Hudi) now carry richer statistics and data-skipping indexes that let Catalyst-style optimizers prune at file and even row-group granularity over petabyte tables. A parallel thread brings query optimization to AI-native operations: engines such as DuckDB, Polars, and Daft apply predicate pushdown and column pruning to pipelines that include embedding columns and similarity filters, and research systems explore pushing model-inference and vector-search predicates into the scan so an LLM or ANN filter executes with the same pushdown discipline as country = 'US'. The connection to distributed retrieval surfaces again in Chapter 25.

Practical Example: The Feature Job That Got Ten Times Faster Without New Hardware

Who: A data engineer on a recommendation team building daily training features from a clickstream.

Situation: A nightly Spark job joined a 4-terabyte event log to a user table and aggregated per-user features, taking just over three hours.

Problem: The job was written in the RDD style inherited from an older codebase: parse every record, then filter, then key and join.

Dilemma: Request a larger cluster to hit the deadline (more cost, same wasteful plan), or rewrite the job as DataFrames and trust the optimizer to do better on the existing cluster.

Decision: They rewrote the pipeline as DataFrame and Spark SQL operations over Parquet sources, changing the logic not at all, only the abstraction.

How: Reading Parquet gave Catalyst column statistics; the date and event-type filters pushed into the scan, the projection pruned the log from forty columns to six, and a small dimension table was broadcast instead of shuffled.

Result: Runtime fell from over three hours to about eighteen minutes on the same cluster, dominated by the pushdown and pruning that Output 7.3.3 illustrates in miniature, with byte-for-byte identical features.

Lesson: Before buying machines, give the engine structure. A declarative query over columnar storage often recovers an order of magnitude that hand-written RDD code left on the table.

We have now seen the structured API in full: the schema that makes optimization legal, the Catalyst rewrites that shrink the work, the Tungsten engine that runs the remainder close to the metal, and the columnar storage that turns the optimizer's decisions into bytes never read. One question we have deferred is when all this planning happens. Catalyst rewrote a plan in Code 7.3.3, but a real Spark job builds that plan lazily, accumulating transformations without running them until an action forces execution, which is exactly what gives the optimizer a complete query to rewrite. That lazy, plan-then-execute model and the directed acyclic graph it produces are the subject of Section 7.4.

Exercise 7.3.1: Read the Plan Conceptual

Consider the query SELECT user_id FROM events WHERE country = 'US' AND device = 'mobile' over the six-column table of Code 7.3.3. List, in order, the relational operators in its logical plan, then state for each of predicate pushdown and column pruning exactly which rows or columns it removes and which columns must survive the prune even though the final result does not return them. Explain in one sentence why both predicates can be pushed into a single scan.

Exercise 7.3.2: Extend the Optimizer Coding

Extend Code 7.3.3 with a second table and a join, so the plan is Scan(events) join Scan(users) on user_id -> Filter -> Project. Add a join_reorder rule that, given a selectivity for the filter, pushes the filter below the join when it applies to only one input, and report the rows flowing into the join before and after. Verify that pushing the filter first reduces the join's input on the affected side, and print the cells scanned in both orderings.

Exercise 7.3.3: Row Store Versus Column Store Analysis

Output 7.3.3 reports a 91 percent cut in cells scanned. Argue how much of that reduction a row-oriented store can actually realize versus a columnar store such as Parquet, treating the row count reduction (from pushdown) and the column count reduction (from pruning) separately. State which of the two rewrites a row store can honor and which it largely cannot, and use that to explain why Chapter 8 argues columnar formats and structured query engines must be designed together.