"I am a query vector, asking a billion items politely to narrow themselves down. They could not all answer at once, so they formed committees, each shard nominating its favorites, and a coordinator counted the votes."
A User Embedding Awaiting Its Shortlist
Candidate generation is the retrieval stage that turns a catalog of billions of items into a shortlist of a few thousand, and at catalog scale it is nothing other than distributed approximate nearest-neighbor search over a sharded item index. A recommender cannot afford to run its expensive ranking model on every item in the store; it must first cut the field by orders of magnitude, cheaply and in milliseconds. The dominant tool is the two-tower model: a user tower and an item tower each emit an embedding, and recommendation becomes "find the item vectors nearest the user vector." The catalog is too large to score on one machine, so the item index is partitioned across shards, each shard returns its local top candidates, and a gather step merges them into the global shortlist. This section builds that funnel, shows it is the same scatter-gather reduction the book has used since Chapter 4, and ties it to the vector-search machinery of Chapter 25.
The previous section built the embeddings that this section consumes. In Section 38.2 we learned how a recommender represents users and items as dense vectors and how the enormous embedding tables are sharded across a parameter-server fleet so that no single machine must hold them all. Those embeddings are not the product; they are the raw material for a decision. A recommendation request arrives, the user tower produces one query vector, and the system must answer a question with a strict latency budget: out of every item in the catalog, which few thousand are even worth a closer look? Answering that question over a billion-item catalog, fast enough to serve a homepage, is the candidate-generation problem, and it is unavoidably a read path over the sharded item embeddings of the prior section.
Recommenders solve this with a funnel. The full catalog enters the top; a cheap retrieval stage cuts it to thousands of candidates; an expensive ranking model (the subject of Section 38.4) then scores only those survivors and orders them. The reduction is dramatic and it has to be, because the ranking model is hundreds of times more costly per item than retrieval. Figure 38.3.1 shows the shape of the whole funnel and the two-tower retrieval that drives its widest stage; the rest of the section unpacks each part.
1. Two-Tower Retrieval as Nearest-Neighbor Search Beginner
The two-tower model is the workhorse of modern candidate generation, and its appeal is entirely structural. One neural network, the user tower, maps a request (the user, their recent history, the time of day, the surface they are on) to a vector $q \in \mathbb{R}^d$. A second network, the item tower, maps each item to a vector $v_i \in \mathbb{R}^d$ in the same space. The predicted affinity between the user and item $i$ is simply their dot product,
$$\text{score}(q, v_i) = q^\top v_i = \sum_{j=1}^{d} q_j \, v_{ij},$$and the recommendation for the user is the set of items with the highest scores. The structural payoff is that the item tower does not depend on the user at all. Every item vector can therefore be computed offline, in a batch job, and stored once in an index; at request time only the user tower runs, producing a single $q$. Retrieval then reduces to a geometric question: which item vectors lie nearest to $q$ under the inner product? That is exactly the (maximum inner-product) nearest-neighbor search that Chapter 25 developed in full, now pointed at a recommendation catalog instead of a document corpus.
Formally, candidate generation asks for the top-$k$ set
$$\mathrm{TopK}_k(q) = \operatorname*{arg\,max}_{\,I \subseteq \{1,\dots,N\},\ |I|=k} \ \sum_{i \in I} q^\top v_i,$$the $k$ items maximizing the score, out of a catalog of $N$ items. With $N$ in the billions, computing all $N$ dot products on one machine for every request is infeasible at serving latency, which is what forces both approximation and distribution. The approximation, replacing the exact top-$k$ with an approximate nearest-neighbor (ANN) search that inspects only a fraction of the index, is the subject of Chapter 25 and we reuse it here. The distribution, splitting the index across shards, is the subject of the next section.
The single design choice that makes billion-item retrieval tractable is that the item tower never sees the user. Because item embeddings are query-independent, they are computed once, offline, and frozen into an index, so the per-request cost collapses from "run a model on $N$ items" to "run one user tower, then do a geometric lookup." Any architecture that lets the item representation depend on the user (early-interaction or cross-attention models) forfeits this and cannot serve as a first-stage retriever at catalog scale. Such richer models are exactly what the ranking stage of Section 38.4 spends its budget on, precisely because retrieval has already shrunk the field.
2. Sharding the Item Index and Gathering Top-k Intermediate
Even with offline item embeddings, one machine cannot hold a billion $d$-dimensional vectors in memory, nor scan them within a serving deadline. The remedy is the partitioning-and-sharding arc the book has followed from Chapter 25: split the item index across $S$ shards, place each shard on its own machine, and let every shard own a disjoint slice of the catalog. A retrieval request then becomes a scatter-gather. The query vector $q$ is scattered to all $S$ shards; each shard runs an ANN search over only its own items and returns its local top-$k$; and a coordinator gathers the $S$ partial lists and merges them.
The merge is correct because top-$k$ decomposes the same way a sum does. If each shard returns the $k$ highest-scoring items it holds, then the global top-$k$ is guaranteed to lie within the union of those $S$ local lists, so
$$\mathrm{TopK}_k(q) = \mathrm{TopK}_k\!\Big( \bigcup_{s=1}^{S} \mathrm{TopK}_k^{(s)}(q) \Big),$$where $\mathrm{TopK}_k^{(s)}$ is the local top-$k$ from shard $s$. No global item can outrank every member of its own shard's local top-$k$ without already being in it, so merging $S$ lists of length $k$ and keeping the best $k$ reproduces the exact global answer. This is the same algebraic fact that made the all-reduce of Chapter 4 exact: the gather here is a reduce whose operator is "merge and keep top-$k$" instead of "add." Selecting the global winners from the per-shard winners is the maximum-selection tie that Section 36.6 used for distributed retrieval in the RAG case study, applied now to a recommendation index.
One subtlety distinguishes the recommendation setting from the textbook reduction. Because each shard runs an approximate nearest-neighbor search, not an exact scan, a shard may miss a true top-$k$ item that it actually holds. The merge is then exact with respect to the candidates the shards surfaced, but the end-to-end retrieval is only approximate, and its quality is measured by recall at $k$,
$$\text{recall@}k = \frac{\big|\, \widehat{\mathrm{TopK}}_k(q) \cap \mathrm{TopK}_k(q) \,\big|}{k},$$the fraction of the true top-$k$ that the sharded ANN actually recovered. Production retrieval tunes each shard's search effort and the per-shard $k$ to push recall close to one while staying inside the latency budget. The runnable demo below isolates the merge logic from the approximation: it runs an exact per-shard scan so that the sharded merge provably equals the single-machine answer, letting us verify the decomposition before approximation enters.
import numpy as np
rng = np.random.default_rng(7)
N, d, S, k = 2_000_000, 64, 16, 10 # items, embedding dim, shards, candidates per query
# Two-tower setup: a user tower and an item tower each emit a d-dim embedding.
# Retrieval score = dot product; the candidate set is the top-k items by score.
user_vec = rng.standard_normal(d).astype(np.float32)
item_index = rng.standard_normal((N, d)).astype(np.float32) # the catalog embeddings
# Exact (single-machine) top-k: score every item, take the k highest.
scores_full = item_index @ user_vec
exact_topk = np.argsort(-scores_full)[:k]
# Sharded ANN: partition the item index across S shards. Each shard scores ONLY
# its own items and returns its local top-k. A gather (reduce) step merges the
# S local top-k lists and keeps the global top-k. This is scatter-gather.
shard_bounds = np.array_split(np.arange(N), S)
gathered_ids, gathered_scores = [], []
for ids in shard_bounds: # one "worker" per shard
s = item_index[ids] @ user_vec # local scores
local = ids[np.argsort(-s)[:k]] # local top-k item ids
gathered_ids.append(local)
gathered_scores.append(scores_full[local])
cand_ids = np.concatenate(gathered_ids) # S * k merged candidates
cand_scores = np.concatenate(gathered_scores)
merged_topk = cand_ids[np.argsort(-cand_scores)[:k]]
# Verify the sharded merge equals the exact top-k (as a set).
match = set(merged_topk.tolist()) == set(exact_topk.tolist())
recall = len(set(merged_topk.tolist()) & set(exact_topk.tolist())) / k
print("items N :", f"{N:,}")
print("shards S :", S)
print("candidates per shard :", k, "-> gathered :", len(cand_ids))
print("merged == exact set :", match)
print("recall@%d :" % k, f"{recall:.3f}")
print("funnel :", f"{N:,} items -> {len(cand_ids)} gathered -> {k} candidates")
print("reduction factor :", f"{N // k:,}x")
items N : 2,000,000
shards S : 16
candidates per shard : 10 -> gathered : 160
merged == exact set : True
recall@10 : 1.000
funnel : 2,000,000 items -> 160 gathered -> 10 candidates
reduction factor : 200,000x
The hand-written per-shard scan and merge in Code 38.3.1 is what production ANN libraries implement, far faster, in a few lines. FAISS exposes an IndexShards wrapper that fans a query out to sub-indices and merges their results automatically, and its IndexIVFPQ turns the exhaustive scan into a quantized, cluster-pruned search that inspects a small fraction of each shard:
import faiss, numpy as np
# build one IVF-PQ sub-index per shard, then wrap them as a sharded index
shards = [faiss.index_factory(d, "IVF4096,PQ32") for _ in range(S)]
for sub, vecs in zip(shards, item_shards):
sub.train(vecs); sub.add(vecs)
index = faiss.IndexShards(d, threaded=True) # fan-out + gather, built in
for sub in shards: index.add_shard(sub)
scores, ids = index.search(user_vec[None, :], k) # one call: scatter, search, merge
IndexShards wrapper plus one search call.3. Blending Multiple Candidate Sources Intermediate
A single two-tower retriever, however well sharded, is rarely enough on its own. The learned embedding space is good at "items semantically similar to what this user likes," but it systematically underweights a few things a good homepage needs: brand-new items the towers have barely seen, items surging in global popularity, and items reachable only through the relationship graph rather than the content space. Production candidate generation therefore runs several retrievers in parallel and unions their outputs. A typical blend combines the ANN two-tower source with a popularity source (the globally trending items, cheap to maintain), a recency source (items added or updated in the last hours), and a graph-based source.
The graph-based source is where Chapter 13 returns. Co-engagement defines a bipartite user-item graph; candidates can be generated by random walks from the user's recently engaged items, or by reading a node's precomputed graph-neighbor list, both of which surface items that share no obvious content similarity yet are strongly co-consumed. Because the graph itself is partitioned across machines (the distributed graph-ML problem of Chapter 13), this source is itself a distributed read path, gathered alongside the ANN shards. The blend is a union followed by deduplication: if $C_r$ is the candidate set from source $r$, the combined pool is $\bigcup_r C_r$, distinct, and the downstream ranker of Section 38.4 is responsible for scoring this heterogeneous pool on a common scale. Retrieval's job is recall (do not miss anything good); precision is the ranker's job.
Who: A retrieval engineer on the discovery team of a large online marketplace.
Situation: The homepage ran a single two-tower ANN retriever over an item index of about 800 million listings, sharded 64 ways.
Problem: New listings drew almost no impressions for their first day, because the item tower had no engagement signal to place a fresh listing near any user, so its embedding sat in a sparse region of the space and never entered any shard's top-$k$.
Dilemma: Retrain the item tower more often (expensive, and still cold for truly new items) or add a separate retrieval path for fresh inventory and accept a more complex, multi-source candidate generator.
Decision: They kept the ANN source as the recall backbone and added two cheap sources: a recency source serving listings under twenty-four hours old, and a graph source walking from the seller's and category's co-engaged items.
How: The three sources ran in parallel; their outputs were unioned and deduplicated into a single candidate pool of a few thousand, handed unchanged to the existing ranker.
Result: New-listing impressions in the first day rose several-fold, and overall engagement held steady, because the ranker still decided final ordering; only retrieval's recall had widened.
Lesson: Retrieval is plural. One learned source optimizes for similarity; cold-start, freshness, and graph reach need their own cheap sources blended in, because a candidate the retriever never surfaces is one the ranker can never recover.
4. Freshness: Getting New Items Into the Index Fast Advanced
The recency source in the example is a patch over a deeper structural problem: the item index is, by construction, a snapshot. Item embeddings are computed in an offline batch job and frozen into the ANN shards, so an item that did not exist when the last batch ran is invisible to two-tower retrieval no matter how relevant it would be. For a static document corpus this is tolerable; for a marketplace, a news feed, or a short-video platform where the most valuable items are minutes old, a stale index is a broken product. Freshness, the latency between an item appearing and an item becoming retrievable, is therefore a first-class scaling requirement of candidate generation, not an afterthought.
The standard architecture splits the index into two tiers. A large base index holds the bulk of the catalog and is rebuilt on a slow cadence (hours to a day) by the offline item tower. A small, mutable real-time index sits beside it, receives new and updated item embeddings within seconds of the item appearing, and is searched in parallel with the base shards; its results are gathered into the same merge. Periodically the real-time index is folded into the next base rebuild and reset. This mirrors the log-structured merge pattern of Chapter 9: a fast-write tier absorbs the stream while a slow-rebuild tier holds the steady state, and queries read both. The cost of freshness is that the real-time tier must embed items on the fly, often with a lighter item tower, and that the gather now spans heterogeneous index types, but the alternative, a catalog where new items cannot be recommended for hours, is usually unacceptable.
The dot-product two-tower retriever has dominated for a decade, but a fast-moving line is replacing the ANN index itself. Generative retrieval methods such as the TIGER line (Rajput et al., 2023) and its 2024 to 2025 descendants assign each item a short semantic ID and train a sequence model to generate the IDs of the items to retrieve, turning candidate generation into autoregressive decoding rather than nearest-neighbor lookup, which sidesteps the freshness and index-maintenance costs of a frozen ANN structure. In parallel, work on scaling laws for recommendation retrieval (Meta's generative-recommender and HSTU-style architectures, 2024) shows two-tower-style retrieval benefiting from transformer-scale item encoders, pushing the embedding-computation cost back onto the kind of distributed training Part IV covers. Both threads keep the distributed-read structure of this section, scatter the query, gather candidates, while changing what each shard computes; they are an active reframing of the funnel's widest stage.
Candidate generation is the scatter-gather of Chapter 4 with the reduction operator swapped from "sum" to "merge and keep top-$k$." The query is scattered to every shard, each shard reduces its slice of the catalog to a short local list, and the coordinator reduces those lists to one global shortlist. The same primitive that averaged gradients in data-parallel training (Section 1.1) and gathered retrieved passages in the RAG case study (Section 36.6) here narrows a billion items to a few thousand. Recognizing candidate generation as a reduce is what lets a recommendation system borrow, unchanged, the topology-aware gather and fault handling the book built for collectives.
The decomposition in Section 2 claims the global top-$k$ always lies within the union of the per-shard top-$k$ lists, provided each shard returns its exact local top-$k$. Prove this by contradiction: assume a true global top-$k$ item $i$ on shard $s$ is missing from shard $s$'s local top-$k$, and derive a contradiction. Then explain precisely where the argument fails when each shard runs an approximate (not exact) nearest-neighbor search, and why that failure is measured by recall@$k$ rather than by a correctness bug.
Modify Code 38.3.1 so each shard returns only its top $k' < k$ items (for example $k' = 3$ while the global $k = 10$), then merge and keep the global top-10. Run it for several values of $k'$ and several shard counts $S$, and plot or tabulate the measured recall@10 against $k'$. Explain why a per-shard $k'$ smaller than the global $k$ can drop true winners even with an exact per-shard scan, and state the rule of thumb a system should use to choose per-shard $k'$ given $S$ and the global $k$ when items are spread roughly uniformly across shards.
A retrieval request scatters the query to $S = 64$ shards and gathers a local top-$k$ from each, with $k = 200$ and item ids of 8 bytes plus a 4-byte score. Estimate the bytes the coordinator must receive per request, and the bytes returned if the catalog is instead searched by one giant unsharded index returning the same global top-$k$. Then argue, using the latency intuition of Chapter 3, why sharding still wins despite the gather overhead, and at what shard count $S$ the fixed per-shard scatter cost would start to dominate the per-shard search savings.