Part II: Distributed Data Processing for AI
Chapter 6: The MapReduce Model and Distributed Algorithms

Top-K, Matrix Multiplication, and PageRank

"They asked me for the top ten. I shipped them all ten million rows and let the reducer sort it out. We do not speak of that shuffle anymore."

A Mapper That Forgot to Filter First
Big Picture

Three algorithms that look unrelated, finding the largest items, multiplying matrices, and ranking a graph, are all the same MapReduce move applied to different data: emit keyed partial results, then combine them. Each one also teaches a distinct lesson about cost. Top-K shows that doing local work before the shuffle can collapse network traffic from millions of records to a handful. Block matrix multiplication shows how linear algebra, the engine under every linear model and neural network layer, decomposes into independent tile products. PageRank shows what happens when one MapReduce pass is not enough and you must iterate, re-reading the graph from disk every round, a cost so painful it is the reason the next chapter exists. Master these three and you hold the template for most distributed-data algorithms an AI pipeline will ever need.

The previous section built distributed sorting and joins, the two operations that move records between machines to bring related keys together. This section spends that machinery on three named algorithms that recur constantly in AI data pipelines: selecting the largest scoring items, multiplying large matrices, and computing the stationary importance of nodes in a graph. They span the space of MapReduce computation well. Top-K is a single pass dominated by what you can avoid sending. Matrix multiplication is a single pass dominated by how you partition the operands. PageRank is many passes, dominated by the cost of repeating the same shuffle until the answer stops changing. The thread connecting all three is the one this chapter keeps pulling: a distributed algorithm is won or lost at the shuffle, and the cleverness lives in arranging the map phase so the shuffle carries as little as possible.

1. Top-K: Filter Before You Shuffle Beginner

A great many AI tasks reduce to "return the $k$ largest items by some score": the ten nearest neighbors of a query embedding, the hundred most frequent tokens in a corpus, the highest-loss training examples to inspect, the most central users in a social graph. The naive distributed recipe is to have every mapper emit every record under one key and let a single reducer sort the whole dataset and slice off the top $k$. That works and it is catastrophically wasteful: if the dataset has $N$ records spread over $M$ mappers, the shuffle moves all $N$ records to one machine, and that machine sorts all of them only to discard all but $k$.

The fix is the defining pattern of efficient MapReduce, a combiner: do partial work locally before anything crosses the network. Each mapper keeps only the top $k$ of its own records, which it can do with a bounded heap in a single streaming pass without ever sorting its full partition. With $M$ mappers, the shuffle now carries at most $M \times k$ records instead of $N$. A single reducer merges those $M$ already-sorted runs and takes the global top $k$. When $N$ is a billion and $k$ is ten and $M$ is a thousand, the shuffle shrinks from a billion records to ten thousand, a factor of one hundred thousand. The reducer's merge is correct because the global top $k$ must be contained in the union of the per-mapper top $k$ lists: no item that failed to make its own mapper's cut can outrank $k$ items that did.

Key Insight: The Shuffle Is the Budget, the Combiner Is How You Spend Less

Network movement, not computation, is the scarce resource in distributed data processing. Any time a reduce operation is associative and commutative (taking a maximum, summing, counting, keeping a top-$k$), you can apply it partially on each mapper before the shuffle and again on the reducer afterward. The map-side combiner is not an optimization you add later; it is the difference between an algorithm that scales and one that funnels the entire dataset through a single machine. Top-K is the cleanest illustration: the same answer, with the shuffle volume cut by orders of magnitude, purely by moving the selection earlier.

The selection per mapper costs $O(n_m \log k)$ for a partition of $n_m$ records using a size-$k$ min-heap, and the final merge costs $O(M k \log M)$, both negligible against the input scan. The win is entirely in bytes moved. This same combiner logic powers the aggregation patterns of Section 6.5 and returns transformed in Chapter 7, where Spark's treeReduce arranges the per-mapper merges into a balanced tree so even the final combine is distributed rather than funneled to one reducer.

2. Distributed Matrix Multiplication: Tiling the Linear Algebra Intermediate

Underneath nearly every machine learning computation is a matrix product. A linear model scoring a batch of examples computes $X W$. A neural network layer computes the same. A recommendation system factorizing a user-item matrix multiplies its factor matrices. When the matrices outgrow one machine's memory, the product itself must be distributed, and the question becomes how to partition the operands so that each machine computes a piece of the result without needing the whole of either input.

The block (or tiled) decomposition answers this directly. To compute $C = A B$ where $A$ is $m \times p$ and $B$ is $p \times n$, cut $A$ into a grid of row-blocks and $B$ into a grid of column-blocks. The result block $C_{ij}$, the tile at block-row $i$ and block-column $j$, is

$$C_{ij} = \sum_{\ell} A_{i\ell}\, B_{\ell j},$$

a sum of products of corresponding sub-blocks. Each term $A_{i\ell} B_{\ell j}$ is an independent small matrix multiply, computable on whatever machine holds those two blocks, and the result block is the sum over the shared index $\ell$. This is exactly a MapReduce shape: the map phase emits, for each pair of blocks that must meet, the keyed partial product $\big((i,j),\, A_{i\ell}B_{\ell j}\big)$; the shuffle groups all partial products sharing the output key $(i,j)$; the reduce phase sums them into the final block $C_{ij}$. The sum over $\ell$ is associative, so each mapper can pre-accumulate the terms it can form locally before the shuffle, the same combiner trick that saved Top-K.

The cost that dominates is again communication: every input block must reach every output block that needs it, so block size trades local memory against network volume. Larger blocks mean fewer, bigger messages and more memory per task; smaller blocks mean more parallelism and more shuffle overhead. This data-parallel decomposition of linear algebra is the foundation under distributed linear and logistic models, developed in full in Chapter 12, where the matrix being multiplied is the design matrix against a weight vector and the same block partition lets thousands of features and millions of examples spread across a cluster. The communication-cost models of Chapter 3 tell you which block shape minimizes total bytes for a given cluster.

Library Shortcut: Dask and Spark Hide the Tiling

Writing the block map and reduce by hand teaches the pattern, but production code never does it. Dask exposes a distributed array whose @ operator runs exactly the tiled multiply above, choosing block shapes and scheduling the partial products across the cluster for you:

import dask.array as da

# 100k x 50k and 50k x 80k matrices, never materialized on one machine.
A = da.random.random((100_000, 50_000), chunks=(5_000, 5_000))
B = da.random.random((50_000, 80_000), chunks=(5_000, 5_000))

C = A @ B            # builds the tiled map/reduce graph; nothing runs yet
result_block = C[:1000, :1000].compute()   # only the blocks needed are produced
Code 6.6.1: The entire block matrix multiplication of Section 2, expressed as one @ operator. Dask infers the per-tile partial products, the shuffle that groups them by output block, and the summation reduce; the chunks argument is the block size you would otherwise hand-tune. Roughly fifty lines of explicit map/reduce collapse to a single line, and the library schedules the partial products to respect data locality.

3. PageRank: When One Pass Is Not Enough Intermediate

Top-K and matrix multiplication each finish in a single MapReduce pass. PageRank does not, and that is precisely why it is worth studying: it is the canonical iterative graph computation, the prototype for an entire class of algorithms (connected components, shortest paths, label propagation, graph-neural-network message passing) that repeat a map-and-reduce step until the answer stabilizes. PageRank assigns each node in a directed graph an importance score equal to the probability that a random surfer, following links and occasionally jumping to a random page, is found at that node. The update rule for node $v$ with damping factor $d$ over a graph of $N$ nodes is

$$r_{t+1}(v) = \frac{1 - d}{N} + d \sum_{u \to v} \frac{r_t(u)}{\deg^{+}(u)},$$

where the sum runs over all nodes $u$ that link to $v$, and $\deg^{+}(u)$ is the out-degree of $u$. Each node hands an equal share of its current rank to every node it points at; the new rank of $v$ is the damped sum of everything it receives, plus a small uniform teleport term that keeps the process well behaved on dangling or disconnected nodes.

That update is one MapReduce iteration, and it is structurally a join followed by a reduce. The map phase joins the current rank vector with the link graph: for each node $u$, it reads $u$'s rank and out-edges and emits one message $\big(v,\, r_t(u)/\deg^{+}(u)\big)$ along each edge $u \to v$. The shuffle groups all messages by their destination $v$. The reduce phase sums the incoming contributions for each $v$ and applies the damping formula to produce $r_{t+1}(v)$. Then the whole thing runs again, with the new rank vector replacing the old, until the ranks stop moving. Figure 6.6.1 shows one such iteration on a four-node graph: rank flowing out along edges in the map, then summing at each destination in the reduce.

Map: split rank along out-edges Reduce: sum incoming, apply damping A B C D r(A)/2 r(A)/2 r(B)/1 r(D)/1 A→B, A→C B→C C→A D→C shuffle by dest. A ← (1-d)/N + d·[ r(C)/1 ] B ← (1-d)/N + d·[ r(A)/2 ] C ← (1-d)/N + d·[ r(A)/2 + r(B) + r(D) ] D ← (1-d)/N + d·[ 0 ] new rank vector feeds the next iteration
Figure 6.6.1: One PageRank iteration as MapReduce on the four-node graph $A\!\to\!B$, $A\!\to\!C$, $B\!\to\!C$, $C\!\to\!A$, $D\!\to\!C$. In the map phase (left) each node splits its current rank equally across its out-edges and emits a keyed contribution per edge. The shuffle groups contributions by destination. In the reduce phase (right) each node sums its incoming contributions and applies the damping formula. The resulting rank vector becomes the input to the next iteration, the loop that single-pass MapReduce cannot express.

The code below implements exactly this map and reduce and runs it to convergence on the graph of Figure 6.6.1. It prints the full rank vector after every iteration so you can watch the values settle, and it stops when the total change between iterations falls below a small threshold.

graph = {"A": ["B", "C"], "B": ["C"], "C": ["A"], "D": ["C"]}
nodes = list(graph.keys())
N = len(nodes)
d = 0.85                                   # damping factor
rank = {v: 1.0 / N for v in nodes}         # start uniform

def map_phase(rank):
    """Each node distributes its rank equally along its out-edges,
    emitting (destination, contribution) messages: the join of the
    rank vector with the link graph."""
    emits = []
    for v in nodes:
        outs = graph.get(v, [])
        if outs:
            share = rank[v] / len(outs)
            for w in outs:
                emits.append((w, share))
    return emits

def reduce_phase(emits):
    """Group contributions by destination and sum, then apply damping:
    r(w) = (1 - d)/N + d * sum(incoming)."""
    incoming = {v: 0.0 for v in nodes}
    for target, contrib in emits:
        incoming[target] += contrib
    return {v: (1.0 - d) / N + d * incoming[v] for v in nodes}

for it in range(1, 30):
    new_rank = reduce_phase(map_phase(rank))            # one MapReduce pass
    delta = sum(abs(new_rank[v] - rank[v]) for v in nodes)
    rank = new_rank
    if delta < 1e-6:
        break                                           # converged
Code 6.6.2: PageRank as a loop over a single MapReduce pass. map_phase joins ranks with edges and emits one message per edge; reduce_phase sums incoming messages by destination and applies damping. The outer for loop is the part MapReduce itself cannot express: it re-runs the entire shuffle each round until the ranks stop moving.
iter |       A       B       C       D
--------------------------------------------
   0 |  0.2500  0.2500  0.2500  0.2500
   1 |  0.2500  0.1437  0.5687  0.0375
   2 |  0.5209  0.1437  0.2978  0.0375
   3 |  0.2906  0.2589  0.4130  0.0375
   4 |  0.3885  0.1610  0.4130  0.0375
   5 |  0.3885  0.2026  0.3714  0.0375
  ...
  26 |  0.3725  0.1958  0.3941  0.0375
  27 |  0.3725  0.1958  0.3941  0.0375
  28 |  0.3725  0.1958  0.3941  0.0375

converged after 28 iterations (L1 delta = 9.67e-07)
final ranks sum to: 1.0000
Output 6.6.2: Twenty-eight MapReduce passes for a four-node graph. The ranks oscillate early, then damp toward the stationary distribution; C is most important (everything links to it), D least (nothing links to it, so it keeps only the teleport floor of $0.0375 = 0.15/4$). The ranks sum to 1.0, as a probability distribution must.

Two facts in that output matter for systems design. First, convergence took twenty-eight iterations even on four nodes; a web graph needs dozens of passes, and each pass is a full shuffle of the entire rank-times-edges join. Second, classic MapReduce stores the rank vector and the graph on disk between iterations, so every one of those twenty-eight passes re-reads the entire graph from disk, recomputes the same join structure, and writes the result back. The graph topology never changes, yet it is loaded from disk dozens of times. This is the central inefficiency of iterative MapReduce, and it is the specific pain that motivates the next chapter.

Thesis Thread: The Same Shuffle, Done Smarter Each Layer Up

PageRank's map phase is the distributed join of Section 6.5 applied to a rank vector and an edge list, and its reduce is the keyed aggregation of Section 6.5. Nothing here is new machinery; it is the chapter's shuffle, run in a loop. The book's larger arc picks this up twice. Chapter 7 keeps the graph in memory across iterations so the loop stops paying disk cost every round, the reason Spark exists. Chapter 13 recognizes that PageRank's "send a value along every edge, then sum at each node" is exactly the message-passing primitive of graph neural networks, and builds distributed graph-ML systems on that recognition. Iterative map-then-reduce is not a one-off; it is the skeleton of distributed graph computation.

Research Frontier: Graph Computation Beyond the Disk-Bound Loop (2024 to 2026)

The disk re-read that makes iterative MapReduce slow has driven a steady research line toward keeping graph state resident and moving only what changed. Modern distributed graph engines push incremental and asynchronous schemes in which a vertex recomputes only when an incoming message actually shifts its value, cutting the work per iteration far below a full re-shuffle. The 2024 to 2026 literature on billion-edge graph learning (large-scale GNN training frameworks in the lineage of DistDGL and the GNNAutoScale and sampling-based systems that followed) fuses the PageRank-style propagation here with neighbor sampling and feature aggregation, so the same "join ranks with edges, reduce at destinations" loop now carries learned embeddings rather than scalar ranks. The open problems are the ones this section foreshadows: how to avoid re-reading static topology, how to overlap the next iteration's communication with the current reduce, and how to partition a power-law graph so the shuffle is balanced. We take these up with the right tools in Chapter 13.

Practical Example: The Influence Ranking That Reran All Night

Who: A data engineer at a social-media company computing weekly influence scores over a follower graph.

Situation: A PageRank-style ranking over a two-billion-edge graph ran as a Hadoop MapReduce job, one MapReduce stage per iteration, scheduled every Sunday.

Problem: The job needed roughly forty iterations to converge, and each iteration re-read the entire edge list from HDFS, so the run took most of the night and frequently spilled past the Monday-morning deadline.

Dilemma: Throw more machines at the same MapReduce job, which sped up each pass but did nothing about the forty redundant disk reads of an unchanging graph, or change the execution model so the graph stayed in memory across iterations.

Decision: They kept the algorithm identical, the map-join-reduce of Code 6.6.2, but moved it onto Spark so the edge list was cached in cluster memory after the first read.

How: The graph was loaded once into a cached RDD partitioned by source node; each iteration joined the cached edges with the small rank vector and reduced, never touching disk for topology again.

Result: Per-iteration time dropped by roughly an order of magnitude and the full forty-iteration run finished in under an hour, comfortably inside the window. The arithmetic was unchanged; only the repeated disk reads were eliminated.

Lesson: For iterative graph algorithms, the bottleneck is rarely the math of one pass; it is re-materializing static state every pass. Keep the unchanging graph resident and you keep the algorithm but lose the tax, exactly the transition Chapter 7 generalizes.

Fun Note: Node D, Permanently Unpopular

Watch node D in Output 6.6.2. It links to C, dutifully handing away its rank every iteration, but nothing ever links back to D. Its score collapses immediately to $0.0375$ and never recovers, because the only rank it can keep is the teleport floor that the damping term hands to every node unconditionally. D is the graph-algorithm equivalent of a worker that broadcasts to everyone and is subscribed to by no one: it does work, it just gets no credit for it.

4. Why These Three Belong Together Intermediate

The three algorithms form a deliberate progression in cost structure. Top-K is bound by what you can avoid shuffling, and its lesson is the combiner: push associative work before the network. Matrix multiplication is bound by how you partition operands, and its lesson is tiling: choose block shapes so each output piece needs only the inputs that reach it. PageRank is bound by iteration count times per-pass shuffle, and its lesson is the limit of the model itself: MapReduce expresses one pass cleanly but expresses a loop only by re-launching the whole machine, re-reading static data every time. Table 6.6.1 lays the three side by side along the dimension that distinguishes them.

Table 6.6.1: The three algorithms of this section, organized by the cost that dominates each and the MapReduce lesson it teaches.
AlgorithmPassesDominant costLesson
Top-KOneShuffle volumeCombine locally before the shuffle ($M k$ not $N$)
Block matrix multiplyOneOperand movementTile so each output block needs only its inputs
PageRankMany (iterative)Iterations × per-pass shuffleOne pass is clean; the loop re-reads static data

That last row is the hinge of the whole chapter. Single-pass MapReduce is a beautiful fit for Top-K and matrix multiplication, and a poor fit for anything that iterates, because it has no memory between passes and must spill every intermediate result to disk. Every iterative AI computation, gradient descent above all, hits this wall. The response, keeping working state in memory across iterations and only shuffling what genuinely changes, is the founding idea of Spark and the subject of Chapter 7. We close this chapter with the algorithms MapReduce handles best before that chapter shows what to reach for when MapReduce handles them worst.

Exercise 6.6.1: Why the Local Top-K Is Enough Conceptual

Prove that the global top $k$ of a dataset partitioned across $M$ mappers is always contained in the union of the per-mapper top $k$ lists, so the reducer never needs any record a mapper discarded. Then explain why the same argument fails for the median: give a small two-mapper example where each mapper's local median tells you nothing about the global median, and state what property of "top $k$" the median lacks.

Exercise 6.6.2: Count the Shuffle Bytes Analysis

A block matrix multiply computes $C = AB$ with $A$ of shape $m \times p$ and $B$ of shape $p \times n$, all cut into square blocks of side $b$. Write the number of partial-product messages the map phase emits as a function of $m$, $n$, $p$, and $b$, and the total number of scalar values shuffled. Holding $m$, $n$, $p$ fixed, does increasing the block side $b$ raise or lower the shuffle volume, and what does it cost you in return? Connect your answer to the communication-cost model of Chapter 3.

Exercise 6.6.3: Dangling Nodes and a Disk-Free Loop Coding

Extend Code 6.6.2 in two steps. First add a node $E$ with no out-edges (a dangling node) and observe that the ranks no longer sum to 1, because $E$'s rank vanishes into nothing each pass; fix it by redistributing each dangling node's rank uniformly across all nodes inside the reduce phase, and confirm the sum returns to 1. Second, count how many times your loop reads the graph structure, then refactor so the edge list is built once before the loop and only the rank dictionary is rebuilt per iteration. Explain, in one paragraph, how this mirrors the in-memory caching that Chapter 7 applies to the same loop on a billion-edge graph.