"They asked me to find the groups. I found them eight times, once per shard, then added the answers up and discovered I had found them exactly once."
An All-Reduce That Has Seen Some Centroids
Clustering is unsupervised learning's answer to "what structure is in this data?", and for the most widely used method, k-means, scaling it out is exact: the assignment step is embarrassingly parallel and the update step is a mergeable aggregation, so one all-reduce per iteration reproduces the single-machine centroids bit for bit. The previous sections of this chapter distributed supervised learners whose loss is an average over examples. Clustering has no labels, but k-means still optimizes an average objective, and that is enough to make the same decomposition work. This section shows why the distributed iteration is exact, how to parallelize the one part that is not (initialization), how to keep clustering alive on an unbounded stream, and why density-based and hierarchical clustering refuse to fall to the same trick because their core operation is a neighborhood query, not an average.
Up to this point Chapter 12 has distributed learners that consume labels: linear models, random forests, gradient-boosted trees. Their training objectives are sums over labeled examples, and we have seen repeatedly that a sum over examples splits cleanly across workers. Unsupervised learning removes the labels but not the structure of the objective. The dominant clustering algorithm, k-means, partitions $N$ points into $K$ groups by minimizing the total squared distance from each point to its assigned cluster center, and that total is again a sum over points. The same arithmetic that made data-parallel gradient descent exact in Section 1.1 makes distributed k-means exact, with one twist: the quantity we all-reduce is not a gradient but a pair of per-cluster accumulators, the sum of points and the count of points in each cluster.
1. The k-means Objective and Lloyd's Two Steps Beginner
Given $N$ points $x_1, \dots, x_N$ in $\mathbb{R}^d$ and a target of $K$ clusters, k-means seeks $K$ centroids $\mu_1, \dots, \mu_K$ and an assignment of each point to one centroid that minimizes the within-cluster sum of squares,
$$J(\mu, a) = \sum_{i=1}^{N} \lVert x_i - \mu_{a_i} \rVert^2, \qquad a_i = \arg\min_{k} \lVert x_i - \mu_k \rVert^2.$$Lloyd's algorithm minimizes this objective by alternating two steps until the assignments stop changing. The assignment step fixes the centroids and sends each point to its nearest one, computing $a_i$ for every $i$. The update step fixes the assignments and moves each centroid to the mean of the points now assigned to it,
$$\mu_k \leftarrow \frac{1}{|C_k|} \sum_{i \in C_k} x_i, \qquad C_k = \{\, i : a_i = k \,\}.$$Each step can only lower $J$ (or leave it unchanged), so the algorithm converges to a local optimum. The two steps have very different distributed characters, and recognizing that difference is the whole game. The assignment step touches one point at a time and needs only the current centroids, a small object every worker can hold. The update step is an average, and an average over a partitioned set is a mergeable aggregation, exactly the property Section 6.8 identified as the precondition for a single combine.
The update step looks like it needs all the points of a cluster in one place, but it does not. A mean is a sum of vectors divided by a count of points, and both the sum and the count are mergeable: a worker can compute its own partial sum and partial count over only its shard, and adding the partials across workers gives the global sum and global count. Dividing one by the other after the merge yields the exact same centroid as if every point had been in one machine. The new centroid never required co-locating the points, only co-locating two small accumulators per cluster.
2. Distributed Lloyd: One All-Reduce per Iteration Intermediate
Partition the $N$ points into $S$ disjoint shards, one per worker, and broadcast the current $K$ centroids to every worker. Each worker runs the assignment step locally on its own shard, which is embarrassingly parallel because a point's nearest centroid depends only on that point and the shared centroids, never on points held by other workers. Then each worker, instead of shipping its assigned points anywhere, accumulates two small quantities per cluster: the vector sum $s_k = \sum_{i \in C_k \cap \text{shard}} x_i$ and the integer count $n_k = |C_k \cap \text{shard}|$. The combine step adds these accumulators across all workers,
$$s_k = \sum_{w=1}^{S} s_k^{(w)}, \qquad n_k = \sum_{w=1}^{S} n_k^{(w)}, \qquad \mu_k \leftarrow \frac{s_k}{n_k}.$$Summing $s_k^{(w)}$ and $n_k^{(w)}$ across workers and sharing the result is precisely an all-reduce, the collective introduced in Section 4.3. One all-reduce per iteration, of a payload that is only $K \times (d+1)$ numbers regardless of how many points each worker holds, produces the new centroids. Because addition is associative and commutative, the grouping of points into shards does not affect the sum, so the distributed centroids equal the single-machine centroids to within floating-point rounding. Figure 12.6.1 traces one iteration.
The code below makes the exactness concrete. It builds a clustered point cloud, runs plain single-machine Lloyd's algorithm, then runs the distributed version that only ever combines per-cluster sums and counts across eight shards, and compares the two sets of centroids. Both start from the identical initialization so that any difference can come only from the distribution, not from a different starting point.
import numpy as np
rng = np.random.default_rng(7)
N, d, K, iters, S = 60_000, 4, 5, 12, 8 # points, dims, clusters, iterations, shards
# Three well-separated blobs plus noise so the clustering is non-trivial.
centers = rng.standard_normal((K, d)) * 6.0
labels = rng.integers(0, K, size=N)
X = centers[labels] + rng.standard_normal((N, d))
def lloyd(X, init, iters): # plain single-machine Lloyd's algorithm
C = init.copy()
for _ in range(iters):
d2 = ((X[:, None, :] - C[None, :, :]) ** 2).sum(axis=2)
a = d2.argmin(axis=1) # assignment step
for k in range(K):
m = a == k
if m.any():
C[k] = X[m].mean(axis=0) # update step (full-data mean)
return C
def lloyd_distributed(X, init, iters, S): # update is an all-reduce of sums and counts
C = init.copy()
shards = np.array_split(np.arange(N), S) # S disjoint worker shards
for _ in range(iters):
sums = np.zeros((K, d)); counts = np.zeros(K) # all-reduce accumulators
for s in shards: # each worker, blind to the others
Xs = X[s]
a = ((Xs[:, None, :] - C[None, :, :]) ** 2).sum(axis=2).argmin(axis=1)
for k in range(K): # local partial sums and counts
m = a == k
sums[k] += Xs[m].sum(axis=0); counts[k] += m.sum()
for k in range(K): # combine: divide summed vector by summed count
if counts[k] > 0:
C[k] = sums[k] / counts[k]
return C
init = X[:K].copy() # identical start for a fair comparison
C_single = lloyd(X, init.copy(), iters)
C_dist = lloyd_distributed(X, init.copy(), iters, S)
print("shards S :", S)
print("clusters K :", K)
print("max abs difference :", f"{np.max(np.abs(C_single - C_dist)):.2e}")
print("relative error :", f"{np.linalg.norm(C_single - C_dist) / np.linalg.norm(C_single):.2e}")
print("centroids identical :", np.allclose(C_single, C_dist))
C_single and the all-reduced centroids C_dist are compared directly; each shard sees only its own points, and the workers exchange nothing but per-cluster sums and counts.shards S : 8
clusters K : 5
max abs difference : 3.20e-14
relative error : 2.07e-15
centroids identical : True
np.allclose reports them identical. Eight shards, each blind to seven eighths of the points, jointly produced the exact single-machine clustering.The difference is zero up to rounding, just as it was for the gradient in Section 1.1. The cost of the combine does not grow with the number of points; it grows only with $K$ and $d$, which is why a cluster can hold billions of points and still synchronize centroids in a payload of a few kilobytes per iteration. The only quantity that scales with $N$ stays entirely local: the per-shard assignment work.
Distributed k-means is the gradient all-reduce of Section 1.1 wearing different clothes. There the per-worker payload was a partial gradient; here it is a pair of per-cluster accumulators, sums and counts. The collective is the same, the exactness argument is the same (addition does not care how terms are grouped), and the communication is again independent of the data volume. Once you see that any algorithm whose update is an average can be cast as an all-reduce of sums and counts, the catalogue of "embarrassingly distributable" classical methods writes itself: k-means, Gaussian-mixture EM, naive Bayes counts, and feature-mean standardization all share this shape.
3. Initialization at Scale: k-means|| Intermediate
Lloyd's algorithm only finds a local optimum, and the quality of that optimum depends heavily on where the centroids start. The standard fix on one machine is k-means++ (Arthur and Vassilvitskii, 2007), which seeds the centroids one at a time, each chosen with probability proportional to its squared distance from the nearest already-chosen center, so the seeds spread out and the final clustering is much better. The trouble is that k-means++ is inherently sequential: picking seed $k+1$ requires the full set of seeds $1$ through $k$, which forces $K$ passes over the data with a synchronization between each. For large $K$ on a large cluster, those $K$ rounds of communication dominate the run.
k-means|| (Bahmani et al., 2012) parallelizes the seeding. Instead of adding one center per pass, it runs a logarithmic number of passes, and in each pass oversamples: every point is independently selected as a candidate center with probability proportional to its squared distance from the current set, drawing roughly $\ell$ candidates per pass for an oversampling factor $\ell \approx K$. After about $\log(\text{cost})$ passes the algorithm holds a modest set of candidates (a few times $K$ of them), weights each candidate by how many points it captures, and runs ordinary k-means++ on that small weighted set to pick the final $K$ seeds. The crucial property is that each pass is embarrassingly parallel (the selection probability of a point depends only on that point and the shared candidate set), so seeding now costs a handful of all-reduce rounds rather than $K$ of them, while matching k-means++ quality. This is the same move that recurs throughout the book: replace a long sequential dependency with a few oversampled parallel rounds.
Code 12.6.1 spelled out the assignment, the per-cluster accumulators, and the combine by hand. Production libraries hide all of it. On one machine, scikit-learn's KMeans defaults to k-means++ seeding and a tuned Lloyd's loop:
from sklearn.cluster import KMeans
km = KMeans(n_clusters=5, init="k-means++", n_init=10).fit(X) # X: (N, d) array
print(km.cluster_centers_, km.inertia_) # centroids and final J
To scale past one machine, Spark MLlib's KMeans runs the exact distributed iteration of Figure 12.6.1 over a partitioned dataset, seeds with k-means|| by default, and does the per-cluster sum-and-count all-reduce internally:
from pyspark.ml.clustering import KMeans as SparkKMeans
model = SparkKMeans(k=5, initMode="k-means||", maxIter=20).fit(df) # df: distributed rows
centers = model.clusterCenters() # one combine per iter, hidden
4. Mini-Batch and Streaming k-means Intermediate
Full Lloyd's algorithm reads every point on every iteration. When the data is enormous or arriving as an unbounded stream, that is too expensive or simply impossible. Mini-batch k-means (Sculley, 2010) replaces the full assignment-and-update with a stochastic version: on each step it draws a small batch, assigns those points to the current centroids, and nudges each touched centroid toward the batch mean with a per-cluster learning rate that decays as the cluster accumulates more points. The update for a centroid that gains a point $x$ is
$$\mu_k \leftarrow \mu_k + \eta_k (x - \mu_k), \qquad \eta_k = \frac{1}{n_k},$$where $n_k$ is the running count of points ever assigned to cluster $k$. This is a running-mean update, and like a running mean it is mergeable: distributed mini-batch k-means has each worker process its own stream of batches and periodically all-reduces the per-cluster sums and counts, the identical accumulators from Section 2. The connection to online learning is direct. A clustering that must track drifting structure on a never-ending feed is exactly the setting of Chapter 9; mini-batch k-means is the clustering member of that family of online algorithms, trading a little accuracy per step for the ability to run forever on bounded memory.
Run mini-batch k-means on a live stream and watch the centroids twitch. Each batch tugs them a little, and because the learning rate decays as $1/n_k$, the early batches throw them around the space while the late batches barely move them. It looks like the clusters are nervous at first and grow calm with age, which is a fair description of the algorithm and an unfair but tempting description of the engineers monitoring it.
5. When the Average Trick Fails: Density-Based and Hierarchical Clustering Advanced
Everything above worked because k-means summarizes each cluster by a single point, the mean, and a mean is a mergeable aggregation. Not all clustering has that shape. DBSCAN groups points by density: a point is a core point if at least minPts neighbors lie within radius $\varepsilon$, and clusters grow by chaining core points whose $\varepsilon$-neighborhoods overlap. The core operation is a neighborhood query, "who is within $\varepsilon$ of me?", and that question cannot be answered from a worker's own shard alone, because a point's $\varepsilon$-neighbors may sit on a different machine. There is no small per-cluster accumulator to all-reduce; correctness requires that points near a partition boundary be compared across partitions, which means exchanging actual data, not summaries.
Distributed DBSCAN therefore takes a different route: spatially partition the points into cells, replicate a thin boundary halo of width $\varepsilon$ into neighboring cells so each cell can answer its own neighborhood queries, cluster within each cell, and then run a separate merge phase that unions clusters whose boundary points coincide. That merge is the hard part, and it is why distributed density-based clustering is meaningfully more complex than distributed k-means. Hierarchical (agglomerative) clustering is harder still: it repeatedly merges the two closest clusters, a global minimum that depends on all pairwise distances at once, so a naive distribution would all-reduce a quantity that grows with the data rather than staying small. The lesson generalizes well beyond clustering. An algorithm distributes cheaply when its per-iteration state is a small summary that merges by addition; it distributes expensively when its core operation is a query against the raw points themselves. The next section meets that second world head-on.
Because the merge phase is the bottleneck, recent work attacks distributed density-based clustering from both ends. On the systems side, GPU and multi-node implementations in the lineage of RAPIDS cuML accelerate the neighborhood queries and the boundary-halo merge, and approximate variants replace exact $\varepsilon$-range search with the approximate-nearest-neighbor indices of Section 12.7 to bound the cross-partition exchange. On the modeling side, deep clustering pushes the work into a learned embedding: a representation network maps points into a space where simple k-means separates them well, so the cheap, exactly-distributable clustering of Section 2 runs on top of an expensive but data-parallel encoder. Federated clustering (clustering data that never leaves the devices that hold it) is a third active thread, computing centroids by secure aggregation of the same per-cluster sums and counts, which ties this section directly to Chapter 14. The common thread is to keep the mergeable-summary structure wherever possible and to pay for neighborhood queries only where it is unavoidable.
Who: A data scientist at a streaming-media company building nightly user segments.
Situation: Each night, roughly one billion interaction events had to be reduced to a few hundred behavioral segments that fed the next morning's recommendation refresh.
Problem: A single-machine clustering job could not hold the feature matrix in memory, and the nightly window was only a few hours wide.
Dilemma: Reach for DBSCAN, which would find arbitrarily shaped segments but whose cross-partition neighborhood merge made it slow and fragile at this scale, or use distributed k-means, which scales out exactly and cheaply but assumes roughly convex clusters and a chosen $K$.
Decision: They chose distributed k-means on Spark MLlib with k-means|| seeding, accepting the convexity assumption because the downstream recommender only needed coarse, stable segments, not exact density contours.
How: They partitioned the feature table across the cluster, ran twenty iterations with the per-cluster sum-and-count all-reduce of Figure 12.6.1, and seeded with k-means|| so the seeding itself did not cost $K$ communication rounds.
Result: The job finished in under forty minutes, the segments matched a single-machine sanity run on a 1% sample to within rounding, and the per-iteration network traffic was a few kilobytes because only centroids crossed the wire.
Lesson: Pick the clustering whose per-iteration state is a small mergeable summary when the data is large and the window is tight; save density-based methods for when cluster shape genuinely matters and the scale permits the boundary merge.
We have a distributed clustering that is exact (k-means via per-cluster all-reduce), a parallel way to start it well (k-means||), a streaming variant for unbounded feeds (mini-batch k-means), and a clear marker for where the average trick stops working (neighborhood-query clustering). The boundary we just hit, that some methods must query the raw points rather than a summary, is the entire subject of the next section, which builds distributed indices that answer "what is near this point?" without comparing it to everything. That story begins in Section 12.7.
In Code 12.6.1 each worker emits both a per-cluster sum and a per-cluster count, and the combine divides the summed vector by the summed count. Suppose an engineer "optimizes" the combine by instead averaging the per-shard centroids directly, that is, computing each shard's own cluster means and then taking the unweighted mean of those means across shards. Construct a small example with two shards of very different sizes where this gives a different (wrong) answer, and explain in one sentence why carrying the count restores exactness. Relate your answer to the size-weighted combination of Exercise 1.1.2.
Extend Code 12.6.1 to count the number of floating-point values that cross the "wire" each iteration, that is, the size of the all-reduced (sums, counts) payload, and confirm it equals $K \times (d+1)$ and does not change when you increase $N$ from 60,000 to 600,000 or change the number of shards $S$. Then add a timer around the per-shard assignment work and around the combine, and report how the two scale as you vary $N$ and $S$. Use the numbers to argue which part of distributed k-means is the real bottleneck at scale.
For each algorithm, state whether its per-iteration update can be expressed as an all-reduce of a small fixed-size summary (and if so, what the summary is) or whether it fundamentally requires exchanging raw points across partitions: (a) Gaussian-mixture EM with full covariance matrices; (b) DBSCAN; (c) k-medoids, where each cluster is represented by an actual data point rather than a mean; (d) single-linkage agglomerative clustering. Order them from cheapest to most expensive to distribute, and explain what property of the cheap ones the expensive ones lack.