"I closed the 9:00 window at 9:00:05 and called it final. At 9:00:11 three more events from 8:59 wandered in, looking for a window that no longer existed. We have not spoken since."
A Watermark That Advanced Too Soon
A streaming window can only be emitted once the system believes no more data belongs in it, but on a distributed stream data is always still in flight, so "complete" can never be proven, only asserted. A watermark is exactly that assertion: a moving timestamp $T$ that declares "no further events with event-time below $T$ are expected." When the watermark crosses a window's end, the system closes the window and emits its result. Setting the watermark is a wager. Advance it aggressively and you emit quickly but drop the stragglers that arrive after you committed; hold it back and you capture nearly everything but every result is late. That wager, latency against completeness, is the same goodput-versus-SLO tension you measured in Chapter 5, now playing out one window at a time. This section gives you the mechanism, the trade-off as a measured curve, and the three ways a system handles the events that arrive too late.
In Section 9.3 we separated event time (when something happened) from processing time (when the system saw it) and committed to event-time windows so that a count for the 9:00 minute means the events that occurred in that minute, regardless of network delay or replay. That commitment buys correctness but raises a hard operational question that processing-time windows never face: if events can arrive out of order and arbitrarily late, when is the 9:00 window actually finished? A processing-time window closes when the clock on the wall says so; it never waits for anyone. An event-time window has no such luxury, because the events that belong to it may still be crossing the network. The watermark is the device that turns "we can never be sure" into a concrete, tunable decision about when to stop waiting.
1. The Watermark as a Moving Promise Beginner
A watermark is a single timestamp $T$ that the stream system threads through the data alongside the events themselves. Its meaning is a promise: the system asserts that every event with event-time below $T$ has already been seen, so any window whose end lies at or before $T$ can be closed and emitted as final. The watermark only ever moves forward. As events flow past, the operator that generates the watermark watches the event-times it has observed and pushes $T$ upward according to some policy, then forwards the new $T$ downstream so that every window operator can act on the same promise.
The most common policy is bounded out-of-orderness. The generator tracks the largest event-time it has seen so far, call it $t_{\max}$, and sets
$$T = t_{\max} - L,$$where $L$ is a fixed lateness bound chosen by the engineer. The subtraction is the safety margin: by holding the watermark $L$ units behind the newest event, the system tolerates events that arrive up to $L$ out of order before declaring their window closed. A window covering the event-time interval $[a, a+W)$ is emitted at the instant the watermark satisfies $T \ge a + W$, that is, when $t_{\max} \ge a + W + L$. The window does not wait for a wall-clock timer; it waits for enough newer data to arrive that the promise about older data becomes safe to make. This is why a stalled stream stalls its windows too: if no new events arrive, $t_{\max}$ stops growing, $T$ freezes, and pending windows never close. Production systems paper over this with an idle-source timeout that advances the watermark on processing time when a partition goes quiet, a detail we return to in Section 9.6.
Nothing in the stream tells the system that the 9:00 window is truly complete; on a distributed source it can never know. The watermark $T = t_{\max} - L$ is a deliberate guess that becomes a commitment the moment a window crosses it. Choosing $L$ is choosing how wrong you are willing to be: small $L$ means you commit early and occasionally close a window before its stragglers land, large $L$ means you almost never commit too early but every result carries the full delay. The watermark does not remove the uncertainty of a distributed stream; it converts that uncertainty into one tunable number.
2. The Latency versus Completeness Trade-Off Intermediate
Because the watermark trails the newest event by $L$, every window emits roughly $L$ event-time units after its true end. That is the latency cost, and it is unavoidable: holding back the watermark is precisely how the system buys time for late data. The benefit bought with that latency is completeness, the fraction of the events truly belonging to a window that were admitted before the window closed. A larger $L$ closes windows later but catches more stragglers; a smaller $L$ closes windows sooner but drops the ones still in flight. There is no setting that gives both low latency and full completeness on an out-of-order stream, only a curve trading one for the other, and the engineer's job is to pick the point on that curve that matches the application's tolerance.
The demonstration below makes the curve concrete with no streaming framework at all, only the watermark rule in pure Python. It synthesizes a stream of twenty thousand events whose event-times advance steadily but whose arrival order is scrambled by a heavy-tailed delay, so most events are nearly on time and a few are very late. It runs event-time tumbling windows of width ten and sweeps the lateness bound $L$ across a range, reporting for each $L$ the completeness (fraction of events admitted), the number of dropped late events, and the average emit latency measured in event-time units.
import random
W = 10 # window width in event-time units
N = 20_000 # number of events
MAX_DELAY = 25 # an event can arrive up to this many units late
random.seed(7)
# Build a stream: each event has a true event-time, then a random non-negative
# delay defines when the system actually sees it (its arrival order).
events = []
for i in range(N):
et = i * 0.5 # event-time grows steadily
delay = min(random.expovariate(1 / 6), MAX_DELAY) # mostly small, a few huge
events.append((et, et + delay)) # (event_time, arrival_key)
arrival = sorted(events, key=lambda e: e[1]) # order in which the system sees them
true_total = len(events)
def run(lateness_bound):
max_seen = float("-inf")
closed_end, counted = set(), {}
dropped, emit_lag_sum, windows_emitted = 0, 0.0, 0
for et, _ in arrival:
max_seen = max(max_seen, et)
watermark = max_seen - lateness_bound # T = t_max - L
win_start = (int(et // W)) * W
if win_start + W <= watermark: # window already closed: too late
dropped += 1
continue
counted[win_start] = counted.get(win_start, 0) + 1
for ws in list(counted): # close every window T has passed
we = ws + W
if we <= watermark and we not in closed_end:
closed_end.add(we)
emit_lag_sum += (max_seen - we) # latency past the window's true end
windows_emitted += 1
admitted = sum(counted.values())
return admitted / true_total, dropped, emit_lag_sum / max(windows_emitted, 1)
print(f"{'lateness L':>11} | {'completeness':>12} | {'dropped':>8} | {'avg emit latency':>16}")
print("-" * 56)
for L in [0, 2, 5, 10, 20, 40]:
comp, drop, lat = run(L)
print(f"{L:>11} | {comp:>11.2%} | {drop:>8} | {lat:>16.2f}")
lateness_bound is the only change between rows of the output. lateness L | completeness | dropped | avg emit latency
--------------------------------------------------------
0 | 65.84% | 6832 | 0.87
2 | 75.39% | 4923 | 2.90
5 | 85.00% | 2999 | 5.79
10 | 93.47% | 1307 | 10.87
20 | 99.48% | 105 | 20.87
40 | 100.00% | 0 | 40.87
Read the rows as a menu. At $L = 0$ the system commits the instant a window's end is reached, so it emits with almost no delay (0.87 units) but discards a third of the events because they had not yet arrived. At $L = 40$ it waits long enough that every straggler lands, achieving perfect completeness, but every window is now reported forty units after the fact. The middle rows are the interesting ones: $L = 10$ recovers more than 93% of events at a latency of about eleven units, and $L = 20$ reaches 99.5% completeness, capturing all but the rarest stragglers, at twice that latency. Notice the diminishing return: the jump from $L = 0$ to $L = 10$ buys nearly twenty-eight points of completeness, while the jump from $L = 20$ to $L = 40$ buys only the last half-percent. The heavy tail of the delay distribution means a handful of very late events would force you to wait far longer than the bulk of the data ever needs, which is the central reason no production system simply sets $L$ to "wait for everything."
Set $L$ to the largest delay any event could conceivably have and your completeness hits 100% on paper. Then one event from a phone that was in airplane mode for six hours finally syncs, and your "complete" 9:00 window has been holding its breath since this morning. Perfect completeness on an open-ended stream is a horizon, not a destination: there is always one more event that could, in principle, still be coming. Watermarks exist precisely so the system can stop waiting for the event that may never arrive.
3. What Happens to the Late Events Intermediate
Output 9.4.1 counts dropped events as a flat loss, but a real system has three distinct policies for an event that arrives after its window has already closed, and the choice among them is as consequential as the choice of $L$ itself. The first policy is to drop the event silently, which is what Code 9.4.1 does. Dropping is cheap and keeps every emitted result immutable, and it is acceptable when the metric tolerates a small sampling error and a stale correction would be worse than a missing one. The danger is silence: a dropped event leaves no trace, so a slow upstream partition can quietly erode every window's counts without anyone noticing until a downstream model starts behaving strangely.
The second policy is to route late events to a side output, a separate stream the system emits for anything that misses its window. The main results stay clean and on time, while the side output preserves the late data for a reconciliation job, an audit, or a delayed batch correction. This is the pattern that lets a team honor a tight latency SLO on the primary path while still accounting for every event somewhere, and it is the policy most production pipelines reach for when correctness must be auditable.
The third policy is allowed lateness with retraction or update. Here the window stays in a soft-closed state for an extra grace period after the watermark passes its end; an event arriving inside that grace period reopens the window, recomputes the aggregate, and emits a revised result that supersedes the earlier one. The downstream consumer must be able to absorb an update to a value it already received, which is a real burden, but the payoff is a result that is both timely and eventually accurate: the window emits early on the watermark and corrects itself if a straggler lands within the grace period. Allowed lateness is, in effect, a second smaller watermark layered behind the first, and it lets a system have a fast provisional answer and a slower correct one without choosing between them.
Who: A machine learning platform engineer maintaining the online feature pipeline for a click-through-rate model at an ad exchange.
Situation: A streaming job computed a per-user "clicks in the last minute" feature from a clickstream and served it to a real-time ranking model on every ad request.
Problem: Mobile clients buffered events when offline and flushed them in bursts, so a sizable fraction of clicks arrived tens of seconds after the minute they belonged to, well past the windows already closed at $L = 2$ seconds.
Dilemma: Raise $L$ to capture the buffered bursts, adding seconds of latency to a feature on the request hot path where every millisecond costs revenue, or keep $L$ tiny and let the dropped clicks silently understate active users, biasing the feature low for exactly the mobile segment the team cared about most.
Decision: They kept $L$ small for the serving path but added a side output for late clicks feeding a slow reconciliation job, and switched the training feature to allowed-lateness updates so the labels the model learned from were complete even though the served feature was provisional.
How: The serving window emitted at $L = 2$ seconds for latency; late clicks went to a side-output topic; a nightly job recomputed the offline feature table with a generous grace period so train-time and serve-time features were reconciled rather than silently divergent.
Result: The served feature stayed fast, the dropped-click bias vanished from the training set, and a dashboard on the side-output volume turned a previously invisible data-loss problem into a monitored quantity.
Lesson: A late event that is silently dropped does not just lower completeness; it corrupts a feature in a direction correlated with the cause of the lateness. Choose the late-event policy per consumer: tight $L$ for the latency-bound serving path, side output or allowed lateness for the paths where the model learns.
This is the deeper reason watermarks matter for AI specifically, and not only for dashboards. A late event that vanishes is not a random sample loss; lateness is usually caused by something, a slow region, a flaky device class, a partition under load, so the events it removes are correlated with that cause. A feature computed from a window that silently dropped them is biased exactly along the dimension that produced the delay, and a model trained or served on that feature inherits the bias without any error ever being raised. The watermark and its late-event policy are therefore part of the correctness contract of every online feature, not a streaming implementation detail. We build features on this foundation in Section 9.7, and the monitoring that catches a watermark silently starving a feature is the subject of Chapter 26.
Code 9.4.1 hand-rolls the watermark generator, the window-close logic, and the drop rule across roughly thirty lines. Apache Flink exposes all three as declarative configuration: you attach a bounded-out-of-orderness watermark strategy to the source, declare the window, and state an allowed-lateness grace period and a side output for whatever still misses it. The framework handles per-partition watermark tracking, the merge of watermarks across parallel subtasks (the minimum across inputs becomes the operator's watermark), and the timer firing.
// Apache Flink (Java DataStream API)
DataStream<Event> events = source.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2)) // L = 2s
.withTimestampAssigner((e, ts) -> e.eventTimeMillis));
OutputTag<Event> lateTag = new OutputTag<>("late-events"){}; // side output
SingleOutputStreamOperator<Result> out = events
.keyBy(e -> e.userId)
.window(TumblingEventTimeWindows.of(Time.seconds(10))) // event-time window
.allowedLateness(Time.seconds(30)) // grace: update on stragglers
.sideOutputLateData(lateTag) // anything past 30s lands here
.aggregate(new CountClicks());
DataStream<Event> reallyLate = out.getSideOutput(lateTag); // reconciliation path
4. Watermarks Across Parallel Partitions Advanced
Everything so far assumed one stream with one watermark, but a distributed stream is partitioned across many parallel sources, and each partition advances its own event-time independently. A window operator downstream of several partitions cannot close a window until every input partition has promised that no older event is coming, so the operator's effective watermark is the minimum of its input watermarks, not the maximum or the average. This minimum rule is what makes a single slow or idle partition hold back the entire operator: if one source's $t_{\max}$ lags, its watermark lags, and the minimum drags the whole window with it. The behavior is correct, since closing a window while a lagging partition might still send a qualifying event would violate the promise, but it means watermark progress is governed by the slowest partition, a straggler effect with the same shape as the ones we met in Chapter 2.
The interaction between the lateness bound and the partition skew is where real tuning lives. If partitions differ in their natural delay, the bound $L$ must cover not only each partition's internal out-of-orderness but also the skew between partitions, or the laggard's events will be counted as late at the merge. This is why systems expose idle-source detection (a quiet partition stops vetoing watermark progress after a timeout) and per-source watermark strategies. The same partition-and-recombine structure that organized Chapter 6 and Chapter 7 reappears here, except the thing being recombined is a promise about time, and the combine operator is a minimum.
A fixed lateness bound $L$ wastes latency when the stream is well behaved and drops data when a burst of skew exceeds it, so a current research line makes the watermark adaptive. The idea is to model the distribution of event delays online and set $L$ to a target quantile, for example "wait long enough to capture 99% of events," letting the bound widen during a late-arrival burst and tighten when the stream settles. Work in this direction frames watermark generation as an online estimation problem and learns the delay distribution per key or per partition, including approaches that treat the latency-completeness frontier explicitly and tune toward a service-level target rather than a hand-set constant. Parallel effort targets the partition-skew problem, where adaptive idle-source handling and skew-aware watermark merging keep one slow shard from stalling an entire job. The throughline is the same as the goodput framing of Chapter 5: the lateness bound stops being a constant an engineer guesses and becomes a quantity a controller drives toward a measured completeness target.
With watermarks in hand, event-time stream processing from Section 9.3 is finally practical: we can close windows on out-of-order data with a known, tunable trade-off, and we know what becomes of the events that miss the cut. The next question is where the stream itself lives, the durable, replayable, partitioned log that lets a watermark be recomputed after a failure and lets a late event be reprocessed at all. That log is the subject of Section 9.5.
Using Output 9.4.1, suppose an online feature must be served within a latency budget of 12 event-time units and the downstream model degrades noticeably once completeness falls below 90%. Which value of $L$ from the table would you choose, and why do both neighboring rows fail at least one constraint? Then explain, in terms of the heavy-tailed delay distribution, why pushing from 99.48% to 100% completeness costs as much added latency as the entire climb from 65.84% to 99.48%.
Extend Code 9.4.1 so that a window is not deleted the instant the watermark passes its end but stays in a soft-closed state for an extra grace period $G$ in event-time units. An event arriving while its window is soft-closed should still be admitted (an allowed-lateness update); only events arriving after $G$ are dropped, and those should be appended to a separate side_output list instead of merely counted. For a fixed $L = 5$, report completeness and the side-output size as $G$ ranges over 0, 5, 15, and 30, and confirm that $L$ and $G$ together recover the completeness that a single large $L$ would, at lower emit latency for the on-time majority.
Modify the stream generator to produce two partitions whose event-times advance at the same rate but where partition B's $t_{\max}$ consistently trails partition A's by a fixed skew $s$. Implement the merge correctly: the operator watermark is the minimum of the two per-partition watermarks. Show analytically and then empirically that to keep partition B's events from being dropped as late, the bound $L$ must satisfy $L \ge s$ plus B's own out-of-orderness, and explain why using the maximum or the average of the two watermarks instead of the minimum would violate the watermark promise and silently drop valid data.