Part II: Distributed Data Processing for AI
Chapter 7: Spark and Distributed DataFrames

PySpark for AI Workloads

"They told me to apply a Python function to a billion rows. I applied it one row at a time, called Python a billion times, and serialized my way to retirement."

A JVM Executor That Discovered Arrow Too Late
Big Picture

In an AI pipeline, Spark rarely trains the final deep model; it does the distributed work that feeds the model, turning raw rows into clean, featurized, sharded tensors that a GPU cluster can consume. PySpark is the Python gateway to that work, but it sits on a JVM engine, and the boundary between Python and the JVM is where naive AI code goes to die: a per-row Python function forces the engine to serialize one value, cross the language boundary, call Python, and serialize back, billions of times. The fix is to move whole columns across that boundary at once using Apache Arrow, so a single Python call sees a batch and NumPy does the work at native speed. This section shows that boundary, measures the cost of crossing it the wrong way, and places Spark in the larger pipeline where it prepares data and then hands sharded files to deep-learning training that lives on different machines entirely.

The previous sections of this chapter treated Spark as a distributed data engine: DataFrames, lazy DAGs, partitioning, and the shuffle that Section 7.7 dissected. That machinery is general; it processes web logs and financial records as happily as machine-learning features. This section narrows the lens to AI specifically. The question is no longer "how does Spark execute a join?" but "how do I run my Python and NumPy logic, the code that actually computes a feature or applies a model, across a cluster without paying a crippling per-row tax?" The answer turns on one architectural fact about PySpark, and on a serialization format named Arrow that makes the fact survivable.

1. PySpark Sits on a JVM, and That Boundary Has a Price Beginner

Spark is written in Scala and runs on the Java Virtual Machine. When you write PySpark, your Python driver program builds a query plan and ships it to JVM executors that do the heavy lifting in their own process. As long as your transformation is expressible in Spark's built-in DataFrame operations (a filter, a join, a group-by, an arithmetic column expression), the executor runs everything inside the JVM and Python is never invoked on the data at all; your Python code only described the plan. This is the fast path, and most data preparation should stay on it.

The trouble starts when the logic you need lives in Python: a feature that calls a NumPy routine, a tokenizer, a small model applied per record, anything Spark's expression language cannot represent. To run that code, the executor must hand each value out of the JVM, into a Python worker process, and back. A plain Python user-defined function (UDF) does this one row at a time: serialize a value, send it across the process boundary, call your function, serialize the result, send it back. The compute inside your function might be trivial; the cost is the billions of boundary crossings around it. Figure 7.8.1 shows where this boundary sits and why the row-at-a-time path is so much narrower than the batched one.

One Spark executor: the JVM-to-Python boundary JVM executor columnar DataFrame shard runs built-in expressions natively process boundary (serialize here) Python worker your NumPy / model code row-at-a-time UDF: one value per crossing Arrow batch: a whole column per crossing pandas / np.ndarray native batch compute
Figure 7.8.1: The boundary that governs PySpark AI performance. Built-in DataFrame expressions stay inside the JVM (left). When logic must run in Python, data crosses the process boundary in the middle. A row-at-a-time UDF (thin arrow) crosses once per value and pays serialization and a Python call billions of times; an Arrow-backed vectorized UDF (wide arrow) moves an entire column in one crossing, and Python sees it as a NumPy array.

2. Vectorized UDFs: Move Columns, Not Rows Intermediate

The remedy is to stop crossing the boundary one value at a time. A vectorized UDF (in PySpark, a pandas_udf) receives not a single value but a whole batch of them, delivered as a pandas Series or NumPy array through Apache Arrow's columnar memory layout. Arrow lets the JVM and the Python worker share the same in-memory representation of a column, so the transfer is a bulk copy of contiguous bytes rather than a per-row serialize-and-deserialize. Your function is then called once per batch, not once per row, and inside it you run NumPy or pandas at native, vectorized speed. The number of boundary crossings drops from the number of rows to the number of batches, which is smaller by a factor of tens of thousands.

To see the size of that effect without needing a cluster, we can isolate the part that matters: the difference between calling a Python function once per element and running the identical math once over a whole batch. The code below applies a standardize-then-clip feature transform, the kind of column preparation a Spark feature pipeline performs before training, two ways on four million values: a pure-Python row-at-a-time loop, then a single vectorized NumPy expression. Both compute exactly the same numbers; only the boundary-crossing pattern differs.

import numpy as np
import time

# A standardize-then-clip feature transform, the kind Spark applies to a
# column of model inputs before training. We run it two ways on the same data.
rng = np.random.default_rng(0)
N = 4_000_000
col = rng.standard_normal(N).astype(np.float64)
mean, std = col.mean(), col.std()

def transform_scalar(x):
    # Pure-Python per-row logic: this is what a row-at-a-time UDF runs.
    z = (x - mean) / std
    if z > 3.0:  return 3.0
    if z < -3.0: return -3.0
    return z

# Row-at-a-time path: one Python function call per element.
t0 = time.perf_counter()
out_row = [transform_scalar(x) for x in col]
t_row = time.perf_counter() - t0

# Vectorized path: the identical math expressed once over the whole batch,
# the way an Arrow-backed pandas UDF receives and returns a column.
t0 = time.perf_counter()
z = (col - mean) / std
out_vec = np.clip(z, -3.0, 3.0)
t_vec = time.perf_counter() - t0

print(f"rows transformed     : {N:,}")
print(f"row-at-a-time UDF    : {t_row:.3f} s")
print(f"vectorized UDF       : {t_vec:.3f} s")
print(f"speedup              : {t_row / t_vec:.1f}x")
print(f"max abs difference   : {np.max(np.abs(np.asarray(out_row) - out_vec)):.2e}")
Code 7.8.1: The cost of crossing the language boundary one row at a time, isolated in pure Python so it runs anywhere. The scalar loop stands in for a row-at-a-time UDF; the NumPy block stands in for the batched compute an Arrow-backed pandas_udf performs. The final line confirms the two paths produce identical values.
rows transformed     : 4,000,000
row-at-a-time UDF    : 1.079 s
vectorized UDF       : 0.022 s
speedup              : 49.3x
max abs difference   : 0.00e+00
Output 7.8.1: Batching the same transform is about fifty times faster on this machine while changing not a single output value (the difference is exactly zero). On a real cluster the gap is usually larger still, because the vectorized path also avoids the per-row serialization across the JVM-to-Python boundary that this single-process measurement does not even include.

The forty-nine-fold gap in Output 7.8.1 comes entirely from amortizing per-element Python overhead, and it is a lower bound on what you gain inside Spark, where the row-at-a-time path additionally pays serialization across the process boundary that Code 7.8.1 never touched. The lesson generalizes past this one transform: whenever your AI logic must run in Python on a Spark column, express it so a single call sees a whole batch.

Key Insight: The Boundary, Not the Math, Is the Bottleneck

In a PySpark AI pipeline, the expensive thing is rarely the arithmetic inside your feature function; it is the act of moving data across the JVM-to-Python boundary and invoking Python around each value. A row-at-a-time UDF pays that cost once per row; an Arrow-backed vectorized UDF pays it once per batch and runs the math at NumPy speed. Before reaching for any UDF at all, check whether a built-in DataFrame expression can do the job inside the JVM, where the boundary is never crossed; only when the logic is genuinely Python-only should you write a UDF, and then it should always be vectorized.

Library Shortcut: pandas_udf Turns Code 7.8.1 Into a Cluster Operation

Code 7.8.1 measured the batched-versus-per-row gap in one process. In PySpark you express the batched path with the pandas_udf decorator, and Spark handles the Arrow transfer, the batching, and the distribution across executors for you. The same transform, now running on a distributed DataFrame:

import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf("double")                     # Arrow-backed; receives a batch, not a row
def standardize_clip(col: pd.Series) -> pd.Series:
    z = (col - MEAN) / STD                # vectorized over the whole batch
    return z.clip(-3.0, 3.0)              # returns a Series, one Arrow crossing

df = df.withColumn("feat", standardize_clip("raw_value"))   # lazy, runs on executors
Code 7.8.2: The same standardize-then-clip transform as Code 7.8.1, now as a vectorized pandas_udf that runs across a cluster. The decorator and one column expression replace all the manual batching and serialization logic; Spark moves each partition's column to a Python worker through Arrow, calls standardize_clip once per batch, and writes the result back. A plain @udf here would run the row-at-a-time path of Output 7.8.1 on every executor.

3. Spark MLlib: Classical ML That Already Lives on the Cluster Beginner

Not every model needs a UDF. For classical machine learning, Spark ships MLlib, a library of distributed estimators and transformers that run natively on the JVM and never touch the Python boundary on the data path. MLlib's Pipeline abstraction chains feature transformers and an estimator into one object that fits across the whole cluster: a StringIndexer and OneHotEncoder prepare categorical columns, a VectorAssembler packs features into a single vector column, and an estimator such as LogisticRegression or GBTClassifier trains on the partitioned DataFrame using distributed optimization. The whole sequence is lazy and benefits from the same DAG planning as any other Spark job.

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression

idx = StringIndexer(inputCol="country", outputCol="country_idx")
ohe = OneHotEncoder(inputCols=["country_idx"], outputCols=["country_vec"])
asm = VectorAssembler(inputCols=["country_vec", "feat"], outputCol="features")
lr  = LogisticRegression(featuresCol="features", labelCol="label")

pipeline = Pipeline(stages=[idx, ohe, asm, lr])    # one distributed fit
model = pipeline.fit(train_df)                      # trains across all executors
preds = model.transform(test_df)                    # distributed scoring
Code 7.8.3: A Spark MLlib pipeline that indexes a categorical column, one-hot encodes it, assembles a feature vector, and fits a logistic-regression model, all distributed across the cluster without a single Python UDF on the data path. This is the right tool when the model itself is classical; deep learning needs the handoff described in Section 4.

MLlib is the natural home for distributed linear models, tree ensembles, clustering, and the like, and it is a full topic in its own right. The distributed-optimization machinery underneath these estimators, and the gradient-boosting and tree methods that scale particularly well on Spark, are developed in Chapter 12. For this chapter the point is narrower: when your model is classical, you often do not need a UDF at all, because MLlib already distributes the fit. The interesting case for AI at scale is the one MLlib does not cover, deep learning, and that is where Spark changes role.

4. The Production Pattern: Spark Preps, the GPU Cluster Trains Intermediate

Deep-learning training does not run inside Spark. A modern training job lives on GPUs, synchronizes gradients with the collectives of Chapter 4, and is driven by PyTorch, not by a JVM query engine. The dominant production pattern therefore splits the work cleanly across two systems: Spark performs the distributed, data-heavy preparation (cleaning, joining, deduplicating, featurizing, and shaping the dataset), then writes the result to sharded columnar files that a separate deep-learning cluster reads. Spark's job ends where the tensors begin. Figure 7.8.2 traces this handoff.

Spark cluster clean, join, dedup, featurize (Sec 1-3) vectorized UDFs shard 1.parquet shard 2.parquet shard K.parquet sharded columnar files GPU worker 1 loader + model GPU worker 2 loader + model GPU worker K loader + model all-reduce gradient sync (Ch 4, 15)
Figure 7.8.2: The Spark-to-training handoff. Spark does the distributed feature engineering of Sections 1 to 3 and writes sharded columnar files (one shard per intended worker). A separate GPU cluster reads those shards through its data loaders and runs data-parallel training, synchronizing gradients with the all-reduce of Chapter 4. The two systems never share a process; the sharded files are the contract between them.

This division of labor is not a workaround; it is the design. Spark is built for wide, shuffle-heavy data transformation on commodity CPUs, and a GPU spends most of a Spark job idle. A training cluster is built to keep accelerators saturated and is wasteful at the irregular, IO-bound work of data cleaning. Letting each system do what it is good at, and connecting them through sharded files on shared storage, is how the largest training pipelines are actually built. The seam between them, getting bytes off storage and into a GPU fast enough to keep it fed, is its own hard problem: the data-loading bottleneck that Chapter 8 is devoted to, and the data-parallel training loop on the far side is the subject of Chapter 15.

Thesis Thread: One Job, Split Across Two Distributed Systems

The handoff in Figure 7.8.2 is the book's distribution thesis applied at the level of whole systems rather than a single algorithm. The same dataset is distributed twice for two different reasons: across Spark executors because the data does not fit on one machine and the featurization must scale on CPUs, then across GPU workers because the model training must scale on accelerators. The sharded files are the boundary where one form of distribution ends and another begins, and the shuffle that built those shards in Section 7.7 reappears, transformed, as the all-reduce that synchronizes the workers reading them. Recognizing where one distribution axis hands off to the next is exactly the design fluency the book is building toward.

Fun Note: The GPU That Watched Spark Work

A common and expensive mistake is to run Spark feature engineering on a GPU-equipped cluster "to save a step." The accelerators sit at near-zero utilization for the entire shuffle-heavy job while you pay GPU-hour prices for CPU-bound work, then the actual training cannot start until the prep finishes anyway. The cheap fix is the boring one in Figure 7.8.2: prep on CPU instances, write shards, and only then wake the GPUs.

Practical Example: The Feature Pipeline That Starved the GPUs

Who: A data engineer at a streaming-media company building the training pipeline for a recommendation model.

Situation: A nightly job featurized two billion interaction records in Spark, then trained a deep ranking model on an eight-GPU node.

Problem: The featurization step ran a per-row Python UDF that called a small NumPy normalization, and the Spark stage alone took over five hours, leaving the GPUs idle and waiting.

Dilemma: Rewrite the feature logic in Spark's native expressions where possible and convert the rest to a vectorized pandas_udf, or rent a larger Spark cluster to brute-force the slow UDF at higher cost.

Decision: They converted the normalization to an Arrow-backed pandas_udf and pushed the simple arithmetic into built-in column expressions, exactly the move Output 7.8.1 quantifies, rather than paying for more machines to run slow code.

How: The row-at-a-time UDF became a batched function receiving a pandas Series; two derived columns moved to native Spark expressions; the output was written as sharded Parquet for the training loader.

Result: The featurization stage fell from over five hours to about twenty minutes on the same cluster, the GPUs started training far earlier in the window, and the numerical output was unchanged.

Lesson: When a Spark AI job is slow, suspect the language boundary before the cluster size. A vectorized UDF often turns a "we need more machines" problem into a "we wrote the UDF wrong" problem.

5. Why Arrow Is the Enabling Layer Intermediate

It is worth naming explicitly what makes vectorized UDFs possible, because the same idea recurs throughout the data side of AI. Apache Arrow defines a language-independent, columnar in-memory format. When Spark hands a batch to a Python worker as Arrow, the JVM and Python agree, byte for byte, on how that column is laid out in memory, so the transfer is a bulk memory copy (often a shared buffer) rather than a value-by-value serialization through a slow encoding. NumPy and pandas can wrap that buffer directly. The cost model changes from "pay once per row to serialize" to "pay once per batch to copy a contiguous block," which is why the boundary stops being the bottleneck once batches are large enough.

Arrow is the same format underneath the columnar files Spark writes for the training handoff, so the data crosses from Spark to the training loader without a re-encoding either. This is not a Spark-only convenience; Arrow has become the lingua franca that lets the data-prep world and the deep-learning world exchange columns cheaply, and it returns in Chapter 8 as the on-disk and over-the-wire format for sharded training data. When you choose a vectorized UDF over a row-at-a-time one, you are choosing to ride Arrow instead of fighting it.

Research Frontier: Closing the Spark-to-GPU Gap (2024 to 2026)

The clean two-system handoff of Figure 7.8.2 is under active pressure to become tighter and, where it pays, GPU-accelerated end to end. The RAPIDS Accelerator for Apache Spark pushes DataFrame operations and shuffles onto GPUs, and recent releases extend GPU execution to more of the feature-engineering path so that prep and training can share accelerators when the workload justifies it. On the loading side, NVIDIA DALI and the Ray Data ecosystem (2024 to 2025) blur the boundary further by streaming Arrow batches directly from columnar storage into GPU training without a separate Spark write step, attacking the data-loading bottleneck of Chapter 8 head on. A parallel thread treats the entire pipeline as one schedulable computation, so the featurization in Section 2 and the data-parallel training in Section 4 can overlap instead of running strictly in sequence. The open question is when fusing the two systems beats keeping them separate; the answer still depends on how shuffle-heavy the prep is relative to how compute-heavy the model is, which is exactly the cost reasoning Section 7.9 makes precise.

With the language boundary understood and the prep-then-train handoff in place, the remaining question for a PySpark AI workload is the one any distributed system eventually asks: where is the time actually going, and how do I make it go away? Partition counts, shuffle sizes, serialization formats, and executor memory all bear on whether a feature pipeline like the one in this section runs in twenty minutes or five hours. That measurement-and-tuning discipline is the subject of Section 7.9.

Exercise 7.8.1: Native, Vectorized, or Row-at-a-Time? Conceptual

For each transformation, decide whether it should be a built-in Spark DataFrame expression (no boundary crossing), an Arrow-backed pandas_udf (one crossing per batch), or whether a plain row-at-a-time UDF is unavoidable, and justify each choice using the boundary argument of Section 1: (a) multiply a numeric column by a constant and add another column; (b) standardize a column using its global mean and standard deviation; (c) run a pretrained Python tokenizer that returns a variable-length list of token ids per text row; (d) clip a column to the range $[-3, 3]$. For any case where you chose a UDF, explain what stops Spark's native expression language from doing the job inside the JVM.

Exercise 7.8.2: Measure the Batch-Size Curve Coding

Extend Code 7.8.1 so that, instead of comparing only the full per-row loop against the full vectorized call, it processes the data in batches of size $B$ and calls the vectorized transform once per batch, for $B \in \{1, 10, 100, 10^3, 10^4, 10^5\}$. Plot or print elapsed time against $B$. Identify the batch size beyond which further increases stop helping, and explain the shape of the curve in terms of fixed per-call Python overhead amortized over $B$ elements. Relate your finding to why a real pandas_udf exposes a configurable Arrow batch size rather than processing one row or one whole partition at a time.

Exercise 7.8.3: Size the Handoff Analysis

A Spark job featurizes $2 \times 10^9$ records into feature vectors of $256$ float32 values each and writes them as sharded files for a data-parallel training run on $K = 16$ GPU workers. Estimate the total featurized dataset size in bytes, the size of one shard if shards are equal, and, given a shared-storage read bandwidth of $2$ gigabytes per second per worker, the time for all workers to read one full epoch in parallel. Compare that read time to a plausible per-epoch GPU compute time of a few minutes, and argue from the two numbers whether the data loader of Chapter 8 or the training compute is more likely to bound this pipeline. State which way your conclusion flips if the read bandwidth is ten times lower.