"They asked why the job took six hours. I had eight cores and four partitions; four of us worked and four of us watched. Nobody asked the four of us who watched."
An Executor Slot That Was Never Scheduled
Tuning a Spark job is not a bag of tricks; it is a single discipline: measure where the time goes in the Spark UI, find the stage that dominates, and apply the one lever that attacks that stage's cost. Almost every slow Spark job is slow for one of a small number of reasons: too few partitions leaving cores idle, too many partitions drowning in overhead, a shuffle that moves more data than it needs to, a skewed key that pins one task while the rest finish, a small table read once per row instead of broadcast once, or a dataset recomputed from scratch every time it is touched. This closing section of Chapter 7 turns the chapter's individual ideas (the DAG, RDD lineage, DataFrames and Catalyst, partitioning and caching, joins and skew) into an ordered playbook you can run against any job, and ties each lever back to the section that explains why it works.
Everything earlier in this chapter built one machine: a lazy DAG of partitioned transformations that Catalyst plans and the cluster executes stage by stage. This section is about making that machine fast. The good news is that performance tuning in Spark is unusually diagnosable, because the engine reports exactly what it did. The Spark UI shows every stage, every task, how many bytes each task read and wrote across the shuffle, and how long each task ran. A slow job is rarely a mystery; it is a number on a page that is larger than it should be. The skill is reading that page, naming the bottleneck, and reaching for the matching lever rather than guessing.
We proceed in the order a real investigation takes. First, diagnose with the UI. Then walk the levers from the cheapest and highest-impact (partition sizing, killing the shuffle, broadcasting, caching, Adaptive Query Execution) down to the memory and serialization knobs you turn only when the structural fixes are exhausted. Throughout, the rule is the same one this book repeats for every distributed system: the expensive thing is moving data between machines, so the best optimization is the data movement you avoid.
1. Diagnose First: Read the Spark UI Beginner
You cannot tune what you have not measured, and Spark measures everything. The Spark UI (served on port 4040 while a job runs, and persisted to the history server afterward) breaks the job into jobs, then stages, then tasks. A stage is the unit bounded by shuffles, exactly the stage boundaries that lazy evaluation and the DAG produced in Section 7.4. The single most useful screen is the stage detail, which shows, per task, the run time and the shuffle read and write bytes. Three patterns on that screen account for most slow jobs, and Figure 7.9.1 shows what each looks like as a timeline of tasks across the cluster's cores.
The middle and bottom panels of Figure 7.9.1 are the two failures you will meet most. Idle cores mean too few partitions: there are fewer tasks than slots, so some slots never get work, the exact situation our demo in Section 4 measures. A single long bar while the rest finish early means skew: one key carries far more rows than the others, a problem we diagnosed and fixed in Section 7.7. The metric to watch is the spread between the median task time and the maximum task time; when the max is many times the median, you have a straggler, and no amount of extra hardware will help until you rebalance the work.
A Spark stage does not finish when the average task finishes; it finishes when the last task finishes, because the next stage cannot start until every partition of this one is done. This single fact explains why both idle cores and skew are so costly: an idle core contributes nothing to progress, and a straggler task makes every other core wait at the stage barrier. Tuning is therefore almost always about shrinking the maximum task time, by spreading work evenly across enough partitions and breaking up the keys that are too big. Optimize the tail, not the mean.
2. The Levers, In Order of Impact Intermediate
Once the UI has named the bottleneck, you reach for a lever. They are not equal: the structural levers (partition count, shuffle avoidance, broadcast, cache) routinely change runtime by multiples, while the memory and serialization knobs trim percentages. Table 7.9.1 is the playbook itself. Each row names a symptom you can see in the UI, the lever that attacks it, the Spark configuration or API involved, and the chapter section that explains the underlying mechanism. Work the table top to bottom; the cheapest and highest-impact fixes come first.
| Symptom in the UI | Lever | Config / API | Why it works |
|---|---|---|---|
| Idle cores; fewer tasks than slots | Raise partition count | spark.sql.shuffle.partitions, repartition | 7.6 partitioning |
| Thousands of tiny tasks; scheduler overhead | Lower partition count | coalesce, AQE coalescing | 7.6 partitioning |
| Huge shuffle read/write bytes | Aggregate before shuffling | reduceByKey over groupByKey | 7.5 transformations |
| Large shuffle on a join with one small side | Broadcast the small side | broadcast(), auto-broadcast threshold | 7.7 joins |
| One straggler task; skewed key | Split the hot key (salt / AQE skew join) | spark.sql.adaptive.skewJoin.enabled | 7.7 skew |
| Same dataset recomputed every action | Cache the reused dataset | cache(), persist() | 7.6 caching |
| Plan chosen before data sizes are known | Enable Adaptive Query Execution | spark.sql.adaptive.enabled | 7.3 Catalyst |
| OutOfMemory; heavy disk spill | Raise executor memory / partition finer | spark.executor.memory, spill | Section 5 below |
| Driver OOM on collect | Never collect large data to the driver | write, limit, aggregate first | Section 3 below |
The two partition rows at the top of Table 7.9.1 pull in opposite directions, and finding the balance between them is the most common tuning decision in Spark, so we give it a runnable demonstration in Section 4. The shuffle, broadcast, and skew rows all attack data movement, the dominant cost in any distributed job; we treat them next. The caching and AQE rows recover work the chapter already taught. The last two rows are the memory and driver safety levers of Section 5.
Minimize the shuffle, the most expensive thing Spark does
A shuffle is an all-to-all exchange: every task may send rows to every other task, across the network, written to and read from disk on the way. It is the Spark relative of the MapReduce shuffle from Chapter 6, and the same arc returns yet again as the gradient all-reduce of Chapter 15. Because it touches the network and the disk, the shuffle dominates the cost of most non-trivial jobs, and the highest-leverage thing you can do is move less data through it. The classic example is the choice between groupByKey and reduceByKey from Section 7.5: groupByKey ships every value across the network and then reduces, while reduceByKey reduces locally on each partition first and ships only the partial results.
# Counting events per user. Both produce the same answer; one moves far less data.
# BAD: groupByKey ships every single event across the network, then counts.
counts = events.groupByKey().mapValues(len) # shuffle = all the values
# GOOD: reduceByKey adds up counts WITHIN each partition first (a map-side
# combine), so the shuffle carries one partial count per (user, partition).
counts = events.map(lambda e: (e.user, 1)).reduceByKey(lambda a, b: a + b)
# DataFrame API: a groupBy + agg compiles to the same map-side partial aggregation,
# and Catalyst (Section 7.3) inserts the local combine for you.
counts = df.groupBy("user").count() # preferred in practice
reduceByKey performs a map-side combine before the exchange, so the bytes crossing the network shrink from one-per-row to one-per-key-per-partition. On the DataFrame API, Catalyst inserts this partial aggregation automatically, one more reason to prefer DataFrames over raw RDDs.The same principle drives broadcast joins. When one side of a join is small enough to fit in each executor's memory, Spark can send a full copy to every executor and join locally, with no shuffle of the large side at all. This is the broadcast-hash join of Section 7.7, and it converts a network-bound shuffle join into a local hash lookup. You can request it explicitly, and Spark also applies it automatically when a side is below spark.sql.autoBroadcastJoinThreshold (10 MB by default).
from pyspark.sql.functions import broadcast
# 'dim' is a small lookup table (a few MB); 'facts' is billions of rows.
# Broadcasting 'dim' avoids shuffling 'facts' entirely: each executor joins
# its local facts partitions against an in-memory copy of dim.
result = facts.join(broadcast(dim), on="key", how="inner")
# Equivalent automatic behavior, controlled by a size threshold:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 32 * 1024 * 1024) # 32 MB
Who: A data engineer at a streaming-media company owning the nightly user-feature pipeline.
Situation: A Spark job enriched a billion-row watch-events table by joining it against a country-and-plan lookup table, then aggregated features per user for the next day's recommendation training.
Problem: The job had crept from forty minutes to over three hours as the events table grew, and was now missing the training window.
Dilemma: Request a bigger cluster, the easy ask that doubled the monthly bill, or open the Spark UI and find out where the three hours actually went before spending anything.
Decision: They read the UI first, and the dominant stage was a sort-merge join shuffling the full billion-row events table by key, even though the lookup table was only six megabytes.
How: They wrapped the lookup side in broadcast() exactly as in Code 7.9.2, turned on Adaptive Query Execution, and set spark.sql.shuffle.partitions to a few times the core count from the curve in Output 7.9.5.
Result: The events table was no longer shuffled at all; the join became a local hash lookup, and the job fell from three hours back to thirty-five minutes on the same cluster, at no extra cost.
Lesson: The bigger cluster would have hidden the real problem and kept billing for it. Reading the UI named the shuffle, and one broadcast removed it.
Cache reuse, enable Adaptive Query Execution, and let Catalyst plan
Spark's laziness, the property that makes the DAG possible in Section 7.4, has a sharp edge: a dataset with no cache is recomputed in full every time an action touches it, all the way back along its RDD lineage from Section 7.2. If you train three models on the same prepared feature table, or run several aggregations over one cleaned dataset, an uncached dataset pays the entire preparation cost three times. The fix is one call, cache() or persist() from Section 7.6, placed on the dataset that is read more than once. Cache deliberately: caching a dataset used only once wastes memory that another stage needs.
Adaptive Query Execution (AQE), on by default since Spark 3.2, is the highest-value single switch in the whole playbook because it fixes several rows of Table 7.9.1 at once, using real runtime statistics that the static Catalyst planner of Section 7.3 could only estimate. After each shuffle, AQE sees the actual partition sizes and can coalesce hundreds of tiny post-shuffle partitions into a sensible number, switch a planned shuffle join to a broadcast join when a side turned out small, and split a skewed partition into several so one hot key no longer pins a straggler task.
# Adaptive Query Execution: let the engine re-plan using real shuffle statistics.
spark.conf.set("spark.sql.adaptive.enabled", True) # master switch
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", True) # auto right-size
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True) # split hot keys
# A sane static starting point for the post-shuffle partition count when you
# tune by hand: aim for a few times the total executor cores (see the demo).
spark.conf.set("spark.sql.shuffle.partitions", 256) # default is 200
shuffle.partitions value still sets the starting point, which the next demo helps you choose.Before AQE, getting these levers right meant manual work: profile a representative run, compute a good spark.sql.shuffle.partitions from the shuffle volume, hand-mark small join sides with broadcast(), and salt skewed keys yourself, then redo it whenever the data sizes shifted. That is dozens of lines of tuning code and a recurring babysitting chore. spark.sql.adaptive.enabled = true folds partition coalescing, broadcast promotion, and skew-join splitting into the engine, driven by statistics it measures after each shuffle. You still set a sensible starting shuffle.partitions and still broadcast obvious cases by hand, but the framework now handles the cases that used to require a tuning spreadsheet, and it re-tunes itself when the data changes.
3. Keep Big Data Off the Driver Beginner
One tuning rule is really a correctness rule, because violating it does not merely slow the job, it crashes it. The driver is a single machine, the one process that holds the DAG and coordinates the executors. Any operation that pulls the distributed result back to the driver (collect(), toPandas(), a large take()) tries to fit data that lived across the whole cluster into one machine's memory, and on real data it triggers an OutOfMemory error on the driver. The fix is to keep large results distributed: write them to storage with write, or reduce them to something small first.
# DANGER: pulls every row of a billion-row DataFrame into the driver's RAM.
rows = big_df.collect() # driver OutOfMemory on real data
pdf = big_df.toPandas() # same trap, same crash
# SAFE: keep the result distributed, or shrink it before collecting.
big_df.write.parquet("s3://bucket/output/") # stays across the cluster
summary = big_df.groupBy("label").count().collect() # tiny: one row per label
sample = big_df.limit(1000).toPandas() # bounded, fits in the driver
limit; never materialize cluster-scale data in the single driver process.collect() That Took Down the ClusterA reliable rite of passage for new Spark users is the celebratory df.collect() at the end of a pipeline that worked beautifully on a ten-row sample. On the full dataset, the driver inhales four hundred gigabytes it does not have, the job dies, and the logs say java.lang.OutOfMemoryError with a cheerful stack trace. The cluster is fine; the executors were barely warmed up. It was always the driver. The cure is muscle memory: if the result is big, it gets write, never collect.
4. Demonstration: Finding the Partition Sweet Spot Intermediate
The first and most common tuning decision, how many partitions, has a sweet spot that the two top rows of Table 7.9.1 only describe in words. We can see it as a curve. The model below is pure Python, no Spark required: a fixed cluster of eight worker slots runs a job of fixed total work, split into $P$ partitions. Each partition (task) carries a share of the work plus a fixed per-task overhead (launch, serialization, shuffle bookkeeping), and the tasks are deliberately uneven, because real data never splits perfectly. A greedy scheduler hands each task to the slot that is free earliest, and the stage finishes when the last slot finishes, the wall-clock rule from the Key Insight above. Sweeping $P$ traces the trade-off.
import random
WORKERS = 8 # executor slots (cores) in the cluster
TOTAL_WORK = 1000.0 # total useful compute, arbitrary time units
OVERHEAD = 0.25 # fixed cost per task (launch, serialize, shuffle bookkeeping)
SKEW = 0.6 # tasks are uneven: real data partitions never split evenly
def makespan(num_partitions):
"""Greedy list-scheduling on UNEVEN tasks. Each task costs a (skewed) share
of the work plus a fixed OVERHEAD; assign every task to the worker that
finishes earliest. Return when the LAST worker finishes (the stage wall-clock).
More tasks balance the skew better but add overhead."""
rng = random.Random(0)
raw = [1.0 + SKEW * rng.random() for _ in range(num_partitions)]
scale = TOTAL_WORK / sum(raw) # rescale so tasks sum to TOTAL_WORK
finish = [0.0] * WORKERS
for r in raw:
cost = r * scale + OVERHEAD
i = min(range(WORKERS), key=lambda w: finish[w]) # earliest-free worker
finish[i] += cost
return max(finish)
ideal = TOTAL_WORK / WORKERS # perfect split, zero overhead
print(f"workers={WORKERS} total_work={TOTAL_WORK:.0f} overhead/task={OVERHEAD}")
print(f"ideal makespan (perfect split, no overhead) = {ideal:.1f}\n")
print(f"{'partitions':>10} {'makespan':>9} {'efficiency':>11} note")
for p in [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]:
span, eff = makespan(p), ideal / makespan(p)
note = "workers idle (too few tasks)" if p < WORKERS else \
("overhead piling up" if p > 4 * WORKERS else "")
print(f"{p:>10} {span:>9.1f} {eff:>10.1%} {note}")
workers=8 total_work=1000 overhead/task=0.25
ideal makespan (perfect split, no overhead) = 125.0
partitions makespan efficiency note
1 1000.3 12.5% workers idle (too few tasks)
2 509.0 24.6% workers idle (too few tasks)
4 280.9 44.5% workers idle (too few tasks)
8 142.8 87.6%
16 137.7 90.8%
32 135.6 92.2%
64 133.2 93.8% overhead piling up
128 132.9 94.0% overhead piling up
256 134.7 92.8% overhead piling up
512 142.0 88.0% overhead piling up
1024 157.7 79.2% overhead piling up
best single point : 128 partitions, efficiency 94.0%
good plateau (>=90%): partitions 16..256 (2x..32x the 8 cores)
takeaway: aim a few-times the core count; the curve is flat there, and falls off hard at both ends.
The plateau in Output 7.9.5 is the whole point. There is no single magic number, but there is a wide safe zone at a few times the core count, with cliffs on both sides: idle cores on the left (the middle panel of Figure 7.9.1) and overhead on the right. This is exactly why spark.sql.shuffle.partitions defaults to 200 and why AQE's partition coalescing is so valuable: the goal is to land anywhere on the plateau, and the engine, watching real shuffle sizes, lands there more reliably than a fixed guess.
Hand-tuning the dozens of knobs in Table 7.9.1 across changing workloads is exactly the kind of repetitive optimization that machine learning now attacks. Adaptive Query Execution was the first step, replacing static plans with runtime-statistic re-planning, and it is now the assumed baseline. Beyond it, a research and product line aims at fully automatic configuration: cost-based and learned auto-tuners that profile a workload and search the configuration space, and cloud offerings such as Databricks' Predictive Optimization and serverless auto-scaling that size clusters and set shuffle parameters without a human in the loop. The 2024 to 2026 direction couples these with learned cardinality and cost estimators so the planner's guesses about partition sizes and join strategies improve from history rather than from static heuristics. The through-line is the same one this section teaches by hand: measure the real data, then choose the lever, with the engine increasingly doing both. The vector-search and serving chapters (Chapter 25) show the same shift from hand-tuned to learned configuration in a different distributed system.
5. Memory and Serialization: The Last Knobs Advanced
When the structural levers are exhausted (partitions sized, shuffle minimized, small sides broadcast, reuse cached, AQE on) and the job is still slow or unstable, the remaining knobs are memory and serialization. They matter, but they are last because no amount of memory tuning rescues a job that shuffles ten times more data than it needs to. Spark divides each executor's heap between execution memory (for shuffles, joins, and aggregations) and storage memory (for cached data), and the two borrow from each other under a unified manager. When a shuffle or aggregation needs more execution memory than is available, Spark spills the overflow to local disk: correct, but slow, and visible in the UI as spill bytes. Heavy spill is the signal to either give executors more memory or, often better, use more partitions so each task's working set is smaller.
# Memory and serialization knobs, turned only after the structural fixes are in.
spark.conf.set("spark.executor.memory", "8g") # heap per executor
spark.conf.set("spark.executor.memoryOverhead", "2g") # off-heap (shuffle, Python)
spark.conf.set("spark.memory.fraction", "0.6") # execution+storage share of heap
# Kryo serializes shuffle and cached data far more compactly than Java default,
# which shrinks both network bytes and the memory a cached dataset occupies.
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
Serialization deserves the same framing. Every byte that crosses the shuffle, and every cached object, is serialized first, so a more compact serializer (Kryo, rather than the Java default) directly reduces both network traffic and cache memory. It is a genuine win, but a bounded one: it makes the data movement cheaper per byte, whereas the structural levers reduce the number of bytes moved at all. That ordering, structure before knobs, is the discipline this whole section has been arguing for.
Spark is an in-memory, lazily evaluated DAG engine. It records every transformation as RDD lineage (7.2), which both defers work into a DAG the cluster runs stage by stage (7.4) and lets it recompute any lost partition for fault tolerance. The DataFrame API and the Catalyst optimizer (7.3) let you write declarative queries that compile to efficient plans, inserting map-side combines and choosing join strategies for you. Performance lives in the data layout: partitioning and caching (7.6) decide how work is split across cores and what is recomputed, while joins, shuffles, and skew (7.7) decide how much data crosses the network. PySpark (7.8) makes all of this the workhorse for distributed feature engineering and data preparation feeding AI training. This final section ties it together as a playbook: read the Spark UI, find the dominant stage, then size partitions to the core count, kill the shuffle, broadcast small sides, cache reuse, and let Adaptive Query Execution re-plan, reaching for memory and serialization knobs only last. The shuffle you just tamed here is the same primitive that returns as the gradient all-reduce when we start training models across machines in Chapter 15.
For each Spark UI observation, name the lever from Table 7.9.1 you would apply first and the section that explains why: (a) a stage has 8,000 tasks, each running for 40 milliseconds, on a 64-core cluster; (b) one task in a join stage reads 50 GB of shuffle data while the median task reads 200 MB; (c) the same prepared DataFrame is used by four separate aggregations and each one takes the full preparation time; (d) a stage on a 200-core cluster shows only 12 tasks. For each, explain in one sentence what would happen if you instead reached for a bigger cluster without applying the lever.
Take the model in Code 7.9.5 and sweep OVERHEAD over the values 0.05, 0.25, and 1.0 while keeping everything else fixed. For each, find the partition count that minimizes the makespan and the width of the 90-percent-efficiency plateau. Explain how a higher per-task overhead (a cluster with slow task launch or expensive serialization) moves the sweet spot and narrows the safe zone, and connect this to why coalescing tiny partitions, the second row of Table 7.9.1, matters more on some clusters than others.
A join has a large side of $2 \times 10^9$ rows at 200 bytes each and a small side of $5 \times 10^5$ rows at 200 bytes each. Estimate the bytes a shuffle-hash join must move (both sides shuffled by key) versus a broadcast join (the small side sent to each of 50 executors, the large side not moved at all). At a network rate of 10 GB/s, estimate the data-movement time for each strategy and the speedup from broadcasting. State the assumption that has to hold for the broadcast join to be legal, and tie your answer back to the broadcast threshold in Code 7.9.2.
Carry Chapter 7 from reading into building with one of these. Each is sized to run on a laptop in Spark local mode or a small cluster, and each exercises the full diagnose-then-tune loop.
1. Tune a skewed join end to end. Generate a two-table dataset where one join key holds 80 percent of the rows (a power-law key distribution). Run the join naively, capture the straggler in the Spark UI (the bottom panel of Figure 7.9.1), then fix it three ways: enable AQE skew-join handling, salt the hot key by hand, and broadcast the small side if it qualifies. Report the stage wall-clock and the maximum task time for each, and write up which lever helped most and why, grounding each choice in Section 7.7.
2. Build a Spark feature pipeline that feeds a training job. Take a tabular dataset (for example, a public clickstream or transactions log), and write a PySpark pipeline that cleans, joins dimension tables, and computes aggregate features per entity, caching the reused intermediate. Write the result to Parquet, then load it into a single-machine training job (scikit-learn or PyTorch). Measure how partition count and caching change the preparation wall-clock, and confirm the handoff to training matches the PySpark-for-AI pattern of Section 7.8. This pipeline is the direct on-ramp to the storage and data-loading concerns of Chapter 8.
3. Reproduce the partition-sweet-spot curve in real Spark. Take Code 7.9.5's idealized curve and reproduce it on an actual cluster: run a fixed shuffle-heavy job (a large groupBy) while sweeping spark.sql.shuffle.partitions across the same range, with AQE off, and record the stage wall-clock from the Spark UI at each setting. Plot measured wall-clock against partition count, compare the shape to Output 7.9.5, then turn AQE on and show where its coalescing lands on your curve.