Part II: Distributed Data Processing for AI
Chapter 9: Stream Processing and Online AI

Online Feature Computation

"In training I knew everything about the user. In production I had a hundred milliseconds and a key. Nobody warned me they were supposed to agree."

A Feature That Drifted Between Offline and Online
Big Picture

A model is only as good as the numbers fed to it at the instant it decides, and those numbers are themselves computed by a distributed system that must answer in milliseconds. An online feature is a quantity derived from recent events (a user's spending in the last five minutes, the click rate on an item in the last hour) that a model reads at inference time. Computing it well means doing two hard things at once: maintaining windowed aggregates over an unbounded event stream cheaply enough to keep up, and guaranteeing that the value served in production equals the value the model was trained on. When those two diverge you get training-serving skew, one of the most common and most invisible causes of production model failure. This section builds an online feature from scratch, serves it, and proves the offline and online paths agree.

The previous section put a streaming engine to work on the event stream: windows, watermarks, and continuous aggregation in Section 9.6 turned a never-ending log into a table of running results. This section asks what those results are for. The answer, for most production AI, is features: the inputs a model consumes to make a decision. When a fraud model scores a card swipe, it does not see the raw event alone; it sees engineered signals such as "how much has this card spent in the last five minutes," and that signal must be fresh, correct, and identical to the one the model learned from. Producing such signals continuously, over a distributed stream, and serving them under a latency budget is the discipline of online feature computation.

1. From Events to Features in Real Time Beginner

A feature is a function of data that a model takes as input. In batch machine learning that function runs over a static table; the distributed-preprocessing machinery of Section 8.7 computes it once per training run and stores the result. An online feature is the same idea with time added: the function runs over an event stream and its value changes as new events arrive. The most common form is a windowed aggregate. Given a stream of events $e_1, e_2, \dots$ each carrying a timestamp $t_i$, a sliding-window feature for a key (say a user) at query time $\tau$ is

$$f(\text{user}, \tau) = \mathrm{agg}\big(\{\, v_i : \text{user}(e_i) = \text{user},\ \tau - W < t_i \le \tau \,\}\big),$$

where $W$ is the window length and $\mathrm{agg}$ is a sum, count, mean, max, or similar. The defining property is recency: as $\tau$ advances, old events fall out of the window and new ones enter, so the feature is never computed once and frozen; it is maintained. The engineering goal is to maintain it incrementally, in $O(1)$ amortized work per event, rather than rescanning the window on every query, because at production volumes a rescan per event is hopeless.

Two distinct moments matter, and conflating them is where skew is born. There is the write moment, when an event arrives and the streaming engine updates the aggregate, and the read moment, when a model asks for the current feature value to make a prediction. The write side runs continuously across the cluster; the read side must be a fast key lookup, because the model is on a latency budget. Separating these two responsibilities is exactly what a feature store does, and it is the subject of the next section.

2. The Feature Store and Its Two Halves Beginner

A feature store is the system that computes feature values, stores them, and serves them to both training and inference. Its defining design choice is a split into two physical stores fed, ideally, by one logical definition. The offline store holds the full history of feature values, partitioned and columnar, optimized for the large scans that training needs; it lives on the distributed storage and batch machinery of Chapter 8. The online store holds only the latest value per key in a low-latency key-value system (Redis, DynamoDB, Cassandra) so that an inference request can fetch every feature it needs with a handful of point lookups inside its millisecond budget. Figure 9.7.1 shows how the same event stream feeds both halves.

Event stream swipes, clicks e1 e2 e3 e4 e5 ... Streaming feature windowed aggregate one feature definition f(·) latest per key Online store key-value, low latency Model inference full history Offline store columnar, for training training reads
Figure 9.7.1: The online/offline split of a feature store. One streaming job maintains a windowed aggregate from the event stream and writes the latest value per key to a low-latency online store (read by the model at inference time) while appending the full history to an offline store (scanned for training). The dashed arrow marks the single feature definition $f(\cdot)$ that must drive both halves; when it does not, the offline and online values diverge and the model trains on one distribution and serves on another.

The split is what makes both workloads fast: training gets cheap sequential scans over history, serving gets cheap random lookups of the present. The danger is also born here. Two physical pipelines computing "the same" feature, one in a batch engine over the offline store and one in a streaming engine over the live stream, will drift apart unless they are forced to share a definition. That forced sharing is the central requirement of the whole section.

3. Train/Serve Consistency, or You Are Training a Different Model Intermediate

Training-serving skew is the situation where the feature value a model sees in production differs from the value it would have seen for the same input during training. It is insidious because nothing crashes: the model returns predictions, the dashboards stay green, and accuracy quietly collapses because the model is being asked about points it never learned. The skew usually comes from one of a few sources: the offline feature is computed by different code than the online feature; the online feature includes events the offline one excluded (or vice versa) because of a window-boundary or timezone mismatch; or the offline feature accidentally peeks at the future. The defense is a single rule, stated as a key insight.

Key Insight: One Definition, Two Stores, Identical Values

Train/serve consistency holds when the same feature definition, evaluated for the same key as of the same instant, produces the same value whether it is computed online over the live stream or offline over the historical log. Achieve it by writing the feature logic once and running it in both paths (or by deriving both paths from one declarative specification), never by maintaining two hand-written implementations and hoping they match. If the offline and online numbers for a known key as of a known time are not bit-for-bit equal, you have skew, and the model is being trained and served on two different distributions.

The second subtle requirement is temporal honesty in the offline path. When you build a training set, each training row pairs a label observed at time $\tau$ with the feature values as they were at $\tau$, not as they are now. Joining a label to a feature value computed from events that occurred after $\tau$ leaks the future into the past, and the model learns a relationship that cannot exist at serving time. This is the streaming face of the leakage and correctness discipline introduced for batch pipelines in Section 8.8; here it has a name of its own.

Fun Note: The Model That Predicted Cancellations Perfectly

A team once shipped a churn model that scored a flawless AUC offline and was useless in production. The culprit feature was "number of support tickets in the last 30 days," joined to each account as the value it held today, long after the churn label was set. Customers about to cancel file a lot of tickets, so the feature was essentially reading the answer key. The model had not learned churn; it had learned to look at the future. A point-in-time correct join would have refused it the peek.

4. Point-in-Time Correct Joins Intermediate

The mechanism that enforces temporal honesty is the point-in-time correct join (also called an as-of join). Given a set of label events, each a pair $(\text{key}, \tau)$, and an offline store of timestamped feature values, the join attaches to each label the feature value whose timestamp is the latest one not exceeding $\tau$:

$$\widehat{f}(\text{key}, \tau) = \mathrm{agg}\big(\{\, v_i : \text{key}(e_i) = \text{key},\ \tau - W < t_i \le \tau \,\}\big),$$

which is the very same window definition from Section 1 evaluated at the label time $\tau$ rather than at "now." The inequality $t_i \le \tau$ is the whole game: it admits only events the system could actually have observed by $\tau$, and it excludes anything later. A naive join on key alone, ignoring time, would attach the current feature value to every historical label and reintroduce exactly the leakage the as-of join exists to prevent. Production feature stores implement this join as a first-class operation over the offline store so that a correct training set is the default rather than a thing you must remember to do by hand.

5. Building It From Scratch Intermediate

The code below makes all of this concrete with no framework at all. It maintains a five-minute sliding-window sum of spend per user incrementally as a stream of card swipes arrives (the streaming path), writes the latest value per user into a dictionary that stands in for the online key-value store, and serves a toy flag-if-over-100 model by a single key lookup. It then recomputes the same feature the offline way, directly from the full event log, evaluated point-in-time as of each user's last event, and checks that the two agree. The match is the property that the rest of this section has been arguing for.

import time
from collections import deque, defaultdict

# --- 1. A stream of card-swipe events: (user, unix_ts_seconds, amount) ---
events = [
    ("u_alice", 1000, 12.0), ("u_bob", 1001, 5.0),  ("u_alice", 1100, 40.0),
    ("u_alice", 1180, 7.0),  ("u_bob", 1200, 9.0),  ("u_alice", 1250, 30.0),
    ("u_bob", 1260, 4.0),    ("u_alice", 1330, 22.0),("u_alice", 1400, 11.0),
    ("u_bob", 1500, 6.0),    ("u_alice", 1520, 50.0),
]
WINDOW = 300  # a 5-minute (300-second) sliding window

# --- 2. STREAMING path: maintain a per-user sliding window incrementally ---
# This is what runs in production: one pass, O(1) amortized per event, and after
# each event the freshest feature value is written to the online store.
online_store = {}                       # user -> latest feature value (key-value)
windows = defaultdict(deque)            # user -> deque of (ts, amount) still in window
running_sum = defaultdict(float)        # user -> sum of amounts in the window

def streaming_feature(user, ts, amount):
    dq = windows[user]
    dq.append((ts, amount)); running_sum[user] += amount
    while dq and dq[0][0] <= ts - WINDOW:        # evict events that fell out
        old_ts, old_amt = dq.popleft(); running_sum[user] -= old_amt
    return running_sum[user]

for user, ts, amount in events:
    feat = streaming_feature(user, ts, amount)
    online_store[user] = feat            # serve-time read will hit this dict

# --- 3. SERVE: an inference call reads the feature by key, no recomputation ---
def serve(user):
    x = online_store.get(user, 0.0)      # single low-latency key lookup
    return 1 if x > 100.0 else 0         # toy "flag if >$100 spent in 5 min" model

print("online store (amount spent, last 5 min, as of latest event):")
for u in sorted(online_store):
    print(f"  {u:8s} feature={online_store[u]:6.1f}  ->  model says {serve(u)}")

# --- 4. OFFLINE path: same feature DEFINITION, recomputed from the full log ---
# This is the training-time computation. If it disagrees with the streaming value
# for the same (user, as-of time), the model trains and serves on skewed inputs.
def offline_feature(user, as_of_ts):
    return sum(a for (u, t, a) in events
               if u == user and as_of_ts - WINDOW < t <= as_of_ts)

# point-in-time correct check: recompute each user's feature as of THEIR last event
last_ts = {u: max(t for (uu, t, a) in events if uu == u) for u in online_store}
print("\ntrain / serve consistency (online vs offline recomputation):")
ok = True
for u in sorted(online_store):
    online_v = online_store[u]
    offline_v = offline_feature(u, last_ts[u])
    match = abs(online_v - offline_v) < 1e-9
    ok = ok and match
    print(f"  {u:8s} online={online_v:6.1f}  offline={offline_v:6.1f}  match={match}")
print("\nno training-serving skew:" , ok)
Code 9.7.1: An online feature, end to end and framework-free. The streaming path maintains the window incrementally and writes to online_store; serve reads one key; offline_feature recomputes the identical definition from the full log with the point-in-time bound as_of_ts - WINDOW < t <= as_of_ts; the final loop confirms the two paths agree.
online store (amount spent, last 5 min, as of latest event):
  u_alice  feature= 113.0  ->  model says 1
  u_bob    feature=  10.0  ->  model says 0

train / serve consistency (online vs offline recomputation):
  u_alice  online= 113.0  offline= 113.0  match=True
  u_bob    online=  10.0  offline=  10.0  match=True

no training-serving skew: True
Output 9.7.1: The streaming and offline computations return the same feature for every user, so the consistency flag is True. Alice's five-minute spend of 113.0 (the 30, 22, 11, and 50 swipes still inside the 300-second window as of her last event at 1520; the earlier 40 and 7 have aged out) crosses the model's threshold; Bob's 10.0 does not.

The win is that the offline number was produced by the same window definition as the online number, evaluated at the same instant per user, so the model trains and serves on one distribution. The deque in the streaming path is what keeps the maintenance cheap: each event is pushed once and popped at most once, giving the $O(1)$ amortized cost that Section 1 demanded, while the offline recomputation rescans the log because at training time the full history is available and a one-pass scan is acceptable. Two implementations, one definition, identical values.

Library Shortcut: Feast Defines the Feature Once for Both Stores

Code 9.7.1 hand-built the window, the online store, and the offline recomputation, and it is the matching-by-hand of those two paths that production teams get wrong. An open-source feature store such as Feast inverts the burden: you declare the feature once, and the framework materializes it into the online store and serves the point-in-time correct version for training, so the offline and online values cannot diverge by construction.

# pip install feast
from feast import FeatureStore

store = FeatureStore(repo_path=".")

# SERVING: one call fetches the latest values from the online key-value store
features = store.get_online_features(
    features=["user_spend:amount_5min"],
    entity_rows=[{"user": "u_alice"}],
).to_dict()                              # -> {'amount_5min': [113.0], ...}

# TRAINING: the SAME definition, joined point-in-time to labels (no future leak)
training_df = store.get_historical_features(
    entity_df=labels_df,                 # columns: user, event_timestamp, label
    features=["user_spend:amount_5min"],
).to_df()
Code 9.7.2: The same online feature in Feast. get_online_features is the millisecond key lookup; get_historical_features runs the point-in-time correct join for training. One declared definition drives both, collapsing the two hand-written paths of Code 9.7.1 into a single source of truth and eliminating the class of skew bugs by construction.
Practical Example: The Fraud Feature That Was Right Offline and Wrong Online

Who: A machine learning engineer on the payments-risk team at a digital bank.

Situation: A fraud model used "transaction amount in the last five minutes" as a feature, computed in a nightly Spark job for training and in a separate streaming job for serving.

Problem: Offline evaluation showed strong precision, but live blocked-fraud rates were far below the offline estimate, and analysts could not reproduce the model's online scores.

Dilemma: Trust the offline metrics and push for a more aggressive threshold, or suspect the features themselves and audit two pipelines that both looked correct in isolation.

Decision: They audited the features, replaying a day of events and comparing the streaming feature value against an offline recomputation for the same card as of the same second, the exact check at the bottom of Code 9.7.1.

How: The comparison exposed a window-boundary mismatch: the Spark job used a calendar-aligned five-minute bucket while the streaming job used a true sliding window, so the two disagreed near bucket edges. They replaced both with one declared sliding-window definition served from a single feature store.

Result: Online and offline values matched to the cent, the live fraud-catch rate rose to meet the offline estimate, and scores became reproducible across the two environments.

Lesson: Two correct-looking feature pipelines are still a skew bug waiting to happen. Consistency is a property you enforce with one shared definition and a value-level audit, not one you assume.

6. From One Feature to a Real-Time Personalization Engine Advanced

A single windowed sum is the atom; a recommendation system is the molecule. Real-time personalization reads dozens of online features per request (recent clicks, dwell time, category affinities computed over sliding windows) and combines them with embeddings to rank items in milliseconds. The same online/offline split governs it: the ranking model trains on point-in-time correct histories from the offline store and serves on the latest values from the online store, and any skew between them degrades recommendations exactly as it degrades fraud scores. The embeddings such systems consume are themselves a distributed feature, sharded across machines as built in Chapter 11; we develop the full real-time personalization stack, online features and sharded embeddings together, as a case study in Chapter 38.

Research Frontier: Real-Time Feature Platforms (2024 to 2026)

The feature store has matured from a research idea into a competitive platform category, and the active work is on closing the consistency gap automatically. Open-source Feast has grown a richer streaming-materialization path and on-demand feature views that compute request-time transformations consistently across training and serving, while managed platforms such as Tecton push declarative pipelines that compile a single feature definition into both a streaming job and a backfill so the two cannot drift. A parallel thread targets freshness and cost: incremental materialization that updates only changed windows, and tiered online stores that keep hot keys in memory and spill cold keys to cheaper storage. The shared research question is how to guarantee point-in-time correctness and online/offline equality by construction, treating training-serving skew as a property the platform certifies rather than one each team rediscovers in production. The streaming-feature/online-store pattern of Code 9.7.1 is the kernel every one of these systems scales out.

With features computed and served consistently, the remaining question is how the model that consumes them is itself deployed across machines under the same latency budget. That is where this chapter turns next: a feature lookup is one stage of a real-time inference pipeline, and the pipeline as a whole, feature fetch, model call, and post-processing, must be distributed to meet its deadline. Section 9.8 builds that distributed real-time inference pipeline.

Exercise 9.7.1: Spot the Skew Conceptual

For each pairing of an offline (training) feature definition and an online (serving) feature definition, state whether training-serving skew is present and name its source: (a) offline uses a calendar-aligned hourly bucket, online uses a true sliding one-hour window; (b) both use a sliding 24-hour window, but the offline timestamps are in UTC and the online ones in the server's local time; (c) offline joins each label to the feature value computed from the entire history of the user, online uses only events up to the request time. Explain why each mismatch would let the model look better offline than it performs online.

Exercise 9.7.2: Add a Second Aggregate and Re-Check Consistency Coding

Extend Code 9.7.1 with a second online feature, the count of transactions in the same five-minute window, maintained incrementally in the streaming path alongside the sum. Write it to the online store and recompute it offline from the full log point-in-time as of each user's last event, then assert that both new values match across the two paths. Confirm that your incremental count uses $O(1)$ amortized work per event (no rescan of the window) and explain why the eviction loop already gives you the count for free.

Exercise 9.7.3: The Cost of Late Events on the As-Of Join Analysis

Suppose events can arrive up to $L$ seconds late (out of timestamp order), as in the watermark setting of Section 9.4. Argue why a point-in-time correct offline join evaluated immediately at label time $\tau$ may disagree with the online value that the serving system eventually settled on once the late events arrived. Propose a rule for how long the offline materialization should wait past $\tau$ before it computes the feature so that the offline and online values converge, and state the trade-off this waiting period imposes between feature freshness and consistency.