Part II: Distributed Data Processing for AI
Chapter 9: Stream Processing and Online AI

Watermarks and Late Events

"I closed the 9:00 window at 9:00:05 and called it final. At 9:00:11 three more events from 8:59 wandered in, looking for a window that no longer existed. We have not spoken since."

A Watermark That Advanced Too Soon
Big Picture

A streaming window can only be emitted once the system believes no more data belongs in it, but on a distributed stream data is always still in flight, so "complete" can never be proven, only asserted. A watermark is exactly that assertion: a moving timestamp $T$ that declares "no further events with event-time below $T$ are expected." When the watermark crosses a window's end, the system closes the window and emits its result. Setting the watermark is a wager. Advance it aggressively and you emit quickly but drop the stragglers that arrive after you committed; hold it back and you capture nearly everything but every result is late. That wager, latency against completeness, is the same goodput-versus-SLO tension you measured in Chapter 5, now playing out one window at a time. This section gives you the mechanism, the trade-off as a measured curve, and the three ways a system handles the events that arrive too late.

In Section 9.3 we separated event time (when something happened) from processing time (when the system saw it) and committed to event-time windows so that a count for the 9:00 minute means the events that occurred in that minute, regardless of network delay or replay. That commitment buys correctness but raises a hard operational question that processing-time windows never face: if events can arrive out of order and arbitrarily late, when is the 9:00 window actually finished? A processing-time window closes when the clock on the wall says so; it never waits for anyone. An event-time window has no such luxury, because the events that belong to it may still be crossing the network. The watermark is the device that turns "we can never be sure" into a concrete, tunable decision about when to stop waiting.

W1 [0,10) W2 [10,20) W3 [20,30) event time allowed lateness L watermark T "no event with time < T expected" late, inside band: updates W3 late, past band: dropped
Figure 9.4.1: The watermark mechanism on the event-time axis. Three tumbling windows sit above the axis. Green dots are events placed at their event-time. The red watermark line $T$ sweeps rightward as the stream advances; a dashed allowed-lateness band of width $L$ trails behind it. The orange event is late but lands inside the band, so it still updates window W3. The red event arrives long after window W2 closed and falls past the band, so it is dropped. Windows W1 and W2, fully behind the watermark, are closed and final.

1. The Watermark as a Moving Promise Beginner

A watermark is a single timestamp $T$ that the stream system threads through the data alongside the events themselves. Its meaning is a promise: the system asserts that every event with event-time below $T$ has already been seen, so any window whose end lies at or before $T$ can be closed and emitted as final. The watermark only ever moves forward. As events flow past, the operator that generates the watermark watches the event-times it has observed and pushes $T$ upward according to some policy, then forwards the new $T$ downstream so that every window operator can act on the same promise.

The most common policy is bounded out-of-orderness. The generator tracks the largest event-time it has seen so far, call it $t_{\max}$, and sets

$$T = t_{\max} - L,$$

where $L$ is a fixed lateness bound chosen by the engineer. The subtraction is the safety margin: by holding the watermark $L$ units behind the newest event, the system tolerates events that arrive up to $L$ out of order before declaring their window closed. A window covering the event-time interval $[a, a+W)$ is emitted at the instant the watermark satisfies $T \ge a + W$, that is, when $t_{\max} \ge a + W + L$. The window does not wait for a wall-clock timer; it waits for enough newer data to arrive that the promise about older data becomes safe to make. This is why a stalled stream stalls its windows too: if no new events arrive, $t_{\max}$ stops growing, $T$ freezes, and pending windows never close. Production systems paper over this with an idle-source timeout that advances the watermark on processing time when a partition goes quiet, a detail we return to in Section 9.6.

Key Insight: A Watermark Is an Assertion, Not a Measurement

Nothing in the stream tells the system that the 9:00 window is truly complete; on a distributed source it can never know. The watermark $T = t_{\max} - L$ is a deliberate guess that becomes a commitment the moment a window crosses it. Choosing $L$ is choosing how wrong you are willing to be: small $L$ means you commit early and occasionally close a window before its stragglers land, large $L$ means you almost never commit too early but every result carries the full delay. The watermark does not remove the uncertainty of a distributed stream; it converts that uncertainty into one tunable number.

2. The Latency versus Completeness Trade-Off Intermediate

Because the watermark trails the newest event by $L$, every window emits roughly $L$ event-time units after its true end. That is the latency cost, and it is unavoidable: holding back the watermark is precisely how the system buys time for late data. The benefit bought with that latency is completeness, the fraction of the events truly belonging to a window that were admitted before the window closed. A larger $L$ closes windows later but catches more stragglers; a smaller $L$ closes windows sooner but drops the ones still in flight. There is no setting that gives both low latency and full completeness on an out-of-order stream, only a curve trading one for the other, and the engineer's job is to pick the point on that curve that matches the application's tolerance.

The demonstration below makes the curve concrete with no streaming framework at all, only the watermark rule in pure Python. It synthesizes a stream of twenty thousand events whose event-times advance steadily but whose arrival order is scrambled by a heavy-tailed delay, so most events are nearly on time and a few are very late. It runs event-time tumbling windows of width ten and sweeps the lateness bound $L$ across a range, reporting for each $L$ the completeness (fraction of events admitted), the number of dropped late events, and the average emit latency measured in event-time units.

import random

W = 10          # window width in event-time units
N = 20_000      # number of events
MAX_DELAY = 25  # an event can arrive up to this many units late
random.seed(7)

# Build a stream: each event has a true event-time, then a random non-negative
# delay defines when the system actually sees it (its arrival order).
events = []
for i in range(N):
    et = i * 0.5                       # event-time grows steadily
    delay = min(random.expovariate(1 / 6), MAX_DELAY)  # mostly small, a few huge
    events.append((et, et + delay))    # (event_time, arrival_key)

arrival = sorted(events, key=lambda e: e[1])   # order in which the system sees them
true_total = len(events)

def run(lateness_bound):
    max_seen = float("-inf")
    closed_end, counted = set(), {}
    dropped, emit_lag_sum, windows_emitted = 0, 0.0, 0
    for et, _ in arrival:
        max_seen = max(max_seen, et)
        watermark = max_seen - lateness_bound        # T = t_max - L
        win_start = (int(et // W)) * W
        if win_start + W <= watermark:               # window already closed: too late
            dropped += 1
            continue
        counted[win_start] = counted.get(win_start, 0) + 1
        for ws in list(counted):                     # close every window T has passed
            we = ws + W
            if we <= watermark and we not in closed_end:
                closed_end.add(we)
                emit_lag_sum += (max_seen - we)      # latency past the window's true end
                windows_emitted += 1
    admitted = sum(counted.values())
    return admitted / true_total, dropped, emit_lag_sum / max(windows_emitted, 1)

print(f"{'lateness L':>11} | {'completeness':>12} | {'dropped':>8} | {'avg emit latency':>16}")
print("-" * 56)
for L in [0, 2, 5, 10, 20, 40]:
    comp, drop, lat = run(L)
    print(f"{L:>11} | {comp:>11.2%} | {drop:>8} | {lat:>16.2f}")
Code 9.4.1: A complete event-time windowing engine with a bounded-out-of-orderness watermark, written in pure Python so the latency-completeness curve depends on nothing but the watermark rule. Sweeping lateness_bound is the only change between rows of the output.
 lateness L | completeness |  dropped | avg emit latency
--------------------------------------------------------
          0 |      65.84% |     6832 |             0.87
          2 |      75.39% |     4923 |             2.90
          5 |      85.00% |     2999 |             5.79
         10 |      93.47% |     1307 |            10.87
         20 |      99.48% |      105 |            20.87
         40 |     100.00% |        0 |            40.87
Output 9.4.1: The trade-off as measured numbers. Raising $L$ from 0 to 40 lifts completeness from 65.84% to 100% and shrinks dropped events from 6832 to zero, while average emit latency climbs in near-lockstep, about $L + 0.87$ units. Each row is one point on the curve the engineer must choose from.

Read the rows as a menu. At $L = 0$ the system commits the instant a window's end is reached, so it emits with almost no delay (0.87 units) but discards a third of the events because they had not yet arrived. At $L = 40$ it waits long enough that every straggler lands, achieving perfect completeness, but every window is now reported forty units after the fact. The middle rows are the interesting ones: $L = 10$ recovers more than 93% of events at a latency of about eleven units, and $L = 20$ reaches 99.5% completeness, capturing all but the rarest stragglers, at twice that latency. Notice the diminishing return: the jump from $L = 0$ to $L = 10$ buys nearly twenty-eight points of completeness, while the jump from $L = 20$ to $L = 40$ buys only the last half-percent. The heavy tail of the delay distribution means a handful of very late events would force you to wait far longer than the bulk of the data ever needs, which is the central reason no production system simply sets $L$ to "wait for everything."

Fun Note: The Stream That Waited Forever

Set $L$ to the largest delay any event could conceivably have and your completeness hits 100% on paper. Then one event from a phone that was in airplane mode for six hours finally syncs, and your "complete" 9:00 window has been holding its breath since this morning. Perfect completeness on an open-ended stream is a horizon, not a destination: there is always one more event that could, in principle, still be coming. Watermarks exist precisely so the system can stop waiting for the event that may never arrive.

3. What Happens to the Late Events Intermediate

Output 9.4.1 counts dropped events as a flat loss, but a real system has three distinct policies for an event that arrives after its window has already closed, and the choice among them is as consequential as the choice of $L$ itself. The first policy is to drop the event silently, which is what Code 9.4.1 does. Dropping is cheap and keeps every emitted result immutable, and it is acceptable when the metric tolerates a small sampling error and a stale correction would be worse than a missing one. The danger is silence: a dropped event leaves no trace, so a slow upstream partition can quietly erode every window's counts without anyone noticing until a downstream model starts behaving strangely.

The second policy is to route late events to a side output, a separate stream the system emits for anything that misses its window. The main results stay clean and on time, while the side output preserves the late data for a reconciliation job, an audit, or a delayed batch correction. This is the pattern that lets a team honor a tight latency SLO on the primary path while still accounting for every event somewhere, and it is the policy most production pipelines reach for when correctness must be auditable.

The third policy is allowed lateness with retraction or update. Here the window stays in a soft-closed state for an extra grace period after the watermark passes its end; an event arriving inside that grace period reopens the window, recomputes the aggregate, and emits a revised result that supersedes the earlier one. The downstream consumer must be able to absorb an update to a value it already received, which is a real burden, but the payoff is a result that is both timely and eventually accurate: the window emits early on the watermark and corrects itself if a straggler lands within the grace period. Allowed lateness is, in effect, a second smaller watermark layered behind the first, and it lets a system have a fast provisional answer and a slower correct one without choosing between them.

Practical Example: The Late Click That Inflated a Feature

Who: A machine learning platform engineer maintaining the online feature pipeline for a click-through-rate model at an ad exchange.

Situation: A streaming job computed a per-user "clicks in the last minute" feature from a clickstream and served it to a real-time ranking model on every ad request.

Problem: Mobile clients buffered events when offline and flushed them in bursts, so a sizable fraction of clicks arrived tens of seconds after the minute they belonged to, well past the windows already closed at $L = 2$ seconds.

Dilemma: Raise $L$ to capture the buffered bursts, adding seconds of latency to a feature on the request hot path where every millisecond costs revenue, or keep $L$ tiny and let the dropped clicks silently understate active users, biasing the feature low for exactly the mobile segment the team cared about most.

Decision: They kept $L$ small for the serving path but added a side output for late clicks feeding a slow reconciliation job, and switched the training feature to allowed-lateness updates so the labels the model learned from were complete even though the served feature was provisional.

How: The serving window emitted at $L = 2$ seconds for latency; late clicks went to a side-output topic; a nightly job recomputed the offline feature table with a generous grace period so train-time and serve-time features were reconciled rather than silently divergent.

Result: The served feature stayed fast, the dropped-click bias vanished from the training set, and a dashboard on the side-output volume turned a previously invisible data-loss problem into a monitored quantity.

Lesson: A late event that is silently dropped does not just lower completeness; it corrupts a feature in a direction correlated with the cause of the lateness. Choose the late-event policy per consumer: tight $L$ for the latency-bound serving path, side output or allowed lateness for the paths where the model learns.

This is the deeper reason watermarks matter for AI specifically, and not only for dashboards. A late event that vanishes is not a random sample loss; lateness is usually caused by something, a slow region, a flaky device class, a partition under load, so the events it removes are correlated with that cause. A feature computed from a window that silently dropped them is biased exactly along the dimension that produced the delay, and a model trained or served on that feature inherits the bias without any error ever being raised. The watermark and its late-event policy are therefore part of the correctness contract of every online feature, not a streaming implementation detail. We build features on this foundation in Section 9.7, and the monitoring that catches a watermark silently starving a feature is the subject of Chapter 26.

Library Shortcut: Flink Expresses Watermarks and Allowed Lateness in a Few Lines

Code 9.4.1 hand-rolls the watermark generator, the window-close logic, and the drop rule across roughly thirty lines. Apache Flink exposes all three as declarative configuration: you attach a bounded-out-of-orderness watermark strategy to the source, declare the window, and state an allowed-lateness grace period and a side output for whatever still misses it. The framework handles per-partition watermark tracking, the merge of watermarks across parallel subtasks (the minimum across inputs becomes the operator's watermark), and the timer firing.

// Apache Flink (Java DataStream API)
DataStream<Event> events = source.assignTimestampsAndWatermarks(
    WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))  // L = 2s
        .withTimestampAssigner((e, ts) -> e.eventTimeMillis));

OutputTag<Event> lateTag = new OutputTag<>("late-events"){};   // side output

SingleOutputStreamOperator<Result> out = events
    .keyBy(e -> e.userId)
    .window(TumblingEventTimeWindows.of(Time.seconds(10)))     // event-time window
    .allowedLateness(Time.seconds(30))                         // grace: update on stragglers
    .sideOutputLateData(lateTag)                               // anything past 30s lands here
    .aggregate(new CountClicks());

DataStream<Event> reallyLate = out.getSideOutput(lateTag);     // reconciliation path
Code 9.4.2: The watermark strategy, event-time window, allowed-lateness updates, and late-event side output from Section 3, each one line of Flink configuration. The thirty-odd lines of manual bookkeeping in Code 9.4.1 collapse into a declarative pipeline, and Flink additionally handles cross-partition watermark merging that the single-stream demo never had to.

4. Watermarks Across Parallel Partitions Advanced

Everything so far assumed one stream with one watermark, but a distributed stream is partitioned across many parallel sources, and each partition advances its own event-time independently. A window operator downstream of several partitions cannot close a window until every input partition has promised that no older event is coming, so the operator's effective watermark is the minimum of its input watermarks, not the maximum or the average. This minimum rule is what makes a single slow or idle partition hold back the entire operator: if one source's $t_{\max}$ lags, its watermark lags, and the minimum drags the whole window with it. The behavior is correct, since closing a window while a lagging partition might still send a qualifying event would violate the promise, but it means watermark progress is governed by the slowest partition, a straggler effect with the same shape as the ones we met in Chapter 2.

The interaction between the lateness bound and the partition skew is where real tuning lives. If partitions differ in their natural delay, the bound $L$ must cover not only each partition's internal out-of-orderness but also the skew between partitions, or the laggard's events will be counted as late at the merge. This is why systems expose idle-source detection (a quiet partition stops vetoing watermark progress after a timeout) and per-source watermark strategies. The same partition-and-recombine structure that organized Chapter 6 and Chapter 7 reappears here, except the thing being recombined is a promise about time, and the combine operator is a minimum.

Research Frontier: Adaptive and Predictive Watermarks (2024 to 2026)

A fixed lateness bound $L$ wastes latency when the stream is well behaved and drops data when a burst of skew exceeds it, so a current research line makes the watermark adaptive. The idea is to model the distribution of event delays online and set $L$ to a target quantile, for example "wait long enough to capture 99% of events," letting the bound widen during a late-arrival burst and tighten when the stream settles. Work in this direction frames watermark generation as an online estimation problem and learns the delay distribution per key or per partition, including approaches that treat the latency-completeness frontier explicitly and tune toward a service-level target rather than a hand-set constant. Parallel effort targets the partition-skew problem, where adaptive idle-source handling and skew-aware watermark merging keep one slow shard from stalling an entire job. The throughline is the same as the goodput framing of Chapter 5: the lateness bound stops being a constant an engineer guesses and becomes a quantity a controller drives toward a measured completeness target.

With watermarks in hand, event-time stream processing from Section 9.3 is finally practical: we can close windows on out-of-order data with a known, tunable trade-off, and we know what becomes of the events that miss the cut. The next question is where the stream itself lives, the durable, replayable, partitioned log that lets a watermark be recomputed after a failure and lets a late event be reprocessed at all. That log is the subject of Section 9.5.

Exercise 9.4.1: Reading the Trade-Off Curve Conceptual

Using Output 9.4.1, suppose an online feature must be served within a latency budget of 12 event-time units and the downstream model degrades noticeably once completeness falls below 90%. Which value of $L$ from the table would you choose, and why do both neighboring rows fail at least one constraint? Then explain, in terms of the heavy-tailed delay distribution, why pushing from 99.48% to 100% completeness costs as much added latency as the entire climb from 65.84% to 99.48%.

Exercise 9.4.2: Add Allowed Lateness and a Side Output Coding

Extend Code 9.4.1 so that a window is not deleted the instant the watermark passes its end but stays in a soft-closed state for an extra grace period $G$ in event-time units. An event arriving while its window is soft-closed should still be admitted (an allowed-lateness update); only events arriving after $G$ are dropped, and those should be appended to a separate side_output list instead of merely counted. For a fixed $L = 5$, report completeness and the side-output size as $G$ ranges over 0, 5, 15, and 30, and confirm that $L$ and $G$ together recover the completeness that a single large $L$ would, at lower emit latency for the on-time majority.

Exercise 9.4.3: The Minimum Rule Across Two Partitions Analysis

Modify the stream generator to produce two partitions whose event-times advance at the same rate but where partition B's $t_{\max}$ consistently trails partition A's by a fixed skew $s$. Implement the merge correctly: the operator watermark is the minimum of the two per-partition watermarks. Show analytically and then empirically that to keep partition B's events from being dropped as late, the bound $L$ must satisfy $L \ge s$ plus B's own out-of-orderness, and explain why using the maximum or the average of the two watermarks instead of the minimum would violate the watermark promise and silently drop valid data.