Part VIII: Case Studies and Capstone Projects
Chapter 36: Web-Scale Text Processing and Distributed RAG

Distributed Indexing

"I am an inverted index, split across shards that have never met. Each of us swears it holds the whole corpus, and each of us is exactly one fifth right."

A Posting List Convinced It Is the Whole Vocabulary
Big Picture

Before a retrieval-augmented generator can answer a question, the cleaned corpus must be turned into an index that maps query terms to the documents that contain them, and at web scale that index does not fit on one machine, so it is built and stored across a cluster. This section owns the lexical half of retrieval: the inverted index, the classic term-to-document data structure that has powered keyword search for half a century. We build it the way a web-scale system actually does, as a MapReduce job that emits one record per occurrence and groups records by term in the shuffle, exactly the pattern from Chapter 6. We then face the one decision that defines the system's query behavior, whether to partition the index by document or by term, and we compress the result so that billions of postings fit in memory. The dense vector index of Section 36.5 is the other half; this section ends by showing how the two scores combine into the hybrid signal that feeds the generator.

The previous sections of this chapter took a raw web crawl and turned it into a clean, deduplicated, chunked corpus. That corpus is now a pile of text passages, each with an identifier, and a retrieval system that cannot find anything in it is useless. Retrieval has two complementary engines. The lexical engine matches the exact words of a query against the exact words of the documents and is unbeatable when the query contains a rare identifier, a product code, or a proper noun that no embedding model has memorized. The dense engine, built in Section 36.5 and sharded for search in Section 36.6, matches meaning rather than surface form and recovers documents that share no words with the query. A production RAG system runs both and fuses their scores. This section builds the lexical engine and the distributed index underneath it; the vector index of Chapter 25 is its structural twin.

Documents doc 0, doc 1 doc 2, doc 3 Map: emit (term, docID) (index, 0) (shard, 0) ... (bm25, 2) (rag, 3) ... Shuffle: group by term all pairs with the same term routed to one reducer (hash of term) Reduce: posting lists index -> [0, 1, 4] inverted -> [0, 1, 4] bm25 -> [2] rag -> [3] Inverted index term -> posting list, sharded across the cluster
Figure 36.4.1: The inverted index built as a MapReduce job. Each mapper reads its shard of documents and emits one $(\text{term}, \text{docID})$ pair per token occurrence. The shuffle routes every pair with the same term to a single reducer by hashing the term, and each reducer concatenates the document identifiers it receives into a posting list. The grouping is the same shuffle that Chapter 6 introduced; only the emit and reduce functions are specific to indexing.

1. The Inverted Index: Term to Posting List Beginner

A corpus stored the obvious way, as a list of documents, answers the wrong question. It can tell you what words document seven contains, but a search engine needs the reverse: given a word, which documents contain it. The inverted index is the data structure that answers that reverse question directly. For every distinct term in the vocabulary it stores a posting list, the sorted set of identifiers of the documents in which the term appears, usually annotated with the per-document term frequency and sometimes the token positions. The name "inverted" is literal: it inverts the document-to-terms map that the corpus gives you for free into the term-to-documents map that search requires.

The structure is small relative to the text it indexes because it stores integers, not words, and because the integers compress extraordinarily well, a property we exploit in Section 5. A query is answered by intersecting or unioning the posting lists of its terms and scoring the resulting candidates. Because each posting list is a sorted run of integers, the intersection of two terms is a linear merge, and a conjunctive query over a handful of terms touches only the documents that actually contain those terms, never the rest of the corpus. That selectivity is why a lexical index can serve a billion-document collection from a query that names three rare words in single-digit milliseconds.

Key Insight: The Index Stores Integers, Not Text, and That Is Why It Scales

An inverted index does not store the corpus a second time; it stores, for each term, a sorted list of the document numbers that contain it. Sorted integer runs are tiny after delta compression, intersect in linear time, and skip everything that does not match. The whole edifice of web-scale lexical search rests on this one reframing: turn "which words are in this document" into "which documents hold this word", and store the answer as compressible integers. Every distributed-indexing decision in this section is about where those integer lists live and how a query reaches them.

2. Building the Index with MapReduce Intermediate

Constructing an inverted index over a few thousand documents is a tight loop on one machine. Constructing it over a few billion is the canonical MapReduce job, and it was in fact one of the motivating examples for which MapReduce was designed. The shape, shown in Figure 36.4.1, maps the indexing problem onto the same map, shuffle, reduce skeleton from Chapter 6. The map function reads one document, tokenizes it, and emits a key-value record for every token occurrence: the key is the term, the value is the document identifier (carrying the term frequency or position when needed). The framework's shuffle groups every record by key, so all occurrences of a given term, no matter which mapper saw them, arrive together at one reducer. The reduce function receives a term and the stream of document identifiers that contain it, sorts and deduplicates them, and writes the finished posting list.

This is exactly the structure that the chapter's narrative spine keeps returning to. The shuffle that groups $(\text{term}, \text{docID})$ pairs by term is the same shuffle that becomes the all-reduce of distributed training in Chapter 15; here it groups postings rather than gradients, but the cost model and the skew problem are identical. A term like "the" produces a posting list the size of the corpus and lands on a single reducer, the textbook hot-key skew that Chapter 7 teaches you to mitigate by partitioning hot keys or pruning stopwords before the emit. The finished posting lists are written to the distributed storage layer of Chapter 8, where they live as immutable segment files that the query path memory-maps.

Thesis Thread: The Shuffle Builds the Index, Then Trains the Model

The MapReduce shuffle is the workhorse of this book, and the inverted-index build is its first concrete payoff in this case study. Grouping records by a key, here a term, is the identical primitive that groups gradient contributions by parameter in data-parallel training (Chapter 15) and groups embedding updates by row in a parameter server (Chapter 11). When you watch a term-skewed reducer stall on "the", you are watching the same straggler that a hot parameter creates in a sharded optimizer. One shuffle, many payloads.

3. Two Ways to Shard the Index Intermediate

A single inverted index over a web-scale corpus is far too large for one machine, so it must be partitioned across many. There are exactly two natural ways to cut it, and the choice is the single most consequential decision in the whole index design because it determines what every query does at run time. Figure 36.4.2 contrasts them.

Document-partitioned each shard = a slice of the documents query shard 0 docs 0-3 shard 1 docs 4-7 shard 2 docs 8-11 fan-out to ALL shards, merge top-k, latency = max over shards Term-partitioned each shard = a slice of the vocabulary query shard A terms a-h shard B terms i-q shard C terms r-z route ONLY to shards owning the query terms, then join posting lists
Figure 36.4.2: The two sharding strategies. In document-partitioned indexing (left) each shard holds a complete inverted index over a disjoint slice of the documents, so a query must fan out to every shard and merge their partial top-$k$ results. In term-partitioned indexing (right) each shard owns the posting lists for a slice of the vocabulary, so a query touches only the shards that own its terms but must ship long posting lists to a single node to join them. The dashed green arrows mark the routed (selective) path; the solid arrows mark the fan-out path.

In document-partitioned indexing, the more common choice and the one nearly every production engine defaults to, each shard holds a complete, self-contained inverted index over a disjoint slice of the documents. Adding documents means adding shards; the vocabulary is replicated, but each shard's posting lists cover only its own documents. The cost lands at query time: because any query term might match documents in any shard, every query must be broadcast to all shards, each shard computes its local top-$k$, and a coordinator merges the partial results. The end-to-end latency is therefore governed by the slowest shard, $$ T_{\text{query}} \;=\; T_{\text{coordinate}} \;+\; \max_{1 \le s \le S} T_s \;+\; T_{\text{merge}}, $$ where $S$ is the number of shards and $T_s$ is shard $s$'s local search time. The fan-out makes a single straggler set the tail latency for the whole query, the same straggler problem that Chapter 3 models, which is why production systems hedge by querying a replica when a shard runs slow.

In term-partitioned indexing, each shard owns the complete posting lists for a slice of the vocabulary. A query routes only to the shards that own its terms, so a three-term query touches at most three shards rather than all $S$, which is attractive for very long queries against very many shards. The catch is that joining the posting lists, the intersection that actually answers the query, requires shipping entire posting lists across the network to one node, and a common term's posting list can be gigabytes. Term partitioning also concentrates load on whichever shards own popular terms and makes per-document updates touch many shards at once. For these reasons web search engines overwhelmingly choose document partitioning and pay the fan-out, accepting broad-but-shallow work per shard over the network-heavy joins of the term-partitioned design. This is the same index-sharding trade-off that Chapter 25 revisits for vector indexes, where document partitioning reappears as partitioning the vectors across shards.

Practical Example: The Search Team That Stopped Sharding by Term

Who: A search infrastructure engineer at a company indexing a 400-million-document enterprise corpus for a RAG assistant.

Situation: The first index was term-partitioned, on the theory that routing each query to only the shards holding its terms would minimize the machines touched per query.

Problem: Common-term queries dragged multi-gigabyte posting lists across the network to a single join node, and that node became a bottleneck that no amount of added shards relieved.

Dilemma: Keep term partitioning and engineer around the join hotspot with caching and posting-list pruning, or rebuild as document-partitioned and accept that every query now fans out to all shards.

Decision: They rebuilt as document-partitioned, because the fan-out cost is bounded and parallel, $\max_s T_s$ rather than a serial gather, while the term-partitioned join cost grew with corpus size.

How: Each of 40 shards built a self-contained index over 10 million documents; a coordinator scattered each query, gathered per-shard top-$k$, and merged, hedging slow shards against a replica.

Result: Median latency fell and, more importantly, the tail latency became predictable, set by the slowest of 40 parallel local searches rather than by an unbounded network gather.

Lesson: Document partitioning trades a guaranteed fan-out for bounded, parallel per-shard work, and at web scale that bound is worth more than the selectivity of term routing.

4. Scoring with BM25 Intermediate

Finding the documents that contain the query terms is only half of retrieval; ranking them is the other half. The lexical engine ranks with BM25, the probabilistic scoring function that has anchored keyword search for decades and remains a strikingly strong baseline. BM25 scores a document $D$ against a query $Q$ by summing, over the query terms, a saturating term-frequency contribution weighted by how rare the term is across the corpus, $$ \text{BM25}(Q, D) \;=\; \sum_{t \in Q} \text{IDF}(t)\,\cdot\,\frac{f(t, D)\,(k_1 + 1)}{f(t, D) + k_1\,\bigl(1 - b + b\,\frac{|D|}{\text{avgdl}}\bigr)}, $$ where $f(t, D)$ is the frequency of term $t$ in document $D$, $|D|$ is the document length, $\text{avgdl}$ is the average document length, and $k_1$ and $b$ are tuning constants (typically $k_1 \approx 1.5$, $b \approx 0.75$). The inverse document frequency rewards rare terms, $$ \text{IDF}(t) \;=\; \ln\!\left( \frac{N - n(t) + 0.5}{n(t) + 0.5} + 1 \right), $$ with $N$ the number of documents and $n(t)$ the number containing $t$, which is exactly the length of $t$'s posting list. Two properties make BM25 the right fit for a distributed index. The term-frequency term saturates, so a document that repeats a word fifty times does not outscore one that uses it five times by a factor of ten, and the length normalization through $b$ keeps long documents from winning by sheer size.

The distributed subtlety hides in $\text{IDF}$. Under document partitioning, each shard knows $n(t)$ only over its own documents, not over the whole corpus, so a naively local IDF differs from the global one. Production systems either accept the approximation, since per-shard ranking errors mostly wash out in the merge, or maintain a small replicated table of global document frequencies that every shard reads. The runnable demo below makes this concrete: it builds an inverted index over a handful of documents, scores a BM25 query against the global index, then splits the index into two document-partitioned shards and shows that the per-shard scores, computed with local statistics, differ from the global ones yet still surface the right document after the merge.

import math, re
from collections import defaultdict

DOCS = {
    0: "distributed indexing splits the inverted index across many shards",
    1: "an inverted index maps each term to a posting list of documents",
    2: "bm25 ranks documents by term frequency and inverse document frequency",
    3: "vector search complements lexical retrieval for hybrid rag pipelines",
    4: "the mapreduce shuffle groups term postings to build the inverted index",
}

def tokenize(text):
    return re.findall(r"[a-z0-9]+", text.lower())

def build_index(docs):
    """Return postings: term -> {docID: term_freq}, plus per-doc lengths."""
    postings, doc_len = defaultdict(dict), {}
    for did, text in docs.items():
        toks = tokenize(text)
        doc_len[did] = len(toks)
        tf = defaultdict(int)
        for t in toks:
            tf[t] += 1
        for t, f in tf.items():
            postings[t][did] = f                 # term -> (docID, frequency)
    return postings, doc_len

def bm25_scores(query, postings, doc_len, k1=1.5, b=0.75):
    N = len(doc_len)
    avgdl = sum(doc_len.values()) / N
    scores = defaultdict(float)
    for term in tokenize(query):
        plist = postings.get(term)
        if not plist:
            continue
        df = len(plist)                          # n(t) = posting-list length
        idf = math.log((N - df + 0.5) / (df + 0.5) + 1.0)
        for did, f in plist.items():
            denom = f + k1 * (1 - b + b * doc_len[did] / avgdl)
            scores[did] += idf * (f * (k1 + 1)) / denom
    return sorted(scores.items(), key=lambda kv: kv[1], reverse=True)

postings, doc_len = build_index(DOCS)
print("=== Inverted index (term -> posting list) ===")
for term in ["inverted", "index", "bm25", "rag"]:
    print(f"  {term:9s} -> {dict(postings.get(term, {}))}")

query = "inverted index for rag"
print(f"\n=== BM25 ranking for query: '{query}' ===")
for rank, (did, score) in enumerate(bm25_scores(query, postings, doc_len), 1):
    print(f"  #{rank}  doc {did}  score={score:.4f}  | {DOCS[did]}")

# Document-partitioned sharding: each shard owns a disjoint slice of docs
# and builds its OWN local index. A query must fan out to every shard.
shard_docs = [{d: DOCS[d] for d in (0, 1, 2)}, {d: DOCS[d] for d in (3, 4)}]
shard_index = [build_index(sd) for sd in shard_docs]
print("\n=== Document-partitioned sharding (2 shards) ===")
for s, (p, dl) in enumerate(shard_index):
    print(f"  shard {s}: docs {sorted(dl)}  | {len(p)} distinct terms")

print(f"\n=== Fan-out query '{query}' across shards, then merge ===")
merged = []
for s, (p, dl) in enumerate(shard_index):
    local = bm25_scores(query, p, dl)
    if local:
        print(f"  shard {s} local top: doc {local[0][0]} score={local[0][1]:.4f}")
    merged.extend(local)
merged.sort(key=lambda kv: kv[1], reverse=True)
print("  merged top doc:", merged[0][0] if merged else None)
Code 36.4.1: A complete lexical retriever in one screen: build_index inverts the corpus into posting lists, bm25_scores ranks documents by the formula above, and the second half splits the index into two document-partitioned shards, fans the query out to both, and merges their partial results.
=== Inverted index (term -> posting list) ===
  inverted  -> {0: 1, 1: 1, 4: 1}
  index     -> {0: 1, 1: 1, 4: 1}
  bm25      -> {2: 1}
  rag       -> {3: 1}

=== BM25 ranking for query: 'inverted index for rag' ===
  #1  doc 3  score=2.9276  | vector search complements lexical retrieval for hybrid rag pipelines
  #2  doc 0  score=1.1383  | distributed indexing splits the inverted index across many shards
  #3  doc 4  score=1.0412  | the mapreduce shuffle groups term postings to build the inverted index
  #4  doc 1  score=0.9987  | an inverted index maps each term to a posting list of documents

=== Document-partitioned sharding (2 shards) ===
  shard 0: docs [0, 1, 2]  | 26 distinct terms
  shard 1: docs [3, 4]  | 19 distinct terms

=== Fan-out query 'inverted index for rag' across shards, then merge ===
  shard 0 local top: doc 0 score=0.9980
  shard 1 local top: doc 3 score=1.4516
  merged top doc: 3
Output 36.4.1: The rare term "rag" carries the most IDF weight, so doc 3 wins both globally and after the fan-out merge. Note that the per-shard scores differ from the global ones (doc 3 scores 1.4516 locally versus 2.9276 globally) because each shard computes IDF from its own document frequencies; the merge still recovers the correct top document, the practical reason document-partitioned systems tolerate local IDF.
Library Shortcut: OpenSearch Builds and Shards the Index for You

Code 36.4.1 spells out the inverted index, BM25 scoring, and document-partitioned fan-out by hand, roughly sixty lines. A production search engine such as OpenSearch or Elasticsearch (both built on Apache Lucene) gives you all three, plus replication, hedged requests, and global-IDF handling, behind a single index-creation call:

from opensearchpy import OpenSearch, helpers

client = OpenSearch("https://localhost:9200")
client.indices.create("corpus", body={               # document-partitioned by default
    "settings": {"number_of_shards": 40, "number_of_replicas": 1},
    "mappings": {"properties": {"text": {"type": "text",
        "similarity": "BM25"}}}})                     # BM25 is the built-in default

helpers.bulk(client, ({"_index": "corpus", "_id": i, "text": t}
                      for i, t in DOCS.items()))      # map+shuffle+reduce, hidden

hits = client.search(index="corpus", body={           # scatter, score, merge top-k
    "query": {"match": {"text": "inverted index for rag"}}})["hits"]["hits"]
Code 36.4.2: The same lexical retriever in OpenSearch. The engine performs the MapReduce-style build, document-partitioned sharding across 40 shards, per-shard BM25, and the scatter-gather merge internally; about sixty lines of hand-written index and scoring code collapse to one create, one bulk, and one search.

5. Compressing the Posting Lists Advanced

A web-scale inverted index holds hundreds of billions of postings, and stored as raw 32-bit integers it would not fit in cluster memory, where it must live for queries to be fast. Compression is therefore not an optimization but a requirement, and posting lists compress beautifully because they are sorted. The standard move is delta encoding: instead of storing the document identifiers $[17, 23, 24, 90, \dots]$, store the gaps between consecutive identifiers $[17, 6, 1, 66, \dots]$. Gaps are smaller than absolute identifiers, especially for common terms whose postings are dense, and small integers encode in few bits. A frequent term that appears in every tenth document has gaps clustered around ten, which a variable-length code stores in a handful of bits each rather than a full word.

The gaps are then packed with a variable-length integer code. Classic schemes include variable-byte encoding and the Elias gamma and delta codes; modern engines favor SIMD-friendly block codes such as PForDelta and the Roaring bitmap layout, which decode billions of integers per second by processing whole blocks at once. To bound the worst case, note that a posting list for a term appearing in $n(t)$ of $N$ documents, delta-encoded, needs on the order of $$ \text{bits}(t) \;\approx\; n(t)\,\Bigl\lceil \log_2\!\frac{N}{n(t)} \Bigr\rceil $$ bits, a quantity that grows with how common the term is yet shrinks per posting as the gaps tighten. Real indexes reach well under one byte per posting. Compression also interacts with the query path: block codes let the engine skip whole blocks of a posting list during an intersection without decoding them, turning a long merge into a series of jumps, the skip-list optimization that keeps conjunctive queries fast. The compressed segments are the immutable files that the storage layer of Chapter 8 memory-maps into each shard's address space.

6. Hybrid Retrieval: Lexical Meets Dense Intermediate

The lexical index built in this section and the dense vector index built in Section 36.5 have complementary failure modes. BM25 is precise on exact terms, rare identifiers, and proper nouns but is blind to synonymy: a query for "automobile" misses a document that only ever says "car". Dense retrieval matches meaning and bridges that gap but can drift on rare tokens and exact strings that no embedding has learned. A hybrid retriever runs both and fuses their results, and the fusion is what feeds the RAG generator its context. The simplest robust fusion is reciprocal rank fusion, which combines the two ranked lists by summing a rank-based score, $$ \text{RRF}(d) \;=\; \sum_{r \in \{\text{lex}, \text{dense}\}} \frac{1}{k + \text{rank}_r(d)}, $$ where $\text{rank}_r(d)$ is document $d$'s position in retriever $r$'s ranking and $k$ is a small constant (commonly 60) that damps the influence of low ranks. Because RRF uses ranks rather than raw scores, it sidesteps the awkward problem that BM25 scores and cosine similarities live on incomparable scales, the same reason the local-versus-global IDF mismatch in Output 36.4.1 stops mattering once results are ranked.

Architecturally the two indexes mirror each other: both are document-partitioned across shards, both fan a query out to all shards, and both merge per-shard top-$k$ lists at a coordinator, which is why a single retrieval service can host both and fuse them at the merge step. The lexical half owns recall on exact matches; the dense half owns recall on meaning; the generator downstream sees one fused, deduplicated context window. With this section the chapter has built the lexical retriever; Section 36.5 builds its dense twin and Section 36.6 shards it for billion-vector search, after which the two streams join.

Research Frontier: Learned Sparse Retrieval and Unified Indexes (2024 to 2026)

The boundary between lexical and dense retrieval is dissolving. Learned sparse retrievers such as SPLADE and its successors keep the inverted-index machinery of this section, posting lists, BM25-style scoring, and all the distributed sharding, but replace hand-counted term frequencies with weights predicted by a transformer, expanding each document with semantically related terms so that an inverted index can match "car" to "automobile" without leaving the sparse world. In parallel, work on unified and hybrid indexes (for example the BGE-M3 multi-functionality embeddings and production fusion in OpenSearch and Vespa) trains a single model to emit sparse, dense, and multi-vector representations together, so that one index build serves both retrieval modes and the fusion of Section 6 happens inside the model rather than after it. The open question the field is actively chasing is whether a single distributed index can replace the two-engine hybrid entirely while keeping the exact-match precision that keeps BM25 in every production stack.

Fun Note: BM25 Refuses to Retire

Every few years a new neural retriever is announced as the end of keyword search, and every few years BM25 shows up in the same paper's baseline table, stubbornly within a few points of the state of the art and occasionally beating it on out-of-domain queries. A scoring function from 1994, built from term counts and a logarithm, remains the line that billion-parameter models must clear. The lesson the field keeps relearning: the cheapest component in your retrieval stack is often the one you cannot remove.

Exercise 36.4.1: Fan-Out Latency Under Document Partitioning Analysis

A document-partitioned index has $S = 50$ shards. Each shard's local search time is independent and, on a typical query, takes 8 milliseconds on average, but 2 percent of the time a shard hits a slow path and takes 40 milliseconds. Using $T_{\text{query}} \approx T_{\text{coordinate}} + \max_s T_s + T_{\text{merge}}$ with negligible coordinate and merge times, estimate the probability that at least one of the 50 shards is on its slow path for a given query, and explain why this makes the expected tail latency close to 40 milliseconds even though the average shard is fast. Then describe how querying a replica for any shard that has not responded within 12 milliseconds changes the calculation, and connect your answer to the straggler models of Chapter 3.

Exercise 36.4.2: Global Versus Local IDF Coding

Extend Code 36.4.1 so that the two document-partitioned shards share a single replicated table of global document frequencies $n(t)$ computed over all five documents, and use it in place of each shard's local $n(t)$ when computing IDF. Re-run the fan-out query and compare the per-shard scores to the local-IDF scores in Output 36.4.1 and to the global ranking. Quantify how much the ranking changes for this tiny corpus, then argue what happens to the size of the discrepancy as the number of shards grows while the corpus stays fixed, and explain why production systems often tolerate local IDF anyway.

Exercise 36.4.3: Why Not Term Partitioning for Search Conceptual

Construct a concrete two-term query against a term-partitioned index where the join cost (shipping posting lists to one node) dominates everything else, and a matching query against a document-partitioned index where the fan-out cost dominates. For each, identify the single quantity that drives the cost (posting-list length versus shard count) and explain how it scales as the corpus grows tenfold. Conclude with one realistic workload, by query length, corpus size, and update rate, in which term partitioning would actually be the better choice, and relate it to the index-sharding discussion for vector search in Chapter 25.