Part III: Distributed Machine Learning
Chapter 13: Distributed Graph Machine Learning

Distributed Graph Analytics

"I told every vertex to whisper its rank to its neighbors. Thirty rounds later they all agreed, and not one of them ever saw the whole graph."

A Superstep That Finally Reached a Fixed Point
Big Picture

The classic graph algorithms (PageRank, connected components, shortest paths, triangle counting) all turn out to be the same shape: a vertex looks only at messages from its neighbors, updates a single number, and sends new messages, repeating until nothing changes. That shape is exactly the vertex-centric, superstep model from Section 13.3, which means every one of these algorithms distributes the same way: partition the vertices across machines, let each machine run the local update, and exchange messages along cut edges between supersteps. This section takes the four workhorse analytics, writes each as a vertex program, and shows why an in-memory superstep engine converges in seconds where the iterative-MapReduce version of Section 6.6 spends most of its time reloading the graph from disk. The ranks, components, and counts these jobs produce are not the end goal; they become input features for the graph models of Section 13.5.

By the end of Section 13.3 we had an engine: a set of vertices, each holding a small piece of state, each able to receive messages, run a local update, and send messages, all advancing in lockstep supersteps separated by a global barrier. That engine is deliberately spartan, and the payoff is that a great many graph computations fit inside it without modification. This section cashes in that generality. We take the four analytics that appear in almost every production graph pipeline, express each as a vertex program, and watch them converge. Along the way we account for two things that decide whether such a job is fast or slow at scale: how much the vertices have to say to each other each round (communication), and how many rounds it takes them to fall silent (convergence). Both, we will see, are governed by the partition quality established in Section 13.2.

1. One Engine, Four Algorithms Beginner

The vertex-centric contract is narrow on purpose. A vertex never queries the global graph, never reads another vertex's state directly, and never sees more than the messages addressed to it in the previous superstep. Everything it needs to act, it must receive. The surprising fact, and the reason Pregel-style systems caught on, is that the four most common graph analytics each compress into a handful of lines under this contract. Table 13.4.1 lays them side by side: what each vertex stores, what it sends, how it combines what arrives, and when the whole computation is allowed to stop.

Table 13.4.1: Four classic analytics as vertex programs. Each row is the same superstep loop (receive, combine, update, send) with a different choice of state, message, and combiner. The narrowness of the contract is what makes them all distribute identically.
AlgorithmVertex stateMessage sentCombine on receiveStops when
PageRankcurrent rankrank / out-degree to each out-neighborsum incoming contributionsranks change by less than a tolerance
Connected componentssmallest label seenown label to all neighborstake the minimumno label changes in a superstep
Single-source shortest pathstentative distancedistance + edge weight to neighborstake the minimumno distance improves
Triangle countingneighbor set, then a countneighbor list to higher-id neighborsintersect lists, count matchesafter a fixed two-superstep pass

Three of the four share an even tighter family resemblance: connected components and shortest paths are both "take the best value you have heard and pass it on", differing only in whether "best" means smallest label or smallest distance. This is the monotone-propagation pattern, and it has a useful property we return to in Section 5: because the combined value only ever moves in one direction (labels only shrink, distances only shorten), the computation cannot oscillate and is guaranteed to reach a fixed point. PageRank is the odd one out, because a rank is an average rather than a minimum, so it approaches its answer gradually rather than locking in pieces of it; we treat it first precisely because it is the most demanding.

Key Insight: The Combiner Is the Algorithm

In the vertex-centric model, the entire personality of a graph algorithm lives in one function: how a vertex combines the messages it receives. Sum gives you PageRank; minimum gives you connected components and shortest paths; set-intersection gives you triangle counting. Everything else (partitioning the vertices, shipping messages across cut edges, the global barrier between rounds) is shared infrastructure that the engine provides once and every algorithm reuses. To distribute a new graph computation, you do not design a new distributed system; you choose a combiner.

2. Distributed PageRank, Superstep by Superstep Intermediate

PageRank assigns each vertex a score equal to the probability that a random surfer, following out-edges at random and occasionally teleporting to a uniformly random vertex, is found at that vertex in the long run. Written as a fixed-point equation over the rank vector, with $N$ vertices, damping factor $d$, and $\text{out}(u)$ the out-degree of $u$,

$$\text{PR}(v) = \frac{1 - d}{N} + d \sum_{u \,\to\, v} \frac{\text{PR}(u)}{\text{out}(u)}.$$

The sum runs over the in-neighbors $u$ of $v$, and the term $\text{PR}(u)/\text{out}(u)$ is exactly the message $u$ sends along each of its out-edges: it splits its current rank evenly among its out-neighbors. This is the whole vertex program. In each superstep, every vertex (i) sums the contributions that arrived from its in-neighbors, (ii) applies the teleport-and-damping update above, and (iii) sends its new rank divided by its out-degree along each out-edge. The barrier between supersteps guarantees that a vertex reads only ranks from the previous round, so the computation is deterministic regardless of how vertices are spread across machines. Figure 13.4.1 shows one such superstep on a fragment of a graph.

One PageRank superstep: messages flow along edges, then the target sums and updates u₁ out=2 u₂ out=1 u₃ out=3 v target PR(u₁)/2 PR(u₂)/1 PR(u₃)/3 v gathers and updates incoming = sum of messages PR(v) ← (1−d)/N + d · incoming then v sends PR(v)/out(v) onward
Figure 13.4.1: A single PageRank superstep at the target vertex $v$. Each in-neighbor $u_i$ sends its current rank split by its out-degree; $v$ sums the arriving contributions, applies the damping-and-teleport update, and in the same superstep emits $\text{PR}(v)/\text{out}(v)$ along its own out-edges. No vertex ever inspects the global graph; it acts only on the messages addressed to it.

Two edge cases need care and both are visible once you think in messages. A dangling vertex, one with no out-edges, has nobody to send its rank to, so its mass would silently leak out of the system and the ranks would no longer sum to one. The fix is to collect the total dangling mass each superstep and redistribute it uniformly, which keeps the rank vector a proper probability distribution. The second case is initialization: starting every vertex at $1/N$ is the natural choice, and because PageRank's update is a contraction for $d < 1$, the starting point affects only how many supersteps convergence takes, not the answer it converges to.

3. Why In-Memory Supersteps Beat Iterative MapReduce Intermediate

PageRank is iterative by nature: the rank vector at superstep $t+1$ depends on the rank vector at superstep $t$, and you need dozens of rounds to converge. In Section 6.6 we expressed exactly this computation as a chain of MapReduce jobs, one job per iteration. That version is correct, and for a single pass it is a fine illustration, but as an iterative algorithm it is wasteful in a way that is easy to overlook. Each MapReduce iteration reads the entire graph from distributed storage, shuffles the rank contributions across the network, writes the new rank vector back to storage, and only then can the next iteration start by reading that graph again. The graph structure, which never changes, is loaded from disk once per iteration. For thirty iterations on a graph too large to cache, that is thirty full reads of a structure that was identical every time.

The vertex-centric, in-memory engine removes that waste by inverting what is persistent and what is transient. The graph is loaded once, partitioned across the workers, and kept resident in memory for the entire job; only the messages (the rank contributions) move between supersteps, and they move worker-to-worker without a detour through storage. The structural cost is paid a single time, and each superstep pays only for the messages it actually sends. Table 13.4.2 contrasts the two execution models on the parts that dominate the wall-clock for an iterative graph algorithm.

Table 13.4.2: Iterative MapReduce versus an in-memory superstep engine for a graph algorithm that runs for $T$ supersteps. The decisive difference is that the unchanging graph structure is reloaded every iteration in one model and exactly once in the other.
What it costsIterative MapReduce (Ch 6)In-memory superstep engine
Graph structure reads from storageonce per superstep ($T$ times)once for the whole job
Where state lives between roundswritten to and re-read from diskresident in worker memory
What crosses the network each roundall rank contributions, via shuffle to storageonly messages on cut edges, worker to worker
Per-round fixed overheadjob launch, scheduling, output commita single barrier synchronization

The lesson is not that MapReduce is the wrong tool in general; for a single-pass, embarrassingly parallel transformation it is excellent, as Chapter 6 shows. The lesson is that iteration with unchanging structure is precisely the workload its design penalizes, and recognizing that pattern is what motivated the whole vertex-centric line of systems. The same instinct (keep the invariant part resident, move only the changing part) reappears when we keep model parameters resident on workers and move only gradients in Chapter 14 and beyond.

Thesis Thread: Move the Small Thing, Not the Big Thing

The recurring move of this book is to pin the large, slowly-changing object in place and ship only the small, fast-changing one across the network. In data-parallel training we keep the data shard resident and all-reduce only the gradient (Chapter 3 gives the cost model that makes this pay). Here the large object is the graph structure and the small object is the message stream, and the superstep engine wins over iterative MapReduce for exactly the same reason: it stops re-shipping the part that never changed. Whenever a distributed design feels slow, ask which big thing is being moved that could instead be held still.

4. Components, Shortest Paths, and Triangles Intermediate

The three remaining analytics reuse the same engine with a different combiner. Connected components labels each vertex with an integer such that two vertices share a label exactly when a path connects them. The vertex program is label propagation: every vertex starts with its own id as its label, and in each superstep adopts the minimum label among itself and its neighbors, then broadcasts that label onward. Because labels only ever decrease, the process is monotone and terminates; the number of supersteps it needs is bounded by the diameter of the largest component, since a small label has to travel hop by hop to reach the far side of its component.

Single-source shortest paths is the same monotone propagation with arithmetic in place of comparison. The source starts at distance zero and every other vertex at infinity; each superstep, a vertex that has a tentative distance $\delta$ sends $\delta + w(e)$ along each out-edge $e$, and a receiver keeps the minimum of its current distance and the smallest arriving offer. This is Bellman-Ford recast as message passing, and like connected components it converges in a number of rounds bounded by the longest shortest-path in hops. Triangle counting is the one genuine departure: a vertex sends its neighbor list to its higher-id neighbors, and each receiver counts how many of its own neighbors also appear in a list it received, which is the number of triangles closed at that vertex. It needs only a couple of supersteps but its messages are neighbor lists rather than single numbers, so on a graph with high-degree vertices it is the communication-heaviest of the four, a point we quantify in Section 5.

The code below runs distributed-style PageRank and connected components on a small two-cluster graph using a from-scratch superstep engine, printing the convergence behavior of each round. It is pure standard-library Python so the superstep logic is fully exposed; the same loop, with the vertex set partitioned across machines and messages shipped over the network, is what a production engine runs.

out_edges = {
    0: [1, 2], 1: [2], 2: [0, 3], 3: [2],      # component A: 0,1,2,3
    4: [5], 5: [4, 6], 6: [4, 7], 7: [6],       # component B: 4,5,6,7
}
vertices = list(out_edges)
N = len(vertices)
out_deg = {v: len(out_edges[v]) for v in vertices}
in_edges = {v: [] for v in vertices}            # who points at me (for the gather)
for u in vertices:
    for w in out_edges[u]:
        in_edges[w].append(u)

def pagerank(damping=0.85, max_supersteps=30, tol=1e-6):
    rank = {v: 1.0 / N for v in vertices}       # every vertex starts at 1/N
    for step in range(1, max_supersteps + 1):
        dangling = sum(rank[v] for v in vertices if out_deg[v] == 0)   # leaked mass
        new = {v: (1 - damping) / N
                  + damping * (sum(rank[u] / out_deg[u] for u in in_edges[v])
                               + dangling / N)
               for v in vertices}
        delta = sum(abs(new[v] - rank[v]) for v in vertices)          # L1 change
        rank = new
        if step <= 6 or delta < tol:
            print(f"  superstep {step:2d}  L1-change={delta:.2e}  total={sum(rank.values()):.4f}")
        if delta < tol:
            print(f"  converged after {step} supersteps"); break
    return rank

def connected_components(max_supersteps=30):
    neigh = {v: set(out_edges[v]) for v in vertices}                  # undirected view
    for v in vertices:
        for u in in_edges[v]:
            neigh[v].add(u)
    label = {v: v for v in vertices}            # own id as the starting label
    for step in range(1, max_supersteps + 1):
        new = {v: min([label[v]] + [label[u] for u in neigh[v]]) for v in vertices}
        changed = sum(new[v] != label[v] for v in vertices)
        label = new
        print(f"  superstep {step:2d}  relabeled={changed}  distinct-labels={len(set(label.values()))}")
        if changed == 0:
            print(f"  converged after {step} supersteps"); break
    return label

print("PageRank:");             pagerank()
print("Connected components:"); connected_components()
Code 13.4.1: A from-scratch superstep engine running PageRank (sum combiner) and connected components (minimum combiner) on a two-cluster graph. Only the combiner line differs between the two; the receive-update-send loop and the convergence test are shared, exactly as Table 13.4.1 promises.
PageRank:
  superstep  1  L1-change=4.25e-01  total=1.0000
  superstep  2  L1-change=3.61e-01  total=1.0000
  superstep  3  L1-change=2.69e-01  total=1.0000
  superstep  4  L1-change=1.63e-01  total=1.0000
  superstep  5  L1-change=9.01e-02  total=1.0000
  superstep  6  L1-change=5.01e-02  total=1.0000
  superstep 30  L1-change=7.66e-07  total=1.0000
  converged after 30 supersteps
Connected components:
  superstep  1  relabeled=6  distinct-labels=4
  superstep  2  relabeled=2  distinct-labels=2
  superstep  3  relabeled=0  distinct-labels=2
  converged after 3 supersteps
Output 13.4.1: PageRank's L1 change shrinks smoothly toward the tolerance and the ranks always sum to one, confirming the dangling-mass redistribution; connected components locks in the two clusters in three supersteps, bounded by the small diameter of each component. Both ran on one machine, but the per-vertex updates never consulted the global graph, so partitioning the vertices across workers would change only where each update executes, not its result.

Notice the two convergence signatures, because they are characteristic. PageRank's L1 change decreases geometrically: each superstep multiplies the remaining error by roughly the damping factor, so the count of supersteps to a fixed tolerance grows only logarithmically as you tighten it. Connected components instead finishes in a small fixed number of rounds set by the graph's diameter, and would not run faster no matter how tight a tolerance you asked for, because once labels stop changing they stop for good. Knowing which signature an algorithm has tells you in advance whether throwing more supersteps at it helps.

Fun Note: The Vertex That Never Got the Memo

Connected components has a charming failure mode on a badly partitioned graph. If a single low-id vertex sits across a slow network link from the rest of its component, its winning label has to crawl one cut edge per superstep to reach the far side. The computation is correct, but it dawdles, waiting round after round for one message to make its way across the partition boundary. The vertices on the far side are, in effect, the last to hear who won the election. Good partitioning (Section 13.2) is what keeps that crawl short.

5. Communication, Convergence, and the Price of a Bad Cut Advanced

The cost of an iterative graph algorithm factors cleanly into two quantities, and both are now in view. The first is the number of supersteps to convergence, which is a property of the algorithm and the graph: logarithmic in the tolerance for PageRank, bounded by component diameter for label propagation and shortest paths, constant for triangle counting. The second is the communication per superstep, and this is where the partition from Section 13.2 sets the price. A message is cheap when it stays inside a worker and expensive when it crosses the network to another worker, and a message crosses the network exactly when its edge is cut by the partition. So the per-superstep network traffic is governed by the number of cut edges, which is the very quantity a graph partitioner tries to minimize.

This is the through-line that ties the chapter together: partition quality, an abstract objective in Section 13.2, becomes concrete wall-clock here. Two partitions of the same graph that differ in their edge cut produce identical answers but very different running times, because one ships far more messages across the barrier every single superstep, and an iterative job pays that difference $T$ times over. The combination is multiplicative: total network cost scales as (supersteps) times (cut edges touched per superstep). An algorithm with many supersteps, like PageRank, is the most sensitive to a poor cut, because it pays the inflated per-round communication on every one of its thirty-odd rounds. Triangle counting, with its two supersteps but list-valued messages, instead stresses the per-message size, so for it the partitioner should worry about isolating high-degree vertices rather than minimizing rounds.

Practical Example: The Recommendation Team Whose PageRank Job Would Not Shrink

Who: A data engineer on the graph-features team at a large e-commerce company.

Situation: A nightly personalized-PageRank job over a billion-edge user-item graph fed ranking features the next morning, and it had crept past its window.

Problem: The job ran fifty supersteps to converge, and each superstep was dominated by network time, not by the per-vertex arithmetic.

Dilemma: Add more workers to spread the computation, which is the reflex for a slow distributed job, or accept that more workers might cut more edges and so ship more messages per superstep, making each of the fifty rounds slower.

Decision: They left the worker count alone and instead repartitioned the graph with a cut-minimizing partitioner, treating communication per superstep as the real bottleneck rather than raw parallelism.

How: They replaced the default hash partitioning with a multilevel edge-cut partitioner of the kind in Section 13.2, co-locating each user's neighborhood on one worker so most rank messages stayed local.

Result: The superstep count was unchanged (the algorithm and graph were the same), but cross-worker messages fell sharply, and the wall-clock dropped enough to clear the window with room to spare.

Lesson: For an iterative graph job, the lever is usually communication per superstep, not the number of workers. A better cut multiplies its savings by every round the algorithm runs.

Library Shortcut: GraphX, NetworkX, and cuGraph Ship These Analytics

Code 13.4.1 spells out the superstep loop so the mechanics are visible, but you would not hand-write PageRank in production. Spark's GraphX exposes the Pregel API and a built-in pageRank, NetworkX gives you single-machine reference implementations for correctness checks, and NVIDIA's cuGraph runs the same analytics on the GPU at very high throughput. The roughly forty lines of engine in Code 13.4.1 collapse to a single call, and the library handles partitioning, message combiners, dangling-mass handling, and the convergence test for you:

# Single-machine reference with NetworkX (great for validating a distributed run)
import networkx as nx
G = nx.DiGraph([(0,1),(0,2),(1,2),(2,0),(2,3),(3,2),(4,5),(5,4),(5,6),(6,4),(6,7),(7,6)])
pr = nx.pagerank(G, alpha=0.85)               # the whole superstep loop, one call
comps = list(nx.weakly_connected_components(G))   # connected components, one call

# Distributed on a cluster with Spark GraphX (Scala/Python via GraphFrames)
#   from graphframes import GraphFrame
#   g.pageRank(resetProbability=0.15, tol=1e-6)   # runs the Pregel supersteps for you
#   g.connectedComponents()                       # label propagation, partitioned

# GPU-accelerated with RAPIDS cuGraph
#   import cugraph
#   cugraph.pagerank(cu_graph, alpha=0.85)        # same math, on the GPU
Code 13.4.2: The same two analytics as one library call each. NetworkX is the correctness oracle for a single machine, GraphX/GraphFrames runs the partitioned supersteps across a cluster, and cuGraph runs them on the GPU; all three internally do exactly what Code 13.4.1 spells out by hand.
Research Frontier: Graph Analytics as Features for Models (2024 to 2026)

The ranks, components, and counts these jobs produce are increasingly consumed not by humans but by machine-learning models, and recent work sharpens that pipeline. Scalable approximate personalized PageRank, in the lineage of methods that push the local computation only where it matters, now underpins the propagation step in scalable graph neural networks such as the PPRGo and GBP families, letting a model precompute graph-structural features on billion-edge graphs without a full GNN forward pass. A parallel thread studies streaming and incremental analytics, where the graph changes continuously and PageRank or component labels must be maintained without recomputing from scratch, a setting that connects directly to the stream processing of Chapter 9. The framing to carry forward is that distributed analytics is no longer a terminal report; it is a feature-generation stage feeding the distributed graph neural networks of Section 13.5, and the two are converging into a single pipeline.

We now have the full analytics toolkit running on the engine from Section 13.3: PageRank by summing, connected components and shortest paths by taking minima, triangle counting by intersecting neighbor lists, each one a vertex program whose cost is supersteps times cut edges. These produce structural features that describe where a vertex sits in the graph. The natural next question is how to learn from those positions directly, training a model whose computation is itself shaped like the graph. That is the subject of the distributed graph neural networks in Section 13.5, where the message a vertex sends is no longer a single rank but a learned feature vector.

Exercise 13.4.1: Pick the Combiner Conceptual

For each of the following graph computations, state the vertex state, the message a vertex sends, and the combiner it uses on receipt, in the style of Table 13.4.1, and say whether the algorithm has PageRank's geometric-convergence signature or label propagation's diameter-bounded one: (a) compute, for every vertex, the maximum value held by any vertex in its connected component; (b) compute the number of hops from a single source to every reachable vertex (unweighted shortest path); (c) propagate a "infected" flag outward from an initial set of infected vertices until no new vertex becomes infected. Explain which of the three could be made to terminate one superstep sooner with a smarter stopping test.

Exercise 13.4.2: Add Shortest Paths to the Engine Coding

Extend Code 13.4.1 with a shortest_paths(source) function on the same graph that runs single-source shortest paths as a superstep loop, taking all edges to have weight one. Initialize the source at distance zero and every other vertex at infinity, have each vertex send (its distance + 1) to its out-neighbors, and combine by taking the minimum. Print, per superstep, how many vertices improved their distance and how many still hold infinity, and confirm the job converges in a number of supersteps equal to the eccentricity of the source. Then break the graph into two components as in Code 13.4.1 and confirm the vertices in the other component remain at infinity, and explain what that means operationally.

Exercise 13.4.3: Cost of the Cut Analysis

Consider a graph with $E = 10^9$ edges run for $T = 40$ PageRank supersteps. Under partition X, $5\%$ of edges are cut; under partition Y, $30\%$ are cut. Each cut edge carries one 8-byte message per superstep and the cross-worker network moves data at 10 gigabytes per second. Estimate the total time spent on cross-worker communication under each partition, and the ratio between them. Now suppose partition Y uses twice as many workers as X; argue from your numbers whether the extra parallelism could ever pay back the extra communication for this iterative job, and contrast your answer with how it would change for triangle counting, which runs only two supersteps. Tie your reasoning back to the communication-cost model of Chapter 3.