Part II: Distributed Data Processing for AI
Chapter 6: The MapReduce Model and Distributed Algorithms

Approximate Algorithms at Scale

"They asked me how many distinct users we saw last month. I said about a million, give or take three thousand, and I said it from sixteen kilobytes. Nobody asked for the exact number once they saw the bill for storing it."

A Register Array That Counts to a Billion
Big Picture

A sketch is a small, fixed-size summary of a giant dataset that answers one question approximately, and the good ones are mergeable: the summary of two shards combined equals a cheap combination of their two summaries. That single property is what marries them to MapReduce. Each mapper builds a tiny local sketch over its shard, the reducer merges the sketches with a max or an add, and the shuffle moves kilobytes instead of terabytes. You give up a controlled sliver of accuracy, often well under one percent, and in return you turn questions that would need petabytes of exact state into questions answered from a structure that fits in cache. This section covers the four sketches you will reach for most: HyperLogLog for distinct counts, Count-Min for frequencies, Bloom filters for set membership, and reservoir sampling for a uniform sample in one pass.

The previous sections of this chapter computed exact answers: an exact word count, an exact sort, an exact join, an exact PageRank vector. Exactness has a price, and at scale the price is state. To count the distinct users in a stream exactly, you must remember every user you have ever seen, because the next record might be a repeat or might be new and you cannot tell without the full set. To find the most frequent items exactly, you must keep a counter for every item. When the universe of items is billions of strings spread across a thousand machines, that exact state is itself a distributed dataset, and maintaining it costs more than the question is worth. Approximate algorithms break this bind by refusing to remember everything. They keep a summary whose size is fixed in advance, independent of how much data flows through, and they answer with a provable error bound instead of a certainty.

This is not a compromise you make reluctantly. For the questions these structures answer, the approximate answer is the right engineering answer, and the exact one is a luxury nobody needs. Knowing the distinct-user count to seven significant figures is pointless when the figure changes every second; knowing it to within a fraction of a percent, from a sketch you can hold in a CPU register file and merge in microseconds, is exactly useful. The art is choosing a structure whose error you can bound and whose merge step survives the shuffle.

Each mapper builds a local sketch; only the sketches cross the shuffle shard 1 (TB) shard 2 (TB) shard 3 (TB) sketch 1 16 KiB sketch 2 16 KiB sketch 3 16 KiB merged sketch per-register max global estimate distinct count, error < 1% tiny merge, exact result independent of how shards were split
Figure 6.8.1: Why sketches and MapReduce fit. Three mappers each scan a terabyte-scale shard but emit only a fixed-size sketch (here 16 KiB). The reducer combines them with a per-register maximum, the merge operation for HyperLogLog, into one merged sketch that yields the global distinct-count estimate. The shuffle, drawn as the three short arrows into the merge box, carries kilobytes; the data never moves. Because the merge is associative and commutative, the merged answer does not depend on how the records were partitioned across mappers.

1. Mergeability Is the Whole Trick Beginner

Recall from Section 6.2 that MapReduce splits work into map tasks that run independently on shards and reduce tasks that combine partial results carried across the shuffle. A data structure fits this model exactly when it supports a merge: an operation $\oplus$ such that the sketch of two shards joined is the merge of their separate sketches, $\mathrm{sk}(A \cup B) = \mathrm{sk}(A) \oplus \mathrm{sk}(B)$, and that operation is associative and commutative. Associativity and commutativity matter because a real shuffle gives no guarantee about the order in which partial results arrive or how they are grouped; a correct merge must give the same answer regardless. The exact-gradient identity that opened this book in Section 1.1 was the same idea for sums; here we extend it from sums to set-summaries.

This is precisely the algebraic structure of a commutative monoid, and it is no accident that the most useful sketches are designed to be one. A plain counter merges by addition. A maximum merges by taking the larger value. HyperLogLog merges by taking a per-register maximum. Count-Min merges by adding two tables cell by cell. Bloom filters merge by a bitwise OR. Reservoir samples merge by a weighted random combination. In every case the merge is a few microseconds of work on a kilobyte-scale object, which is why the shuffle stays cheap no matter how large the input grows.

Key Insight: A Sketch Earns Its Place in MapReduce by Being Mergeable

The reason these structures combine perfectly with MapReduce is not their cleverness in any single mapper; it is that two local sketches merge into the sketch you would have built from the union of their data, exactly and order-independently. That turns a global question over terabytes into a local question over kilobytes followed by a trivial combine. When you evaluate any summary structure for distributed use, the first question is not "how accurate is it?" but "does it merge associatively, and how big is the thing that crosses the shuffle?"

2. HyperLogLog: A Billion Distinct Items in Kilobytes Intermediate

The cleanest example is counting distinct elements, the cardinality of a multiset. The exact method keeps a hash set of everything seen, which costs memory linear in the number of distinct items: counting a billion distinct users needs gigabytes. HyperLogLog answers the same question from a fixed array of $m$ small registers, typically a few kilobytes, with a relative error that depends only on $m$ and not at all on the cardinality.

The intuition is a probabilistic trick. Hash each item to a uniform bit string. In a stream of $n$ distinct hashes, the longest run of leading zeros you observe is about $\log_2 n$, because a run of $k$ leading zeros happens with probability $2^{-k}$ and you need roughly $2^k$ distinct items before you expect to see one. So the maximum leading-zero count is a noisy estimate of $\log_2$ of the cardinality. HyperLogLog tames the noise by splitting the hash: the first $p$ bits choose one of $m = 2^p$ registers, and each register stores the maximum leading-zero count (plus one) seen among the items routed to it. Averaging across registers with a harmonic mean and a bias-correction constant $\alpha_m$ gives the estimator

$$\hat{n} = \alpha_m \, m^2 \left( \sum_{j=1}^{m} 2^{-M_j} \right)^{-1}, \qquad \text{with standard error} \quad \frac{\sigma}{n} \approx \frac{1.04}{\sqrt{m}},$$

where $M_j$ is the value in register $j$. With $m = 2^{14} = 16384$ registers of one byte each, the structure occupies 16 KiB and the relative error is about $1.04/\sqrt{16384} \approx 0.81\%$, independent of whether you are counting a thousand items or a billion. The merge is a per-register maximum, because the maximum leading-zero count over a union is the larger of the two per-shard maxima. That is the operation drawn in Figure 6.8.1.

The code below builds the structure from first principles on a stream of seven million records containing exactly one million distinct ids, splits the work across four simulated mappers that each maintain a local register array, merges them with a per-register max, and reports the estimate against the exact answer it also computes for comparison.

import hashlib, math

def make_stream(n_distinct, repeats):
    for i in range(n_distinct):                 # 1,000,000 distinct ids ...
        token = f"user_{i:08d}".encode()
        for _ in range(repeats):                # ... each repeated 7 times
            yield token

N_DISTINCT, REPEATS = 1_000_000, 7
P = 14                                          # 2^14 = 16384 one-byte registers
M = 1 << P

def h64(b):                                     # a fast 64-bit hash of the item
    return int.from_bytes(hashlib.blake2b(b, digest_size=8).digest(), "big")

def add(regs, x):                               # update one register array
    h = h64(x)
    idx = h >> (64 - P)                          # top P bits pick the register
    tail = h & ((1 << (64 - P)) - 1)             # remaining bits hold the run
    rho = (64 - P) - tail.bit_length() + 1 if tail else (64 - P + 1)
    if rho > regs[idx]:
        regs[idx] = rho                         # keep the longest run seen

def merge(a, b):                                # THE shuffle-crossing operation
    return bytearray(max(x, y) for x, y in zip(a, b))   # per-register maximum

def estimate(regs):
    m = len(regs)
    alpha = 0.7213 / (1 + 1.079 / m)            # bias-correction constant
    E = alpha * m * m / sum(2.0 ** (-r) for r in regs)
    if E <= 2.5 * m and regs.count(0):          # small-range correction
        E = m * math.log(m / regs.count(0))
    return E

NUM_MAPPERS = 4
local = [bytearray(M) for _ in range(NUM_MAPPERS)]
exact_set = set()
for rec in make_stream(N_DISTINCT, REPEATS):
    exact_set.add(rec)                          # exact answer (gigabyte-scale)
    add(local[h64(rec) % NUM_MAPPERS], rec)     # build that mapper's local sketch

merged = local[0]                               # reducer merges the 4 sketches
for s in local[1:]:
    merged = merge(merged, s)

exact, est = len(exact_set), estimate(merged)
print(f"exact distinct          : {exact:,}")
print(f"HLL estimate (4 merged) : {est:,.0f}")
print(f"relative error          : {abs(est-exact)/exact*100:.3f} %")
print(f"sketch size             : {M:,} bytes ({M//1024} KiB, fixed)")
print(f"exact-set size (approx) : {exact*48/1024/1024:.1f} MiB")
print(f"space saving factor     : {exact*48//M:,}x")
Code 6.8.1: HyperLogLog from scratch with a four-mapper merge. The only object that crosses the shuffle is the 16 KiB register array; merge is the per-register maximum that makes the structure a commutative monoid. The exact hash set is built only to measure the error and would never exist in a real pipeline.
exact distinct          : 1,000,000
HLL estimate (4 merged) : 996,799
relative error          : 0.320 %
sketch size             : 16,384 bytes (16 KiB, fixed)
exact-set size (approx) : 45.8 MiB
space saving factor     : 2,930x
Output 6.8.1: Four mappers, each blind to three quarters of the stream, jointly estimated one million distinct ids to within 0.32 percent, comfortably inside the $0.81\%$ standard error the formula predicts for $m=16384$. The sketch is 16 KiB and fixed; the exact set is roughly 2,900 times larger and would only grow with the data.

The estimate lands within a third of a percent of the truth, the structure never exceeds 16 KiB no matter how many records flow through, and the merge that produced the global answer was a single pass of byte-wise maxima over four small arrays. Output 6.8.1 is the entire value proposition of this section in one run: a controlled error, a fixed footprint, and a shuffle that moved kilobytes.

Library Shortcut: datasketches Gives You a Tuned, Mergeable HLL

Code 6.8.1 is for understanding, not production. Apache DataSketches (the datasketches Python package, with matching Java and C++ implementations used inside Druid, Spark, and Hive) ships sketches with carefully tuned bias correction, serialization, and union operators, so the roughly forty lines above collapse to a handful and the merge works across languages and engines:

from datasketches import hll_sketch, hll_union

def mapper_sketch(shard, lg_k=14):              # one call per shard
    sk = hll_sketch(lg_k)
    for rec in shard:
        sk.update(rec)
    return sk.serialize_compact()               # bytes that cross the shuffle

u = hll_union(14)                               # reducer side
for blob in serialized_sketches_from_mappers:   # arrive over the network
    u.update(hll_sketch.deserialize(blob))
print("distinct estimate:", u.get_estimate())   # global cardinality
Code 6.8.2: The same distinct-count pipeline with DataSketches. The library owns the register layout, the bias correction, and the cross-language serialization; hll_union is the production form of the per-register merge in Code 6.8.1. In Spark this is a single approx_count_distinct aggregate, which we revisit in Chapter 7.

3. Count-Min for Frequencies, Bloom for Membership Intermediate

Distinct counting is one question; two others come up constantly in data pipelines and have equally elegant sketches. The Count-Min Sketch estimates the frequency of any item, and by extension finds the heavy hitters, the handful of items responsible for most of the volume. It is a table of $d$ rows and $w$ columns of counters. Each of $d$ independent hash functions maps an item to one column per row; to record an occurrence you increment those $d$ cells, and to query an item you return the minimum of its $d$ cells. Hash collisions can only push a count up, never down, so the minimum across rows strips away most of the collision noise. With $w = \lceil e/\varepsilon \rceil$ and $d = \lceil \ln(1/\delta) \rceil$, the estimate overshoots the true count $f_x$ by at most $\varepsilon N$ with probability at least $1 - \delta$, where $N$ is the total stream length:

$$\hat{f}_x = \min_{1 \le i \le d} C_i[\,h_i(x)\,], \qquad f_x \le \hat{f}_x \le f_x + \varepsilon N \ \text{ with probability } \ge 1 - \delta.$$

The table is the sketch, its size $d \times w$ is fixed in advance, and two Count-Min tables built on different shards merge by adding them cell by cell, because a sum of counts over a union is the sum of the per-shard sums. That additive merge is what lets each mapper build a local frequency table and the reducer combine them into one global table that answers any item's frequency.

The Bloom filter answers a different question: is this item in the set? It is a bit array of $m$ bits with $k$ hash functions; inserting an item sets the $k$ bits it hashes to, and a membership test returns "possibly present" if all $k$ bits are set and "definitely absent" otherwise. It never produces a false negative, only a false positive, with rate approximately $(1 - e^{-kn/m})^k$ for $n$ inserted items, minimized at $k = (m/n)\ln 2$. Two Bloom filters of the same size and hash family merge by a bitwise OR, the union of their bit patterns being exactly the filter for the union of their sets. Table 6.8.1 lines up the four structures, the question each answers, its merge operation, and its error knob, so the mergeability that ties them to MapReduce is visible at a glance.

Table 6.8.1: The four mergeable sketches of this section. Every merge operation is associative and commutative, which is exactly what a MapReduce reducer needs to combine per-shard summaries order-independently.
StructureQuestion it answersMerge operationError knob
HyperLogLogHow many distinct items?per-register maximumregisters $m$; error $\approx 1.04/\sqrt{m}$
Count-Min SketchHow frequent is item $x$? Heavy hitters?cell-wise additionwidth $w$, depth $d$; overshoot $\le \varepsilon N$
Bloom filterIs item $x$ in the set?bitwise ORbits $m$, hashes $k$; false-positive rate
Reservoir sampleGive me a uniform sample of size $s$weighted random combinesample size $s$; sampling variance

Bloom filters earn their keep in distributed AI as work-skippers. Before an expensive distributed join or a deduplication pass over a web crawl, a mapper can carry a Bloom filter of the keys present on one side and use it to discard, locally and without any network traffic, the records on the other side that cannot possibly match. The filter never drops a record that should match (no false negatives), so the join stays correct while the shuffle shrinks. This is the pruning role that Chapter 8 builds on when it skips entire data blocks during a scan.

Fun Note: The Filter That Cannot Lie About Absence

A Bloom filter has a charming asymmetry: it will sometimes claim an item is present when it is not, but it will never claim an item is absent when it is. So "no" is always the truth and "yes" means "probably". Engineers exploit this shamelessly: use the filter for the fast "definitely not here, skip it" path and fall back to the expensive exact check only on the rare "maybe" that survives. The structure is wrong in exactly the direction that is safe to be wrong in.

4. Reservoir Sampling: A Uniform Sample in One Pass Intermediate

The fourth structure answers a question that sounds harder than it is: from a stream whose length you do not know in advance and cannot store, hand me a uniform random sample of exactly $s$ items, in a single pass. Reservoir sampling does it by keeping a reservoir of $s$ items. The first $s$ items fill it. For the $i$-th item with $i > s$, you keep it with probability $s/i$, and if you keep it you evict a uniformly chosen current occupant. A short induction shows every item seen so far sits in the reservoir with probability exactly $s/i$, so the final sample is uniform over the whole stream regardless of its length.

For MapReduce, each mapper runs reservoir sampling over its shard and also reports how many items it saw. The reducer merges two reservoirs by drawing the final $s$ items from the two reservoirs in proportion to their stream counts, which reconstructs a uniform sample of the union. Sampling is the most general approximation in this section: a uniform sample lets you estimate any statistic of the corpus, not just the one a specialized sketch was built for, at the cost of a sampling error that shrinks like $1/\sqrt{s}$. In AI pipelines it is the standard way to build a small inspectable subset of a petabyte corpus for exploratory analysis, label-quality audits, or a quick distributional check before committing a full training run.

Practical Example: Counting Distinct Users Without a Distinct-Count Bill

Who: A data engineer on the analytics platform of a large streaming-media service.

Situation: Product managers wanted daily and rolling-28-day distinct-viewer counts per title, across billions of play events spread over a thousand-node cluster.

Problem: The exact distinct count needed a giant shuffle of user ids into per-title sets, and the rolling window meant re-counting overlapping days; the nightly job took hours and the shuffle dominated the cluster.

Dilemma: Keep exact counts and pay the shuffle and storage every night, or switch to HyperLogLog and accept roughly a one-percent error on a number that product managers read to two significant figures anyway.

Decision: They stored one 16 KiB HyperLogLog sketch per title per day instead of the user-id sets, because distinct-count error under one percent was irrelevant to every decision the number drove.

How: A mapper built a per-title daily sketch; the rolling-28-day count became a per-register max over 28 stored sketches, computed in milliseconds, with no re-scan of the raw events. The whole title-day catalog of sketches fit in memory.

Result: The nightly job's shuffle shrank by more than three orders of magnitude, the rolling windows became a cheap merge of pre-stored sketches, and the reported counts stayed within a fraction of a percent of the retired exact pipeline.

Lesson: When a number is read approximately and merged often, store the mergeable sketch, not the exact state. Mergeability turned an expensive rolling re-count into a maximum over kilobytes.

5. Why This Matters for AI at Scale Beginner

These structures are not a data-engineering sideline; they sit on the critical path of building and operating AI systems. Corpus statistics for a training set, how many distinct documents, distinct URLs, distinct n-grams, are distinct-count questions that HyperLogLog answers from sketches while the corpus is being assembled. Deduplication of a web-scale training corpus, a step that materially affects model quality, uses Bloom filters and the MinHash and locality-sensitive-hashing machinery of Section 6.7 as a cheap pre-filter that discards obvious duplicates before the expensive exact comparison. Feature cardinality, how many distinct values a categorical feature takes, decides whether to one-hot encode or hash a feature and how large an embedding table to allocate, a sizing question that returns when we shard embedding tables in Chapter 11. Heavy-hitter detection with Count-Min surfaces the tokens, items, or users that dominate a workload and therefore drive cost.

The fourth use is monitoring. A deployed model produces an unbounded stream of predictions, inputs, and outcomes, and you want to track distinct active users, request-frequency spikes, and shifts in the input distribution continuously, with bounded memory, across a serving fleet. Sketches are the standard tool: each serving node maintains local sketches and a central monitor merges them, exactly the per-mapper-then-merge pattern of Figure 6.8.1 run continuously instead of as a batch. This streaming-monitoring use is where these structures meet online systems, and we develop it fully in the stream-processing chapter, Chapter 9, where the input never ends and a fixed-memory summary is not a nicety but a requirement.

Research Frontier: Learned and Optimal Sketches (2024 to 2026)

Two active lines are reshaping sketching. The first is learned sketches: replacing or augmenting random hashing with a small learned model that predicts which items are heavy, so the sketch spends its budget on the items that matter. The learned Count-Min and learned Bloom filter lineage (Kraska et al.'s learned-index program and its successors) has been extended through 2024 to 2026 with robustness guarantees and to streaming-frequency estimation, narrowing error for skewed, real-world distributions where a handful of keys dominate, exactly the regime of corpus token counts. The second line pushes theoretical optimality and practicality together: tighter space lower bounds for distinct counting and frequency estimation, and the continued hardening of the Apache DataSketches family into a cross-engine standard so a sketch built in a Spark job merges with one built in a streaming monitor. For training-data work specifically, sketch-based and locality-sensitive near-duplicate detection has become a documented stage in the pipelines behind frontier-scale text corpora, where deduplication quality measurably affects the trained model. The throughline is that the cost of the merge box in Figure 6.8.1 is now something the field engineers down with learning, not just with better hash functions.

The structures of this section share the chapter's deepest lesson: a question over a distributed dataset becomes cheap when its answer is mergeable. HyperLogLog merges by max, Count-Min by add, Bloom by OR, reservoir sampling by weighted draw, and each merge is the reduce step of a MapReduce job whose shuffle moves kilobytes. The next section turns from what MapReduce computes well to where the model breaks down: how it tolerates the inevitable machine failures of a large cluster, and the classes of computation it handles awkwardly or not at all.

Exercise 6.8.1: Pick the Sketch Conceptual

For each task, name the structure from Table 6.8.1 you would use and its merge operation, and state in one sentence why an exact computation would be impractical at scale: (a) report the number of distinct IP addresses that contacted a service this hour; (b) before a distributed join of a 5-terabyte click log against a 200-gigabyte user table, prune click rows whose user id is not in the user table; (c) find the 100 most-requested API endpoints across a fleet; (d) pull a uniform 10,000-row sample of a 4-petabyte log for a labeling study. Explain why distributing along the wrong sketch (for example, using HyperLogLog to find heavy hitters) would not answer the question.

Exercise 6.8.2: Merge Two Sketches by Hand Coding

Extend Code 6.8.1 so that, in addition to the four-mapper merge, you split the same stream a second way into eight mappers with a different routing hash, build eight local sketches, and merge those. Verify that the eight-mapper merged estimate matches the four-mapper estimate to within rounding, confirming the per-register maximum is order- and partition-independent. Then deliberately corrupt mergeability: have one mapper use $p = 13$ registers instead of $14$ and attempt the per-register max against the others. Explain precisely why the merge is now invalid and what real-world failure (sketches built with mismatched parameters across a cluster) this models.

Exercise 6.8.3: Size the Error and the Shuffle Analysis

(a) For HyperLogLog, use the standard error $1.04/\sqrt{m}$ to find the smallest number of registers $m$ (a power of two) that keeps the relative error below $0.5\%$, and state the resulting sketch size in bytes assuming one byte per register. (b) For a Count-Min Sketch with $\varepsilon = 10^{-3}$ and $\delta = 10^{-2}$, compute $w = \lceil e/\varepsilon \rceil$ and $d = \lceil \ln(1/\delta) \rceil$ and the table's total cell count. (c) A pipeline has 2,000 mappers each emitting one sketch of each kind to a single reducer. Compare the total bytes crossing the shuffle for the two sketch types against the bytes that would cross if each mapper instead emitted its raw per-key partial counts for a universe of $10^8$ keys. Argue from these numbers which approximation buys the larger shuffle reduction and why.