Part II: Distributed Data Processing for AI
Chapter 6: The MapReduce Model and Distributed Algorithms

Aggregation, Filtering, and Secondary Sorting

"They asked me to count things. Billions of things. I learned to add a little before I spoke, so the network would not hate me."

A Combiner That Pre-Aggregated Out of Politeness
Big Picture

Most of the data wrangling that feeds an AI system is one of three patterns: aggregate many records into a summary, filter records down to the ones that matter, or arrange a key's records into a meaningful order. MapReduce expresses all three, and the cost of each is decided almost entirely by how much data crosses the network in the shuffle. Aggregation is cheap when the combine function is associative and commutative, because then a worker can pre-aggregate its own slice before sending anything, collapsing millions of rows into a handful of partial sums. Filtering is cheaper still: it needs no shuffle at all and runs as a map-only job. Secondary sorting is the subtle one, the trick that lets a reducer see a key's values already in order (for example, every user's events in time order), and it is built entirely from how we shape the key and how we partition, sort, and group on it. This section makes all three concrete, with a runnable group-by-plus-secondary-sort job and the diagram that shows why the composite key works.

The previous section built the canonical word-count job and established the map, shuffle, reduce rhythm of Section 6.2. Word count is a special case of a much larger family: take a stream of key-value records, group them by key, and compute something per group. Counting is the simplest per-group computation, but the same skeleton computes sums, means, minimums, maximums, label distributions, and feature statistics over datasets too large for one machine. This section names the patterns that family contains and, just as importantly, the optimizations that decide whether a job finishes in minutes or in hours. The governing fact, established for word count and reinforced here, is that the shuffle (moving every map output to its reducer) is the expensive step, so every technique below is ultimately about shrinking or skipping the shuffle.

1. Distributed Aggregation and the Combiner Beginner

Aggregation means reducing many records that share a key into one summary per key: the count of events per user, the sum of clicks per advertisement, the mean dwell time per video. In the naive MapReduce form, each mapper emits one key-value pair per input record, the shuffle ships all of those pairs to reducers grouped by key, and each reducer folds its group into a single number. For word count over a billion-word corpus this means a billion pairs crossing the network, even though the final answer has only as many rows as there are distinct words. That is wasteful, and the waste is fixable whenever the reduce function has the right algebraic shape.

The fix is the combiner: a mini-reducer that runs on the map side, inside each mapper, before the shuffle. It folds the pairs a single mapper produced into per-key partial aggregates, so that the mapper sends one partial per key instead of one pair per record. The reducer then folds the partials from all mappers into the final answer. The combiner is allowed to run zero, one, or many times on any subset of the data, which is why it is safe only when the aggregation function is both associative and commutative. Writing the per-key reduce as a binary operator $\oplus$ over partial results, those two properties are

$$(a \oplus b) \oplus c = a \oplus (b \oplus c), \qquad a \oplus b = b \oplus a,$$

so the engine may regroup and reorder the partials freely and still reach the same result. Sum, count, min, max, and any combination of them as a fixed-size tuple satisfy this. A mean does not directly, because averaging an average of averages is wrong, but a mean becomes combinable the moment you carry the pair $(\text{count}, \text{sum})$ through the pipeline and divide only at the very end. This pattern, push a $(\text{count}, \text{sum})$ aggregate through the combiner and finish with one division, is the workhorse of distributed feature engineering, and you will reuse it whenever you compute per-group statistics at scale.

Key Insight: Associativity and Commutativity Buy You the Map-Side Combiner

A combiner pre-aggregates a mapper's output before the shuffle, turning "one pair per record" into "one partial per key per mapper". It is correct precisely when the reduce function is associative and commutative, because the engine may apply it any number of times, on any grouping, in any order. Sum, count, min, and max qualify directly; mean qualifies once you carry $(\text{count}, \text{sum})$ and divide last. When the function does not qualify (median, exact distinct count), you cannot combine, and you must either shuffle everything or switch to an approximate sketch, the subject of Section 6.8.

2. Filtering as a Map-Only Job Beginner

Filtering keeps the records that satisfy a predicate and drops the rest: valid rows, rows in a date range, examples whose label passed quality control, documents in a target language. Filtering needs no grouping, so it needs no shuffle and no reduce phase at all. Each mapper reads its input split, applies the predicate, and writes the survivors straight to output. This is a map-only job, and it is the cheapest thing MapReduce can do, because the dominant cost of the model, moving data between map and reduce, is simply absent. The same map-only shape covers projection (keep only some fields), reformatting (parse a log line into a record), and tokenization (turn a document into a feature vector), which is why the first stages of almost every data pipeline are map-only.

Two practical points make filtering even cheaper in modern systems. First, push the predicate down to the storage layer so that rows failing the filter are never read off disk; columnar formats and predicate pushdown make this routine, a theme developed in Chapter 8. Second, combine filtering with the map step of a later job rather than running it as a separate pass, so the data is read once. For AI pipelines the practical upshot is large: a training corpus is typically filtered (deduplicated, language-screened, quality-scored) far more times than it is aggregated, and because each filter is map-only, those passes parallelize perfectly with no communication tax. We make the communication-versus-computation trade explicit with the cost models of Chapter 3.

3. Secondary Sorting: Controlling the Order a Reducer Sees Intermediate

Plain MapReduce guarantees that a reducer receives all values for a key, but it says nothing about the order of those values. For many AI tasks the order is the whole point: build a per-user session by replaying events in time order, compute a running feature over a time series, or pair each event with the one before it. Sorting the values inside the reducer works only until a key has more values than fit in memory, at which point it fails exactly on the large keys you most care about. The scalable answer is secondary sorting: make the shuffle itself deliver the values already ordered, so the reducer streams through them without holding them all at once.

The mechanism is a composite key together with three separately controllable functions. Suppose we want, for each user, that user's events in timestamp order. We build the key as the pair $(\text{user}, \text{timestamp})$ and then set:

The three functions look at different slices of the same composite key, and that asymmetry is the entire trick. Partition-by-user keeps a user whole; sort-by-(user, time) orders the events; group-by-user hands the reducer one ordered stream per user. Figure 6.4.1 shows the three slices acting on the same keys and the sorted stream that arrives at the reducer.

Composite keys (unsorted) key = (user, timestamp) (bob, 53) (alice, 84) (bob, 10) (alice, 45) (alice, 49) (bob, 37) PARTITION reads user only all of a user to one reducer SORT reads full key (user, then timestamp) GROUP reads user only one ordered stream per user Sorted by full key (alice, 45) (alice, 49) (alice, 84) (bob, 10) (bob, 37) (bob, 53) Reducer calls reduce(alice, [...]) 45 → 49 → 84 events in time order reduce(bob, [...]) 10 → 37 → 53 events in time order Partition keeps a user whole; the full-key sort orders each user's events; grouping on the user alone makes the reducer see one ascending stream.
Figure 6.4.1: Secondary sorting with a composite key $(\text{user}, \text{timestamp})$. The three shuffle functions read different slices of the same key: partition reads the user (so a user's records stay together on one reducer), the sort comparator reads the full key (so records arrive in ascending time order), and the grouping comparator reads the user again (so the reducer gets one call per user with the values already sorted). No in-reducer sort is needed, so the pattern scales to keys with more values than fit in memory.
Fun Note: Three Functions, One Key, Zero In-Memory Sorts

The first time most engineers meet secondary sorting, they ask why not just sort the list inside the reducer. The answer arrives the day a single celebrity user has two hundred million events and the reducer runs out of memory mid-sort. Secondary sorting never materializes the list; it lets the shuffle's external merge sort, which already spills to disk gracefully, do the ordering for free. The trick feels like sleight of hand precisely because the work moved somewhere you were not looking.

4. A Runnable Group-By with a Combiner and a Secondary Sort Intermediate

The program below simulates one MapReduce job over a shuffled log of per-user click events, each a triple $(\text{user}, \text{timestamp}, \text{dwell})$. Goal A computes a per-user count and mean dwell time using a map-side combiner, and reports how many key-value pairs the combiner kept off the network. Goal B rebuilds each user's events in time order using the composite-key recipe from Section 3: partition on the user, sort on the full key, group on the user. The mean is computed by carrying $(\text{count}, \text{sum})$ through the combiner and dividing only in the reducer, exactly the combinable-mean pattern from Section 1.

import random
from collections import defaultdict

random.seed(7)

# ---- synthetic event log: (user_id, timestamp, dwell_seconds), shuffled order ----
USERS = [f"u_{i:03d}" for i in range(50)]         # 50 users
events = []
for u in USERS:
    n = random.randint(40, 120)                   # tens to hundreds of events each
    t = random.randint(1000, 1100)
    for _ in range(n):
        t += random.randint(1, 40)                # strictly increasing per user
        events.append((u, t, random.randint(2, 30)))
random.shuffle(events)                            # arrives out of order, mixed users

# split across 4 mappers
SHARDS = 4
mapper_inputs = [events[i::SHARDS] for i in range(SHARDS)]

# ---------------- GOAL A: aggregation with a map-side combiner ----------------
# Mapper emits (user, (count=1, sum_dwell)). Combiner sums partials WITHIN a shard
# before they cross the network. Correct because + is associative and commutative.
def combiner(shard):
    partial = defaultdict(lambda: [0, 0])         # user -> [count, sum_dwell]
    for u, _t, dwell in shard:
        partial[u][0] += 1
        partial[u][1] += dwell
    return partial                                # one record per user per shard

combined = [combiner(s) for s in mapper_inputs]
raw_pairs = sum(len(s) for s in mapper_inputs)
post_combine_pairs = sum(len(c) for c in combined)

# Reducer merges the small per-shard partials (the shuffle delivered these).
final = defaultdict(lambda: [0, 0])
for c in combined:
    for u, (cnt, ssum) in c.items():
        final[u][0] += cnt
        final[u][1] += ssum

print("=== Goal A: group-by aggregation with combiner ===")
print(f"raw map-output pairs (no combiner) : {raw_pairs}")
print(f"pairs after map-side combiner      : {post_combine_pairs}")
print(f"shuffle volume reduction           : {raw_pairs}/{post_combine_pairs} = "
      f"{raw_pairs / post_combine_pairs:.1f}x fewer pairs crossing the network")
print(f"users aggregated                   : {len(final)}")
for u in sorted(final)[:3]:                        # show first 3 of 50 users
    cnt, ssum = final[u]
    print(f"  {u}: count={cnt:3d}  mean_dwell={ssum / cnt:5.2f}s")

# ---------------- GOAL B: secondary sort via composite key ----------------
# Composite key = (user, timestamp). PARTITION on user only (all of a user's
# events land on one reducer); SORT on the full key; GROUP on user only.
def partition(key, num_reducers):
    user, _ts = key
    return hash(user) % num_reducers

mapped = [((u, t), dwell) for (u, t, dwell) in events]   # composite key
# shuffle sorts every key-value pair by the FULL composite key
shuffled = sorted(mapped, key=lambda kv: kv[0])

# grouping comparator: consecutive records form one group iff same user
groups = defaultdict(list)
for (u, t), dwell in shuffled:
    groups[u].append((t, dwell))

all_ordered = True
for u, seq in groups.items():
    times = [t for t, _ in seq]
    if any(times[i] > times[i + 1] for i in range(len(times) - 1)):
        all_ordered = False

print("\n=== Goal B: secondary sort, each user's events in time order ===")
print(f"every one of {len(groups)} user groups in time order : {all_ordered}")
for u in sorted(groups)[:3]:                       # first 3 users' leading timestamps
    times = [t for t, _ in groups[u]]
    print(f"  {u}: first 6 timestamps={times[:6]}  (n={len(times)})")
Code 6.4.1: One job, two patterns. Goal A pre-aggregates each shard with a combiner and carries $(\text{count}, \text{sum})$ so the mean is combinable; Goal B builds the composite key, sorts on the full key, and groups on the user to deliver each user's events in time order. The partition function reads the user alone, which is what keeps a user's records on a single reducer.
=== Goal A: group-by aggregation with combiner ===
raw map-output pairs (no combiner) : 4184
pairs after map-side combiner      : 200
shuffle volume reduction           : 4184/200 = 20.9x fewer pairs crossing the network
users aggregated                   : 50
  u_000: count= 81  mean_dwell=14.54s
  u_001: count=102  mean_dwell=15.66s
  u_002: count=104  mean_dwell=16.13s

=== Goal B: secondary sort, each user's events in time order ===
every one of 50 user groups in time order : True
  u_000: first 6 timestamps=[1045, 1049, 1084, 1108, 1112, 1126]  (n=81)
  u_001: first 6 timestamps=[1087, 1106, 1116, 1151, 1191, 1212]  (n=102)
  u_002: first 6 timestamps=[1051, 1085, 1087, 1116, 1128, 1129]  (n=104)
Output 6.4.1: The combiner collapsed 4,184 raw map-output pairs into 200 partials, a 20.9x reduction in shuffle volume, while the per-user counts and means stayed exact. Every one of the 50 user groups arrived in ascending time order without any reducer ever sorting a list in memory.

Two numbers in the output carry the section's message. The 20.9x reduction is the combiner doing its job: with fifty users spread across four shards, each shard holds at most fifty distinct users, so the post-combine count is bounded by $50 \times 4 = 200$ no matter how many raw events there are. As the event count per user grows, the reduction grows with it, which is why real combiners on web-scale logs cut shuffle traffic by orders of magnitude. The "in time order: True" line is the composite key doing its job: the reducer received each user's events already sorted, with no in-memory sort and therefore no per-key memory ceiling.

Library Shortcut: Spark Folds Combiner and Secondary Sort Into a Few Lines

In Code 6.4.1 we wrote the combiner, the partitioner, and the grouping by hand. A modern dataflow engine exposes both patterns as one-liners. The combinable mean is aggregateByKey, whose seed-and-merge functions are exactly the map-side combiner; the engine runs the merge on the map side automatically. The secondary sort is repartitionAndSortWithinPartitions with a partitioner on the natural key and an ordering on the full key:

# PySpark: combinable per-user mean, then per-user events in time order
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
events = sc.parallelize([("u_000", 1045, 14), ("u_000", 1049, 9)])  # (user, ts, dwell)

# (count, sum) carried through; Spark runs the merge map-side as a combiner
stats = (events.map(lambda e: (e[0], (1, e[2])))
               .aggregateByKey((0, 0),
                   lambda a, x: (a[0] + x[0], a[1] + x[1]),   # within partition
                   lambda a, b: (a[0] + b[0], a[1] + b[1]))   # across partitions
               .mapValues(lambda cs: cs[1] / cs[0]))          # divide last

# secondary sort: partition on user, sort on (user, timestamp)
ordered = (events.map(lambda e: ((e[0], e[1]), e[2]))
                 .repartitionAndSortWithinPartitions(
                     numPartitions=8, partitionFunc=lambda k: hash(k[0])))
Code 6.4.2: The same combiner and secondary sort as Code 6.4.1, now as two Spark calls. Roughly forty lines of hand-written shuffle logic collapse to a handful, and Spark handles the map-side merge, the external sort that spills to disk, and the partition routing. Chapter 7 (Spark and Distributed DataFrames) makes these operators the default vocabulary.

5. These Patterns Are the Feature Pipeline Intermediate

The three patterns of this section are not academic exercises; they are the literal shape of the data work that precedes most AI training. A label-aggregation job turns raw interaction logs into supervised targets: group impressions by item, count clicks and views per item, and emit the click-through rate as a label, a pure combinable aggregation. A feature pipeline computes per-entity statistics (a user's average session length, a product's seven-day sales) with the same $(\text{count}, \text{sum})$ combiner. A sessionization job, the backbone of sequence models and recommender features, is exactly the secondary sort of Code 6.4.1: gather each user's events, order them in time, and emit windows or transitions. And the quality filters that clean a pretraining corpus (deduplication, language identification, toxicity screening) are the map-only filters of Section 2.

Practical Example: The Sessionization Job That Stopped Falling Over on Power Users

Who: A data engineer on the recommendations team at a video-streaming service.

Situation: A nightly job built per-user watch sessions from a day of event logs to train a next-video model, sorting each user's events by timestamp inside the reducer.

Problem: A handful of power users and a few automated accounts had tens of millions of events each; the reducer loaded a whole user's events into a list to sort them, and those keys blew past the memory limit and failed the job.

Dilemma: Raise the reducer memory for everyone (expensive, and it only postpones the next outlier) or keep the in-reducer sort but cap each user's events (cheap, but it silently corrupts the sessions of the most active users, the ones the model most needs).

Decision: They removed the in-reducer sort entirely and switched to a secondary sort with the composite key $(\text{user}, \text{timestamp})$, partitioning on the user and sorting on the full key.

How: Events became composite-key records; the partitioner read the user, the sort comparator read user-then-time, and the grouping comparator read the user, so the reducer streamed each user's events in order without ever holding them all at once.

Result: The job stopped failing on the heavy users because no reducer materialized a per-user list; memory use became flat in the largest key, and total runtime dropped because the shuffle's existing external sort did the ordering instead of a fragile in-memory sort.

Lesson: When per-key order matters and some keys are huge, push the ordering into the shuffle with a composite key. Sorting inside the reducer works in the demo and fails in production on exactly the keys you cannot afford to lose.

These same aggregation and ordering primitives reappear one layer down, in the training data pipeline itself. The aggregates and sessions produced here are written as sharded, columnar datasets that the training loop streams, the subject of Chapter 8 on distributed storage and data loading, where the cost of laying out and reading these features dominates. The grouping-by-key idea also resurfaces transformed: the MapReduce shuffle that routes a key's values to one reducer is, at heart, the same all-to-all routing that returns as the gradient all-reduce of data-parallel training in Chapter 15, and as the parameter-sharding exchange in Chapter 11.

Thesis Thread: The Combiner Is Local Pre-Aggregation, and It Never Leaves

The combiner you wrote in Code 6.4.1 embodies a principle that runs through the entire book: do as much work locally as the algebra allows before paying to communicate. The combiner pre-aggregates within a mapper so the shuffle carries partials, not raw rows. The identical idea returns as gradient accumulation and local reduction in data-parallel training (Chapter 15), where each worker reduces its micro-batch gradients locally before the all-reduce, and again as the local-update schemes that take several optimizer steps between communications. Whenever a method "communicates less", look for a combiner-shaped step that pre-aggregates under an associative, commutative operation; it is almost always there.

6. When the Combiner Does Not Apply Advanced

It is worth stating the limits plainly, because they explain the algorithms in later sections. A combiner requires associativity and commutativity, and several common aggregations lack them. An exact median needs the full multiset of a key's values and cannot be folded incrementally. An exact count of distinct items needs to remember everything it has seen, which defeats the point of a small partial. A top-$k$ by some score is combinable (each partial keeps its own top-$k$), but only because the partial carries a bounded summary, foreshadowing Section 6.6. For the genuinely non-combinable cases, the field trades a little accuracy for a combinable sketch: HyperLogLog estimates distinct counts from a tiny, mergeable summary, and count-min sketches estimate frequencies the same way. Those sketches are precisely combiners for problems that have no exact combiner, and Section 6.8 builds them.

Research Frontier: Aggregation and Sessionization for AI Data Pipelines (2024 to 2026)

The combiner-and-sort patterns of this section are being pushed hard by the demands of foundation-model data curation. Large-scale corpus pipelines such as those behind FineWeb (Penedo et al., 2024) and DCLM (Li et al., 2024) run map-only quality filters and combinable deduplication signals over tens of trillions of tokens, where shaving the shuffle is the difference between a feasible and an infeasible run; their published pipelines are essentially industrial-scale instances of Sections 1 and 2. On the engine side, Apache DataFusion and the Velox/Photon vectorized backends (2024 to 2025) push aggregation and sorting into SIMD-accelerated, columnar operators, so the combiner runs on batches of rows rather than one at a time, often an order of magnitude faster per core. A third thread treats sessionization and per-entity ordering as a streaming problem with watermarks and stateful operators (developed in Chapter 9), letting the same secondary-sort logic run incrementally as events arrive rather than in a nightly batch. The throughline is unchanged from this section: associativity buys pre-aggregation, and the composite key buys order, whether the engine is a batch job or a stream.

With aggregation, filtering, and secondary sorting in hand, we can move from per-key summaries to operations that combine two datasets and impose a global order. The next section, Section 6.5, builds distributed sorting (a total order across all reducers, not just within a key) and distributed joins (matching records across two datasets on a shared key), both of which lean on the partition-and-shuffle machinery this section made concrete.

Exercise 6.4.1: Which Aggregations Are Combinable? Conceptual

For each per-key aggregation, state whether a map-side combiner can compute it exactly, and if so, what partial state the combiner must carry: (a) the maximum dwell time per user; (b) the variance of dwell time per user; (c) the median dwell time per user; (d) the number of distinct sessions per user; (e) the most recent event per user. For the combinable ones, write the associative, commutative merge of two partials. For (b), recall that variance can be computed from $(\text{count}, \sum x, \sum x^2)$; explain why that triple is combinable even though variance itself is not a simple sum. For the non-combinable ones, name the approximate sketch from Section 6.8 you would reach for instead.

Exercise 6.4.2: Break and Fix the Partitioner Coding

Take Code 6.4.1 and deliberately misconfigure the secondary sort: change the partition function to read the full composite key $(\text{user}, \text{timestamp})$ instead of the user alone, and route each composite key to a partition with hash((user, ts)) % num_reducers. Simulate the per-reducer grouping under this broken partitioner and show that a single user's events now scatter across multiple reducers, so no reducer can produce that user's full ordered sequence. Then restore the user-only partitioner and confirm the order is recovered. In two or three sentences, explain why partition-by-natural-key is non-negotiable for secondary sorting while the sort comparator must use the full key.

Exercise 6.4.3: Estimate the Combiner's Payoff Analysis

Suppose a click log has $R$ records spread over $U$ distinct users, processed by $M$ mappers, and each map output pair is $b$ bytes. Write the bytes shuffled with no combiner and with a perfect combiner (assume every user appears in every mapper). Show that the reduction factor is approximately $R / (U \cdot M)$, and use it to explain the 20.9x in Output 6.4.1 from $R = 4184$, $U = 50$, $M = 4$. Then argue when a combiner buys you almost nothing: give a realistic scenario (in terms of how $R$, $U$, and $M$ relate) where the post-combine volume is no smaller than the raw volume, and connect it to the cost models of Chapter 3.