Part V: Distributed Inference and Serving
Chapter 25: Distributed Retrieval and Vector Search

Distributed Embedding Pipelines

"I embedded a billion passages last night and only one of them came back to thank me. The other nine hundred ninety-nine million were encoded with a model the query side never heard of."

An Encoder With Version Skew
Big Picture

Before any nearest-neighbor search can happen, the corpus must become vectors, and turning a web-scale corpus into vectors is a distributed batch-inference job measured in GPU-days, not a preprocessing afterthought. Retrieval has two embedding moments that look similar and behave nothing alike. At index time you run an embedding model over billions of document chunks once, offline, and write the vectors to the index; the only thing that matters is throughput, and the work is parallelized across a fleet of accelerators. At query time you embed one short query on the serving path under a strict latency budget. The single non-negotiable rule that ties the two together is that they must use the identical model and preprocessing, or the vectors are not comparable and retrieval silently returns garbage. This section treats the index-time job as the distributed batch inference of Chapter 23 running over the sharded data of Chapter 7, and shows why the two embedding paths pull in opposite directions.

A vector index is only as good as the vectors poured into it, and those vectors do not appear by themselves. Section 25.1 framed retrieval-augmented generation as a distributed system and assumed an index already existed; this section builds that index. The task is deceptively plain to state: take a corpus, cut each document into chunks, run an embedding model over every chunk, and store the resulting vector next to its chunk. The difficulty is entirely in the scale. A serious corpus is hundreds of millions of documents, which becomes billions of chunks, and an embedding model is a transformer that must run on every one of them. That is a large GPU-heavy batch job, the same kind of distributed batch inference developed in Chapter 23, fed by data sharded across storage exactly as Chapter 8 describes. Figure 25.2.1 lays out the two paths the corpus and a query each travel, which the rest of this section pulls apart in turn.

Index-time path (offline, throughput) Sharded corpus shard 1 ... shard M billions of document chunks (Ch 7, Ch 8) GPU worker 1 GPU worker 2 GPU worker K embedding model, large batches Vector index (chunk, vector) Section 25.3 vectors Query-time path (online, latency) One query batch size 1 SAME embedding model served, low latency (Ch 24) query vector -> search if this model differs from the index model, recall collapses
Figure 25.2.1: The two embedding paths of a retrieval system. The index-time path (top) is an offline distributed batch job: a sharded corpus of billions of chunks fans out to $K$ GPU workers that run the embedding model in large batches and write vectors to the index. The query-time path (bottom) embeds a single query online, under a latency budget, on a served copy of the model (Chapter 24). Both paths converge on the same index, which is why both must use the identical model.

1. Indexing-Time Embedding Is Distributed Batch Inference Beginner

The index-time job has a fixed shape that should feel familiar from Part IV and the inference chapters before this one. The corpus already lives sharded across object storage or a distributed file system, so step one is simply to read it in parallel, one reader per shard, exactly the data-loading pattern of Chapter 8. Step two is chunking: each document is split into passages of a few hundred tokens, because an embedding model has a bounded context window and because a single vector for a 50-page document would be too coarse to retrieve anything useful. Step three is the embedding model itself, a transformer encoder run forward over each chunk to produce one dense vector, typically a few hundred to a couple thousand dimensions. Step four writes the (chunk identifier, vector) pairs into the index.

Steps one, two, and four are data-parallel in the boring, exact sense of Chapter 1: every chunk is independent, so the work partitions across machines with no communication between them. Step three is the expensive one. Embedding is a pure forward pass with no gradients, no all-reduce, and no cross-chunk dependency, which makes it embarrassingly parallel and therefore an ideal distributed batch-inference workload. Each GPU worker pulls a stream of chunks, batches them to fill the accelerator, runs the encoder, and emits vectors. There is no coordination on the critical path; the only shared resource is the index being written, and that is an append.

Key Insight: Index-Time Embedding Is a GPU-Days Batch Job, Not a Preprocessing Step

Embedding a web-scale corpus is one of the largest single pieces of compute in a retrieval system. Billions of chunks times a transformer forward pass each adds up to GPU-days of work, which means it must be parallelized across a fleet, scheduled like any other distributed job, and budgeted as a real cost. Treating "compute the embeddings" as a one-line preprocessing step is the mistake that turns a two-hour cluster run into a two-week single-machine stall. The remedy is the same embarrassingly-parallel batch inference of Chapter 23: shard the corpus, fan out across $K$ workers, and the wall-clock falls by a factor of $K$ because there is nothing to synchronize.

2. The Throughput Problem in Numbers Intermediate

It helps to see the cost as arithmetic rather than intuition. Let the corpus produce $N$ chunks, let a single accelerator embed $r$ chunks per second when batched, and let the fleet hold $K$ such accelerators. The total compute, expressed as GPU-hours independent of how many machines you use, is

$$\text{GPU-hours} = \frac{N}{r \cdot 3600}, \qquad \text{wall-clock} = \frac{N}{K \cdot r}, \qquad \text{cost} = \text{GPU-hours} \times p,$$

where $p$ is the price per GPU-hour. The GPU-hours are fixed by the corpus and the model; $K$ buys you wall-clock by dividing that fixed work across more machines, and it does so almost perfectly because the job has no communication tax. This is the rare distributed workload where adding machines gives nearly linear speedup, the happy opposite of the communication-bound training of Chapter 1. The code below makes the estimate concrete for a corpus of 200 million documents, then turns to the second lesson: index-time and query-time embedding optimize opposite objectives.

import numpy as np

# --- Part 1: GPU-hours and dollar cost to embed a corpus of N chunks ---
N_docs = 200_000_000          # documents in the corpus
chunks_per_doc = 6            # ~6 passages per document
N = N_docs * chunks_per_doc   # total chunks to embed
per_gpu_throughput = 1800     # chunks/second on one accelerator (batched)
K = 64                        # GPUs in the embedding fleet
price_per_gpu_hour = 2.50     # USD, on-demand accelerator

gpu_hours = (N / per_gpu_throughput) / 3600.0
wall_hours = (N / (K * per_gpu_throughput)) / 3600.0
cost = gpu_hours * price_per_gpu_hour

print("=== Index-time embedding: one big distributed batch job ===")
print(f"chunks to embed       : {N:,}")
print(f"single-GPU days       : {(N / per_gpu_throughput) / 86400.0:,.1f}")
print(f"GPUs (K)              : {K}")
print(f"wall-clock (hours)    : {wall_hours:,.1f}")
print(f"total GPU-hours       : {gpu_hours:,.0f}")
print(f"total cost (USD)      : ${cost:,.0f}")

# --- Part 2: index-time vs query-time embedding optimize OPPOSITE goals ---
fixed_ms = 9.0      # per-call fixed cost: launch, copy, tokenize (ms)
per_item_ms = 0.4   # marginal cost per chunk in a batch (ms)
print("\n=== Same model, opposite goals ===")
print(f"{'batch':>6} {'latency/req(ms)':>16} {'throughput(chunks/s)':>22}")
for B in (1, 8, 64, 512):
    batch_ms = fixed_ms + B * per_item_ms   # time to process the whole batch
    latency = batch_ms                       # a single request waits the full batch
    throughput = B / (batch_ms / 1000.0)     # chunks per second
    print(f"{B:>6} {latency:>16.1f} {throughput:>22,.0f}")
print("index-time picks B=512 (max throughput); query-time picks B=1 (min latency)")

# --- Part 3: mismatched index/query embedding models break retrieval ---
rng = np.random.default_rng(7)
d, n_passages = 384, 5000
topics = rng.standard_normal((n_passages, 32))   # latent document content
proj_A = rng.standard_normal((32, d))            # embedding model A (index time)
proj_B = rng.standard_normal((32, d))            # a DIFFERENT model version B

def encode(topic_vecs, proj):
    v = topic_vecs @ proj
    return v / np.linalg.norm(v, axis=-1, keepdims=True)

index_A = encode(topics, proj_A)                 # index built with model A
q_ids = rng.choice(n_passages, size=200, replace=False)

def recall_at1(query_vecs):
    sims = query_vecs @ index_A.T                # cosine vs the model-A index
    return np.mean(np.argmax(sims, axis=1) == q_ids)

print("\n=== Embedding-version skew breaks retrieval ===")
print(f"recall@1, query model == index model (A vs A): {recall_at1(encode(topics[q_ids], proj_A)):.3f}")
print(f"recall@1, query model != index model (B vs A): {recall_at1(encode(topics[q_ids], proj_B)):.3f}")
Code 25.2.1: A pure-Python model of the embedding pipeline in three parts: the GPU-hours and dollar cost of the index-time batch job, the opposite batch-size preferences of the index-time and query-time paths, and a demonstration that a query encoded by a different model than the index cannot retrieve its own passage.
=== Index-time embedding: one big distributed batch job ===
chunks to embed       : 1,200,000,000
single-GPU days       : 7.7
GPUs (K)              : 64
wall-clock (hours)    : 2.9
total GPU-hours       : 185
total cost (USD)      : $463

=== Same model, opposite goals ===
 batch  latency/req(ms)   throughput(chunks/s)
     1              9.4                    106
     8             12.2                    656
    64             34.6                  1,850
   512            213.8                  2,395
index-time picks B=512 (max throughput); query-time picks B=1 (min latency)

=== Embedding-version skew breaks retrieval ===
recall@1, query model == index model (A vs A): 1.000
recall@1, query model != index model (B vs A): 0.000
Output 25.2.1: Embedding 1.2 billion chunks is 7.7 GPU-days of work, which 64 accelerators finish in under three hours for a few hundred dollars. The batch-size table shows the conflict: large batches maximize throughput but inflate per-request latency, so index time wants $B=512$ while query time wants $B=1$. The skew block shows recall collapsing from 1.000 to 0.000 the moment the query model differs from the index model.

Three lessons sit in that output. First, the cost is real but tractable when distributed: 7.7 single-GPU-days collapse to under three wall-clock hours across 64 GPUs, almost perfect scaling because nothing is synchronized. Second, the batch-size table shows why the two paths cannot share a configuration. Third, and most important, the recall numbers expose the consistency rule we now make explicit.

3. Query-Time Embedding and the Opposite-Goals Problem Intermediate

Query-time embedding looks like a smaller version of the same task, and that resemblance is a trap. At query time the input is one short query that must become a vector before any search can begin, and that embedding now sits on the serving path with a user waiting. The embedding model is itself a served model, hosted and scaled with the techniques of Chapter 24, and the metric that matters has flipped from throughput to tail latency. The index-time job happily waits to accumulate a batch of 512 chunks because a few hundred milliseconds of batching cost nothing when the whole job runs for hours. The query path cannot wait: batching one user's query behind 511 others would add the very latency the system is trying to avoid.

The batch-size table in Output 25.2.1 captures this exactly. At $B=512$ the fleet reaches 2,395 chunks per second, more than twenty times the throughput of $B=1$, which is precisely what the offline job wants. But that same batch takes 214 milliseconds to clear, an eternity for a single query that could have been answered in 9. The two paths run the identical model with identical weights, yet they configure it as near-opposites: index time maximizes throughput with the largest batch the accelerator can hold, query time minimizes latency with a batch of one (or a tiny dynamic batch with a millisecond-scale timeout). Recognizing that one model serves two regimes, and that you tune each regime against a different objective, is the practical heart of running an embedding pipeline.

Practical Example: The Embedding That Was Fast Offline and Slow Online

Who: A retrieval engineer at a documentation-search company rolling out semantic search.

Situation: The team had a beautiful offline pipeline embedding 80 million help-center passages overnight on a 32-GPU spot fleet.

Problem: They reused the exact same serving configuration for the query encoder, including a batch size of 256 with a 200 ms accumulation window.

Dilemma: Keep the high-throughput configuration that made the offline job cheap, or build a second deployment of the same model tuned for latency, doubling the serving surface to maintain.

Decision: They split the model into two deployments: the offline batch job kept the large batch, and the online query encoder switched to dynamic batching with a 5 ms timeout and a max batch of 8.

How: Same weights, same preprocessing, two serving profiles; the query encoder ran behind the same autoscaler as the rest of the serving fleet (Chapter 24).

Result: Query embedding p99 latency dropped from 210 ms to 11 ms with no change to the vectors or to recall, because the weights were untouched.

Lesson: Index-time and query-time embedding are the same model tuned for opposite objectives. Share the weights, never the serving configuration.

Library Shortcut: sentence-transformers Plus Ray Data for the Fan-Out

Code 25.2.1 modeled the pipeline arithmetically; in practice you do not hand-roll the batching, the GPU placement, or the shard fan-out. The sentence-transformers library wraps an embedding model behind a single encode call that handles tokenization, batching, and pooling, and Ray Data distributes that call across the fleet by mapping it over a sharded dataset, one model replica per GPU:

# Distributed index-time embedding: a few lines replace the whole fan-out.
import ray
from sentence_transformers import SentenceTransformer

class Embedder:                                    # one instance per GPU worker
    def __init__(self):
        self.model = SentenceTransformer("BAAI/bge-base-en-v1.5", device="cuda")
    def __call__(self, batch):
        batch["vector"] = self.model.encode(       # large batch, max throughput
            list(batch["chunk"]), batch_size=512, normalize_embeddings=True)
        return batch

(ray.data.read_parquet("s3://corpus/chunks/")      # sharded corpus (Ch 8)
    .map_batches(Embedder, concurrency=64,         # 64 GPU workers, no all-reduce
                 num_gpus=1, batch_size=512)
    .write_parquet("s3://index/vectors/"))         # append (chunk, vector) pairs
Code 25.2.2: The same fan-out modeled in Code 25.2.1, now as production code. Ray Data owns the shard reading, the 64-way parallelism, and the GPU placement, while sentence-transformers owns batching and pooling; the roughly two dozen lines of manual scheduling collapse to a single map_batches over a sharded dataset.

4. The Consistency Requirement: Embedding-Version Skew Advanced

The recall numbers in Output 25.2.1 are the most important three lines in this section. A query vector is comparable to an index vector only if both came from the same embedding model with the same preprocessing. Encode the index with model A and the query with model B, even a slightly different version of the same architecture, and the two vectors live in incompatible geometries; cosine similarity between them is meaningless. The synthetic experiment makes this stark: with matched models, every query retrieves its own passage at rank one (recall 1.000); with a different model on the query side, not a single one is found (recall 0.000). Real encoders degrade more gracefully than random projections, but the direction is the same and the failure is quiet, which is what makes it dangerous.

This is training/serving skew wearing a retrieval costume. The pathology is exactly the feature skew of Chapter 9: the index-time path and the query-time path are two code paths computing what is supposed to be the same function, and any drift between them, a model upgrade applied to one side, a different tokenizer version, a changed normalization, a mismatched pooling layer, corrupts the result while every component reports healthy. The index does not know its vectors are stale; the query encoder does not know it is speaking a different dialect; the search returns plausible-looking neighbors that are simply wrong.

Key Insight: Embedding Version Is Part of the Index Contract

The embedding model and its full preprocessing (tokenizer, truncation, pooling, normalization) are a versioned contract between the index and the query path, not an implementation detail. Upgrading the embedding model is not a code deploy; it is a full re-embedding of the corpus, a fresh index, and an atomic swap of both sides together. Pin the model version into the index metadata, refuse queries from a mismatched encoder, and treat any divergence as a production incident, because the symptom is not a crash but a slow, silent collapse of retrieval quality that no health check will catch.

The operational consequence is that an embedding-model upgrade is one of the heaviest operations in a retrieval system. You cannot upgrade the query encoder on Tuesday and re-embed the corpus the following month; for that gap the query vectors do not match the index and recall craters. The correct procedure is to re-embed the entire corpus with the new model into a fresh index (another full GPU-days batch job), then cut both the index and the query encoder over to the new version in one atomic switch, keeping the old pair live until the new one is verified. The cost of the index-time batch job in Section 2 is therefore not a one-time bill; it recurs on every embedding-model upgrade.

5. Incremental Embedding and Chunking Strategy Intermediate

Two refinements separate a demo pipeline from a production one. The first is freshness. A corpus is rarely static: new documents arrive, old ones change, some are deleted. Re-running the full GPU-days batch job for every new document is absurd, so production systems run an incremental, streaming embedder alongside the bulk job. New and changed documents flow through the same chunk-then-embed path the moment they land, using the streaming machinery of Chapter 9, and their vectors are upserted into the live index. The bulk job builds the index once; the streaming job keeps it fresh. The hard constraint is that the streaming embedder must use the identical model version as the bulk job, or it reintroduces exactly the skew of Section 4 through a side door.

The second refinement is chunking strategy, which quietly determines retrieval quality. Chunk too large and a single vector must summarize many distinct ideas, blurring the signal so that no query matches sharply; chunk too small and you shatter the context a passage needs to be meaningful, and you multiply $N$, the chunk count, which directly multiplies the GPU-hours in Section 2. Overlapping windows preserve context across boundaries at the cost of more chunks; sentence-aware or structure-aware splitting respects natural units at the cost of variable chunk sizes that complicate batching. There is no universal answer; the chunking decision trades retrieval quality against embedding cost, and it must be made identically at index time and at incremental time, because the chunk boundaries are themselves part of the preprocessing contract.

Fun Note: The Chunk That Was Cut in Half

A classic chunking bug splits a document so that the sentence "the patient should not be given penicillin" lands its negation in chunk 7 and its drug name in chunk 8. Each chunk embeds to something reasonable; neither carries the meaning. The retriever cheerfully returns chunk 8 for a penicillin query, having lost the one word that mattered. Chunk boundaries are not whitespace; they are semantic cuts, and a careless cut can invert meaning.

Research Frontier: Embeddings Get Longer, Shorter, and Plural (2024 to 2026)

Three lines are reshaping what the index-time job produces. Long-context embedders such as the E5-Mistral and NV-Embed families and the Jina and Nomic long-document encoders push the context window from a few hundred tokens to thousands, easing the chunking tradeoff by letting one vector cover a whole section without losing detail. Matryoshka representation learning (Kusupati et al., 2022, now widely shipped in 2024 to 2026 models including OpenAI's text-embedding-3 and the Nomic and bge-m3 releases) trains a single embedding whose leading dimensions are themselves a usable shorter embedding, so one index-time pass yields vectors you can truncate to 64, 256, or 768 dimensions and trade recall for storage and search cost without re-embedding. Multi-vector models in the ColBERT lineage (ColBERTv2 and the 2024 ColBERT-style late-interaction retrievers) abandon the one-vector-per-chunk premise entirely, storing a vector per token for finer-grained matching, which multiplies index size and reshapes the distributed search of Sections 25.4 and 25.5. Each shifts the cost arithmetic of Section 2, and each still obeys the consistency rule of Section 4: whatever the model produces, index and query must produce it the same way.

With the corpus turned into vectors and the consistency contract understood, the open question is where those billions of vectors live and how they are searched. That is the job of a vector database: a system that stores vectors, indexes them for approximate nearest-neighbor search, and shards across machines when the index outgrows one of them. Section 25.3 takes up that system, and the chunking and dimensionality choices made here set the scale it must handle.

Exercise 25.2.1: Size the Re-Embedding Bill Conceptual

Using the formulas in Section 2, your team has a corpus of 500 million documents at 8 chunks each, an accelerator that embeds 1,500 chunks per second, and a price of \$3.00 per GPU-hour. (a) Compute the total GPU-hours and dollar cost of one full index-time embedding pass. (b) You upgrade the embedding model and must re-embed from scratch into a fresh index. Explain, referencing Section 4, why you cannot simply upgrade the query encoder and leave the old index in place. (c) If the new model has a 2x larger context window and you halve the chunk count to take advantage of it, how do the GPU-hours and the cost change, and what retrieval-quality risk does the coarser chunking introduce?

Exercise 25.2.2: Measure the Opposite-Goals Curve Coding

Extend the Part 2 batch-size loop in Code 25.2.1 to sweep batch sizes from 1 to 2,048 in powers of two, recording per-request latency and fleet throughput for each. Plot throughput against latency (a Pareto frontier). Mark the point an index-time job should pick (maximize throughput subject to fitting GPU memory) and the point a query-time path should pick (minimize latency subject to a target of at least, say, 100 chunks per second). Then add a second curve for a smaller fixed_ms (a cheaper per-call cost) and explain how reducing the fixed cost changes where the query-time path should operate.

Exercise 25.2.3: Quantify Graceful Skew Analysis

The Part 3 demo uses two independent random projections, so a mismatched model gives recall 0.000, a worst case. Real model versions are correlated. Modify the experiment so model B is a perturbation of model A, proj_B = proj_A + eps * noise, and sweep eps from 0 (identical) upward. Plot recall@1 against eps. Argue from the curve why embedding-version skew is more dangerous than a hard crash: identify the regime where recall is degraded but still high enough that the system looks healthy, and connect this to the training/serving skew framing of Chapter 9.