Part II: Distributed Data Processing for AI
Chapter 6: The MapReduce Model and Distributed Algorithms

Distributed Sorting and Joins

"I had nine reducers and a perfectly balanced plan. Then one key showed up four million times and decided that reducer three was its permanent home."

A Shuffle That Met a Hot Key
Big Picture

Sorting and joining are the two operations that turn a pile of partitioned records into something an AI pipeline can train on, and both are won or lost in the shuffle. A distributed sort produces one global order across machines by first sampling the data to choose range boundaries, then routing each record to the partition that owns its range. A distributed join brings matching rows from two datasets onto the same machine, either by shuffling both inputs to the join key (reduce-side) or by broadcasting the smaller dataset to every worker so the larger one never moves (map-side). Both depend on the same partition-and-shuffle machinery from the rest of this chapter, and both are wrecked by the same enemy: a single key so frequent that one reducer drowns while the others idle. This section builds both operations from primitives, then shows the skew failure and the salting fix that every production pipeline carries.

The previous section built aggregation, filtering, and secondary sorting on top of the MapReduce shuffle. This section takes the two heaviest data-preparation operations an AI pipeline runs before any model sees a batch: putting records in a global order, and combining two datasets on a shared key. A feature pipeline almost never trains on raw events. It joins a user table against a clickstream, a product catalog against an impression log, a sensor registry against a telemetry feed, and then sorts the result so that downstream sampling, deduplication, and windowing are cheap. When the datasets are too large for one machine, these become distributed-sort and distributed-join problems, and the cost of doing them badly is measured in cluster-hours of idle workers waiting on one overloaded straggler.

1. Total-Order Distributed Sort Intermediate

Sorting within one reducer is trivial: the framework already delivers each reducer its keys in sorted order. The hard part is a total order across reducers, so that concatenating reducer 0, then reducer 1, then reducer 2 yields one globally sorted stream. A hash partitioner cannot do this; hashing scatters adjacent keys to arbitrary reducers. What we need is a range partitioner: reducer 0 takes the smallest slice of the key space, reducer 1 the next, and so on. The output files, concatenated in reducer order, are then globally sorted with no final merge step.

The catch is choosing the range boundaries. If we split the key space into equal-width intervals, a skewed distribution sends most records to one reducer. The fix, used by the canonical TeraSort benchmark that sorts a terabyte across a cluster, is to sample the data first. Draw a small random sample, sort it, and read off the boundaries at evenly spaced quantiles of the sample. With $R$ reducers we want $R - 1$ split points placed so that each of the $R$ resulting ranges holds roughly $N / R$ records. If the sample has size $m$, the split point for reducer boundary $j$ is the order statistic at rank

$$ \text{rank}_j = \left\lfloor \frac{j \cdot m}{R} \right\rfloor, \qquad j = 1, 2, \dots, R - 1. $$

Because the sample is drawn from the same distribution as the data, the quantiles of the sample approximate the quantiles of the full dataset, so each reducer receives a near-equal share whatever the underlying distribution looks like. This is the entire idea: a cheap sampling pass buys a balanced range partition, and a balanced range partition turns a global sort into independent local sorts with no merge. We quantify why balance matters so much in the cost models of Chapter 3; the short version is that the slowest reducer sets the wall-clock, so an unbalanced sort runs at the speed of its busiest machine.

Key Insight: Sampling Converts a Global Sort into Independent Local Sorts

A distributed total-order sort never performs a distributed merge. It performs one cheap sampling pass to learn the data distribution, builds a range partitioner whose boundaries are the sample quantiles, shuffles each record to the partition owning its range, and sorts each partition locally. Concatenating the partitions in range order is already globally sorted. The sample is what makes the ranges balanced; without it, a skewed key distribution would pile most records onto one reducer and the sort would run at the speed of that single machine.

2. Two Ways to Join Across Machines Intermediate

A join combines rows from two datasets that share a key: a left dataset $A$ and a right dataset $B$, emitting one output row for every pair $(a, b)$ where $a$ and $b$ carry the same join key. On one machine this is a hash-table lookup. Across machines the difficulty is physical: the matching rows of $A$ and $B$ start on different nodes, and a join can only happen where both rows are present. There are two ways to make them present, and choosing between them is the single most important join decision you make.

The reduce-side join (also called the repartition join) is the general method. The map phase reads both inputs and tags each row with which dataset it came from, emitting the join key as the shuffle key. The shuffle co-locates every row of $A$ and every row of $B$ that share a key onto the same reducer, by the same partitioning that drives every other MapReduce operation. The reducer then separates the two tagged groups and emits their cross product. It works for any two datasets of any size, and it pays for that generality by shuffling both inputs across the network in full.

The map-side join (also called the broadcast or replicated join) is the fast path available when one dataset is small. Instead of shuffling both inputs, you ship the entire small dataset to every mapper, where it becomes an in-memory lookup table. Each mapper then streams its shard of the large dataset and probes the lookup table directly. The large dataset never moves; only the small one is replicated. Sending one dataset to every worker is exactly the broadcast collective introduced in Chapter 4, reused here as a data-processing primitive: the same one-to-all movement that distributes model weights to workers also distributes a lookup table to mappers. Figure 6.5.1 contrasts the two dataflows.

Reduce-side join: shuffle both inputs Table A (big) Table B (big) Shuffle by join key Reducer 0 key in [a, m) Reducer 1 key in [m, z] both inputs cross the network Broadcast join: replicate the small input Table B (small) broadcast to all Mapper 0 A-shard + B copy Mapper 1 A-shard + B copy Joined rows Joined rows Table A never moves; no shuffle
Figure 6.5.1: The two join strategies. Left: a reduce-side join tags and shuffles both tables so that matching keys land on the same reducer, paying full network cost on both inputs. Right: a broadcast join replicates the small table $B$ to every mapper (the broadcast collective of Chapter 4), so the large table $A$ stays where it is and no shuffle happens. When $B$ fits in a mapper's memory, the right-hand path is dramatically cheaper.

The decision rule is mechanical. If one dataset is small enough to fit in a worker's memory (a few hundred megabytes is a typical threshold), broadcast it and do a map-side join; the large dataset never touches the network. If both datasets are large, you have no choice but the reduce-side join and its full double shuffle. Query engines like Spark, which Chapter 7 takes up, make this choice automatically from table-size statistics, but the underlying mechanics are exactly the two paths in Figure 6.5.1.

3. Both Joins, Side by Side in Code Intermediate

The code below implements both joins over the same two small tables: a tiny users entity table and a larger events fact table, the canonical shape of an AI feature join. The reduce-side path tags both inputs, partitions by a hash of the key, and emits the cross product inside each reducer. The broadcast path turns users into a dictionary shipped to every mapper and probes it while streaming events. We then check that the two strategies produce identical output, which they must: the join result is defined by the data, not by the route the rows took to meet.

import collections, random

users = [("u1", "Ada"), ("u2", "Lin"), ("u3", "Mat")]          # entity table (small)
events = [("u1", "click"), ("u2", "view"), ("u1", "purchase"),
          ("u3", "click"), ("u2", "click"), ("u1", "view")]    # fact table (large)

# ---- Reduce-side join: tag both inputs, shuffle by key, emit cross product ----
def map_tag(rows, side):
    for key, payload in rows:
        yield key, (side, payload)               # tag which dataset the row is from

mapped = list(map_tag(users, "U")) + list(map_tag(events, "E"))

R = 3
buckets = collections.defaultdict(list)
for key, val in mapped:
    buckets[hash(key) % R].append((key, val))    # shuffle: hash partition to R reducers

def reduce_join(pairs):                           # runs inside one reducer
    by_key = collections.defaultdict(lambda: {"U": [], "E": []})
    for key, (side, payload) in pairs:
        by_key[key][side].append(payload)
    for key, sides in by_key.items():
        for u in sides["U"]:
            for e in sides["E"]:
                yield (key, u, e)                 # cross product of the two groups

reduce_side = sorted(row for r in range(R) for row in reduce_join(buckets[r]))

# ---- Broadcast (map-side) join: ship the small table to every mapper ----
broadcast = dict(users)                           # the broadcast collective, Ch 4
map_side = sorted((key, broadcast[key], payload)
                  for key, payload in events if key in broadcast)

print("reduce-side join rows :", len(reduce_side))
for row in reduce_side:
    print("   ", row)
print("\nbroadcast join rows   :", len(map_side))
print("matches reduce-side   :", map_side == reduce_side)
Code 6.5.1: The reduce-side and broadcast joins over the same two tables. The reduce-side path shuffles both inputs through hash buckets; the broadcast path replicates users and never shuffles events. The final comparison confirms both routes compute the identical join.
reduce-side join rows : 6
    ('u1', 'Ada', 'click')
    ('u1', 'Ada', 'purchase')
    ('u1', 'Ada', 'view')
    ('u2', 'Lin', 'click')
    ('u2', 'Lin', 'view')
    ('u3', 'Mat', 'click')

broadcast join rows   : 6
matches reduce-side   : True
Output 6.5.1: Both strategies emit the same six joined rows. The broadcast join reached this result without shuffling the fact table at all, which is why it wins whenever the entity table is small enough to replicate.

4. Data Skew: the Dominant Failure Mode Advanced

Sort and join both rest on the same assumption: that the partitioner spreads keys evenly across reducers. Real data violates this assumption constantly. A handful of keys (a viral product, a bot account, a null placeholder, a default user id) appear orders of magnitude more often than the rest. When such a hot key hashes to one reducer, that reducer receives a disproportionate share of all rows while its peers sit nearly empty. Because the job finishes only when its slowest reducer finishes, one hot key turns a balanced plan into a single-machine bottleneck. This is precisely the straggler phenomenon analyzed in Chapter 2, here induced not by a slow machine but by an uneven data distribution: the machine is fine, its workload is not.

The standard remedy is salting. Append a small random suffix to the hot key so that what was one key becomes $S$ synthetic sub-keys, $\texttt{hot\#0}$ through $\texttt{hot\#}(S-1)$, which the partitioner scatters across up to $S$ reducers. The hot key's rows are then split into $S$ roughly equal streams instead of piling onto one machine. For an aggregation you finish with a second pass that re-combines the salted sub-keys; for a join you replicate the matching side across all $S$ salt values so every sub-stream can still find its partners. The cost is modest extra bookkeeping; the benefit is converting a single overloaded reducer into $S$ balanced ones. The code below makes a key 80 percent of the rows and measures the imbalance before and after salting.

random.seed(0)
N = 100_000
keys = (["hot"] * (N * 80 // 100)                                   # 80% one hot key
        + [f"k{random.randint(1, 5000)}" for _ in range(N * 20 // 100)])

R = 3
load = collections.Counter(hash(k) % R for k in keys)              # rows per reducer
print("rows per reducer (hash):", dict(sorted(load.items())))
print("busiest reducer share :", f"{max(load.values()) / N:.0%}")

S = 8                                                              # salt buckets
def salt(k):
    return f"{k}#{random.randint(0, S - 1)}" if k == "hot" else k  # spread the hot key

load_salted = collections.Counter(hash(salt(k)) % R for k in keys)
print("after salting hot key :", dict(sorted(load_salted.items())))
print("busiest reducer share :", f"{max(load_salted.values()) / N:.0%}")
Code 6.5.2: A skew demonstration. One key holds 80 percent of the rows; plain hashing sends all of them to a single reducer. Salting the hot key into eight sub-keys redistributes its rows across the reducers, flattening the load.
rows per reducer (hash): {0: 86560, 1: 6878, 2: 6562}
busiest reducer share : 87%
after salting hot key : {0: 26680, 1: 26905, 2: 46415}
busiest reducer share : 46%
Output 6.5.2: Before salting, reducer 0 carries 87 percent of all rows while the other two idle, so the job runs at the speed of one machine. After salting the hot key into eight sub-keys, the busiest reducer drops to 46 percent, roughly halving the wall-clock bottleneck; pushing $S$ higher and balancing across more reducers flattens it further.
Fun Note: The Null That Ate the Cluster

A staggering fraction of real-world skew incidents trace to a single value: the join key was missing, the pipeline filled it with a default (a literal null, an empty string, or the dreaded user id 0), and suddenly every orphaned row in a billion-row table shared one key. The first thing a seasoned data engineer checks when one reducer runs ten times longer than the rest is not the cluster; it is whether the hot key is simply the absence of a key wearing a costume.

Practical Example: The Feature Join That Stalled on One Celebrity

Who: A data engineer building the training-feature pipeline for a social recommendation model.

Situation: A nightly job joined a 4-billion-row interaction log against a user table to attach profile features before training.

Problem: The reduce-side join finished 199 of 200 reducer tasks in twelve minutes, then sat for over an hour on the last one.

Dilemma: Throw more reducers at the job, which does nothing because the hot key still lands on one of them, or restructure the join itself to break up the offending key.

Decision: They inspected the straggling reducer's key distribution and found one celebrity account holding 6 percent of all interactions, a textbook hot key.

How: They salted that account's id into sixteen sub-keys on the interaction side, replicated its single profile row across all sixteen salt values on the user side, and unioned the results, exactly the pattern in Code 6.5.2 applied to a join.

Result: The straggling task split into sixteen balanced tasks and the job's wall-clock fell from seventy-five minutes to fourteen, with byte-identical output.

Lesson: Adding machines cannot fix skew, because the bottleneck is one key, not one slow node. The fix is to break the key, not to grow the cluster.

Research Frontier: Adaptive Skew Handling (2024 to 2026)

Hand-salting hot keys is effective but manual, so modern engines automate it. Spark's Adaptive Query Execution detects skewed shuffle partitions at runtime and splits them on the fly, and the related dynamic-partition-pruning and skew-join optimizations have continued to mature through the Spark 3.5 and 4.0 line (2024 to 2025). Recent work on learned and sketch-based partitioning estimates the key-frequency distribution with compact data structures (Count-Min and HyperLogLog sketches) during the map phase, then builds a partitioner that pre-splits predicted hot keys before the shuffle rather than reacting after a straggler appears. The same skew-aware partitioning ideas are crossing into distributed deep learning, where uneven token-to-expert routing in mixture-of-experts models is the direct analog of a hot join key, a connection this book returns to with the expert-parallelism load-balancing losses of Part IV. The throughline is that the field now treats skew as a first-class quantity to be measured and partitioned around, not an exception to be patched after the job stalls.

Library Shortcut: Spark Picks the Join and Fights the Skew for You

Code 6.5.1 and Code 6.5.2 spell out the tagging, partitioning, and salting by hand to show the mechanics. In a production query engine you write the join as a single expression and the optimizer chooses the strategy and handles skew from table statistics:

from pyspark.sql import functions as F

# Spark reads the small table's size and auto-selects a broadcast (map-side) join;
# the hint just makes the choice explicit. The large table is never shuffled.
joined = events.join(F.broadcast(users), on="user_id", how="inner")

# For a large-large join, Adaptive Query Execution detects and splits skewed
# partitions at runtime, no manual salting needed:
#   spark.sql.adaptive.enabled = true
#   spark.sql.adaptive.skewJoin.enabled = true
Code 6.5.3: The same broadcast join and skew mitigation as Code 6.5.1 and 6.5.2, now one method call plus two config flags. Roughly forty lines of manual tagging, partitioning, and salting collapse into the engine's optimizer, which Chapter 7 unpacks; you specify the join, the framework picks map-side versus reduce-side and splits hot partitions itself.

Sorting and joining complete the core distributed-algorithm toolkit of this chapter: you can now group, aggregate, order, and combine datasets too large for one machine, and you know the one failure mode (skew) that threatens all of them and the one fix (salting, automated or manual) that tames it. The next section turns to a different family of distributed computations, where the answer is not a reorganized dataset but a small extract or a numerical product: top-K selection, distributed matrix multiplication, and the PageRank iteration that ties them together. That tour begins in Section 6.6.

Exercise 6.5.1: Choose the Join Conceptual

For each pair of datasets, state whether you would use a reduce-side join or a broadcast join, and justify the choice from the size of each input: (a) a 2-terabyte clickstream joined against a 40-megabyte product catalog; (b) a 900-gigabyte user table joined against a 1.2-terabyte interaction log; (c) a 5-gigabyte device registry joined against a 300-gigabyte telemetry stream, on a cluster whose workers have 16 gigabytes of memory each. For any case where the answer is not obvious, name the specific quantity you would measure to decide.

Exercise 6.5.2: Salt a Join, Not Just an Aggregation Coding

Code 6.5.2 salts a hot key for a load-balancing demonstration but never completes a join with the salted keys. Extend the reduce-side join of Code 6.5.1 so that one user id holds most of the events, then salt that id on the events side into $S$ sub-keys. For the join to stay correct, you must replicate the matching users row across all $S$ salt values; implement that replication and verify the salted join produces exactly the same set of output rows as the unsalted join. Report the busiest reducer's share before and after, as in Output 6.5.2.

Exercise 6.5.3: Why Sampling Beats Equal-Width Ranges Analysis

Consider sorting $N$ keys drawn from a heavily skewed distribution (for example, integers where 90 percent of values fall below 100 but the range extends to 1,000,000) across $R$ reducers. Estimate the load on the busiest reducer if you partition into $R$ equal-width ranges over the full key span, versus if you sample $m$ keys and set the boundaries at the sample quantiles using the rank formula from Section 1. Argue from your two estimates why TeraSort samples, and explain how the sample size $m$ trades accuracy of the boundaries against the cost of the sampling pass. Relate the busiest-reducer load to the wall-clock model of Chapter 3.