Part VIII: Case Studies and Capstone Projects
Chapter 40: Distributed LLM and Agentic Applications

Distributed Document Processing

"A PDF, waiting in a queue of ten million to become searchable. The scanner that made me was honest work; the OCR that reads me is somebody else's problem now."

A Scanned Invoice Between Parse and Chunk
Big Picture

Before an agent can answer a question over a company's documents, every document must be parsed, cleaned, split into retrievable pieces, tagged with the metadata that controls who may see it, and written to a chunk store, all at a scale that no single machine can chew through in a reasonable window. This is the enterprise sibling of the web crawl from Section 36.2: the volumes are smaller and the sources are trusted, but the formats are messier (scanned PDFs, spreadsheets, slide decks) and the access rules are strict (every chunk inherits the permissions of the file it came from). We build the ingestion stage as a distributed batch job, a map over documents across many workers in the MapReduce style of Chapter 6 and the Spark style of Chapter 7, and we keep it fresh with the incremental updates of Chapter 9. The decisions made here, above all how documents are chunked and what metadata travels with each chunk, fix the ceiling on everything the retriever and the agent can later find.

The agentic application of this chapter is only as good as the corpus it can search. In Chapter 36 the corpus was the open web, crawled, deduplicated, and cleaned at petabyte scale. Here the corpus is an enterprise's own documents: contracts in PDF, policies in Word, financial models in Excel, decks in PowerPoint, and a long tail of scanned paper that arrived as images. The total volume is modest by web standards, perhaps a few million documents and a few terabytes, but the heterogeneity is brutal, and two enterprise constraints absent from a public crawl now dominate the design. First, access control: a chunk extracted from a document the requesting user may not read must never reach that user's agent, so permissions are not an afterthought but a field that flows through the entire pipeline. Second, structure: enterprise documents carry tables, forms, and headings whose meaning is lost if the layout is flattened to a wall of text. This section turns that pile into a clean, permissioned, retrievable chunk store, and it does so as a distributed batch job because the per-document work is independent and embarrassingly parallel.

Document store PDF, DOCX, XLSX, PPTX, scans (Ch 8) Doc queue Worker 1 parse → clean → chunk → metadata Worker 2 parse → clean → chunk → metadata Worker W parse → clean → chunk → metadata Chunk store text + metadata, permissions, IDs Embed (40.3) each document is processed independently: a map over documents, no cross-worker communication
Figure 40.2.1: The distributed document-processing pipeline. Documents drawn from the storage layer of Chapter 8 are queued and fanned out to $W$ workers, each of which independently parses, cleans, chunks, and tags one document at a time, then writes the resulting chunks (with their permissions) to a shared store. Because documents do not depend on one another, this is a pure map with no shuffle, and the embedding stage of Section 40.3 consumes the store downstream.

1. Parsing a Corpus of Mixed Formats Beginner

The first stage takes a file and returns text plus structure. A born-digital PDF carries a text layer that can be read directly, but its reading order is rarely linear: multi-column layouts, headers, footers, and footnotes interleave, and a naive extraction yields scrambled prose. Layout-aware extraction recovers the intended order by reasoning about the geometry of text blocks on the page, keeping a table's cells together and separating a sidebar from the body. Office formats are friendlier because the structure is explicit in the file: a DOCX exposes headings and paragraphs, an XLSX exposes sheets and cells, a PPTX exposes slides and their text frames. The hard case is the scan, a PDF or image with no text layer at all, where optical character recognition (OCR) must convert pixels to characters before anything else can proceed. OCR is the most compute-heavy step in the pipeline and the one with the most variable cost per page, which matters for the sizing argument of Section 4.

Tables deserve their own treatment because they are where enterprise meaning concentrates and where flattening does the most damage. A pricing table read row-major into a single paragraph loses the column that says whether a number is a unit price or a total. Layout-aware parsers emit tables as structured objects (rows and cells, or serialized to a small markup such as HTML or Markdown) so that the retriever later sees "Region: EMEA, Q3 revenue: 4.2M" rather than an undifferentiated string of digits. This is a point of contact with the cleaning discipline of Section 36.3, where boilerplate stripping and normalization were developed for web pages; here the same cleaning runs, but the input is a parser's structured output rather than raw HTML, and the boilerplate to strip is headers, footers, watermarks, and confidentiality stamps rather than navigation bars and ad slots.

Key Insight: Parsing Decides What Exists; Everything Downstream Only Sees Its Output

The retriever and the agent never touch the original document. They see only the text and metadata the parser chose to emit. A column dropped during table extraction, a heading lost when layout was flattened, a paragraph scrambled by bad reading order: none of these can be recovered by a smarter embedding model or a cleverer prompt downstream, because the information simply is not in the chunk store. Parsing quality is therefore the true ceiling on retrieval quality, and it is worth more engineering attention than any later stage.

2. Chunking Strategies and Why They Shape Retrieval Intermediate

A parsed document is usually too long to embed or retrieve as a single unit, so it is split into chunks, the atomic pieces the retriever indexes and the agent reads. The simplest strategy is fixed-size chunking: cut the text into windows of a fixed length with a fixed overlap between neighbors. The overlap exists so that a sentence straddling a boundary survives intact in at least one chunk, at the cost of storing some text twice. If each document yields on average $\bar{c}$ chunks, the corpus produces a total chunk count of

$$ C = D \cdot \bar{c}, $$

for $D$ documents, and this number, often in the tens of millions, sets the size of the vector index that Section 40.3 must build and Chapter 25 must serve. With a chunk window of $s$ characters and an overlap of $o$, a document of length $L$ produces about $\lceil L / (s - o) \rceil$ chunks, and the fraction of stored text that is duplicated rises with the overlap ratio $o / s$. Larger chunks pack more context per retrieval but blur the granularity, so a query about one clause drags in a whole page of unrelated text; smaller chunks sharpen granularity but risk severing the context a clause needs to be understood. This is the central chunk-size trade-off, and there is no universally right answer, only a right answer for a given retriever and query distribution.

Semantic chunking respects natural boundaries instead of a character count: it packs whole sentences (or paragraphs, or sections) until a size budget is reached, so no chunk ever ends mid-sentence. Hierarchical, or parent-child, chunking goes further by storing two granularities at once: small child chunks are indexed for precise retrieval, but when one is matched the retriever returns its larger parent (the surrounding section) so the agent reads with full context. The slogan is that you retrieve on the small piece and read on the big one. Which strategy wins is an empirical question tied to the corpus and the agent, but the structural point is universal: chunk boundaries are the unit of retrieval, so a boundary in the wrong place is a fact the agent can never assemble. The demonstration below makes the fixed-versus-semantic contrast concrete on a short policy document.

import re

# A small enterprise document body (a policy excerpt) used to compare chunkers.
text = (
    "Employees may request remote-work arrangements through the HR portal. "
    "Requests must be approved by a direct manager and recorded in the system of record. "
    "Reimbursement for home-office equipment is capped at eight hundred dollars per fiscal year. "
    "Receipts are required for any single item above one hundred dollars. "
    "Access to confidential customer data from outside the corporate network requires a hardware token. "
    "Violations of the data-handling policy are escalated to the security team within one business day. "
    "All policy exceptions must be documented and reviewed quarterly by the compliance office. "
    "This document supersedes the prior remote-work memorandum dated the previous fiscal year."
)

def fixed_chunks(s, size, overlap):
    """Fixed-size character windows with overlap; the standard RAG default."""
    step = size - overlap
    out, i = [], 0
    while i < len(s):
        out.append(s[i:i + size])
        i += step
    return out

def sentence_chunks(s, max_chars):
    """Semantic-ish split: pack whole sentences until the budget is hit."""
    sents = re.findall(r"[^.]*\.", s)
    out, cur = [], ""
    for sent in sents:
        sent = sent.strip()
        if cur and len(cur) + 1 + len(sent) > max_chars:
            out.append(cur); cur = sent
        else:
            cur = (cur + " " + sent).strip()
    if cur:
        out.append(cur)
    return out

SIZE, OVERLAP = 200, 40
fx = fixed_chunks(text, SIZE, OVERLAP)
sx = sentence_chunks(text, SIZE)
print(f"document length (chars) : {len(text)}")
print(f"fixed  size={SIZE} overlap={OVERLAP}")
print(f"  chunks                : {len(fx)}")
print(f"  sizes                 : {[len(c) for c in fx]}")
dup = sum(min(OVERLAP, len(fx[i])) for i in range(1, len(fx)))   # repeated chars
print(f"  duplicated chars      : {dup}  ({dup / len(text):.0%} redundancy)")
print(f"sentence-packed max={SIZE}")
print(f"  chunks                : {len(sx)}")
print(f"  sizes                 : {[len(c) for c in sx]}")
print(f"  mid-sentence cuts     : 0 (boundaries respected)")

# Corpus sizing: chunk count C = D * avg_chunks_per_doc.
DOCS, avg_chunks = 2_000_000, len(fx)
total = DOCS * avg_chunks
WORKERS, DOCS_PER_SEC = 64, 12.0
throughput = WORKERS * DOCS_PER_SEC          # docs/s = workers * per-worker rate
print(f"corpus: {DOCS:,} docs x {avg_chunks} chunks = {total:,} chunks")
print(f"throughput: {WORKERS} workers x {DOCS_PER_SEC} docs/s = {throughput:,.0f} docs/s")
print(f"wall-clock to ingest    : {DOCS / throughput / 3600:.2f} h")
Code 40.2.1: A fixed-size chunker (windows with overlap) and a sentence-packing chunker run on one policy excerpt, followed by a corpus-sizing calculation that turns the per-document chunk count into a total $C = D \cdot \bar{c}$ and a wall-clock estimate from worker throughput.
document length (chars) : 692
fixed  size=200 overlap=40
  chunks                : 5
  sizes                 : [200, 200, 200, 200, 52]
  duplicated chars      : 160  (23% redundancy)
sentence-packed max=200
  chunks                : 4
  sizes                 : [153, 160, 197, 179]
  mid-sentence cuts     : 0 (boundaries respected)
corpus: 2,000,000 docs x 5 chunks = 10,000,000 chunks
throughput: 64 workers x 12.0 docs/s = 768 docs/s
wall-clock to ingest    : 0.72 h
Output 40.2.1: The fixed chunker emits five windows and pays twenty-three percent redundancy for its overlap; the sentence-packed chunker emits four cleaner pieces with no mid-sentence cuts. Scaled to two million documents the corpus holds ten million chunks, and sixty-four workers ingest it in well under an hour.

The numbers in Output 40.2.1 make the trade-off legible. The fixed chunker's overlap buys boundary robustness at a twenty-three percent storage tax, money the system pays on every one of the ten million chunks and again in the vector index. The sentence-packed variant spends no overlap and respects boundaries, but its chunks vary in size and it depends on reliable sentence segmentation, which OCR noise can break. Choosing between them is the kind of empirical decision that retrieval evaluation (the metrics of Chapter 5) is meant to settle, not a matter of taste.

Library Shortcut: Unstructured and LangChain Splitters in a Few Lines

The hand-written chunkers in Code 40.2.1 exist to expose the mechanism; in production you would not write them. The unstructured library parses PDFs, DOCX, PPTX, and scans (calling OCR automatically) into typed elements (titles, narrative text, tables), and LangChain's text splitters implement fixed, recursive, and parent-child strategies behind one interface:

from unstructured.partition.auto import partition
from langchain_text_splitters import RecursiveCharacterTextSplitter

elements = partition(filename="policy.pdf")                 # parse + OCR if needed
text = "\n\n".join(e.text for e in elements if e.text)

splitter = RecursiveCharacterTextSplitter(
    chunk_size=800, chunk_overlap=120,                       # tries paragraph, then
    separators=["\n\n", "\n", ". ", " "])                    # sentence, then word breaks
chunks = splitter.split_text(text)                           # boundary-aware, no manual loop
Code 40.2.2: The roughly thirty lines of parsing and chunking from Code 40.2.1 collapse to about six. The library handles format detection, OCR invocation, table extraction, and the recursive boundary search (paragraph before sentence before word) that the fixed chunker lacked.

3. Metadata, Permissions, and Structured Tables Intermediate

A chunk is not just text. Every chunk carries a metadata record that the retriever filters on and the agent reasons with: the source document identifier and version, the page or section it came from, the document type, timestamps, and, in the enterprise, the access-control list inherited from the source file. This last field is what separates an enterprise pipeline from the public crawl of Section 36.2, where everything was world-readable. Here a chunk drawn from a board-only memorandum must carry the memorandum's permissions, so that retrieval can be filtered by the requesting user's identity before any text is returned. Access control enforced at query time on a chunk that lacks its permission tag is no access control at all; the tag must be attached at ingestion, when the source file's permissions are still in hand, and it must survive every later stage. The retriever of Chapter 25 then treats the permission field as a hard pre-filter on the candidate set.

Structured tables also live in metadata, or alongside the text in a form the agent can parse. A chunk that contains a financial table is most useful when the table's structure is preserved and a short natural-language summary sits beside it, so that the retriever can match on the summary's words while the agent reads the exact figures from the structure. Metadata is likewise what makes incremental ingestion possible: the document version and a content hash stored on each chunk let the next run detect what changed. The thread to keep in view is that metadata and chunk boundaries jointly determine the reachable set: the retriever can only surface a fact that lives inside some chunk whose metadata passes the query's filters, so a missing permission tag hides a chunk the user was entitled to, and a wrong document type filters it out of every relevant query.

Thesis Thread: The Map From Chapter 6 Carries the Whole Pipeline

The ingestion job is a map over documents, the same primitive introduced in Chapter 6, now applied to an AI retrieval corpus rather than a word count. Each document is an independent key, each worker applies parse-clean-chunk-tag with no need to talk to any other worker, and the only "reduce" is the trivial concatenation of emitted chunks into the store. That independence is exactly why the work scales out linearly: there is no shuffle, no all-reduce, no cross-worker barrier. The distributed-data machinery of Part II is not a separate topic from the agentic application of this chapter; it is the engine that fills the application's memory.

4. The Distributed Batch Pipeline and Its Sizing Intermediate

Because each document is processed independently, the pipeline is a textbook map job. Expressed in the Spark idiom of Chapter 7, the corpus is a distributed collection of document paths, a single transformation maps each path through parse, clean, chunk, and tag, the result is flattened (one document yields many chunks, so the map is a flatMap), and the chunks are written to the store. No stage requires data from another document, so there is no shuffle and the job's throughput is, to first order, linear in the worker count. With $W$ workers each processing $r$ documents per second, aggregate throughput is

$$ T = W \cdot r \quad \text{documents per second}, \qquad t_{\text{ingest}} = \frac{D}{T} = \frac{D}{W \cdot r}, $$

so the wall-clock to ingest the whole corpus falls inversely with the number of workers until some shared resource (the document store's read bandwidth from Chapter 8, or the chunk store's write bandwidth) becomes the bottleneck. Output 40.2.1 instantiates these formulas: at $W = 64$ workers and $r = 12$ documents per second the pipeline clears two million documents in about forty-three minutes. The per-document rate $r$ is dominated by the slowest stage, which for a scanned-heavy corpus is OCR, so the cluster is provisioned around the OCR cost and the cheaper born-digital documents ride along for free. This is the same batch-sizing logic that Chapter 3 made rigorous, applied to a document corpus.

Practical Example: The Legal Team That Could Not Find Its Own Contracts

Who: A platform engineer standing up document search for the legal department of an insurer.

Situation: Roughly 1.8 million contracts and amendments, half of them decade-old scans, sat in a permissioned document store, and lawyers wanted an agent that could answer clause-level questions.

Problem: A first prototype flattened every PDF to plain text with a single-machine extractor, dropped all table structure, and ignored file permissions; the agent confidently surfaced a clause from a sealed settlement to a user who should never have seen it.

Dilemma: Patch the prototype's extraction and bolt on access checks at query time, or rebuild ingestion as a distributed batch job that attaches permissions and preserves tables at parse time.

Decision: They rebuilt ingestion, because a permission tag added at query time cannot recover a chunk that was never tagged, and a table flattened at parse time cannot be un-flattened later.

How: A Spark job mapped each document through layout-aware parsing (with OCR for the scans), parent-child chunking, and a tagging step that copied each file's access-control list onto every chunk; the retriever then hard-filtered on the requesting user's identity.

Result: The full corpus ingested in under two hours on a modest cluster, clause-level questions returned the right paragraph with its surrounding section, and the access-control leak closed because no chunk could be retrieved without a matching permission tag.

Lesson: In the enterprise, permissions and structure are properties of a chunk, fixed at ingestion. Retrofitting either one downstream is either impossible or insecure.

5. Incremental Ingestion for a Living Corpus Advanced

An enterprise corpus is never frozen. Contracts are amended, policies are revised, new decks land every day, and a few documents are deleted or have their permissions changed. Re-ingesting the entire corpus on every change would be wasteful and would churn the vector index needlessly, so ingestion becomes incremental in the spirit of Chapter 9: a periodic or event-driven job processes only the documents whose content hash or version changed since the last run, mapping each through the same parse-clean-chunk-tag path and then upserting its chunks, replacing the old chunks for that document and removing any the new version no longer produces. The content hash carried in each chunk's metadata (Section 3) is what makes the change set cheap to compute. Deletions and permission changes are the subtle cases: a deleted document must have all its chunks purged from the store and the index, and a permission downgrade must propagate to every chunk so the retriever's pre-filter immediately reflects the new access rules. Incremental ingestion is therefore not a smaller version of the batch job but the batch map restricted to a change set, with an upsert and a tombstone path bolted on, and it shares all the parsing and chunking logic verbatim.

Research Frontier: Layout-Aware and Late Chunking (2024 to 2026)

Two active lines are reshaping the stages of this section. On parsing, document-understanding models that read layout and text jointly (the LayoutLM lineage, and newer vision-language document parsers such as the open-source nougat and donut families, plus commercial offerings) extract tables and reading order far better than geometry heuristics, at the cost of a heavier per-page compute that raises the OCR-dominated $r$ in Section 4. On chunking, "late chunking" (Günther et al., 2024) inverts the usual order: it embeds the whole document with a long-context encoder first and pools chunk vectors from that context-aware representation, so each chunk's embedding remembers the surrounding document instead of being computed in isolation. Retrieval-evaluation work continues to report that chunking strategy moves end-to-end answer quality as much as the embedding model does, which keeps the unglamorous decisions of this section at the center of system performance rather than the periphery.

With a clean, permissioned, incrementally maintained chunk store in place, the corpus is ready to be made searchable. The next step is to turn each chunk into a vector the retriever can match against, the distributed embedding stage that Section 40.3 builds, where the ten million chunks counted in Output 40.2.1 become ten million vectors in the index that Chapter 25 serves to the agent.

Exercise 40.2.1: Where Does the Fact Hide? Conceptual

An agent is asked, "What is the EMEA Q3 reimbursement cap?" The answer lives in a single cell of a table inside a scanned policy PDF. Trace the answer backward through the pipeline of Figure 40.2.1 and name, at each of the four worker stages (parse, clean, chunk, tag), one specific failure that would make this fact unretrievable even though it is present in the original document. Explain why none of these failures can be fixed by the retriever or the agent downstream.

Exercise 40.2.2: Chunk-Size Sweep Coding

Extend Code 40.2.1 to sweep the fixed chunker over chunk sizes $s \in \{100, 200, 400, 800\}$ at a constant overlap ratio $o / s = 0.2$, and for each size report the chunk count, the average chunk size, and the duplicated-character fraction. Then compute the projected total chunk count $C = D \cdot \bar{c}$ for $D = 2 \times 10^6$ documents at each size. Plot or tabulate how $C$ shrinks as $s$ grows, and explain in two sentences why a larger $s$ lowers index size but can hurt retrieval granularity.

Exercise 40.2.3: Provisioning Around OCR Analysis

A corpus has $D = 3 \times 10^6$ documents, of which forty percent are scans. A born-digital document takes 50 milliseconds per worker to parse, clean, chunk, and tag; a scanned document takes 1.2 seconds because of OCR. Using $T = W \cdot r$ with a per-document rate $r$ that you derive from the format mix, estimate the number of workers $W$ needed to ingest the corpus within a four-hour window. Then argue how the answer changes if a faster OCR model halves the scan cost, and identify which shared resource from Chapter 8 might cap the speedup before $W$ does.