Part II: Distributed Data Processing for AI
Chapter 9: Stream Processing and Online AI

Spark Structured Streaming and Flink

"They asked whether I process one event at a time or a hundred at once. I said yes, and that the difference is the only thing anyone ever fights about."

A Stream Operator Between Two Philosophies
Big Picture

An unbounded stream can be processed in two fundamentally different ways, and the entire distributed-streaming field is organized around that one choice: chop the stream into tiny batches and reuse a batch engine, or process each event the instant it arrives. Spark Structured Streaming takes the first path, running the stream as a sequence of micro-batches on the same DataFrame and Catalyst engine that powers batch jobs in Chapter 7; it inherits a unified batch-and-stream programming model and exactly-once guarantees from checkpointing, at the cost of latency measured in the hundreds of milliseconds. Apache Flink takes the second path, processing events one at a time with first-class event time, the watermarks of Section 9.4, and managed keyed state; it pays in operational complexity and earns latency measured in milliseconds. This section pins down the trade-off, shows the stateful exactly-once machinery both engines share, and gives you a rule for choosing one when an AI feature pipeline is on the line.

By this point in the chapter you have a stream of events with timestamps, an ordering problem solved by the watermarks of Section 9.4, and a durable transport layer in the distributed log of Section 9.5. What you do not yet have is the compute engine that consumes that log, maintains running state across millions of keys, and survives the failure of any worker without double-counting or dropping events. Two open-source systems dominate that role in production AI pipelines, and they embody opposite answers to a single design question. Understanding why they differ, and where each one wins, is the goal of this section.

1. One Stream, Two Execution Models Beginner

A bounded dataset has a beginning and an end, so a batch engine can read all of it, compute, and stop. A stream never ends, which leaves a designer with a choice about how to make progress against an infinite input. The micro-batch model says: wait a short interval, treat everything that arrived in that interval as a small bounded dataset, run an ordinary batch computation over it, commit the result, then repeat. The continuous model says: never wait, never bound; push each event through a standing graph of operators the moment it lands, and let each operator update its state in place. Both produce correct answers over the same logical query; they differ in when work happens and therefore in how long any single event waits.

This is not a minor implementation detail. It determines the latency floor, the failure-recovery strategy, the way state is stored, and even how the programming model feels. Spark Structured Streaming built its execution on micro-batches precisely so that a streaming query is, internally, the same Catalyst-optimized DataFrame plan that runs a batch job in Chapter 7, just executed repeatedly over incremental slices of input. Flink built its execution on a long-lived dataflow graph of operators through which records flow continuously, so that an event can traverse the whole pipeline in milliseconds without ever waiting for a batch boundary. Figure 9.6.1 contrasts the two.

Micro-batch (Spark Structured Streaming) stream of events on a timeline bucket t0 bucket t1 bucket t2 Catalyst batch job, run per bucket latency = wait for boundary + batch time each event waits up to one interval before it is touched Continuous (Apache Flink) stream of events on a timeline map operator keyed state + window each event flows through the standing graph at once latency = operator processing time only
Figure 9.6.1: The two execution models. On the left, Spark Structured Streaming groups events into fixed-interval buckets and runs a full Catalyst batch job per bucket, so an event waits up to one interval before it is processed. On the right, Flink keeps a long-lived operator graph standing and pushes each event through it on arrival, so the latency an event sees is just the operator processing time. Section 2 quantifies the gap this opens.

2. The Latency-Throughput Trade-Off, Measured Intermediate

The intuition from Figure 9.6.1 is that micro-batch trades latency for simplicity and throughput, while continuous processing trades operational complexity for latency. We can make the latency half of that claim concrete without installing either engine, by simulating the two execution models against one synthetic event stream and measuring the end-to-end latency each event experiences. The simulation below replays roughly two thousand events per second through both a micro-batch scheduler, which holds events until a 100-millisecond boundary then processes the whole bucket with a fixed per-batch overhead, and a continuous processor, which handles each event on arrival with a small per-event cost. Latency is the gap between when an event arrived and when its result was produced.

import random
random.seed(7)

# Synthetic stream: ~2000 events/s over a 10-second window (Poisson arrivals).
N = 20_000
arrivals, t = [], 0.0
for _ in range(N):
    t += random.expovariate(1.0 / 0.5)        # mean inter-arrival 0.5 ms
    arrivals.append(t)
T_end = arrivals[-1]

BATCH_INTERVAL = 100.0   # micro-batch trigger interval (ms)
BATCH_OVERHEAD = 8.0     # fixed scheduling cost paid once per batch
PER_EVENT_BATCH = 0.02   # marginal cost per event inside a vectorized batch
PER_EVENT_CONT  = 0.20   # per-event cost in continuous mode (no amortization)

def micro_batch():
    lat, clock, i, boundary = [], 0.0, 0, BATCH_INTERVAL
    while i < N:
        bucket = []
        while i < N and arrivals[i] < boundary:   # collect this interval's events
            bucket.append(arrivals[i]); i += 1
        if bucket:
            start = max(boundary, clock)             # batch fires at the boundary
            clock = start + BATCH_OVERHEAD + PER_EVENT_BATCH * len(bucket)
            lat += [clock - a for a in bucket]       # every event waited for it
        boundary += BATCH_INTERVAL
    return lat

def continuous():
    lat, clock = [], 0.0
    for a in arrivals:
        start = max(a, clock)                        # serial operator may queue
        clock = start + PER_EVENT_CONT
        lat.append(clock - a)
    return lat

def report(name, lat):
    s = sorted(lat); n = len(s)
    print(f"{name:14s} mean={sum(s)/n:7.2f} ms  p50={s[n//2]:7.2f} ms  "
          f"p99={s[int(n*0.99)]:7.2f} ms  throughput={N/(T_end/1000):8.0f} ev/s")

mb, ct = micro_batch(), continuous()
print(f"stream: {N} events over {T_end/1000:.1f} s, batch interval = {BATCH_INTERVAL:.0f} ms")
print("-" * 78); report("micro-batch", mb); report("continuous", ct); print("-" * 78)
print(f"latency ratio (micro-batch mean / continuous mean): "
      f"{(sum(mb)/len(mb)) / (sum(ct)/len(ct)):.1f}x")
Code 9.6.1: A pure-Python simulation of micro-batch versus continuous stream processing. The same arrival sequence is replayed through both models; only the timing rule differs, so the latency gap reported is the structural cost of waiting for a batch boundary, not an artifact of different inputs.
stream: 20000 events over 10.0 s, batch interval = 100 ms
------------------------------------------------------------------------------
micro-batch    mean=  62.06 ms  p50=  62.33 ms  p99= 111.14 ms  throughput=    2009 ev/s
continuous     mean=   0.27 ms  p50=   0.20 ms  p99=   0.70 ms  throughput=    2009 ev/s
------------------------------------------------------------------------------
latency ratio (micro-batch mean / continuous mean): 232.3x
Output 9.6.1: Both models sustain the same throughput (they consume the stream as fast as it arrives), but the micro-batch mean latency of about 62 ms is roughly 230 times the continuous mean of about 0.27 ms. The micro-batch mean sits near half the batch interval, exactly as the every-event-waits-for-the-boundary reasoning predicts.

The two numbers that move are latency and nothing else: throughput is identical because both models keep up with the offered load, and the simulated batch path is in fact more compute-efficient per event (its marginal cost is lower) thanks to amortization. That asymmetry is the whole bargain. Micro-batch buys you vectorized, amortized, batch-engine efficiency and a simpler recovery story, and it charges you a latency on the order of half the batch interval. Continuous processing gives you per-event latency near the operator cost itself, and it charges you a more intricate runtime that must keep state consistent without batch boundaries to lean on. Neither is universally better; the right choice depends on whether your AI pipeline's value comes from freshness or from throughput and simplicity.

Key Insight: The Batch Boundary Is the Latency Floor

In a micro-batch engine, an event cannot be acted on until its bucket closes, so the best-case average latency is about half the trigger interval and the worst case is a full interval plus the batch's own processing time. Shrinking the interval lowers latency but raises the fraction of time spent on per-batch overhead (scheduling, planning, committing), so there is a floor below which micro-batch stops being efficient. A continuous engine has no such boundary; its latency floor is the time to push one event through the operator graph. When a requirement reads "react within a few milliseconds", that single sentence has already chosen continuous processing for you.

3. Spark Structured Streaming: The Batch Engine, Repeated Intermediate

Spark Structured Streaming presents a stream as an unbounded table to which rows are continually appended, and a streaming query as the same DataFrame transformation you would write over a static table. Under the hood the engine wakes on each trigger, identifies the new input since the last trigger, plans an incremental Catalyst job over just that slice, executes it across the cluster exactly as a batch job from Chapter 7 would run, and commits the output. The programmer-facing payoff is that one body of DataFrame code can serve both the historical backfill (run once over all data) and the live pipeline (run repeatedly over increments), which removes an entire class of train-serve skew that haunts feature pipelines.

# Illustrative: a Structured Streaming windowed aggregation. Not run here.
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col, count

spark = SparkSession.builder.appName("clicks").getOrCreate()

events = (spark.readStream
          .format("kafka")                                  # read from the log of Section 9.5
          .option("subscribe", "clicks")
          .load()
          .selectExpr("CAST(value AS STRING) AS user", "timestamp"))

# Event-time windowing with a watermark (Section 9.4) bounds late-event state.
counts = (events
          .withWatermark("timestamp", "2 minutes")          # drop state older than this
          .groupBy(window(col("timestamp"), "1 minute"), col("user"))
          .agg(count("*").alias("clicks")))

query = (counts.writeStream
         .outputMode("update")
         .option("checkpointLocation", "/chk/clicks")       # WAL + state for exactly-once
         .format("console")
         .trigger(processingTime="10 seconds")              # the micro-batch interval
         .start())
Code 9.6.2: A Spark Structured Streaming windowed click-count, shown to illustrate the API, not executed in this section. The withWatermark call ties directly to Section 9.4, and the checkpointLocation is what makes the query restartable with exactly-once semantics, explained in Section 5.

The trigger(processingTime="10 seconds") line is the micro-batch interval made explicit, and it is the dial that sets the latency floor from Section 2. Recent Spark releases also offer a continuous-processing trigger and, more importantly, a low-latency mode that narrows the gap to competitors, but the engine's center of gravity remains micro-batch, and the unified batch-and-stream DataFrame model is the reason teams already invested in Chapter 7 reach for it first.

4. Apache Flink: Event Time as a First-Class Citizen Advanced

Flink was designed from the start as a continuous, event-at-a-time engine, and its data model reflects that. A job is a directed graph of operators that runs indefinitely; records flow edge to edge; and each operator may carry keyed state, a per-key value (or list, or map) that the runtime partitions across the cluster, persists, and restores on failure. Crucially, Flink treats event time as primary: every record carries an event timestamp, watermarks from Section 9.4 flow through the graph as special markers that advance each operator's notion of "now", and windows fire on event-time progress rather than wall-clock arrival. This is what lets a Flink job produce the same result whether it processes a live stream or replays a day-old log at full speed, a property Spark achieves through its watermark API but that sits at Flink's very core.

# Illustrative: the same windowed count in PyFlink. Not run here.
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.window import TumblingEventTimeWindows
from pyflink.common import Time, WatermarkStrategy

env = StreamExecutionEnvironment.get_execution_environment()
env.enable_checkpointing(10_000)                      # checkpoint every 10 s (Section 5)

stream = (env.from_source(kafka_source, WatermarkStrategy
              .for_bounded_out_of_orderness(Time.minutes(2))   # the watermark policy
              .with_timestamp_assigner(lambda ev, ts: ev.event_time))
          .key_by(lambda ev: ev.user)                 # partition keyed state by user
          .window(TumblingEventTimeWindows.of(Time.minutes(1)))
          .reduce(lambda a, b: a.add_click(b)))       # managed state per (user, window)

stream.print()
env.execute("clicks")
Code 9.6.3: The same one-minute click-count expressed in PyFlink, shown illustratively. The key_by establishes the keyed-state partitioning that Flink manages and checkpoints, and enable_checkpointing arms the exactly-once mechanism of Section 5. Each event traverses this graph on arrival rather than waiting for a batch boundary.

The price of this design is operational. A standing operator graph with large keyed state is a stateful distributed service that must be sized, monitored, rescaled, and recovered, which is more demanding to run than periodically launching batch jobs. Flink answers with state backends (an embedded RocksDB store for state too large for memory) and incremental checkpoints, but the team operating a Flink cluster carries a heavier load than one running Structured Streaming on an existing Spark deployment. That operational weight is the real cost behind the abstract phrase "continuous processing is more complex".

Fun Note: The Squirrel and the Spark

Flink's logo is a squirrel, chosen in Berlin where the project was born, because a squirrel is fast, agile, and squirrels things away (its state) for later. Spark's name, by contrast, promised speed over the disk-bound Hadoop MapReduce of Chapter 6. Two mascots, two philosophies: the squirrel never stops moving, and the spark fires in bright, regular bursts.

5. Stateful Operators and Exactly-Once Across Both Advanced

Whatever the execution model, a useful streaming AI pipeline is stateful: it counts events per user over a window, maintains a running feature value, joins a stream against a slowly changing table, or accumulates the context an online model needs. State plus failure is where streaming gets genuinely hard, because a worker can crash mid-computation and the engine must resume without losing updates (which would undercount) or replaying them (which would double-count). The guarantee both engines target is exactly-once: every input event affects the output state once and only once, even across arbitrary failures. The word is a slight abuse, since events may be physically reprocessed; what is exactly-once is the effect on committed state.

Both engines reach exactly-once through the same two ingredients, adapted to their model. The first is a replayable source: the distributed log of Section 9.5 lets either engine rewind to a known offset and re-read events after a crash. The second is a consistent snapshot of operator state paired with the input offset that produced it. Spark records, per micro-batch, the input offsets in a write-ahead log and the resulting state in a checkpoint directory, so on restart it knows exactly which batch to redo and from where. Flink runs the Chandy-Lamport-style asynchronous barrier algorithm: special checkpoint barriers flow through the operator graph alongside data, and when a barrier passes an operator it snapshots that operator's state, yielding a globally consistent cut without stopping the stream. On recovery, Flink restores the last complete snapshot and rewinds the source to the matching offset. Different mechanisms, identical contract: state and offset advance together or not at all.

Practical Example: Choosing an Engine for a Real-Time Fraud Feature

Who: A streaming-platform engineer at a payments company supporting the fraud-scoring model.

Situation: Each card swipe must be scored in under 50 ms, and the score depends on features like "transactions by this card in the last 60 seconds" computed from a live event stream.

Problem: The existing nightly Spark batch job already produced these features for training, but its 10-second micro-batch streaming version added tens of milliseconds of latency that, stacked on model inference and network time, blew the 50 ms budget.

Dilemma: Keep one Spark codebase for both training and serving, accepting the micro-batch latency floor, or stand up a separate Flink pipeline for the online path, gaining millisecond latency but maintaining two systems and risking train-serve feature skew.

Decision: They moved the online feature computation to Flink for its continuous, event-time keyed-state model, and kept Spark for the historical backfill, then wrote a shared specification of the feature logic to keep the two implementations aligned.

How: Flink consumed the same Kafka topics as Spark, maintained per-card keyed state with a one-minute event-time window, checkpointed every 10 seconds for exactly-once, and emitted features to the online store that the scorer read.

Result: The online feature latency dropped from roughly 60 ms to under 5 ms, comfortably inside the budget, while the nightly Spark job continued to serve training unchanged.

Lesson: Latency requirements, not engine fashion, pick the model. When a hard millisecond budget binds, continuous processing earns its operational cost; when it does not, the unified batch-and-stream engine is the simpler bet. The feature pipeline this story turns on is the subject of Section 9.7.

Library Shortcut: Exactly-Once Is a Configuration Flag, Not a Project

Code 9.6.1 simulated only the timing of the two models, not their fault tolerance, because rebuilding consistent snapshots and write-ahead logs from scratch is a multi-month systems effort. Both production engines collapse that effort to a single option. In Spark you set checkpointLocation on the query (Code 9.6.2) and the engine maintains the offset write-ahead log and state checkpoints for you. In Flink you call enable_checkpointing(interval) (Code 9.6.3) and the barrier-snapshot algorithm runs underneath. The hundreds of lines of careful offset bookkeeping, snapshot coordination, and recovery logic that exactly-once requires are exactly what these frameworks own internally; your job is to point them at a durable checkpoint store and pick an interval.

6. When to Choose Each for AI Pipelines Intermediate

The decision rarely turns on a feature checklist, because the two engines have converged on capability: both do event-time windowing, both do exactly-once, both maintain large distributed state. It turns on three practical questions. First, what is the latency budget? A sub-10-millisecond reaction (real-time fraud, ad bidding, anomaly tripwires) points to Flink; a one-to-few-second freshness target (dashboards, near-real-time features, incremental ETL) is comfortably inside micro-batch territory. Second, what does the team already run? An organization with a large Spark investment from Chapter 7 gets unified batch-and-stream code and one operational skill set by staying on Structured Streaming, which can outweigh a latency gap that does not bind. Third, how heavy is the state and how event-time-sensitive is the logic? Pipelines with very large keyed state, complex event-time windows, or fine-grained per-event control tend to fit Flink's model more naturally.

For AI specifically, the most common pattern is exactly the one in the fraud example: Spark or another batch engine computes features for training over historical data, and an online engine computes the same features over the live stream for serving, with both feeding the feature store of Section 9.7. Whether that online engine is Spark Structured Streaming or Flink comes down to the latency budget of the model being served. The two systems are less rivals than complements positioned at different points on the latency-cost curve, and a mature platform often runs both.

Research Frontier: Closing the Latency Gap and Streaming Lakehouses (2024 to 2026)

The micro-batch-versus-continuous line is actively narrowing from both ends. Spark's Project Lightspeed (2023 onward) and the asynchronous progress tracking and state-store improvements shipped in Spark 3.5 and 4.0 cut Structured Streaming's tail latencies and made stateful queries far cheaper to run, eroding Flink's latency advantage for many workloads. From the other side, Flink 2.0 (2025) introduced disaggregated state, separating compute from a remote state store so that very large keyed state no longer pins jobs to fat local disks, which matters for the billion-key feature stores AI pipelines build. A parallel thread fuses streaming with the table formats of Chapter 8: Apache Paimon and the streaming write paths of Apache Iceberg and Delta Lake let a single table serve both a streaming sink and a batch query, pushing toward a genuine "streaming lakehouse" where the micro-batch-versus-continuous choice is hidden beneath one storage abstraction. The trend is not that one model wins, but that the engines are racing to offer both behind a unified API.

With the engines chosen and their exactly-once guarantees understood, the remaining question is what these pipelines actually compute for an AI system: the freshly updated features that an online model reads at inference time. That is where the chapter goes next, in Section 9.7, turning the streaming machinery of this section into the online feature computation that closes the loop between the live stream and the deployed model.

Exercise 9.6.1: Read the Latency Floor Conceptual

Using only the reasoning behind Output 9.6.1, explain why the micro-batch mean latency came out near 62 ms when the batch interval was 100 ms, and predict what the mean would become if the interval were halved to 50 ms (assume the per-batch overhead stays negligible relative to the wait). Then state one reason you cannot shrink the interval indefinitely toward zero to chase continuous-style latency, connecting your answer to the per-batch overhead term in Code 9.6.1.

Exercise 9.6.2: Make the Simulation Bursty Coding

Modify Code 9.6.1 so that arrivals come in bursts: instead of a steady ~2000 events per second, inject a 200-millisecond window where 10,000 events arrive almost simultaneously, surrounded by quiet periods. Re-measure the p99 latency of both models. Explain why the continuous model's serial clock = max(a, clock) + PER_EVENT_CONT rule causes its tail latency to spike under the burst (a queue forms) while the micro-batch model absorbs the burst into one larger but still bounded batch, and what this implies about provisioning a continuous pipeline for peak load.

Exercise 9.6.3: Cost the Exactly-Once Checkpoint Analysis

A Flink job maintains $S = 40$ GB of keyed state and checkpoints every $I = 10$ seconds to a remote store reachable at $10$ GB/s, using full (non-incremental) checkpoints. Estimate the time to write one checkpoint and the fraction of each interval it consumes. Now suppose incremental checkpoints reduce the written bytes to 5 percent of $S$ on a typical interval; recompute. Argue from these two numbers why incremental checkpointing and the disaggregated state of the research frontier matter specifically for the billion-key feature stores that AI pipelines build, and how the checkpoint interval trades recovery time against steady-state overhead.