Part II: Distributed Data Processing for AI
Chapter 9: Stream Processing and Online AI

Kafka-Style Distributed Logs

"I forget nothing and I decide nothing. I just keep what you appended, in the order you appended it, until you finally come back and read it."

A Partition That Has Outlived Three Consumer Groups
Big Picture

A distributed log is the simplest possible distributed data structure (an append-only, partitioned, ordered sequence of records) and exactly that simplicity is what lets it sit underneath nearly every streaming AI system as the durable backbone connecting data producers to feature pipelines and online inference. The previous sections gave us events, windows, event time, and watermarks; all of those assume a stream arriving from somewhere. This section is about that somewhere. Producers append records to the end of a partition, consumers read forward at an offset they control, and the broker in the middle stores bytes and remembers almost nothing about who is reading. From those few rules come the four properties that make the log powerful: ordering within a partition, replay from any past position, decoupling of producers from consumers, and durability through replication. We build a working partitioned log in pure Python so the offset and consumer-group mechanics are concrete before we ever name a product.

Everything earlier in this chapter assumed a stream of events was flowing past a processor. Section 9.4 worried about what to do when an event in that stream arrives late, but nothing so far asked where the stream physically lives between the moment a producer emits an event and the moment a consumer handles it. In a single-process toy, the answer is an in-memory queue. In a real system spanning many machines, with producers and consumers that start, stop, crash, and scale independently, the stream needs a home that survives all of that. The distributed log is that home. It is the storage layer that a stream-processing engine reads from and that the rest of this chapter, including Section 9.6 on Spark Structured Streaming and Flink, quietly depends on.

The design is almost aggressively minimal, and the minimalism is the point. A log is an ordered, append-only file: you can add a record to the end, and you can read records starting from a position, and that is essentially all. To scale this across machines, a topic (a named stream, such as clicks or sensor-readings) is split into several partitions, each an independent append-only sequence living on some broker. Producers append; consumers read at their own pace from an offset, a monotonically increasing integer index into a partition. The broker does not push, does not track per-message acknowledgements, and does not delete a record the moment someone reads it. It keeps the bytes, in order, and lets readers decide where to look. Figure 9.5.1 shows the whole abstraction on one canvas.

Topic "clicks": three partitions, each an ordered append-only log Producer P1 Producer P2 part 0 0 1 2 3 … tail part 1 0 1 2 … tail part 2 0 1 … tail Consumer group "feature-builder" Consumer C0 owns part 0, part 2 Consumer C1 owns part 1 read from offset 2 read from offset 1
Figure 9.5.1: The distributed-log abstraction. The topic clicks is split into three partitions, each a row of numbered offset cells that only ever grows at the tail (the shaded cells are the most recent appends). Producers append to partition tails (orange). A single consumer group divides the partitions among its members: consumer C0 owns partitions 0 and 2, consumer C1 owns partition 1, and each reads forward from an offset it controls (green). The broker stores the cells; it does not decide who reads what.

1. The Append-Only Partitioned Log Beginner

Let us pin down the abstraction precisely, because its power comes from how little it does. A partition is a sequence of records $r_0, r_1, r_2, \ldots$ where record $r_j$ sits at offset $j$. The only write operation is append: a producer hands the broker a record, the broker places it at the current tail offset, and that offset never changes again. The only read operation is a forward scan: a consumer says "give me records from offset $j$ onward," and the broker returns $r_j, r_{j+1}, \ldots$ up to whatever has been written. There is no update in place and no random insertion. A record at offset $j$ means the same thing forever, which is what lets two consumers reading the same partition at different times agree on what happened and in what order.

Partitioning is how the log scales out across machines, and it is the same partitioning idea introduced for distributed systems generally in Section 2.3. A single append-only sequence on one broker can absorb only so many writes per second and store only so many bytes. Splitting a topic into $K$ partitions spreads both the write load and the storage across $K$ brokers, so the topic's total throughput grows roughly linearly with $K$. The cost of that split is the cost we have been paying throughout this book: ordering is now guaranteed only within a partition, not across the whole topic. Records in partition 0 and records in partition 1 have no defined relative order, because they live on different machines that never coordinate on a global clock.

The producer therefore faces a choice with real consequences: which partition does a given record go to? The standard rule is to hash a record's key and take the result modulo the partition count,

$$\text{partition}(r) = \operatorname{hash}(\text{key}(r)) \bmod K,$$

so that all records sharing a key land in the same partition and thus stay in order relative to each other. If the key is a user id, every event for one user is totally ordered, even though events for different users may be interleaved arbitrarily across partitions. This is exactly the right tradeoff for AI workloads: a feature pipeline almost always needs per-entity order (a user's clicks in sequence, a sensor's readings in sequence) and rarely needs a single global order over millions of unrelated entities. We choose the key to match the unit of order the model actually consumes.

Key Insight: The Log Stores Order, Not State

A queue forgets a message once it is delivered; a database remembers only the latest value. A log does neither. It remembers the full ordered history of appends and lets each reader choose its own position in that history. That single decision (store the sequence, externalize the read position as an offset the consumer owns) is what gives the log all four of its superpowers at once: per-partition ordering, replay, producer-consumer decoupling, and a clean unit of replication. Everything else in this section is a consequence of storing order rather than state.

2. Why the Log Is Powerful: Order, Replay, Decoupling, Durability Beginner

The first property, ordering within a partition, we have already met: because appends are placed at the tail and offsets never change, any two readers of the same partition see the same records in the same sequence. For an online model that updates on a per-user event stream, this is the difference between a coherent history and noise. The second property is replay. Because the broker keeps records after they are read and the read position lives in the consumer, a consumer can set its offset backward and re-read anything still retained. This is not an exotic feature; it is the everyday mechanism behind reprocessing a feature pipeline after fixing a bug, bootstrapping a brand-new model on historical events, and recovering a crashed consumer by resuming from its last committed offset rather than losing data.

The third property is decoupling. Producers and consumers never speak to each other; they speak only to the log. A producer appends at whatever rate it can and never blocks on a slow consumer, because the broker is not trying to hand the record to anyone. Consumers read at whatever rate they can sustain and never slow the producer down. New consumers can be added later to read the same data for an entirely different purpose (a fraud model, an analytics dashboard, a training-data exporter) without the producer knowing or caring. This is why the log is so often called a backbone: it is a single durable spine that many independent producers write to and many independent consumers read from, each at its own tempo.

The fourth property is durability, and it ties directly back to Section 2.3. Each partition is replicated across several brokers: one leader handles appends and reads, and a configurable number of followers copy every append. If a record is acknowledged only after it has reached the leader and its in-sync followers, then the loss of any single broker does not lose the record, because a surviving replica still holds it. With a replication factor $f$, the partition tolerates the failure of up to $f-1$ brokers without data loss. Replication is the price the log pays to be a place you can safely build a business on, and it is the same partition-plus-replication recipe that underlies the distributed storage of Chapter 8.

Fun Note: The Log That Refused to Be a Queue

Early message-queue systems treated a delivered message as a used-up thing: read it once, it is gone. The log's quiet rebellion was to say "deletion is a retention policy, not a delivery event." A record sticks around for seven days, or until the partition hits a size cap, regardless of how many consumers have read it. The result is a system where adding a second reader costs nothing and re-reading yesterday is just arithmetic on an integer. An entire generation of "replay the stream" architectures exists because someone decided not to throw the message away.

3. Consumer Groups: Scaling Out Consumption Intermediate

A single consumer reading a high-volume topic eventually hits the same throughput ceiling that motivated partitioning the producer side. The log's answer is the consumer group: a set of consumers that cooperate to read one topic, with the partitions divided among the members so that each partition is owned by exactly one consumer in the group at a time. Because partitions are independent ordered sequences, the members can read in parallel without any coordination on the data path; the only coordination is the assignment of partitions to members, which happens rarely (when a consumer joins or leaves) rather than per record.

This gives a clean scaling law. A topic with $K$ partitions supports up to $K$ usefully-busy consumers in one group; the $(K+1)$-th consumer gets no partition and sits idle, because a partition cannot be split between two readers without breaking its ordering guarantee. So the partition count, chosen once when the topic is created, sets the maximum read parallelism for every group that will ever consume it. Pick it too low and you cap how fast a downstream model can ingest; pick it absurdly high and you pay coordination and small-file overhead. Multiple groups consuming the same topic are fully independent: the fraud model's group and the analytics group each get the full stream and track their own offsets, which is the decoupling property from Section 2 expressed as an org chart.

When a consumer in a group crashes, its partitions are reassigned to the survivors (a rebalance), and the survivors resume from the last committed offset of those partitions. No data is lost as long as offsets were committed after processing, and the cost of the failure is bounded by how much was processed since the last commit. This is the stream-processing version of the fault-tolerance theme that runs from MapReduce re-execution in Chapter 6 onward: the work is partitioned, each partition has a recoverable checkpoint (here, the committed offset), and recovery means replaying from the checkpoint rather than from the beginning.

4. A Partitioned Log from Scratch Intermediate

The mechanics above are easier to trust once you have watched them run. The code below implements a tiny partitioned log in pure Python: a topic of several partitions, each a Python list that only grows at the tail; a producer that places each record by hashing its key; a range assignment that divides partitions among a consumer group; and a forward read from an offset, which doubles as the replay primitive. There is no network and no broker process, but every rule from Sections 1 through 3 is visible: per-key ordering inside a partition, parallel ownership of partitions by group members, and re-reading from an earlier offset.

import hashlib

class PartitionedLog:
    def __init__(self, topic, num_partitions):
        self.topic = topic
        self.partitions = [[] for _ in range(num_partitions)]   # each is an ordered list

    def append(self, key, value):
        # Same key -> same partition, so per-key order is preserved (Kafka's rule).
        # A stable hash (md5) is used instead of Python's salted hash() so the
        # placement is identical on every run and every machine.
        digest = int(hashlib.md5(key.encode()).hexdigest(), 16)
        p = digest % len(self.partitions)
        offset = len(self.partitions[p])                        # next slot = current length
        self.partitions[p].append((key, value))
        return p, offset

    def read(self, partition, offset):
        # A consumer reads from an offset it controls; the broker stores nothing per reader.
        return self.partitions[partition][offset:]


def assign(num_partitions, consumers):
    # Range assignment: divide partitions among the consumers in a group.
    table = {c: [] for c in consumers}
    for part in range(num_partitions):
        table[consumers[part % len(consumers)]].append(part)
    return table


log = PartitionedLog("clicks", num_partitions=3)
events = [("userA", "view"), ("userB", "view"), ("userA", "cart"),
          ("userJ", "view"), ("userA", "buy"), ("userB", "cart"),
          ("userJ", "cart"), ("userB", "buy")]
placement = {}
for key, value in events:
    p, off = log.append(key, value)
    placement.setdefault(key, []).append((p, off))

print("=== where each key landed (key -> [(partition, offset), ...]) ===")
for key in ("userA", "userB", "userJ"):
    print(f"  {key}: {placement[key]}")

print("\n=== per-partition contents (ordered append-only) ===")
for i, part in enumerate(log.partitions):
    print(f"  partition {i}: {[f'{k}:{v}' for k, v in part]}")

print("\n=== consumer-group assignment (group of 2 consumers, 3 partitions) ===")
group = assign(num_partitions=3, consumers=["c0", "c1"])
for c, parts in group.items():
    print(f"  {c} owns partitions {parts}")

print("\n=== c0 consumes its partitions in order, tracking its own offset ===")
committed = {}
for part in group["c0"]:
    for key, value in log.read(part, offset=0):
        print(f"  c0  part={part}  {key}:{value}")
    committed[part] = len(log.partitions[part])      # commit = remember next offset

replay_part = group["c0"][-1]
print(f"\n=== replay: c0 re-reads partition {replay_part} from offset 1"
      f" (its committed offset was {committed.get(replay_part)}) ===")
for key, value in log.read(partition=replay_part, offset=1):
    print(f"  replayed  part={replay_part}  {key}:{value}")
Code 9.5.1: A partitioned append-only log in roughly forty lines. append hashes the key to a partition and returns the assigned offset; read is a forward slice from an offset, which is also the replay operation; assign performs the range division of partitions across a consumer group. The broker (here, the lists) keeps no per-reader state at all.
=== where each key landed (key -> [(partition, offset), ...]) ===
  userA: [(1, 0), (1, 1), (1, 2)]
  userB: [(0, 0), (0, 1), (0, 2)]
  userJ: [(2, 0), (2, 1)]

=== per-partition contents (ordered append-only) ===
  partition 0: ['userB:view', 'userB:cart', 'userB:buy']
  partition 1: ['userA:view', 'userA:cart', 'userA:buy']
  partition 2: ['userJ:view', 'userJ:cart']

=== consumer-group assignment (group of 2 consumers, 3 partitions) ===
  c0 owns partitions [0, 2]
  c1 owns partitions [1]

=== c0 consumes its partitions in order, tracking its own offset ===
  c0  part=0  userB:view
  c0  part=0  userB:cart
  c0  part=0  userB:buy
  c0  part=2  userJ:view
  c0  part=2  userJ:cart

=== replay: c0 re-reads partition 2 from offset 1 (its committed offset was 2) ===
  replayed  part=2  userJ:cart
Output 9.5.1: Each user's events land in a single partition and stay in append order (userA's view, cart, buy are offsets 0, 1, 2 of partition 1). The two-member group splits the three partitions, c0 reads partitions 0 and 2 entirely in order, and the final block re-reads partition 2 from offset 1 even though c0 had already committed past it: replay is just a smaller offset.

Notice what the output makes concrete. The ordering guarantee is per partition, not global: userA's and userB's events interleave nowhere, because they live on different partitions, yet within partition 1 the three userA events are perfectly sequenced. The consumer group divided the partitions with no per-record coordination. And the replay block resumed from offset 1 after the consumer had already advanced to offset 2, recovering a record it had nominally "finished" with, which is exactly how a crashed consumer or a reprocessing job rewinds. None of this needed a real broker; the rules are in the data structure.

Library Shortcut: kafka-python Replaces the Whole Broker

Code 9.5.1 simulates the log in one process. Against a real Apache Kafka cluster, the same produce-and-consume loop is a handful of lines, and the client library handles partition assignment, leader discovery, replication acknowledgements, offset commits, and group rebalancing for you:

# pip install kafka-python   (or confluent-kafka for the librdkafka-backed client)
from kafka import KafkaProducer, KafkaConsumer

producer = KafkaProducer(bootstrap_servers="broker:9092")
producer.send("clicks", key=b"userA", value=b"view")   # hashed to a partition for you
producer.flush()

consumer = KafkaConsumer("clicks",
                         group_id="feature-builder",    # join a consumer group
                         bootstrap_servers="broker:9092",
                         auto_offset_reset="earliest")   # replay from the start if no commit
for record in consumer:                                  # partitions auto-assigned to this member
    print(record.partition, record.offset, record.key, record.value)
Code 9.5.2: The same producer and consumer-group behavior as Output 9.5.1, now against a real cluster. The roughly forty lines of partitioning, assignment, and offset tracking in Code 9.5.1 collapse to the calls send and the consumer loop; group_id triggers automatic partition assignment and auto_offset_reset="earliest" is the library form of the replay we did by hand. The confluent-kafka package exposes the same model over the C client for higher throughput.

5. Delivery Semantics and What Each One Costs Advanced

The log decouples producers from consumers, but a network sits between them, and networks drop and duplicate. The guarantee a pipeline gives about how many times each record is processed is its delivery semantics, and there are three levels, each with a concrete price. Under at-most-once, a consumer commits its offset before processing the record; if it crashes after committing but before finishing, the record is skipped on restart. Nothing is processed twice, but records can be lost. This is the cheapest option and is acceptable only when an occasional dropped event is harmless, such as a coarse metrics counter.

Under at-least-once, the consumer processes the record first and commits its offset only after; if it crashes between processing and committing, the record is reprocessed on restart. Nothing is ever lost, but a record can be handled more than once. This is the common default, and for many AI features it is fine, because the downstream effect is idempotent: writing "userA's last action = buy" twice yields the same state. Under exactly-once, the system guarantees each record affects the output as if processed exactly one time, achieved through idempotent producers (each append is deduplicated by a producer sequence number) and transactions that atomically tie the consumer's offset commit to its output writes. The output and the offset advance together or not at all, so no record is lost and none is double-counted.

Table 9.5.1: The three delivery semantics, the failure each one risks, and what it costs to provide. The right choice depends on whether the downstream computation can tolerate loss, duplication, or neither.
SemanticsOn a crash mid-recordWhat it costsWhen to use it
At-most-onceRecord may be lost (commit before processing)Cheapest; no extra coordinationLoss-tolerant counters, coarse telemetry
At-least-onceRecord may be reprocessed (commit after processing)Requires idempotent downstream writesMost feature pipelines; common default
Exactly-onceNeither loss nor duplicationTransactions, idempotent producer, higher latencyCounts, money, dedup-sensitive features

Exactly-once is not free magic; it buys its guarantee with transactional coordination that adds latency and throughput overhead, and it holds only within the boundary of the streaming system (the moment a side effect escapes to an external service that is not part of the transaction, you are back to designing for idempotence). The practical engineering rule is to reach for at-least-once plus idempotent downstream writes whenever you can, and pay for true exactly-once only where double-counting is genuinely unacceptable, such as billing or deduplicated training-data exports.

Thesis Thread: The Log Is the Spine That Distributes the Stream

Scale-out is the act of splitting one essential activity across machines and recombining the pieces correctly. The distributed log does this for the stream itself: it partitions the event sequence across brokers (scaling write and storage throughput), divides consumption across a group (scaling read throughput), and replicates each partition (surviving failure), all while presenting downstream code with a single named topic. Every later piece of streaming AI in this book (the online feature computation of Section 9.7, the real-time inference pipelines of Section 9.8) reads from a log shaped exactly like Code 9.5.1, just with a real broker behind it. When you meet those systems, ask which topic feeds them and how its partitions are keyed; the answer determines their order, their parallelism, and their failure behavior.

6. The Log as the Backbone of Online AI Intermediate

Pulling the threads together: the reason the distributed log sits underneath streaming AI rather than beside it is that it solves, in one abstraction, the four problems every online-learning and real-time-inference system has. It gives per-entity order, so a model sees a user's history coherently. It gives replay, so a new model can be bootstrapped on the same events an old one saw, and a buggy feature pipeline can be rerun from a chosen offset. It gives decoupling, so the team that emits events and the teams that build features, train models, and serve predictions evolve independently against one stable contract. And it gives durability, so the events a model's correctness depends on are not lost when a machine dies.

Concretely, the same clicks topic can feed a consumer group that materializes online features into a feature store, a second group that exports a training set by replaying a time range, and a third that drives a real-time inference service, all reading the identical ordered event history at their own offsets. The log is the join point between the data-processing world of this Part and the model-serving world of later Parts, which is why it recurs: the offset-based replay you built here is the mechanism behind reprocessing in Chapter 7, and the partition-and-replicate durability is the same recipe as the storage layer of Chapter 8. With the log understood, Section 9.6 can finally show the engines that consume it.

Practical Example: One Topic, Three Teams, Zero Coordination Meetings

Who: A streaming-platform engineer at a consumer app with separate feature, fraud, and analytics teams.

Situation: User interaction events were being delivered by three bespoke point-to-point pipelines, one per consuming team, each re-implementing retries and ordering.

Problem: Every new consumer meant a new pipeline and a cross-team meeting, and a bug in the fraud pipeline once back-pressured the producer and dropped events the feature team needed.

Dilemma: Keep extending point-to-point pipelines, simple per link but quadratic in teams and coupled at the producer, or introduce a single partitioned log as a shared backbone, one more system to operate but decoupling everyone.

Decision: They published all interaction events to one Kafka topic keyed by user id, and let each team run its own consumer group.

How: The producer kept appending; the feature team chose at-least-once with idempotent feature writes, fraud chose the same, analytics chose at-most-once for its coarse dashboards. A new recommendation team later added a fourth group with no change to anyone else.

Result: Per-user ordering was preserved for every consumer, a slow fraud consumer no longer back-pressured the producer, and onboarding a new consumer dropped from a multi-week pipeline project to creating a group id. A retraining job replayed three weeks of events from an offset to bootstrap a model.

Lesson: When several consumers need the same ordered event history, a shared partitioned log replaces an $O(\text{teams}^2)$ tangle of pipelines with one durable spine, and the keying choice (user id) is what buys every consumer its ordering for free.

Research Frontier: The Log Sheds Its Coordinator and Its Disks (2024 to 2026)

Two shifts are reshaping the distributed log. The first is removing the external coordinator: Apache Kafka's KRaft mode (production-default since Kafka 3.3 and the only option once ZooKeeper support was removed in Kafka 4.0, 2024) folds metadata consensus into a built-in Raft quorum, simplifying operations and pushing partition counts and failover speed higher. The second is decoupling compute from storage. Kafka tiered storage (KIP-405, generally available in the 3.9 line) offloads older log segments to object storage so a partition's retention is no longer capped by broker disk, making the multi-week replay of the example above routine. Going further, log-compatible systems such as Redpanda (a single C++ binary, no JVM, no external coordinator) and WarpStream (a Kafka-protocol broker that writes directly to S3 with zero local disk and zero inter-zone replication traffic) push the storage entirely to the cloud object store, trading a little latency for sharply lower operational and cross-zone cost. For AI platforms that retain long event histories to replay into new models, storage-decoupled logs change the economics of keeping the whole stream around.

Exercise 9.5.1: Why Not One Global Order? Conceptual

A colleague proposes a topic with a single partition so that every record in the entire topic is totally ordered, arguing it removes the "annoying per-partition only" caveat. State the two ceilings (from Section 1) this immediately hits, and explain using the consumer-group scaling law of Section 3 what the maximum read parallelism of a one-partition topic is. Then describe a workload where one partition is genuinely the right choice, and a workload where it would be a serious mistake.

Exercise 9.5.2: Implement Consumer Offset Commit and Recovery Coding

Extend Code 9.5.1 with a per-consumer committed-offset table and two methods: poll(consumer), which returns the next unread record for each partition the consumer owns and advances an in-memory position, and commit(consumer), which persists those positions. Then simulate a crash: process three records under at-least-once (commit after processing) and under at-most-once (commit before processing), drop the consumer after the second record but before its commit, restart, and show which records are reprocessed or lost in each case. Confirm your output matches the failure column of Table 9.5.1.

Exercise 9.5.3: Size the Partition Count Analysis

A topic must ingest 600,000 records per second, each record about 1 kilobyte, and a single consumer can process at most 50,000 records per second. Using the scaling law of Section 3, compute the minimum number of partitions so that one consumer group can keep up, and the number of consumers that group needs. Now suppose three independent groups consume the same topic; does the partition count change, and why? Finally, argue qualitatively what goes wrong if you instead provision 100,000 partitions for this load, referencing the coordination and small-segment overheads mentioned in Section 3.