Part II: Distributed Data Processing for AI
Chapter 7: Spark and Distributed DataFrames

Lazy Evaluation and DAG Execution

"They kept handing me transformations and I kept saying 'noted.' Only when they finally asked for an answer did I bother to figure out how to get one. Turns out waiting let me skip half the work."

A Scheduler That Refuses to Hurry
Big Picture

Spark does not run your transformations when you write them; it records them into a directed acyclic graph and runs nothing until an action demands a result, at which point it plans the entire job at once, fuses operations that can stream together, and cuts the graph into stages only where data must cross the network. This deferral is not laziness for its own sake. Seeing the whole computation before executing any of it lets the engine pick a global plan: combine adjacent row-at-a-time operations into a single pass over each partition, push filters earlier, and avoid materializing intermediate datasets that no one will ever read. The price of distribution is the shuffle, and the structure of a Spark job is built entirely around where the shuffles fall. This section shows how a chain of transformations becomes a staged execution plan, and why the boundary between stages is always a shuffle.

In the previous section we built DataFrames and queried them with Spark SQL, and you may have noticed something curious: a line like df.filter(...).select(...) returns instantly even on a terabyte of data, while a later df.count() takes minutes. That gap is the whole subject of this section. The fast calls did almost nothing; they appended a node to a plan. The slow call was an action, and an action is the only thing that makes Spark execute. Understanding this split, and the graph that sits between writing code and running it, is what lets you read a Spark job's behavior instead of being surprised by it.

Every transformation you chain onto a DataFrame, a filter, a select, a groupBy, a join, produces a new logical node and returns immediately. Spark accumulates these into a directed acyclic graph, a DAG, whose nodes are datasets and whose edges are the transformations that derive one from another. The graph is acyclic because each transformation produces a fresh dataset from existing ones; nothing ever loops back. Nothing in this graph computes a single row until you call an action, an operation that must return a concrete value to your program or write data out: count(), collect(), show(), write.parquet(...). We separate the two categories formally in Section 7.5; here we focus on what the deferral buys and how the graph is turned into work.

1. Lazy Evaluation: Record Now, Run Later Beginner

Lazy evaluation means a transformation returns a description of work, not the work's result. When you write df2 = df.filter(cond), no rows are read and no predicate is tested; df2 is simply a handle that says "the rows of df for which cond holds." Chain ten transformations and you have built a ten-node plan while touching zero data. Only an action forces Spark to walk back through the plan, decide how to execute it, and produce the demanded value. This is the opposite of the eager, line-by-line execution of an ordinary Python script, where every statement runs the moment the interpreter reaches it.

The payoff of waiting is that Spark gets to see the complete computation before committing to any of it. An eager engine that ran each transformation immediately would have to materialize every intermediate dataset, write it somewhere, and read it back for the next step, even when the next step is a filter that throws most of it away. A lazy engine, holding the whole plan, can do something far better: it can rewrite the plan. It can move a filter ahead of a projection so fewer columns are carried, collapse two consecutive maps into one pass, and recognize that an intermediate result is never needed on its own and so never build it. The optimizer that performs these rewrites is Catalyst, which we met through Spark SQL in Section 7.3; laziness is precisely what gives Catalyst a whole plan to optimize rather than one isolated operation at a time.

Key Insight: Laziness Is What Makes Whole-Plan Optimization Possible

An engine that executes each operation as it is written can only optimize that one operation. An engine that records operations and defers them until an action sees the entire job at once, and a whole-job view is what unlocks the optimizations that matter at scale: fusing row-at-a-time operations into a single streaming pass, pushing filters and projections toward the data source, reordering joins, and skipping intermediate results that nothing consumes. Lazy evaluation is not a convenience feature; it is the precondition for the global plan that lets Spark avoid moving and materializing data it does not need.

2. The DAG and Its Two Kinds of Edges Intermediate

The plan Spark accumulates is a DAG of partition-level dependencies, and the single most important property of any edge in that graph is whether it is narrow or wide, the distinction introduced for RDDs in Section 7.2. A dependency is narrow when each output partition draws from exactly one input partition: map, filter, and select are narrow, because a row's fate depends only on that row, and the worker holding input partition $k$ can produce output partition $k$ with no help from any other worker. A dependency is wide when an output partition draws from many input partitions, which is the case for groupBy, reduceByKey, join, and repartition: to gather every record with a given key into one place, records must be redistributed across the cluster by their key. That redistribution is the shuffle, the same all-to-all data movement that powered the reduce phase of MapReduce in Chapter 6.

This narrow-versus-wide split is what gives the DAG its execution structure. A run of narrow dependencies can be executed entirely locally: worker $k$ takes its input partition and applies every narrow transformation in sequence, one row flowing through the whole chain, never waiting on another machine. A wide dependency cannot, because the data each output partition needs is scattered across every input partition and must be moved over the network first. So Spark draws its execution boundaries exactly at the wide dependencies. The segments of purely narrow work between shuffles are called stages, and the rule is simple and exact: a new stage begins at every shuffle.

Stage 0 (narrow, fused) read map filter one pass per partition SHUFFLE repartition by key stage boundary (wide dependency) Stage 1 groupBy Stage 2 (narrow) count filter Stages run left to right; each stage waits for the shuffle that feeds it to finish writing. Within a stage, narrow operations are pipelined into a single streaming pass per partition.
Figure 7.4.1: A transformation DAG cut into stages at its shuffle boundaries. The narrow operations read, map, and filter fuse into Stage 0 and run as one pass per partition with no network traffic. The wide dependency (the orange shuffle that repartitions records by key) forces a stage boundary, after which Stage 1's groupBy can see all records for each key. A second shuffle precedes Stage 2's final narrow work. Every stage boundary in the graph is a shuffle, and every shuffle is a stage boundary.

3. The DAG Scheduler: Stages, Pipelining, and Tasks Intermediate

When an action fires, Spark's DAG scheduler turns the logical plan into a physical one in three moves. First it walks the graph and cuts it into stages at every wide dependency, exactly as in Figure 7.4.1. Second, within each stage, it pipelines the narrow transformations: rather than building the output of the map, then reading it back to run the filter, it composes them into a single function that streams each row through the whole chain in one pass. This fusion is the concrete form of the optimization that laziness enabled in Section 1; the intermediate dataset between map and filter is never materialized because the scheduler can see that nothing else needs it. Third, it schedules the stage as a set of tasks, one task per partition, and ships those tasks to the workers holding the relevant data.

A task, then, is the unit of execution: one stage applied to one partition, run by one core on one worker. A stage with 200 partitions becomes 200 tasks that run in parallel across the cluster, gated only by the number of available cores. Stages themselves run in dependency order, and a downstream stage cannot start until the shuffle feeding it has finished writing its output, because that stage's tasks read shuffled data that does not exist until the upstream tasks complete. This is why a Spark job's wall-clock time is dominated by its shuffles: they are the synchronization points where the whole cluster must rendezvous before the next wave of local work can begin. We measure and tune that cost in Section 7.7, where data skew turns a single overloaded shuffle partition into a straggler that holds up an entire stage.

Fun Note: The Job That Did Nothing for an Hour, Then Everything at Once

A common rite of passage for new Spark users is the script that "hangs" on the final line. Forty transformations scroll past in under a second, the console looks frozen on df.write.parquet(...), and panic sets in. Nothing is wrong: those forty fast lines built the DAG, and the last line is the action that finally runs all of it. The deferral that felt like a freeze is the same deferral that let Spark fuse those forty operations into a handful of pipelined stages.

4. Building and Scheduling a DAG from Scratch Advanced

To make the deferral and the staging concrete, the program below implements the whole idea in pure Python with no Spark installed. A LazyFrame records transformations instead of running them, tagging each as narrow or wide. Calling map or filter just appends a node and returns a new frame; nothing executes. Only the action collect triggers planning: it cuts the recorded plan into stages at every wide (shuffle) operation, prints the resulting stage plan, and then executes, fusing the narrow operations within each stage into a single pass. This is a faithful miniature of the DAG scheduler described in Section 3.

class LazyFrame:
    def __init__(self, source, plan=None):
        self.source = source
        self.plan = plan or []      # list of (op_name, kind, fn)

    def _add(self, name, kind, fn):
        return LazyFrame(self.source, self.plan + [(name, kind, fn)])

    # Narrow transformations: each output partition depends on ONE input partition.
    def map(self, fn, label):
        return self._add(label, "narrow", lambda rows: [fn(r) for r in rows])

    def filter(self, pred, label):
        return self._add(label, "narrow", lambda rows: [r for r in rows if pred(r)])

    # Wide transformation: output partitions depend on ALL input partitions (shuffle).
    def group_by_key(self, label):
        def shuffle(rows):
            out = {}
            for k, v in rows:
                out.setdefault(k, []).append(v)
            return list(out.items())
        return self._add(label, "wide", shuffle)

    # DAG SCHEDULER: cut the plan into stages at wide (shuffle) boundaries.
    def plan_stages(self):
        stages, current = [], []
        for op in self.plan:
            name, kind, _ = op
            if kind == "wide":
                if current:
                    stages.append(current)   # close the narrow pipeline before the shuffle
                stages.append([op])          # the shuffle is its own stage boundary
                current = []
            else:
                current.append(op)           # pipeline narrow ops into the running stage
        if current:
            stages.append(current)
        return stages

    # ACTION: planning AND execution happen ONLY here.
    def collect(self, data):
        stages = self.plan_stages()
        print("ACTION triggered: collect()  ->  planning the DAG\n")
        for i, stage in enumerate(stages):
            kind = "SHUFFLE" if stage[0][1] == "wide" else "fused narrow ops"
            ops = " -> ".join(n for n, _, _ in stage)
            print(f"  Stage {i}: [{kind}]  {ops}")
        print()
        rows = data
        for stage in stages:
            for _, _, fn in stage:          # narrow ops in a stage run as ONE fused pass
                rows = fn(rows)
        return rows


# Build the pipeline lazily. NOTHING runs during these calls.
print("Building pipeline (lazy): no computation yet ...")
words = LazyFrame("lines")
pipe = (words
        .map(lambda line: line.strip().lower(), "normalize")
        .filter(lambda line: len(line) > 0, "drop_empty")
        .map(lambda line: (line, 1), "to_pairs")
        .group_by_key("group_by_word")
        .map(lambda kv: (kv[0], sum(kv[1])), "count")
        .filter(lambda kv: kv[1] >= 2, "keep_frequent"))
print(f"Recorded {len(pipe.plan)} transformations, 0 executed.\n")

data = ["Spark", "spark ", "DAG", "spark", "dag", "", "lazy", "DAG"]
result = pipe.collect(data)
print("Result:", sorted(result))
Code 7.4.1: A from-scratch lazy engine. Transformations append nodes to plan and return new frames without running; the action collect calls plan_stages to cut the DAG at the one wide operation, prints the stage plan, then executes by fusing each stage's narrow operations into a single pass.
Building pipeline (lazy): no computation yet ...
Recorded 6 transformations, 0 executed.

ACTION triggered: collect()  ->  planning the DAG

  Stage 0: [fused narrow ops]  normalize -> drop_empty -> to_pairs
  Stage 1: [SHUFFLE]  group_by_word
  Stage 2: [fused narrow ops]  count -> keep_frequent

Result: [('dag', 3), ('spark', 3)]
Output 7.4.1: Six transformations were recorded with zero execution, then a single action drove the whole job. The scheduler cut the plan into three stages exactly at the one shuffle: the three normalizing operations fused into Stage 0, the group_by_word shuffle stood alone as Stage 1, and the final count-and-threshold pair fused into Stage 2.

The structure of Output 7.4.1 is the structure of a real Spark job. The first three narrow operations collapsed into one stage that needs no network; the single wide operation became the lone boundary; the trailing narrow operations formed a final stage. If you had added a second group_by_key, a fourth stage would have appeared, because every shuffle adds exactly one boundary. The same plan, run on a cluster, would dispatch one task per partition within each stage and would block the start of Stage 1 until Stage 0's shuffle output was fully written.

5. Reading the Real Plan with .explain() Intermediate

Spark exposes the very plan our toy engine printed. Calling .explain() on a DataFrame shows the physical plan Catalyst chose, and the keyword Exchange in that output marks a shuffle, which is to say a stage boundary. Reading these plans is the practical skill that this section's theory feeds into: the number of Exchange nodes is the number of shuffles, and the number of shuffles plus one is roughly the number of stages your job will run.

from pyspark.sql import SparkSession, functions as F

spark = SparkSession.builder.appName("dag-demo").getOrCreate()
df = spark.read.parquet("events.parquet")

plan = (df
        .filter(F.col("value") > 0)          # narrow: pipelined, no shuffle
        .groupBy("key")                       # wide: forces an Exchange (shuffle)
        .agg(F.sum("value").alias("total"))   # finishes in the post-shuffle stage
        .filter(F.col("total") >= 2))         # narrow: fused after the aggregation

plan.explain()        # prints the physical plan; look for 'Exchange' = shuffle = stage cut
# An abbreviated physical plan looks like:
#   *(2) Filter (total >= 2)
#   +- *(2) HashAggregate(keys=[key], functions=[sum(value)])   <- post-shuffle stage
#      +- Exchange hashpartitioning(key)                        <- the ONE stage boundary
#         +- *(1) HashAggregate(keys=[key], functions=[partial_sum(value)])
#            +- *(1) Filter (value > 0)                          <- pre-shuffle stage
#               +- FileScan parquet [key, value]
Code 7.4.2: The same shape as Code 7.4.1, now in PySpark. The single Exchange hashpartitioning(key) node in the plan is the one shuffle; the *(1) and *(2) tags are Spark's whole-stage-codegen markers, naming the two fused narrow stages on either side of it. Note that Catalyst also split the sum into a partial aggregation before the shuffle and a final aggregation after, shrinking what crosses the network.
Library Shortcut: Spark Builds, Optimizes, and Schedules the DAG for You

Code 7.4.1 hand-rolled a recorder, a stage cutter, and a fused executor, roughly sixty lines that still ignore partitioning, network transport, partial aggregation, and fault recovery. In PySpark you write only the transformations and one action; the framework records the DAG, hands it to Catalyst for whole-plan optimization (predicate pushdown, projection pruning, join reordering, the partial-then-final aggregation split visible in Code 7.4.2), cuts it into stages at the shuffles, generates fused whole-stage code, and schedules one task per partition with re-execution on failure:

from pyspark.sql import functions as F

result = (df
          .filter(F.col("value") > 0)
          .groupBy("key")
          .agg(F.sum("value").alias("total"))
          .filter(F.col("total") >= 2)
          .collect())            # the ONLY action: everything above runs now, optimized
Code 7.4.3: The complete job in six lines. Every transformation is lazy and the lone collect() triggers the planning, optimization, staging, and fault-tolerant scheduling that Code 7.4.1 only sketched.
Practical Example: The Feature Pipeline That Sped Up by Doing Less

Who: A data engineer building a training-feature pipeline for a recommendation model on a Spark cluster.

Situation: A nightly job read 4 TB of clickstream Parquet, derived features, joined a user table, and wrote training shards, taking just over three hours.

Problem: Profiling in the Spark UI showed five stages and four shuffles, with one Exchange repartitioning the full 4 TB before a filter that discarded 90 percent of rows.

Dilemma: Buy more executors to push the same plan through faster, paying linearly for cluster time, or restructure the query so the plan itself moved less data through its shuffles.

Decision: They restructured, because .explain() revealed the binding cost was a shuffle on data that a later filter would throw away, not a shortage of compute.

How: They moved the selective filter ahead of the wide join so the shuffle carried a tenth of the rows, and let Catalyst's predicate pushdown drop unused columns at the Parquet scan; the action and outputs were unchanged.

Result: The job fell from just over three hours to thirty-eight minutes on the same cluster, with one fewer shuffle and far less data crossing the network, at no extra cost.

Lesson: The shape of the DAG, especially where the shuffles fall and how much data they carry, sets a job's cost more than raw executor count. Read the plan before you scale the cluster.

6. Why the Whole Plan, Not Each Step Intermediate

It is worth stating plainly why deferral wins, because it is the reasoning that recurs whenever a system chooses to plan before it runs. If $T_i$ is the work of transformation $i$ and an eager engine ran each in isolation, its cost would be the sum of every step plus the cost of writing and reading every intermediate dataset between steps. The intermediate-materialization term is the expensive one at scale: writing a multi-terabyte result to storage only to read it straight back into the next operation can dwarf the operations themselves. A lazy engine pays almost none of it, because fused narrow operations stream a row through the entire chain in memory, and intermediate datasets that no action observes are never built. The savings is not a constant factor on the operations; it is the elimination of a whole class of I/O.

The same logic explains why Spark plans stages around shuffles specifically. A shuffle is the one unavoidable materialization: its output must be written to disk so that downstream tasks (and re-executed tasks after a failure) can read the right partitions, a property that ties directly to the fault-tolerant re-execution model of Chapter 6. Everything between two shuffles is fair game for fusion and can avoid the disk entirely. So the engine spends its optimization budget exactly where it pays off: collapse the narrow work into streaming passes, and treat each shuffle as a deliberate, costed boundary rather than an accident of how the code was written. This same instinct, see the whole computation, then place the unavoidable communication deliberately, returns when we plan all-reduce schedules for distributed training in Chapter 15.

Research Frontier: Adaptive and Learned Query Planning (2024 to 2026)

A static DAG is planned from estimates that can be wrong, and the live research line is to make the plan react to what actually happens at runtime. Spark's Adaptive Query Execution, now standard and refined through the Spark 3.5 and 4.0 releases, re-optimizes the DAG at each shuffle boundary using the real sizes just measured: it coalesces tiny post-shuffle partitions, switches a sort-merge join to a broadcast join when a side turns out small, and splits skewed partitions so one heavy key no longer strands a stage, the skew problem we quantify in Section 7.7. Beyond Spark, learned cost models and reinforcement-learning query optimizers (the Bao and Balsa lineage, with 2024 to 2025 work extending them to distributed and cloud engines like Photon and Velox-based runtimes) replace hand-tuned cardinality estimates with models trained on past executions. The common thread is that lazy evaluation gives all of these methods the same thing: a whole plan, captured before execution, that they are free to rewrite. We return to data-skew handling, the most consequential of these adaptations for AI feature pipelines, in Section 7.7.

We now have the execution model that the rest of the chapter rests on: transformations are recorded lazily into a DAG, an action triggers planning, the scheduler cuts the DAG into stages at every shuffle, fuses the narrow work within each stage, and dispatches one task per partition. What we have deliberately left informal is the exact catalog of which operations are transformations and which are actions, and what each costs. That catalog is the subject of Section 7.5, which names every transformation and action, marks each narrow or wide, and shows how the choice between them decides where your shuffles, and therefore your stages, fall.

Exercise 7.4.1: Count the Stages Conceptual

For each pipeline, state how many stages Spark will run and where the boundaries fall, justifying each boundary by naming the wide dependency that causes it. (a) read -> filter -> select -> write. (b) read -> map -> groupBy -> agg -> filter -> write. (c) read -> filter -> join(other) -> groupBy -> agg -> write, where other is large enough that the join shuffles. Then explain why pipeline (a) needs no shuffle at all and what that implies for its wall-clock behavior relative to (b) and (c).

Exercise 7.4.2: Extend the Lazy Engine Coding

Starting from Code 7.4.1, add a join_by_key wide transformation that takes a second dataset and combines records sharing a key, and a narrow select that keeps only chosen fields. Build a pipeline with two wide operations and confirm from the printed stage plan that it produces exactly three stages with the two shuffles as the boundaries. Then add a counter that increments every time a row-level function actually runs, and use it to demonstrate empirically that no work happens until collect is called and that fused narrow operations make a single pass over each row rather than one pass per operation.

Exercise 7.4.3: Predict, Then Read the Plan Analysis

Take any Spark query with at least one groupBy and one join. Before running it, predict the number of Exchange nodes and stages from the operations alone. Then call .explain() and compare your prediction to the physical plan, accounting for any difference (a broadcast join avoids a shuffle; a partial-then-final aggregation appears as two HashAggregate nodes around one Exchange). Finally, move a selective filter from after the join to before it, re-run .explain(), and explain in terms of bytes-shuffled why this reordering, which Catalyst often performs for you, lowers the job's cost.