"I counted the event the moment I saw it. Nobody told me it had been wandering the network for twelve seconds, knocking on the wrong window."
A Stream Operator With a Slow Watch
Every event in a distributed stream carries two timestamps that almost never agree: the moment it happened (event time) and the moment your system observed it (processing time), and confusing the two silently corrupts every windowed aggregate you compute. In the previous section we cut an unbounded stream into windows and aggregated within each one. That construction quietly assumed we knew which window an event belonged to. In a real distributed pipeline, events arrive late, out of order, and in bursts, because networks delay them, partitions hold them, and mobile devices go offline and flush later. So the window an event lands in by arrival is not the window it belongs to by occurrence. This section makes the two clocks explicit, shows with running code that a count by processing time disagrees with the correct count by event time, and explains why online AI features must be anchored to event time. The price of that correctness, waiting for or repairing late data, is the watermark machinery of Section 9.4.
A stream processor sits between a fleet of producers and a set of windowed aggregations. Each event it handles is stamped twice over its lifetime. When the event is born, say a card swipe, a click, a sensor reading, the device that produced it records the wall-clock instant of occurrence. That is the event time, and it travels inside the event payload. Much later, after the event has crossed a network, possibly queued in a broker, possibly waited out a partition, it reaches the operator that will count it. The instant of that arrival, read from the operator's own clock, is the processing time. On a single machine fed a tidy file, the two clocks march in lockstep and the distinction looks pedantic. The moment the data source is distributed, they come apart, and which one you aggregate on decides whether your numbers are right.
r3, born at event time 9 (window $[0,10)$), arrives twelve seconds late at processing time 21, so a processing-time count files it under window $[20,30)$, the wrong window, and drops it from $[0,10)$ where it belongs.1. Two Clocks, One Event Beginner
Fix the vocabulary precisely, because the rest of the chapter rests on it. The event time of a record is the timestamp of the real-world occurrence it represents, assigned at the source and carried in the payload; it never changes no matter how many machines the record passes through. The processing time of a record is the reading of the processing operator's local clock at the instant the operator handles the record; it depends entirely on when the record happens to arrive. There is a third clock worth naming, ingestion time, the instant the record entered the streaming system at its edge, which sits between the two and is sometimes used as a cheap approximation of event time. For correctness arguments we only need the first two.
The gap between them, $\text{skew} = t_{\text{proc}} - t_{\text{event}}$, is never negative (you cannot observe an event before it happens) and is never constant. It is the sum of every delay the record met on its way: serialization at the device, the network hop, time queued in a broker, time blocked behind a partition or a retry, and time spent in a buffer on a phone that was in airplane mode. Each of those varies per record, so skew is a random, often heavy-tailed quantity. We can write the event-time window assignment of a record as a function of its own timestamp, $$\text{win}(r) = \left\lfloor \frac{t_{\text{event}}(r)}{W} \right\rfloor, \qquad \text{skew}(r) = t_{\text{proc}}(r) - t_{\text{event}}(r) \ge 0,$$ for a fixed window length $W$. A processing-time pipeline instead computes $\lfloor t_{\text{proc}}(r)/W \rfloor$, which equals $\text{win}(r)$ only when the skew happens to keep the record inside the same window boundary. Whenever skew pushes a record across a boundary, the two assignments disagree, and the disagreement is exactly the bug.
A small constant skew would be harmless: every record would shift by the same amount and windows would simply be relabeled. The damage comes because skew is variable and unbounded. A record delayed past a window boundary is not counted "a little late"; it is counted in the wrong window and missing from the right one, so two aggregates are wrong at once, one inflated and one deflated. No amount of downstream smoothing fixes a record that was added to the wrong bucket. The only repair is to route by event time, which forces the system to wait for, or later reconcile, records that have not arrived yet.
2. Why Distributed Streams Arrive Out of Order Beginner
Out-of-order and delayed arrival is not an exotic failure; it is the default behavior of any stream whose producers are spread across machines, and the reasons are exactly the distributed-systems properties built up in Chapter 2. There is no global clock, so two producers stamp their events against slightly different local clocks and against an ingestion path of different length. There is no global order, so records from a fast producer overtake records from a slow one even when the slow one's events happened first. And there are partial failures: a network partition, a broker retry, or a consumer rebalance holds a batch of records and then releases them in a burst, long after their event time. The same ordering and coordination concerns that made distributed agreement hard in Chapter 2 reappear here as timestamps that refuse to arrive in order.
Mobile and edge producers make the tail of the skew distribution enormous. A phone that loses connectivity in a tunnel buffers its events locally and flushes them minutes later when signal returns; an IoT sensor on an intermittent uplink can be hours behind. These are not corner cases to be waved away, they are the steady state of consumer and sensor telemetry. So a stream operator must assume that at any instant the records it has seen are an incomplete and unordered sample of the records whose event time has already passed. The window it would like to close may still be missing data that is, physically, already on its way.
Operators of large mobile analytics pipelines have a running joke about the "event from yesterday": a record whose event time is a full day before its arrival because a phone sat in a drawer with a dead battery, then powered on, reconnected, and dutifully flushed its buffer. By processing time it is brand-new. By event time it belongs to a window you closed and reported on twenty-four hours ago. Every event-time system has to decide, in advance, what it will do when yesterday knocks.
3. The Same Stream, Counted Two Ways Intermediate
The cleanest way to feel the difference is to count the same five records under each clock and watch the totals diverge. The program below defines five readings, each with an event time and a processing time. Four of them keep a one-second skew; one, r3, is a mobile straggler that happened at event time 9 but did not arrive until processing time 21. We bucket the records into ten-second windows twice, once keyed on processing time and once on event time, and print both tallies. This is the concrete version of Figure 9.3.1.
import random
from collections import defaultdict
# Five sensor readings. Each is (name, event_time, processing_time).
# event_time : the second the reading actually happened (device clock)
# proc_time : the second our server received and counted it (system clock)
# A 10-second tumbling window groups readings into [0,10), [10,20), [20,30).
random.seed(7)
events = [
("r1", 4, 5), # in order: born t=4, seen t=5
("r2", 7, 8), # in order: born t=7, seen t=8
("r3", 9, 21), # LATE: born t=9, seen t=21 (mobile was offline)
("r4", 12, 13), # in order: born t=12, seen t=13
("r5", 18, 19), # in order: born t=18, seen t=19
]
WIN = 10
def window_of(t): # which 10s window does timestamp t fall in?
base = (t // WIN) * WIN
return f"[{base},{base+WIN})"
by_proc = defaultdict(int)
by_event = defaultdict(int)
for name, et, pt in events:
by_proc[window_of(pt)] += 1 # count by when the SYSTEM saw it
by_event[window_of(et)] += 1 # count by when it actually HAPPENED
print("event event_time proc_time skew proc_window event_window")
for name, et, pt in events:
print(f"{name:6} {et:9} {pt:9} {pt-et:5} {window_of(pt):11} {window_of(et)}")
print()
print("Window counts by PROCESSING time:", dict(sorted(by_proc.items())))
print("Window counts by EVENT time :", dict(sorted(by_event.items())))
print()
print("Window [0,10) true count :", by_event["[0,10)"],
"| processing-time says:", by_proc["[0,10)"])
print("Window [20,30) true count :", by_event["[20,30)"],
"| processing-time says:", by_proc["[20,30)"])
collections module, so the only thing that can make the totals differ is which timestamp keys the window.event event_time proc_time skew proc_window event_window
r1 4 5 1 [0,10) [0,10)
r2 7 8 1 [0,10) [0,10)
r3 9 21 12 [20,30) [0,10)
r4 12 13 1 [10,20) [10,20)
r5 18 19 1 [10,20) [10,20)
Window counts by PROCESSING time: {'[0,10)': 2, '[10,20)': 2, '[20,30)': 1}
Window counts by EVENT time : {'[0,10)': 3, '[10,20)': 2}
Window [0,10) true count : 3 | processing-time says: 2
Window [20,30) true count : 0 | processing-time says: 1
r3 double-charging the system: missing where it belongs, present where it does not.Read the two dictionaries side by side. The event-time count is the ground truth: three things happened in the first ten seconds and two in the second, because that is when they happened. The processing-time count moves one of those three into a later window simply because the network was slow, leaving the first window short and inventing activity in a window where, by event time, nothing occurred. Notice that no record was lost or duplicated in transit; the corruption is purely a consequence of keying the window on the wrong clock. Only one of the five records had large skew, and it was enough to make two of three windows wrong.
4. Why Online AI Features Must Use Event Time Intermediate
The five-record toy becomes a production incident the instant the windowed count is a model feature. Consider a fraud detector whose input includes "number of transactions on this card in the last sixty seconds," a textbook velocity feature. Computed on event time, that count answers the question the model was trained to use: how fast is this card really being spent. Computed on processing time, the count answers a different and unstable question: how fast did transactions for this card happen to reach our server. The two diverge precisely during the moments that matter most.
Picture an ingestion backlog: a broker hiccup queues thirty seconds of traffic and then releases it in a two-second burst. By processing time, every card looks like it is being spent at a furious rate, because a half-minute of genuinely spread-out events all land in the same processing-time window. The velocity feature spikes across the board, the model fires false fraud alerts on legitimate cards, and an on-call engineer is paged for an outage that exists only in the clock. The same backlog computed on event time produces the correct, calm velocities, because each transaction is filed under the second it actually occurred regardless of when the burst delivered it. This is the mechanism behind the "fraud feature that double-counts during a backlog": processing-time windowing turns a transport delay into a fake behavioral signal. The general rule follows directly: any feature that is a windowed aggregate (count, sum, rate, distinct-cardinality, time-since-last) must be computed on event time, or it measures your infrastructure's mood instead of the user's behavior.
Section 1.1 showed that data-parallel training is exact because summation does not care how the terms are grouped across workers. Event-time windowing is the streaming analogue: the aggregate over a window is well defined only when records are grouped by the clock that is invariant to the distributed path they took, their event time, not the path-dependent processing time. In both cases scaling out across machines preserves the right answer only if the partitioning key is chosen so that distribution cannot change it. Pick the path-dependent key and the same machinery that made training exact instead makes streaming wrong.
Who: A streaming data engineer on the real-time risk team at a payments company.
Situation: A fraud model consumed a "transactions per card in the last 60 seconds" feature computed in a stream job over tumbling windows.
Problem: Every few weeks the model produced a flood of false-positive fraud blocks for a minute or two, always coinciding with a brief broker or consumer lag, never with real fraud.
Dilemma: Loosen the model threshold and miss real fraud during calm periods, or keep it strict and keep paging on-call for these phantom spikes; neither addressed the cause.
Decision: They traced it to processing-time windowing: a lag flushed a backlog as a burst, so spread-out transactions collapsed into one processing-time window and every card's velocity spiked together.
How: They re-keyed the windows on the transaction's own event-time timestamp from the payload instead of the consumer's arrival time, exactly the change between the two dictionaries in Output 9.3.1, and configured the late-data handling of Section 9.4.
Result: The phantom spikes vanished because a delayed transaction was now filed under the second it occurred, so a backlog no longer manufactured velocity; the genuine fraud-detection rate was unchanged.
Lesson: A windowed feature on processing time measures your pipeline's lag, not your users' behavior. Anchor every streaming aggregate to event time, then deal with lateness deliberately.
5. The Price of Correctness: Waiting for Late Data Intermediate
Event time is correct, but it is not free, and naming its cost sets up the next section. A processing-time window is trivial to close: when the operator's own clock passes the window's end, no more records can possibly fall into that window, so the operator emits the result and moves on. An event-time window enjoys no such luxury. At the instant event time 10 passes, the operator cannot know whether every record with event time below 10 has arrived, because r3, born at event time 9, might still be in flight and will not show up until processing time 21. To emit the exact count for window $[0,10)$, the operator would have to wait until it is certain no earlier record remains, and in a system with unbounded skew that certainty never comes.
So event-time processing forces a decision the processing-time path never faces: how long to wait before declaring a window complete, and what to do with records that arrive after that. Wait forever and you get exact answers but never emit them; wait zero and you are back to processing time. The streaming system's answer is the watermark, an explicit, advancing assertion that "event time has progressed to $T$; expect no more records older than $T$." Watermarks let an operator trade a bounded amount of waiting for a bounded amount of lateness, and they decide the fate of stragglers like r3 that miss the cutoff. That mechanism, including how watermarks are generated, how late records are dropped or side-output or used to retract and recompute, and how it all stays consistent across a distributed operator graph, is the entire subject of Section 9.4. The handling of late and missing data also echoes the fault-tolerance and recovery patterns of the batch world in Chapter 6, where re-execution rather than waiting was the repair.
Code 9.3.1 keyed windows on a timestamp by hand to expose the mechanism. Production stream processors make event-time windowing the default and ask only that you point at the event-time column and declare how much lateness to tolerate. In Spark Structured Streaming, which Chapter 7's DataFrame model extends to unbounded data, the entire choice of clock plus the late-data bound is two method calls:
# df has a column `event_time` (a timestamp parsed from the payload)
from pyspark.sql.functions import window, count
result = (df
.withWatermark("event_time", "30 seconds") # tolerate up to 30s of lateness
.groupBy(window("event_time", "10 seconds")) # EVENT-time tumbling windows
.agg(count("*").alias("n")))
groupBy(window("event_time", ...)), and withWatermark hands the late-data policy of Section 9.4 to the engine, which tracks watermarks per partition and drops only records later than the bound.6. A Working Rule for Choosing the Clock Beginner
The decision is not always "event time, always." Processing time is the right and cheaper choice when the quantity you want is genuinely about the system rather than the world: dashboard freshness, current ingest throughput, queue depth, or an alert that should fire when data stops arriving. For those, the operator's own clock is the source of truth and waiting for late data would defeat the purpose. The dividing line is a single question: does the correct answer depend on when things happened, or on when you saw them? Velocity features, billing aggregates, sessionization, A/B exposure counts, and almost every model feature depend on when things happened, so they take event time. Liveness and throughput monitoring depend on when you saw them, so they take processing time. Stating that question explicitly for every windowed quantity in a pipeline, rather than inheriting whatever clock the framework defaulted to, is the discipline this section asks for, and the federated and on-device settings of Part III only sharpen it, because there the skew between a device's event and the server's observation can stretch to hours.
The event-time versus processing-time tension is an active research and systems frontier as streaming feature stores become standard infrastructure for online machine learning. Point-in-time correctness, the guarantee that a training example sees exactly the feature values that were knowable at its event time and no later ones, is the offline-online consistency property that systems like Feast, Tecton, and Chronon (Airbnb's open-sourced feature platform, 2024) build their APIs around, and it is fundamentally an event-time argument: a leaked future value is a processing-time contamination of the label window. In parallel, the dataflow and watermarking model pioneered by Akidau et al. continues to be refined, with work on watermark accuracy under heavy-tailed skew, on retraction-and-recompute semantics so that late records correct already-emitted aggregates rather than being dropped, and on bounding the training-serving skew that arises when an online feature is computed on event time but a backfill recomputes it on a different boundary. We make the watermark machinery these systems rely on concrete in Section 9.4, and connect event-time features to drift monitoring later in this chapter.
For each windowed quantity, state whether it must be computed on event time or processing time, and give the one-sentence consequence of choosing the other clock: (a) "ad impressions attributed to each campaign per hour" used for billing; (b) "events ingested per second" shown on an operations dashboard; (c) "distinct pages a user viewed in their current session"; (d) "seconds since the last heartbeat from this sensor," used to alert when a device goes silent. Use the dividing question from Section 6 to justify each answer.
Extend Code 9.3.1 into a velocity feature. Generate 200 transactions for a single card whose event times are spread evenly across $[0, 120)$ seconds, then simulate a broker backlog: give every transaction with event time in $[40, 70)$ a processing time of exactly 70 (released in one burst), and every other transaction a skew of one second. Compute "transactions in the trailing 10-second window" keyed first on processing time, then on event time, and print the maximum window count under each clock. Show that the processing-time maximum spikes far above the true event-time maximum, then explain in two sentences why that spike would trigger a fraud model trained on event-time velocities.
Suppose a stream uses tumbling windows of length $W$ and every record's skew is bounded by some maximum $S$. Argue that a record can be misfiled into the wrong window by a processing-time pipeline only if it lies within $S$ of a window boundary, and use this to estimate the fraction of records that a processing-time count gets wrong as a function of $S$ and $W$ for uniformly distributed event times. What does your formula predict as $S \to 0$, and as $S \to W$, and how does it explain why the single late record in Output 9.3.1 (with $S \approx 12$ and $W = 10$) was enough to corrupt two of three windows?