"They asked me how many distinct users we saw last month. I said about a million, give or take three thousand, and I said it from sixteen kilobytes. Nobody asked for the exact number once they saw the bill for storing it."
A Register Array That Counts to a Billion
A sketch is a small, fixed-size summary of a giant dataset that answers one question approximately, and the good ones are mergeable: the summary of two shards combined equals a cheap combination of their two summaries. That single property is what marries them to MapReduce. Each mapper builds a tiny local sketch over its shard, the reducer merges the sketches with a max or an add, and the shuffle moves kilobytes instead of terabytes. You give up a controlled sliver of accuracy, often well under one percent, and in return you turn questions that would need petabytes of exact state into questions answered from a structure that fits in cache. This section covers the four sketches you will reach for most: HyperLogLog for distinct counts, Count-Min for frequencies, Bloom filters for set membership, and reservoir sampling for a uniform sample in one pass.
The previous sections of this chapter computed exact answers: an exact word count, an exact sort, an exact join, an exact PageRank vector. Exactness has a price, and at scale the price is state. To count the distinct users in a stream exactly, you must remember every user you have ever seen, because the next record might be a repeat or might be new and you cannot tell without the full set. To find the most frequent items exactly, you must keep a counter for every item. When the universe of items is billions of strings spread across a thousand machines, that exact state is itself a distributed dataset, and maintaining it costs more than the question is worth. Approximate algorithms break this bind by refusing to remember everything. They keep a summary whose size is fixed in advance, independent of how much data flows through, and they answer with a provable error bound instead of a certainty.
This is not a compromise you make reluctantly. For the questions these structures answer, the approximate answer is the right engineering answer, and the exact one is a luxury nobody needs. Knowing the distinct-user count to seven significant figures is pointless when the figure changes every second; knowing it to within a fraction of a percent, from a sketch you can hold in a CPU register file and merge in microseconds, is exactly useful. The art is choosing a structure whose error you can bound and whose merge step survives the shuffle.
1. Mergeability Is the Whole Trick Beginner
Recall from Section 6.2 that MapReduce splits work into map tasks that run independently on shards and reduce tasks that combine partial results carried across the shuffle. A data structure fits this model exactly when it supports a merge: an operation $\oplus$ such that the sketch of two shards joined is the merge of their separate sketches, $\mathrm{sk}(A \cup B) = \mathrm{sk}(A) \oplus \mathrm{sk}(B)$, and that operation is associative and commutative. Associativity and commutativity matter because a real shuffle gives no guarantee about the order in which partial results arrive or how they are grouped; a correct merge must give the same answer regardless. The exact-gradient identity that opened this book in Section 1.1 was the same idea for sums; here we extend it from sums to set-summaries.
This is precisely the algebraic structure of a commutative monoid, and it is no accident that the most useful sketches are designed to be one. A plain counter merges by addition. A maximum merges by taking the larger value. HyperLogLog merges by taking a per-register maximum. Count-Min merges by adding two tables cell by cell. Bloom filters merge by a bitwise OR. Reservoir samples merge by a weighted random combination. In every case the merge is a few microseconds of work on a kilobyte-scale object, which is why the shuffle stays cheap no matter how large the input grows.
The reason these structures combine perfectly with MapReduce is not their cleverness in any single mapper; it is that two local sketches merge into the sketch you would have built from the union of their data, exactly and order-independently. That turns a global question over terabytes into a local question over kilobytes followed by a trivial combine. When you evaluate any summary structure for distributed use, the first question is not "how accurate is it?" but "does it merge associatively, and how big is the thing that crosses the shuffle?"
2. HyperLogLog: A Billion Distinct Items in Kilobytes Intermediate
The cleanest example is counting distinct elements, the cardinality of a multiset. The exact method keeps a hash set of everything seen, which costs memory linear in the number of distinct items: counting a billion distinct users needs gigabytes. HyperLogLog answers the same question from a fixed array of $m$ small registers, typically a few kilobytes, with a relative error that depends only on $m$ and not at all on the cardinality.
The intuition is a probabilistic trick. Hash each item to a uniform bit string. In a stream of $n$ distinct hashes, the longest run of leading zeros you observe is about $\log_2 n$, because a run of $k$ leading zeros happens with probability $2^{-k}$ and you need roughly $2^k$ distinct items before you expect to see one. So the maximum leading-zero count is a noisy estimate of $\log_2$ of the cardinality. HyperLogLog tames the noise by splitting the hash: the first $p$ bits choose one of $m = 2^p$ registers, and each register stores the maximum leading-zero count (plus one) seen among the items routed to it. Averaging across registers with a harmonic mean and a bias-correction constant $\alpha_m$ gives the estimator
$$\hat{n} = \alpha_m \, m^2 \left( \sum_{j=1}^{m} 2^{-M_j} \right)^{-1}, \qquad \text{with standard error} \quad \frac{\sigma}{n} \approx \frac{1.04}{\sqrt{m}},$$where $M_j$ is the value in register $j$. With $m = 2^{14} = 16384$ registers of one byte each, the structure occupies 16 KiB and the relative error is about $1.04/\sqrt{16384} \approx 0.81\%$, independent of whether you are counting a thousand items or a billion. The merge is a per-register maximum, because the maximum leading-zero count over a union is the larger of the two per-shard maxima. That is the operation drawn in Figure 6.8.1.
The code below builds the structure from first principles on a stream of seven million records containing exactly one million distinct ids, splits the work across four simulated mappers that each maintain a local register array, merges them with a per-register max, and reports the estimate against the exact answer it also computes for comparison.
import hashlib, math
def make_stream(n_distinct, repeats):
for i in range(n_distinct): # 1,000,000 distinct ids ...
token = f"user_{i:08d}".encode()
for _ in range(repeats): # ... each repeated 7 times
yield token
N_DISTINCT, REPEATS = 1_000_000, 7
P = 14 # 2^14 = 16384 one-byte registers
M = 1 << P
def h64(b): # a fast 64-bit hash of the item
return int.from_bytes(hashlib.blake2b(b, digest_size=8).digest(), "big")
def add(regs, x): # update one register array
h = h64(x)
idx = h >> (64 - P) # top P bits pick the register
tail = h & ((1 << (64 - P)) - 1) # remaining bits hold the run
rho = (64 - P) - tail.bit_length() + 1 if tail else (64 - P + 1)
if rho > regs[idx]:
regs[idx] = rho # keep the longest run seen
def merge(a, b): # THE shuffle-crossing operation
return bytearray(max(x, y) for x, y in zip(a, b)) # per-register maximum
def estimate(regs):
m = len(regs)
alpha = 0.7213 / (1 + 1.079 / m) # bias-correction constant
E = alpha * m * m / sum(2.0 ** (-r) for r in regs)
if E <= 2.5 * m and regs.count(0): # small-range correction
E = m * math.log(m / regs.count(0))
return E
NUM_MAPPERS = 4
local = [bytearray(M) for _ in range(NUM_MAPPERS)]
exact_set = set()
for rec in make_stream(N_DISTINCT, REPEATS):
exact_set.add(rec) # exact answer (gigabyte-scale)
add(local[h64(rec) % NUM_MAPPERS], rec) # build that mapper's local sketch
merged = local[0] # reducer merges the 4 sketches
for s in local[1:]:
merged = merge(merged, s)
exact, est = len(exact_set), estimate(merged)
print(f"exact distinct : {exact:,}")
print(f"HLL estimate (4 merged) : {est:,.0f}")
print(f"relative error : {abs(est-exact)/exact*100:.3f} %")
print(f"sketch size : {M:,} bytes ({M//1024} KiB, fixed)")
print(f"exact-set size (approx) : {exact*48/1024/1024:.1f} MiB")
print(f"space saving factor : {exact*48//M:,}x")
merge is the per-register maximum that makes the structure a commutative monoid. The exact hash set is built only to measure the error and would never exist in a real pipeline.exact distinct : 1,000,000
HLL estimate (4 merged) : 996,799
relative error : 0.320 %
sketch size : 16,384 bytes (16 KiB, fixed)
exact-set size (approx) : 45.8 MiB
space saving factor : 2,930x
The estimate lands within a third of a percent of the truth, the structure never exceeds 16 KiB no matter how many records flow through, and the merge that produced the global answer was a single pass of byte-wise maxima over four small arrays. Output 6.8.1 is the entire value proposition of this section in one run: a controlled error, a fixed footprint, and a shuffle that moved kilobytes.
Code 6.8.1 is for understanding, not production. Apache DataSketches (the datasketches Python package, with matching Java and C++ implementations used inside Druid, Spark, and Hive) ships sketches with carefully tuned bias correction, serialization, and union operators, so the roughly forty lines above collapse to a handful and the merge works across languages and engines:
from datasketches import hll_sketch, hll_union
def mapper_sketch(shard, lg_k=14): # one call per shard
sk = hll_sketch(lg_k)
for rec in shard:
sk.update(rec)
return sk.serialize_compact() # bytes that cross the shuffle
u = hll_union(14) # reducer side
for blob in serialized_sketches_from_mappers: # arrive over the network
u.update(hll_sketch.deserialize(blob))
print("distinct estimate:", u.get_estimate()) # global cardinality
hll_union is the production form of the per-register merge in Code 6.8.1. In Spark this is a single approx_count_distinct aggregate, which we revisit in Chapter 7.3. Count-Min for Frequencies, Bloom for Membership Intermediate
Distinct counting is one question; two others come up constantly in data pipelines and have equally elegant sketches. The Count-Min Sketch estimates the frequency of any item, and by extension finds the heavy hitters, the handful of items responsible for most of the volume. It is a table of $d$ rows and $w$ columns of counters. Each of $d$ independent hash functions maps an item to one column per row; to record an occurrence you increment those $d$ cells, and to query an item you return the minimum of its $d$ cells. Hash collisions can only push a count up, never down, so the minimum across rows strips away most of the collision noise. With $w = \lceil e/\varepsilon \rceil$ and $d = \lceil \ln(1/\delta) \rceil$, the estimate overshoots the true count $f_x$ by at most $\varepsilon N$ with probability at least $1 - \delta$, where $N$ is the total stream length:
$$\hat{f}_x = \min_{1 \le i \le d} C_i[\,h_i(x)\,], \qquad f_x \le \hat{f}_x \le f_x + \varepsilon N \ \text{ with probability } \ge 1 - \delta.$$The table is the sketch, its size $d \times w$ is fixed in advance, and two Count-Min tables built on different shards merge by adding them cell by cell, because a sum of counts over a union is the sum of the per-shard sums. That additive merge is what lets each mapper build a local frequency table and the reducer combine them into one global table that answers any item's frequency.
The Bloom filter answers a different question: is this item in the set? It is a bit array of $m$ bits with $k$ hash functions; inserting an item sets the $k$ bits it hashes to, and a membership test returns "possibly present" if all $k$ bits are set and "definitely absent" otherwise. It never produces a false negative, only a false positive, with rate approximately $(1 - e^{-kn/m})^k$ for $n$ inserted items, minimized at $k = (m/n)\ln 2$. Two Bloom filters of the same size and hash family merge by a bitwise OR, the union of their bit patterns being exactly the filter for the union of their sets. Table 6.8.1 lines up the four structures, the question each answers, its merge operation, and its error knob, so the mergeability that ties them to MapReduce is visible at a glance.
| Structure | Question it answers | Merge operation | Error knob |
|---|---|---|---|
| HyperLogLog | How many distinct items? | per-register maximum | registers $m$; error $\approx 1.04/\sqrt{m}$ |
| Count-Min Sketch | How frequent is item $x$? Heavy hitters? | cell-wise addition | width $w$, depth $d$; overshoot $\le \varepsilon N$ |
| Bloom filter | Is item $x$ in the set? | bitwise OR | bits $m$, hashes $k$; false-positive rate |
| Reservoir sample | Give me a uniform sample of size $s$ | weighted random combine | sample size $s$; sampling variance |
Bloom filters earn their keep in distributed AI as work-skippers. Before an expensive distributed join or a deduplication pass over a web crawl, a mapper can carry a Bloom filter of the keys present on one side and use it to discard, locally and without any network traffic, the records on the other side that cannot possibly match. The filter never drops a record that should match (no false negatives), so the join stays correct while the shuffle shrinks. This is the pruning role that Chapter 8 builds on when it skips entire data blocks during a scan.
A Bloom filter has a charming asymmetry: it will sometimes claim an item is present when it is not, but it will never claim an item is absent when it is. So "no" is always the truth and "yes" means "probably". Engineers exploit this shamelessly: use the filter for the fast "definitely not here, skip it" path and fall back to the expensive exact check only on the rare "maybe" that survives. The structure is wrong in exactly the direction that is safe to be wrong in.
4. Reservoir Sampling: A Uniform Sample in One Pass Intermediate
The fourth structure answers a question that sounds harder than it is: from a stream whose length you do not know in advance and cannot store, hand me a uniform random sample of exactly $s$ items, in a single pass. Reservoir sampling does it by keeping a reservoir of $s$ items. The first $s$ items fill it. For the $i$-th item with $i > s$, you keep it with probability $s/i$, and if you keep it you evict a uniformly chosen current occupant. A short induction shows every item seen so far sits in the reservoir with probability exactly $s/i$, so the final sample is uniform over the whole stream regardless of its length.
For MapReduce, each mapper runs reservoir sampling over its shard and also reports how many items it saw. The reducer merges two reservoirs by drawing the final $s$ items from the two reservoirs in proportion to their stream counts, which reconstructs a uniform sample of the union. Sampling is the most general approximation in this section: a uniform sample lets you estimate any statistic of the corpus, not just the one a specialized sketch was built for, at the cost of a sampling error that shrinks like $1/\sqrt{s}$. In AI pipelines it is the standard way to build a small inspectable subset of a petabyte corpus for exploratory analysis, label-quality audits, or a quick distributional check before committing a full training run.
Who: A data engineer on the analytics platform of a large streaming-media service.
Situation: Product managers wanted daily and rolling-28-day distinct-viewer counts per title, across billions of play events spread over a thousand-node cluster.
Problem: The exact distinct count needed a giant shuffle of user ids into per-title sets, and the rolling window meant re-counting overlapping days; the nightly job took hours and the shuffle dominated the cluster.
Dilemma: Keep exact counts and pay the shuffle and storage every night, or switch to HyperLogLog and accept roughly a one-percent error on a number that product managers read to two significant figures anyway.
Decision: They stored one 16 KiB HyperLogLog sketch per title per day instead of the user-id sets, because distinct-count error under one percent was irrelevant to every decision the number drove.
How: A mapper built a per-title daily sketch; the rolling-28-day count became a per-register max over 28 stored sketches, computed in milliseconds, with no re-scan of the raw events. The whole title-day catalog of sketches fit in memory.
Result: The nightly job's shuffle shrank by more than three orders of magnitude, the rolling windows became a cheap merge of pre-stored sketches, and the reported counts stayed within a fraction of a percent of the retired exact pipeline.
Lesson: When a number is read approximately and merged often, store the mergeable sketch, not the exact state. Mergeability turned an expensive rolling re-count into a maximum over kilobytes.
5. Why This Matters for AI at Scale Beginner
These structures are not a data-engineering sideline; they sit on the critical path of building and operating AI systems. Corpus statistics for a training set, how many distinct documents, distinct URLs, distinct n-grams, are distinct-count questions that HyperLogLog answers from sketches while the corpus is being assembled. Deduplication of a web-scale training corpus, a step that materially affects model quality, uses Bloom filters and the MinHash and locality-sensitive-hashing machinery of Section 6.7 as a cheap pre-filter that discards obvious duplicates before the expensive exact comparison. Feature cardinality, how many distinct values a categorical feature takes, decides whether to one-hot encode or hash a feature and how large an embedding table to allocate, a sizing question that returns when we shard embedding tables in Chapter 11. Heavy-hitter detection with Count-Min surfaces the tokens, items, or users that dominate a workload and therefore drive cost.
The fourth use is monitoring. A deployed model produces an unbounded stream of predictions, inputs, and outcomes, and you want to track distinct active users, request-frequency spikes, and shifts in the input distribution continuously, with bounded memory, across a serving fleet. Sketches are the standard tool: each serving node maintains local sketches and a central monitor merges them, exactly the per-mapper-then-merge pattern of Figure 6.8.1 run continuously instead of as a batch. This streaming-monitoring use is where these structures meet online systems, and we develop it fully in the stream-processing chapter, Chapter 9, where the input never ends and a fixed-memory summary is not a nicety but a requirement.
Two active lines are reshaping sketching. The first is learned sketches: replacing or augmenting random hashing with a small learned model that predicts which items are heavy, so the sketch spends its budget on the items that matter. The learned Count-Min and learned Bloom filter lineage (Kraska et al.'s learned-index program and its successors) has been extended through 2024 to 2026 with robustness guarantees and to streaming-frequency estimation, narrowing error for skewed, real-world distributions where a handful of keys dominate, exactly the regime of corpus token counts. The second line pushes theoretical optimality and practicality together: tighter space lower bounds for distinct counting and frequency estimation, and the continued hardening of the Apache DataSketches family into a cross-engine standard so a sketch built in a Spark job merges with one built in a streaming monitor. For training-data work specifically, sketch-based and locality-sensitive near-duplicate detection has become a documented stage in the pipelines behind frontier-scale text corpora, where deduplication quality measurably affects the trained model. The throughline is that the cost of the merge box in Figure 6.8.1 is now something the field engineers down with learning, not just with better hash functions.
The structures of this section share the chapter's deepest lesson: a question over a distributed dataset becomes cheap when its answer is mergeable. HyperLogLog merges by max, Count-Min by add, Bloom by OR, reservoir sampling by weighted draw, and each merge is the reduce step of a MapReduce job whose shuffle moves kilobytes. The next section turns from what MapReduce computes well to where the model breaks down: how it tolerates the inevitable machine failures of a large cluster, and the classes of computation it handles awkwardly or not at all.
For each task, name the structure from Table 6.8.1 you would use and its merge operation, and state in one sentence why an exact computation would be impractical at scale: (a) report the number of distinct IP addresses that contacted a service this hour; (b) before a distributed join of a 5-terabyte click log against a 200-gigabyte user table, prune click rows whose user id is not in the user table; (c) find the 100 most-requested API endpoints across a fleet; (d) pull a uniform 10,000-row sample of a 4-petabyte log for a labeling study. Explain why distributing along the wrong sketch (for example, using HyperLogLog to find heavy hitters) would not answer the question.
Extend Code 6.8.1 so that, in addition to the four-mapper merge, you split the same stream a second way into eight mappers with a different routing hash, build eight local sketches, and merge those. Verify that the eight-mapper merged estimate matches the four-mapper estimate to within rounding, confirming the per-register maximum is order- and partition-independent. Then deliberately corrupt mergeability: have one mapper use $p = 13$ registers instead of $14$ and attempt the per-register max against the others. Explain precisely why the merge is now invalid and what real-world failure (sketches built with mismatched parameters across a cluster) this models.
(a) For HyperLogLog, use the standard error $1.04/\sqrt{m}$ to find the smallest number of registers $m$ (a power of two) that keeps the relative error below $0.5\%$, and state the resulting sketch size in bytes assuming one byte per register. (b) For a Count-Min Sketch with $\varepsilon = 10^{-3}$ and $\delta = 10^{-2}$, compute $w = \lceil e/\varepsilon \rceil$ and $d = \lceil \ln(1/\delta) \rceil$ and the table's total cell count. (c) A pipeline has 2,000 mappers each emitting one sketch of each kind to a single reducer. Compare the total bytes crossing the shuffle for the two sketch types against the bytes that would cross if each mapper instead emitted its raw per-key partial counts for a universe of $10^8$ keys. Argue from these numbers which approximation buys the larger shuffle reduction and why.