Part II: Distributed Data Processing for AI
Chapter 7: Spark and Distributed DataFrames

Joins, Shuffles, and Data Skew

"Seven of my tasks finished in nine seconds. The eighth is still running, and it has started leaving the cluster passive-aggressive notes about how it carries this team."

A Stage Waiting on Its Last Task
Big Picture

Almost every Spark job that runs slower than you expected is paying for a shuffle, and almost every shuffle that runs slower than its peers is paying for skew. A join asks a deceptively simple question, "for each key on the left, find the matching rows on the right," but answering it across a cluster forces a choice: either copy the small table to every machine and avoid moving the big one, or move both tables across the network so that matching keys land together. The first choice, a broadcast hash join, has almost no network cost; the second, a sort-merge join, pays the full price of a shuffle. When the keys are evenly spread, that shuffle is merely expensive. When one key is far more common than the rest, the shuffle becomes lopsided: most of the data lands on one task, that task becomes a straggler, and the whole stage waits for it. This section explains how Spark's optimizer chooses a join strategy, why the shuffle is the dominant cost in distributed data processing, and how three fixes, salting, broadcasting, and Adaptive Query Execution, turn a stalled stage back into a balanced one.

In Section 7.6 we controlled how a DataFrame is laid out across the cluster: partitioning by a column, caching a reused result, and choosing a sensible number of partitions. Those controls matter most when two DataFrames meet in a join, because a join is where Spark must reconcile two independent layouts into one. This section takes the join apart. We first separate the two physical strategies Spark can use, then isolate the shuffle as the cost that dominates everything else, and finally confront the failure mode that turns a healthy shuffle into a stalled one: data skew. The join material here deepens the reduce-side and broadcast joins introduced for raw MapReduce in Section 6.5; Spark adds a cost-based optimizer that chooses between them for you, and a runtime engine that can repair a bad choice mid-flight.

1. Two Ways to Join Across Machines Beginner

Suppose you are joining a large table of user events (billions of rows) against a small dimension table of user profiles (a few million rows), matching on a user key. There are two fundamentally different ways to place this work on a cluster, and they differ in exactly one quantity: how much data crosses the network.

The first strategy is the broadcast hash join. Because the profile table is small, Spark ships a full copy of it to every executor, where each executor builds an in-memory hash table keyed by the user key. The large events table never moves; each executor probes the broadcast hash table with the events rows it already holds. No shuffle of the big table is required. This is the cheapest join Spark can perform, and it is available whenever one side is small enough to fit in each executor's memory.

The second strategy is the sort-merge join (and its cousin the shuffle hash join). When neither side is small enough to broadcast, Spark must instead bring matching keys together by shuffling both sides: every row is hashed by its join key and sent to the task that owns that key's partition, so that all rows for a given key, from both tables, end up on the same task. Each task then sorts its partition by key and walks the two sorted streams in lockstep, emitting matches. This works for arbitrarily large inputs, but it moves both tables across the network, which is the expensive part.

Broadcast hash join (no shuffle) small table Exec 1 big shard 1 Exec 2 big shard 2 Exec 3 big shard 3 small table copied to every executor; big table never moves Sort-merge join (full shuffle) big A big B Task k1 key % T = 0 Task k2 key % T = 1 Task k3 key % T = 2 both tables hashed by key and shuffled; matching keys meet on the same task network cost grows with both table sizes
Figure 7.7.1: The two physical join strategies. On the left, a broadcast hash join copies the small table to every executor (orange arrows) while the large table stays put, so no big-table shuffle occurs. On the right, a sort-merge join hashes both tables by the join key and shuffles them (blue arrows from table A, orange from table B) so that matching keys arrive on the same task. The broadcast join moves a few million rows once; the sort-merge join moves billions of rows across the network.

The decision between them rests on one threshold. If the smaller side fits comfortably in executor memory, broadcast it and skip the shuffle entirely; otherwise, shuffle both sides. Spark formalizes this threshold, as we see next, and Figure 7.7.1 makes the contrast concrete: the same logical join, two radically different physical plans.

2. How the Optimizer Picks, and the Broadcast Threshold Intermediate

Spark's Catalyst optimizer chooses the physical join from an estimate of each side's size in bytes. The governing knob is spark.sql.autoBroadcastJoinThreshold, which defaults to 10 megabytes. If Catalyst estimates that one side of the join, after all filters and projections, is smaller than this threshold, it plans a broadcast hash join; otherwise it falls back to a sort-merge join. The logic is a direct cost comparison. Broadcasting a table of $b$ bytes to $E$ executors moves $b \cdot E$ bytes over the network, which is cheap when $b$ is small. Shuffling two tables of sizes $S_L$ and $S_R$ moves roughly $S_L + S_R$ bytes, which is enormous when both are large. Broadcasting wins precisely when

$$b \cdot E \;\ll\; S_L + S_R,$$

and since $E$ is at most a few hundred while $S_L + S_R$ can be terabytes, a small $b$ makes the left side of that inequality tiny. The threshold encodes a safe value of $b$ below which the broadcast is always worth it.

Two practical wrinkles matter. First, the estimate can be wrong. If Spark lacks column statistics, it may overestimate a table's size and skip a broadcast that would have been ideal, or underestimate it and attempt a broadcast that runs the driver out of memory. Second, you can override the optimizer with an explicit hint when you know better than the statistics do, which we show in the library shortcut below. The threshold and the hint together give you both an automatic default and a manual escape hatch.

Key Insight: The Join Strategy Is a Bet on One Number

Every Spark join is, underneath, a single decision: is one side small enough to broadcast? If yes, you pay $b \cdot E$ bytes of network and no shuffle. If no, you pay $S_L + S_R$ bytes of shuffle, sorting, and the risk of skew. The entire art of fast joins is making the small side genuinely small (filter and project before the join so the broadcast estimate shrinks) and giving the optimizer accurate statistics so its bet is informed. A broadcast join that should have fired but did not is one of the most common causes of a Spark job that is ten times slower than it needs to be.

Library Shortcut: broadcast() and the AQE Settings

You rarely implement a join strategy by hand in Spark; you hint and configure. The broadcast() function forces a broadcast hash join regardless of the size estimate, and a handful of spark.sql.adaptive settings let the runtime fix skew on its own. The illustrative snippet below shows both:

from pyspark.sql.functions import broadcast

# Force a broadcast hash join: ship `profiles` to every executor, no shuffle of events.
joined = events.join(broadcast(profiles), on="user_id", how="inner")

# Let the runtime detect and split skewed partitions during the shuffle.
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# Raise the auto-broadcast threshold to 64 MB when executors have the memory.
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 64 * 1024 * 1024)
Code 7.7.1: The production join controls. One broadcast() wrapper pins the strategy; three config lines hand the skew problem to Adaptive Query Execution. What would be hundreds of lines of manual partitioning, replication, and straggler detection collapses into a hint and three settings, with Spark handling the network transport, hash-table construction, and runtime partition splitting internally.

3. The Shuffle Is the Dominant Cost Intermediate

Why does the sort-merge join cost so much more than the broadcast join? Because of the shuffle, the same all-to-all data exchange that powered the reduce phase of MapReduce in Section 6.5. A shuffle writes every row to local disk, partitioned by destination, then transfers those partitions across the network to the tasks that will consume them. For a cluster with $T$ tasks, a shuffle is a quadratic communication pattern: in principle every task can send data to every other task, so the number of transfer streams scales as $T^2$. Each row pays for serialization, a disk write on the map side, a network hop, and a disk read and deserialization on the reduce side. Computation, by contrast, is often just a comparison or a hash lookup per row. The ratio is lopsided enough that for most join-heavy and aggregation-heavy jobs, the shuffle dominates the wall-clock time.

This is the same lesson that runs through the whole book, seen from the data-processing side: communication is the tax on distribution. In Chapter 3 we model that tax with an alpha-beta cost equation; here it appears as the concrete bytes a shuffle moves. The MapReduce shuffle of Section 6.5 returns later still as the gradient all-reduce of data-parallel training in Chapter 15, where the "shuffle" of a training step is the all-reduce that synchronizes gradients. Same primitive, same dominant cost, different payload.

Fun Note: The 200-Partition Mystery

For years, Spark users wondered why their shuffles always produced exactly 200 partitions regardless of data size. The answer is a single default, spark.sql.shuffle.partitions = 200, chosen long ago and never tuned to your data. On a tiny dataset it spawns 200 nearly empty tasks; on a huge one it makes each task swallow far too much. Adaptive Query Execution finally retired the mystery by coalescing those partitions to a sensible count at runtime, which is why turning AQE on often speeds up a job you never explicitly tuned.

4. Data Skew: When One Key Stalls the Stage Intermediate

A shuffle assumes the keys are roughly evenly distributed, so that each of the $T$ tasks receives about $1/T$ of the rows. Real data rarely cooperates. A "country" key is dominated by a few populous countries; a "user_id" join against an events table is dominated by a handful of power users and bots; a "null" key absorbs every unmatched row. When one key carries most of the rows, the shuffle sends most of the data to the single task that owns that key, and that task becomes a straggler: it runs long after its peers have finished, and the stage cannot complete until it does. This is exactly the straggler phenomenon introduced in Section 2.7, now arising from data rather than from a slow machine. A stage is only as fast as its slowest task, so a single hot key can make a 64-machine cluster run at the speed of one core.

The demonstration below makes the straggler visible without a real cluster. It builds a left table whose join key is heavily skewed (one "hot" key carries 92 percent of the rows) and models the shuffle that a sort-merge join performs: each row goes to task $\text{hash(key)} \bmod T$. It counts the rows landing on each task, then applies salting, the standard fix, in which the hot key is split into many sub-keys by appending a random salt, so its rows fan out across all tasks instead of piling onto one. The small right side is replicated once per salt value so the matches still find each other.

from collections import Counter
import random, hashlib

NUM_TASKS = 8
HOT_KEY = "country=US"

# Left side (events) with a heavily skewed join key: one hot key dominates.
left_rows = []
for i in range(1_000_000):
    if i % 100 < 92:                 # 92% of rows carry the hot key
        left_rows.append(HOT_KEY)
    else:
        left_rows.append(f"country=C{i % 200}")   # 200 cold keys

def task_of(key):
    """A shuffle sends every row with a given key to one task: hash(key) % T.
    A seed-independent hash (md5) is used so this demo reproduces exactly."""
    h = int.from_bytes(hashlib.md5(key.encode()).digest()[:4], "big")
    return h % NUM_TASKS

# Plain sort-merge join: partition by the raw key.
plain = Counter(task_of(k) for k in left_rows)

# Salted join: the hot key splits into SALT sub-keys that hash independently,
# so its rows fan out across tasks; the small right side is replicated per salt.
SALT = 4 * NUM_TASKS
random.seed(0)

def task_of_salted(key):
    if key == HOT_KEY:
        return task_of(f"{key}#{random.randrange(SALT)}")
    return task_of(key)

salted = Counter(task_of_salted(k) for k in left_rows)
Code 7.7.2: A skewed join and its salting fix in pure Python. plain models the per-task row counts of an unsalted sort-merge join; salted splits the hot key into SALT independent sub-keys so the same rows spread across every task. No Spark is needed to see the load imbalance and its repair.
rows=1,000,000  tasks=8  distinct keys=17

BEFORE salting (sort-merge join, hot key on one task):
  task 0: 940,000  ########################################
  task 1:       0
  task 2:   5,000
  task 3:   5,000
  task 4:  10,000
  task 5:  20,000  #
  task 6:   5,000
  task 7:  15,000  #
  busiest task holds  94.0% of all rows (perfect balance = 12.5%)

AFTER salting the hot key across all tasks:
  task 0: 221,181  ########################################
  task 1: 143,832  ##########################
  task 2:  91,141  ################
  task 3:  62,429  ###########
  task 4: 153,820  ############################
  task 5: 106,385  ###################
  task 6: 148,576  ###########################
  task 7:  72,636  #############
  busiest task holds  22.1% of all rows (perfect balance = 12.5%)
Output 7.7.2: Before salting, task 0 holds 94 percent of all rows and becomes the straggler that stalls the stage; the other seven tasks finish almost immediately and wait. After salting, the busiest task holds 22.1 percent, the hot key now spread across the cluster instead of crushing one task. The balance is not yet perfect because the salt count is finite, but the straggler is gone.

The numbers tell the whole story. With the raw key, one task carries 94 percent of the work while task 1 sits completely idle; the stage runs at the speed of that one overloaded task. After salting, no task exceeds 23 percent, and the work is spread across all eight. The same logical join now finishes in a fraction of the time, because no single task is the bottleneck. Figure 7.7.2 visualizes this before-and-after rebalance.

Before salting: one straggler 94% task 0 holds nearly all rows; the other seven wait for it After salting: balanced 22% the hot key fans out across tasks; no single straggler remains
Figure 7.7.2: The salting rebalance from Output 7.7.2, drawn as per-task load. Before salting (left), task 0 towers over the rest at 94 percent of all rows while seven tasks sit nearly empty, a textbook straggler. After salting (right), the hot key's rows fan out so the busiest task holds only 22 percent and the cluster works in parallel. The bars need not be exactly equal; eliminating the single tall bar is what recovers the speed.

Salting is the manual fix, and it is worth understanding because it shows exactly what the runtime does for you. The three mitigations in practice are: broadcast the small side when it fits, which sidesteps the shuffle and therefore the skew entirely; salt the hot key when both sides are large, as above; and let Adaptive Query Execution split the skewed partition at runtime, which automates the salting without a code change. We turn to that automated path next.

Practical Example: The Feature Join That Ran All Night

Who: A data engineer building the training-feature table for a fraud-detection model at a payments company.

Situation: A nightly Spark job joined two billion transaction events against an account-features table on account_id, producing the labeled feature set the model trained on each morning.

Problem: The job's final stage routinely ran for six hours, with the Spark UI showing 199 tasks finished in under two minutes and one task still running for the remaining six hours.

Dilemma: Throw more executors at the job (which would not help, since one task was the bottleneck and idle executors cannot share its load), or diagnose why a single task carried almost all the work.

Decision: They inspected the join key and found a sentinel account_id = 0 assigned to every guest checkout, which accounted for 80 percent of all events; the join was funneling all of them to one task, a straggler exactly as in Section 2.7.

How: They enabled spark.sql.adaptive.skewJoin.enabled and, because the features table was under 50 megabytes after projection, also raised the broadcast threshold so the join became a broadcast hash join with no shuffle at all.

Result: The stage fell from six hours to eleven minutes, and the morning model retraining stopped missing its deployment window.

Lesson: A slow Spark stage with one long-running task is almost never a shortage of machines; it is skew. Look at the key distribution first, then broadcast or split before you scale the cluster.

5. Adaptive Query Execution Repairs Skew at Runtime Advanced

Salting works, but it requires you to know the hot key in advance and to rewrite the join. Adaptive Query Execution (AQE), enabled by default since Spark 3.2, removes both requirements. AQE re-optimizes a query plan using statistics gathered at runtime, after each shuffle has written its data and the true partition sizes are known rather than estimated. For skew specifically, AQE compares each shuffle partition against the median partition size; when a partition is both far larger than the median and above an absolute byte threshold, AQE splits that one oversized partition into several smaller sub-partitions and runs them as separate tasks. This is salting performed automatically and only where it is needed, with the matching side replicated to preserve correctness. AQE also coalesces the many tiny partitions left by the default 200-partition setting and can switch a planned sort-merge join to a broadcast join once it sees a side is small enough, all without a code change.

The relevant settings are spark.sql.adaptive.enabled and spark.sql.adaptive.skewJoin.enabled from Code 7.7.1, plus spark.sql.adaptive.skewJoin.skewedPartitionFactor (how many times the median a partition must exceed to count as skewed, default 5) and spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (the absolute floor, default 256 megabytes). A partition is treated as skewed only when it crosses both bars, which prevents AQE from over-splitting on a dataset that merely has a few moderately uneven partitions.

Research Frontier: Vectorized Engines and Smarter Skew Handling (2024 to 2026)

Two active lines are reshaping the shuffle. The first is the move to vectorized, native execution engines: Databricks Photon (Behm et al., 2022) rewrites Spark's row-at-a-time SQL operators as columnar, SIMD-accelerated C++ kernels, and the open-source efforts Apache Gluten and Apache DataFusion Comet (2024 to 2025) push the same vectorized execution into community Spark, cutting the per-row serialization cost that makes shuffles expensive. The second is smarter, data-aware skew handling: recent work pushes AQE toward sampling key distributions earlier and choosing per-join salt factors automatically, and toward push-based shuffle (Magnet, merged into Spark 3.2) that pre-merges shuffle blocks on the reduce side to cut the $T^2$ small-fetch overhead. The throughline is that the shuffle remains the cost center of distributed data processing, so the frontier is making each shuffled byte cheaper and making skewed bytes rarer. We meet the same pressure again as gradient compression in distributed training in Chapter 15.

Thesis Thread: The Shuffle Is the Scale-Out Tax, Again

The shuffle behind a sort-merge join is the same all-to-all exchange that this book keeps returning to. It is the reduce-side data movement of MapReduce in Section 6.5, the all-reduce that synchronizes gradients in data-parallel training in Chapter 15, and the all-to-all that routes tokens between experts in mixture-of-experts models later in Part IV. Each time, the lesson is identical: distribution buys you parallel compute but charges you for moving data between machines, and the engineering that matters is minimizing and balancing that movement. Skew is what happens when the movement is not balanced, and salting, broadcasting, and AQE are three ways to rebalance it.

With joins, shuffles, and skew understood, you have the core of Spark performance tuning: most slow jobs are a missed broadcast or an unhandled hot key, and both now have names and fixes. The next section turns from tuning Spark for general data work to using it specifically for AI: assembling training features, preprocessing at scale, and feeding distributed model training, which Section 7.8 takes up.

Exercise 7.7.1: Pick the Join Strategy Conceptual

For each join, state whether Spark should use a broadcast hash join or a sort-merge join, and why: (a) a 5-terabyte clickstream table joined against a 2-megabyte table of campaign metadata; (b) two 800-gigabyte tables of user events from different sources joined on user_id; (c) a 1-terabyte orders table joined against a 40-megabyte products table when spark.sql.autoBroadcastJoinThreshold is left at its 10-megabyte default. For (c), name the one config change that would convert a slow sort-merge join into a fast broadcast join, and the risk it introduces.

Exercise 7.7.2: Measure and Mitigate Skew Coding

Extend Code 7.7.2 so that the right (dimension) side is modeled too: give the hot key one matching row and replicate it once per salt value, then verify that the salted join still produces the same total number of matched pairs as the unsalted join (correctness must survive the rebalance). Next, sweep the salt factor over the values 1, 2, 8, 32, and 128 and plot or print the busiest task's share at each. Explain the diminishing returns you observe and why an unboundedly large salt factor is not free.

Exercise 7.7.3: When Does Broadcast Beat Shuffle? Analysis

Using the cost comparison $b \cdot E \ll S_L + S_R$ from Section 2, suppose you have $E = 200$ executors, a large left table of $S_L = 2{,}000$ gigabytes, and a right table you can shrink by filtering and projecting. At what right-table size $b$ does the broadcast cost $b \cdot E$ equal the shuffle cost $S_L + b$? Below that crossover, broadcasting moves less total data; above it, shuffling does. Compute the crossover $b$, compare it to the 10-megabyte default threshold, and explain why Spark's default is far more conservative than the raw break-even point. We make this kind of network-cost estimate rigorous in Chapter 3.