Part II: Distributed Data Processing for AI
Chapter 9: Stream Processing and Online AI

Events, Streams, and Windows

"They told me the stream had no end. I asked for a window so I could at least count something before the next event arrived."

An Operator Waiting on a Watermark
Big Picture

A stream is an unbounded, partitioned sequence of immutable timestamped events, and almost everything useful you compute over it is an aggregate over a finite window carved out of that infinite sequence. A batch job knows where its input ends, so it can read the whole thing and then answer. A streaming job never gets that luxury: the data keeps arriving, the job runs forever, and any total, average, or count must be scoped to a slice of time or it would never finish. This section names the three building blocks every streaming system is made of, the event, the stream, and the window, and shows how a windowed aggregate maintained as partitioned, checkpointed state becomes a real-time feature for an AI model. Get these three primitives right and the rest of the chapter, event time versus processing time, watermarks, distributed logs, is detail layered on top.

The previous section drew the line between batch and stream processing and argued that online AI lives on the stream side: features that must reflect what happened seconds ago, models that score events as they land, monitors that catch drift while it is still cheap to fix. This section supplies the vocabulary. Before you can reason about late data or exactly-once delivery, you need a precise account of what an event is, what a stream is, and how a window turns an infinite sequence into something you can actually sum. These are simple ideas with sharp edges, and the sharp edges are where the distributed-systems content of Chapter 2 reappears.

We will build each primitive in turn, then write the same windowed aggregations from scratch in pure Python so the mechanics are concrete, and finally connect the result back to the feature that an online model consumes. The diagram in Figure 9.2.1 is the picture to hold in your head throughout: one timeline of events, three different ways to slice it.

1. The Event: An Immutable Record With a Timestamp Beginner

An event is the atom of stream processing. It is a single immutable record that something happened at a particular moment: a user clicked, a sensor read 21.4 degrees, a payment cleared, a model emitted a prediction. Three properties make an event what it is. It carries a timestamp, the moment the thing happened, which is part of the data and not merely metadata about when we processed it. It carries a payload, the fields describing what happened. And it is immutable: once recorded, an event is never edited in place. If the world changes, you append a new event; you do not rewrite the old one.

That immutability is not a stylistic preference, it is what makes streams safe to distribute and replay. Because an event never changes after it is written, a failed worker can re-read the same events and recompute the same result, and two workers reading the same event can never disagree about its contents. This is the same append-only discipline that makes a distributed log durable, which is why Section 9.5 can treat a Kafka-style log as the canonical transport for streams. For now, hold onto the mental model: an event is a tuple $(t, \text{key}, \text{payload})$ where $t$ is fixed forever.

Key Insight: The Timestamp Is Data, Not Metadata

The single most consequential decision in stream processing is to treat the time an event occurred as a first-class field of the event itself, distinct from the time your system happened to see it. A click that happened at 12:00:03 is a 12:00:03 event even if a flaky mobile network delivers it to your cluster at 12:00:47. Every window, every aggregate, and every feature in this chapter is computed against that embedded event time. The gap between when things happen and when we observe them is the entire subject of Section 9.3, and it only becomes tractable because the timestamp travels inside the immutable event.

2. The Stream: An Unbounded, Partitioned Sequence Beginner

A stream is a sequence of events with no end. There is no last element to wait for, no total count to read off the front; the producer keeps appending, possibly forever. This unboundedness is the defining contrast with a batch dataset and the source of every difficulty in the chapter. You cannot sort an infinite sequence, you cannot hold all of it in memory, and you cannot wait for it to finish before you answer, because it never finishes.

A stream is also partitioned, and for the same reason any web-scale dataset is partitioned: no single machine can ingest or hold the whole thing. The producer chooses a partition key, typically a field of the event such as the user id or device id, and routes each event to one of $P$ partitions, usually by hashing the key. Each partition is an ordered, append-only sub-sequence that lives on one machine and can be consumed by one worker. This is exactly the hash partitioning introduced in Chapter 2, now applied to a sequence that grows without bound rather than a table that sits still. Choosing the key well is the same balancing act as before: hash on a field with many distinct values so load spreads evenly, and put events that must be aggregated together (all of one user's clicks) on the same partition so a single worker can see them all.

The payoff of partitioning is the same as everywhere else in this book: $P$ partitions can be processed by $P$ workers in parallel, so stream throughput scales out horizontally. The cost is the same too. Order is guaranteed only within a partition, never across partitions, so two events on different partitions have no defined relative order even if their timestamps say otherwise. Any computation that needs a global view, a count across all users, must shuffle or reduce across partitions, the same boundary-crossing tax that Chapter 7 charges for a Spark shuffle.

3. The Window: Slicing the Infinite Into the Finite Beginner

If a stream never ends, how do you ever compute a sum? You do not sum the stream; you sum a window of it. A window is a finite slice of the unbounded sequence, defined so that it eventually closes and an aggregate over it can be emitted. Windowing is the bridge from the infinite to the finite, and there are three shapes worth knowing. Figure 9.2.1 lays all three over a single event timeline so the differences are visible at a glance.

events time tumbling W1 W2 W3 W4 fixed width, no overlap: each event in exactly one window sliding S1 S2 S3 S4 width > step: windows overlap, one event in several session A B C gaps of inactivity close one session and open the next
Figure 9.2.1: Three ways to window one event stream. Tumbling windows (top) tile the timeline into fixed, non-overlapping slices, so every event belongs to exactly one window. Sliding windows (middle) are wider than their step, so they overlap and a single event contributes to several windows. Session windows (bottom) have no fixed boundaries at all; they grow with activity and close after a gap of inactivity, grouping each burst of events into its own window.

A tumbling window partitions time into fixed-size, non-overlapping intervals, say every five seconds. Each event falls into exactly one window, determined by integer-dividing its timestamp by the window size. Tumbling windows answer questions of the form "how many events per five-second bucket?" and are the natural choice for periodic reporting where every event should be counted once and only once.

A sliding window also has a fixed width but advances by a smaller step, so consecutive windows overlap and a single event can land in several of them. A window of width five seconds that slides every two seconds produces a new aggregate every two seconds, each summarizing the last five. Sliding windows answer "what is the rate over the trailing five seconds, refreshed every two?" and are the workhorse of real-time monitoring and the trailing-average features that AI models love.

A session window has no fixed size at all. It groups events that arrive close together and closes when a configurable gap of inactivity passes with no new event for that key. Sessions naturally model bursty, user-driven activity: a single browsing session, a conversation, a sequence of API calls, each bounded not by the clock but by the user going quiet. Because a session's length depends on the data, session windows are the most expressive and the most stateful of the three.

Fun Note: The Window That Never Closes

A classic streaming bug is the session window with a gap that is longer than your patience. Set the inactivity gap to, say, thirty minutes, then onboard a chatty IoT sensor that pings every twenty-nine minutes forever. The session never sees a gap, never closes, and its state grows without bound until the operator runs out of memory and the job falls over at 3 a.m. The fix is a maximum session duration, a reminder that "unbounded" is a property of the stream you must never let leak into the state of a single window.

4. Windowed Aggregation From Scratch Intermediate

The cleanest way to see what a window is is to compute one. The code below takes a small synthetic stream of click events, each a tuple of (event time, user, value), and implements tumbling and sliding aggregation directly, with no framework. The tumbling function maps each event to one window by integer division; the sliding function generates overlapping windows of a fixed width at a fixed step and collects the events that fall inside each. It also maintains a per-key running sum, the simplest possible stateful operator, to foreshadow Section 5.

from collections import defaultdict

# A synthetic stream of click events: (event_time_seconds, user_id, value).
# Events arrive in event-time order here for clarity; timestamps are immutable.
events = [
    (1, "u1", 4), (2, "u2", 9), (3, "u1", 1), (5, "u1", 7),
    (6, "u2", 2), (8, "u1", 5), (9, "u2", 6), (11, "u1", 3),
    (12, "u2", 8), (14, "u1", 2), (15, "u2", 1), (17, "u1", 9),
]

def tumbling(events, size):
    """Fixed, non-overlapping windows of `size` seconds, keyed by window index."""
    buckets = defaultdict(list)
    for t, _user, v in events:
        w = t // size                       # each event lands in exactly one window
        buckets[w].append(v)
    out = []
    for w in sorted(buckets):
        lo, hi = w * size, (w + 1) * size   # half-open interval [lo, hi)
        vals = buckets[w]
        out.append((lo, hi, len(vals), sum(vals)))
    return out

def sliding(events, size, step):
    """Overlapping windows of width `size`, emitted every `step` seconds."""
    t_max = max(t for t, _u, _v in events)
    out = []
    start = 0
    while start <= t_max:
        lo, hi = start, start + size        # one event can fall in several windows
        vals = [v for t, _u, v in events if lo <= t < hi]
        if vals:
            out.append((lo, hi, len(vals), sum(vals)))
        start += step
    return out

print("TUMBLING windows (size=5s): [lo,hi)  count  sum")
for lo, hi, c, s in tumbling(events, 5):
    print(f"  [{lo:2d},{hi:2d})   count={c}  sum={s}")

print("\nSLIDING windows (size=5s, step=2s): [lo,hi)  count  sum")
for lo, hi, c, s in sliding(events, 5, 2):
    print(f"  [{lo:2d},{hi:2d})   count={c}  sum={s}")

# A per-key stateful aggregate: running sum per user, the seed of an online feature.
state = defaultdict(int)
print("\nPER-KEY running sum (partitioned state, updated per event):")
for t, user, v in events:
    state[user] += v
    print(f"  t={t:2d}  {user}  event_value={v}  running_sum[{user}]={state[user]}")
Code 9.2.1: Tumbling and sliding window aggregation, plus a per-key running sum, over a synthetic event stream in pure Python. The tumbling map sends each event to one window; the sliding loop re-scans for events inside each overlapping window; the running sum is a stateful operator keyed by user.
TUMBLING windows (size=5s): [lo,hi)  count  sum
  [ 0, 5)   count=3  sum=14
  [ 5,10)   count=4  sum=20
  [10,15)   count=3  sum=13
  [15,20)   count=2  sum=10

SLIDING windows (size=5s, step=2s): [lo,hi)  count  sum
  [ 0, 5)   count=3  sum=14
  [ 2, 7)   count=4  sum=19
  [ 4, 9)   count=3  sum=14
  [ 6,11)   count=3  sum=13
  [ 8,13)   count=4  sum=22
  [10,15)   count=3  sum=13
  [12,17)   count=3  sum=11
  [14,19)   count=3  sum=12
  [16,21)   count=1  sum=9

PER-KEY running sum (partitioned state, updated per event):
  t= 1  u1  event_value=4  running_sum[u1]=4
  t= 2  u2  event_value=9  running_sum[u2]=9
  t= 3  u1  event_value=1  running_sum[u1]=5
  t= 5  u1  event_value=7  running_sum[u1]=12
  t= 6  u2  event_value=2  running_sum[u2]=11
  t= 8  u1  event_value=5  running_sum[u1]=17
  t= 9  u2  event_value=6  running_sum[u2]=17
  t=11  u1  event_value=3  running_sum[u1]=20
  t=12  u2  event_value=8  running_sum[u2]=25
  t=14  u1  event_value=2  running_sum[u1]=22
  t=15  u2  event_value=1  running_sum[u2]=26
  t=17  u1  event_value=9  running_sum[u1]=31
Output 9.2.1: The four disjoint tumbling windows partition the twelve events with no double counting (their counts sum to twelve); the sliding windows overlap, so the same event is counted in several of them and their counts sum to more than twelve. The per-key running sum shows state evolving one event at a time, separately for each user.

Two things in Output 9.2.1 are worth pausing on. First, the tumbling counts add up to exactly twelve, the number of events, because every event is in one window; the sliding counts add up to far more, because overlap means an event near the middle of the timeline is summed into multiple windows. That double counting is the point of a sliding window, not a bug: it is what gives you a smoothly refreshed trailing aggregate. Second, the per-key running sum is computed independently for u1 and u2, which is the seed of why streaming state must be partitioned by key, the subject of the next section.

Library Shortcut: One Line of Windowing in Spark Structured Streaming

Code 9.2.1 spelled out window assignment by hand. A production streaming engine exposes the same operation declaratively, and it handles partitioning, state, late data, and incremental emission for you. The roughly forty lines of explicit bucketing collapse to a single grouped aggregation:

# Spark Structured Streaming: a 5-second sliding window stepping every 2 seconds,
# aggregated per user, over a stream read from Kafka.
from pyspark.sql.functions import window, sum as ssum, col

agg = (events_df
    .groupBy(window(col("event_time"), "5 seconds", "2 seconds"), col("user_id"))
    .agg(ssum("value").alias("sum_value")))   # engine maintains the per-key state
Code 9.2.2: The same sliding-window-per-user aggregate as Code 9.2.1, expressed as one groupBy in Spark Structured Streaming. The engine partitions by the grouping key, maintains windowed state, checkpoints it, and emits updates incrementally, the production machinery that Section 9.6 unpacks.

5. Stateful Operators: Why State Is Partitioned and Checkpointed Intermediate

An operator is stateless if its output for an event depends only on that event: a filter, a per-event scoring call, a field projection. A windowed aggregate is stateful: to emit the sum for a window it must remember every value seen in that window so far. That running memory is the operator's state, and managing it correctly across a distributed, failure-prone cluster is the hardest engineering problem in streaming. Two requirements follow directly, and both are familiar.

The first is that state must be partitioned by key. The running sum for user u1 must live on whatever worker processes u1's events, and the sum for u2 on whoever processes u2. If the stream is partitioned by the same key the aggregate groups on, this is automatic: every event for a key lands on one partition, so one worker owns that key's entire state and never has to ask another worker for it. This is the deep reason the partition key and the aggregation key should agree. When they do not, the engine must repartition (shuffle) the stream so that all events for a key meet on one worker, which is the streaming form of the shuffle from Chapter 6. Partitioned state is what lets stateful streaming scale out at all: $P$ workers hold $P$ disjoint slices of the state and never contend.

The second is that state must be checkpointed. A streaming job runs forever, so over a long enough horizon a worker will crash, and when it does, the in-memory running sums it held vanish. Recomputing them by replaying the entire stream from the beginning is impossible, because the stream has no beginning you can afford to re-read. The remedy is to periodically snapshot each operator's state to durable storage together with the stream position (offset) it reflects, so that after a failure a replacement worker reloads the last snapshot and replays only the events after that offset. This is exactly the checkpoint-and-recover model of Chapter 2, now applied continuously to a running aggregate rather than once to a batch job. The combination of an immutable, replayable stream and a periodically checkpointed state is what gives streaming engines their exactly-once guarantees, a thread we pick up in Section 9.5.

Key Insight: Partition by the Key You Aggregate By

The single most important design choice for a stateful streaming job is to partition the stream by the same key the windowed aggregate groups on. When they match, each key's events and each key's state co-locate on one worker, so the aggregate is computed with zero cross-worker communication and the state shards cleanly for fault-tolerant checkpointing. When they mismatch, every window forces a shuffle and the job inherits all the skew and network cost that shuffle implies. Align the two keys and stateful streaming scales out almost for free; misalign them and you pay the shuffle tax on every event.

6. From Windowed Aggregate to Real-Time Feature Intermediate

Everything so far has been plumbing; here is the payoff for AI. A real-time feature is, almost always, a windowed aggregate over a stream of events keyed by some entity. "Number of transactions this card made in the last sixty seconds" is a sliding-window count keyed by card. "Average dwell time in the current browsing session" is a session-window mean keyed by user. "Clicks per article in the last five minutes" is a sliding-window count keyed by article. Each is a tiny number, refreshed continuously, that captures how an entity is behaving right now, and each is precisely the windowed-state computation of Sections 4 and 5.

This matters because the freshness of these features is often what separates a model that works from one that does not. A fraud model scoring a card transaction needs the count of recent transactions as of this transaction, not as of last night's batch job; the fraud pattern it is built to catch unfolds in seconds. A streaming engine maintains that count as partitioned, checkpointed window state and serves it to the model at scoring time, turning the abstract machinery of events and windows into a feature vector. The same windowed state, evaluated continuously, is also what drift monitors watch, which is why Section 9.9 can detect concept drift from the very aggregates this section builds. We have now assembled the streaming feature pipeline in miniature: events flow in, partition by key, accumulate into windowed state, and emit a fresh feature on demand.

Practical Example: The Fraud Feature That Was Always One Batch Behind

Who: A data engineer on the risk team at a digital payments company.

Situation: The fraud model used a feature, transactions-per-card-in-the-last-minute, computed by an hourly batch job that scanned the transaction warehouse.

Problem: Card-testing attacks fire dozens of small transactions in under a minute, so by the time the hourly job refreshed the feature, the attack was over and the money was gone.

Dilemma: Run the batch job far more often, which hammered the warehouse and still lagged by minutes, or rebuild the feature as a true streaming aggregate, which meant standing up a windowing job and managing its state.

Decision: They moved the feature to a sliding-window count, sixty-second width stepping every second, keyed by card id, partitioning the transaction stream by that same key so each card's state lived on one worker.

How: A Spark Structured Streaming job (the shape of Code 9.2.2) consumed the transaction log, maintained the per-card window count as checkpointed state, and published it to a low-latency feature store the model read at scoring time.

Result: The feature went from up to an hour stale to under two seconds, and the model began catching card-testing bursts mid-attack instead of in the next morning's report.

Lesson: A real-time feature is a windowed aggregate over a correctly partitioned stream. Align the window to the timescale of the behavior you want to catch, partition by the entity you key on, and the feature becomes as fresh as the events themselves.

Research Frontier: Unifying Streaming Features and Training (2024 to 2026)

The split between how a feature is computed for training (a batch scan over history) and how it is computed for serving (a streaming window over live events) is a notorious source of training-serving skew, and a vigorous engineering line is collapsing it. Open feature platforms such as Feast and Tecton, and the "feature/training-inference" architecture pattern popularized around 2024, push the same windowed-aggregate definition through both a batch backfill and a streaming engine so the two cannot drift apart. In parallel, streaming-native systems are pushing windowed state into vector and embedding features for retrieval, so that the recent-behavior aggregates of this section feed real-time personalization and retrieval-augmented systems directly. The research questions are sharpening: how to guarantee that a window defined once yields bit-identical features online and offline, and how to checkpoint and reshard windowed state elastically as load shifts, the elastic-state problem that connects this chapter to Chapter 2's fault-tolerance machinery. Expect the boundary between a feature store and a streaming engine to keep eroding.

We now have the three primitives, event, stream, window, and the partitioned, checkpointed state that turns a window into a live feature. Every one of them leaned on a question we have so far waved away: which clock defines the window boundaries, the time an event happened or the time we processed it? That distinction, and the trouble it causes when events arrive out of order, is the subject of Section 9.3.

Exercise 9.2.1: Three Windows, One Stream Conceptual

For each task, state which window type (tumbling, sliding, or session) fits best and justify the choice in one sentence: (a) bill each tenant for API calls in clean, non-overlapping one-hour blocks; (b) raise an alert whenever error rate over the trailing five minutes, refreshed every ten seconds, exceeds a threshold; (c) compute the total time a user spent in a single uninterrupted shopping visit. Then explain why using a tumbling window for task (b) would miss a burst of errors that straddles two adjacent windows.

Exercise 9.2.2: Session Windows From Scratch Coding

Extend Code 9.2.1 with a sessionize(events, gap) function that groups each user's events into session windows: sort one user's events by time, start a new session whenever the time since the previous event exceeds gap, and emit each session's start, end, event count, and value sum. Run it per user on the synthetic stream with a gap of 3 seconds and print the sessions. Confirm that a single very active user produces one long session while sparse activity produces several short ones, and explain why a session's state cannot be released until the gap has elapsed with no new event.

Exercise 9.2.3: The Cost of Misaligned Keys Analysis

Suppose a stream of 100,000 events per second is partitioned by region (5 distinct values) but a windowed aggregate groups by user_id (millions of distinct values). Argue from the partitioning rules of Section 2 and 5 why every window now forces a shuffle, estimate qualitatively how the cross-worker traffic compares to the partition-by-user_id case, and describe the load-skew problem the 5-way region partition creates. Then state the single change that removes both the shuffle and the skew, and connect your answer to the hash-partitioning discussion in Chapter 2.