"They told me to apply a Python function to a billion rows. I applied it one row at a time, called Python a billion times, and serialized my way to retirement."
A JVM Executor That Discovered Arrow Too Late
In an AI pipeline, Spark rarely trains the final deep model; it does the distributed work that feeds the model, turning raw rows into clean, featurized, sharded tensors that a GPU cluster can consume. PySpark is the Python gateway to that work, but it sits on a JVM engine, and the boundary between Python and the JVM is where naive AI code goes to die: a per-row Python function forces the engine to serialize one value, cross the language boundary, call Python, and serialize back, billions of times. The fix is to move whole columns across that boundary at once using Apache Arrow, so a single Python call sees a batch and NumPy does the work at native speed. This section shows that boundary, measures the cost of crossing it the wrong way, and places Spark in the larger pipeline where it prepares data and then hands sharded files to deep-learning training that lives on different machines entirely.
The previous sections of this chapter treated Spark as a distributed data engine: DataFrames, lazy DAGs, partitioning, and the shuffle that Section 7.7 dissected. That machinery is general; it processes web logs and financial records as happily as machine-learning features. This section narrows the lens to AI specifically. The question is no longer "how does Spark execute a join?" but "how do I run my Python and NumPy logic, the code that actually computes a feature or applies a model, across a cluster without paying a crippling per-row tax?" The answer turns on one architectural fact about PySpark, and on a serialization format named Arrow that makes the fact survivable.
1. PySpark Sits on a JVM, and That Boundary Has a Price Beginner
Spark is written in Scala and runs on the Java Virtual Machine. When you write PySpark, your Python driver program builds a query plan and ships it to JVM executors that do the heavy lifting in their own process. As long as your transformation is expressible in Spark's built-in DataFrame operations (a filter, a join, a group-by, an arithmetic column expression), the executor runs everything inside the JVM and Python is never invoked on the data at all; your Python code only described the plan. This is the fast path, and most data preparation should stay on it.
The trouble starts when the logic you need lives in Python: a feature that calls a NumPy routine, a tokenizer, a small model applied per record, anything Spark's expression language cannot represent. To run that code, the executor must hand each value out of the JVM, into a Python worker process, and back. A plain Python user-defined function (UDF) does this one row at a time: serialize a value, send it across the process boundary, call your function, serialize the result, send it back. The compute inside your function might be trivial; the cost is the billions of boundary crossings around it. Figure 7.8.1 shows where this boundary sits and why the row-at-a-time path is so much narrower than the batched one.
2. Vectorized UDFs: Move Columns, Not Rows Intermediate
The remedy is to stop crossing the boundary one value at a time. A vectorized UDF (in PySpark, a pandas_udf) receives not a single value but a whole batch of them, delivered as a pandas Series or NumPy array through Apache Arrow's columnar memory layout. Arrow lets the JVM and the Python worker share the same in-memory representation of a column, so the transfer is a bulk copy of contiguous bytes rather than a per-row serialize-and-deserialize. Your function is then called once per batch, not once per row, and inside it you run NumPy or pandas at native, vectorized speed. The number of boundary crossings drops from the number of rows to the number of batches, which is smaller by a factor of tens of thousands.
To see the size of that effect without needing a cluster, we can isolate the part that matters: the difference between calling a Python function once per element and running the identical math once over a whole batch. The code below applies a standardize-then-clip feature transform, the kind of column preparation a Spark feature pipeline performs before training, two ways on four million values: a pure-Python row-at-a-time loop, then a single vectorized NumPy expression. Both compute exactly the same numbers; only the boundary-crossing pattern differs.
import numpy as np
import time
# A standardize-then-clip feature transform, the kind Spark applies to a
# column of model inputs before training. We run it two ways on the same data.
rng = np.random.default_rng(0)
N = 4_000_000
col = rng.standard_normal(N).astype(np.float64)
mean, std = col.mean(), col.std()
def transform_scalar(x):
# Pure-Python per-row logic: this is what a row-at-a-time UDF runs.
z = (x - mean) / std
if z > 3.0: return 3.0
if z < -3.0: return -3.0
return z
# Row-at-a-time path: one Python function call per element.
t0 = time.perf_counter()
out_row = [transform_scalar(x) for x in col]
t_row = time.perf_counter() - t0
# Vectorized path: the identical math expressed once over the whole batch,
# the way an Arrow-backed pandas UDF receives and returns a column.
t0 = time.perf_counter()
z = (col - mean) / std
out_vec = np.clip(z, -3.0, 3.0)
t_vec = time.perf_counter() - t0
print(f"rows transformed : {N:,}")
print(f"row-at-a-time UDF : {t_row:.3f} s")
print(f"vectorized UDF : {t_vec:.3f} s")
print(f"speedup : {t_row / t_vec:.1f}x")
print(f"max abs difference : {np.max(np.abs(np.asarray(out_row) - out_vec)):.2e}")
pandas_udf performs. The final line confirms the two paths produce identical values.rows transformed : 4,000,000
row-at-a-time UDF : 1.079 s
vectorized UDF : 0.022 s
speedup : 49.3x
max abs difference : 0.00e+00
The forty-nine-fold gap in Output 7.8.1 comes entirely from amortizing per-element Python overhead, and it is a lower bound on what you gain inside Spark, where the row-at-a-time path additionally pays serialization across the process boundary that Code 7.8.1 never touched. The lesson generalizes past this one transform: whenever your AI logic must run in Python on a Spark column, express it so a single call sees a whole batch.
In a PySpark AI pipeline, the expensive thing is rarely the arithmetic inside your feature function; it is the act of moving data across the JVM-to-Python boundary and invoking Python around each value. A row-at-a-time UDF pays that cost once per row; an Arrow-backed vectorized UDF pays it once per batch and runs the math at NumPy speed. Before reaching for any UDF at all, check whether a built-in DataFrame expression can do the job inside the JVM, where the boundary is never crossed; only when the logic is genuinely Python-only should you write a UDF, and then it should always be vectorized.
Code 7.8.1 measured the batched-versus-per-row gap in one process. In PySpark you express the batched path with the pandas_udf decorator, and Spark handles the Arrow transfer, the batching, and the distribution across executors for you. The same transform, now running on a distributed DataFrame:
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("double") # Arrow-backed; receives a batch, not a row
def standardize_clip(col: pd.Series) -> pd.Series:
z = (col - MEAN) / STD # vectorized over the whole batch
return z.clip(-3.0, 3.0) # returns a Series, one Arrow crossing
df = df.withColumn("feat", standardize_clip("raw_value")) # lazy, runs on executors
pandas_udf that runs across a cluster. The decorator and one column expression replace all the manual batching and serialization logic; Spark moves each partition's column to a Python worker through Arrow, calls standardize_clip once per batch, and writes the result back. A plain @udf here would run the row-at-a-time path of Output 7.8.1 on every executor.3. Spark MLlib: Classical ML That Already Lives on the Cluster Beginner
Not every model needs a UDF. For classical machine learning, Spark ships MLlib, a library of distributed estimators and transformers that run natively on the JVM and never touch the Python boundary on the data path. MLlib's Pipeline abstraction chains feature transformers and an estimator into one object that fits across the whole cluster: a StringIndexer and OneHotEncoder prepare categorical columns, a VectorAssembler packs features into a single vector column, and an estimator such as LogisticRegression or GBTClassifier trains on the partitioned DataFrame using distributed optimization. The whole sequence is lazy and benefits from the same DAG planning as any other Spark job.
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression
idx = StringIndexer(inputCol="country", outputCol="country_idx")
ohe = OneHotEncoder(inputCols=["country_idx"], outputCols=["country_vec"])
asm = VectorAssembler(inputCols=["country_vec", "feat"], outputCol="features")
lr = LogisticRegression(featuresCol="features", labelCol="label")
pipeline = Pipeline(stages=[idx, ohe, asm, lr]) # one distributed fit
model = pipeline.fit(train_df) # trains across all executors
preds = model.transform(test_df) # distributed scoring
MLlib is the natural home for distributed linear models, tree ensembles, clustering, and the like, and it is a full topic in its own right. The distributed-optimization machinery underneath these estimators, and the gradient-boosting and tree methods that scale particularly well on Spark, are developed in Chapter 12. For this chapter the point is narrower: when your model is classical, you often do not need a UDF at all, because MLlib already distributes the fit. The interesting case for AI at scale is the one MLlib does not cover, deep learning, and that is where Spark changes role.
4. The Production Pattern: Spark Preps, the GPU Cluster Trains Intermediate
Deep-learning training does not run inside Spark. A modern training job lives on GPUs, synchronizes gradients with the collectives of Chapter 4, and is driven by PyTorch, not by a JVM query engine. The dominant production pattern therefore splits the work cleanly across two systems: Spark performs the distributed, data-heavy preparation (cleaning, joining, deduplicating, featurizing, and shaping the dataset), then writes the result to sharded columnar files that a separate deep-learning cluster reads. Spark's job ends where the tensors begin. Figure 7.8.2 traces this handoff.
This division of labor is not a workaround; it is the design. Spark is built for wide, shuffle-heavy data transformation on commodity CPUs, and a GPU spends most of a Spark job idle. A training cluster is built to keep accelerators saturated and is wasteful at the irregular, IO-bound work of data cleaning. Letting each system do what it is good at, and connecting them through sharded files on shared storage, is how the largest training pipelines are actually built. The seam between them, getting bytes off storage and into a GPU fast enough to keep it fed, is its own hard problem: the data-loading bottleneck that Chapter 8 is devoted to, and the data-parallel training loop on the far side is the subject of Chapter 15.
The handoff in Figure 7.8.2 is the book's distribution thesis applied at the level of whole systems rather than a single algorithm. The same dataset is distributed twice for two different reasons: across Spark executors because the data does not fit on one machine and the featurization must scale on CPUs, then across GPU workers because the model training must scale on accelerators. The sharded files are the boundary where one form of distribution ends and another begins, and the shuffle that built those shards in Section 7.7 reappears, transformed, as the all-reduce that synchronizes the workers reading them. Recognizing where one distribution axis hands off to the next is exactly the design fluency the book is building toward.
A common and expensive mistake is to run Spark feature engineering on a GPU-equipped cluster "to save a step." The accelerators sit at near-zero utilization for the entire shuffle-heavy job while you pay GPU-hour prices for CPU-bound work, then the actual training cannot start until the prep finishes anyway. The cheap fix is the boring one in Figure 7.8.2: prep on CPU instances, write shards, and only then wake the GPUs.
Who: A data engineer at a streaming-media company building the training pipeline for a recommendation model.
Situation: A nightly job featurized two billion interaction records in Spark, then trained a deep ranking model on an eight-GPU node.
Problem: The featurization step ran a per-row Python UDF that called a small NumPy normalization, and the Spark stage alone took over five hours, leaving the GPUs idle and waiting.
Dilemma: Rewrite the feature logic in Spark's native expressions where possible and convert the rest to a vectorized pandas_udf, or rent a larger Spark cluster to brute-force the slow UDF at higher cost.
Decision: They converted the normalization to an Arrow-backed pandas_udf and pushed the simple arithmetic into built-in column expressions, exactly the move Output 7.8.1 quantifies, rather than paying for more machines to run slow code.
How: The row-at-a-time UDF became a batched function receiving a pandas Series; two derived columns moved to native Spark expressions; the output was written as sharded Parquet for the training loader.
Result: The featurization stage fell from over five hours to about twenty minutes on the same cluster, the GPUs started training far earlier in the window, and the numerical output was unchanged.
Lesson: When a Spark AI job is slow, suspect the language boundary before the cluster size. A vectorized UDF often turns a "we need more machines" problem into a "we wrote the UDF wrong" problem.
5. Why Arrow Is the Enabling Layer Intermediate
It is worth naming explicitly what makes vectorized UDFs possible, because the same idea recurs throughout the data side of AI. Apache Arrow defines a language-independent, columnar in-memory format. When Spark hands a batch to a Python worker as Arrow, the JVM and Python agree, byte for byte, on how that column is laid out in memory, so the transfer is a bulk memory copy (often a shared buffer) rather than a value-by-value serialization through a slow encoding. NumPy and pandas can wrap that buffer directly. The cost model changes from "pay once per row to serialize" to "pay once per batch to copy a contiguous block," which is why the boundary stops being the bottleneck once batches are large enough.
Arrow is the same format underneath the columnar files Spark writes for the training handoff, so the data crosses from Spark to the training loader without a re-encoding either. This is not a Spark-only convenience; Arrow has become the lingua franca that lets the data-prep world and the deep-learning world exchange columns cheaply, and it returns in Chapter 8 as the on-disk and over-the-wire format for sharded training data. When you choose a vectorized UDF over a row-at-a-time one, you are choosing to ride Arrow instead of fighting it.
The clean two-system handoff of Figure 7.8.2 is under active pressure to become tighter and, where it pays, GPU-accelerated end to end. The RAPIDS Accelerator for Apache Spark pushes DataFrame operations and shuffles onto GPUs, and recent releases extend GPU execution to more of the feature-engineering path so that prep and training can share accelerators when the workload justifies it. On the loading side, NVIDIA DALI and the Ray Data ecosystem (2024 to 2025) blur the boundary further by streaming Arrow batches directly from columnar storage into GPU training without a separate Spark write step, attacking the data-loading bottleneck of Chapter 8 head on. A parallel thread treats the entire pipeline as one schedulable computation, so the featurization in Section 2 and the data-parallel training in Section 4 can overlap instead of running strictly in sequence. The open question is when fusing the two systems beats keeping them separate; the answer still depends on how shuffle-heavy the prep is relative to how compute-heavy the model is, which is exactly the cost reasoning Section 7.9 makes precise.
With the language boundary understood and the prep-then-train handoff in place, the remaining question for a PySpark AI workload is the one any distributed system eventually asks: where is the time actually going, and how do I make it go away? Partition counts, shuffle sizes, serialization formats, and executor memory all bear on whether a feature pipeline like the one in this section runs in twenty minutes or five hours. That measurement-and-tuning discipline is the subject of Section 7.9.
For each transformation, decide whether it should be a built-in Spark DataFrame expression (no boundary crossing), an Arrow-backed pandas_udf (one crossing per batch), or whether a plain row-at-a-time UDF is unavoidable, and justify each choice using the boundary argument of Section 1: (a) multiply a numeric column by a constant and add another column; (b) standardize a column using its global mean and standard deviation; (c) run a pretrained Python tokenizer that returns a variable-length list of token ids per text row; (d) clip a column to the range $[-3, 3]$. For any case where you chose a UDF, explain what stops Spark's native expression language from doing the job inside the JVM.
Extend Code 7.8.1 so that, instead of comparing only the full per-row loop against the full vectorized call, it processes the data in batches of size $B$ and calls the vectorized transform once per batch, for $B \in \{1, 10, 100, 10^3, 10^4, 10^5\}$. Plot or print elapsed time against $B$. Identify the batch size beyond which further increases stop helping, and explain the shape of the curve in terms of fixed per-call Python overhead amortized over $B$ elements. Relate your finding to why a real pandas_udf exposes a configurable Arrow batch size rather than processing one row or one whole partition at a time.
A Spark job featurizes $2 \times 10^9$ records into feature vectors of $256$ float32 values each and writes them as sharded files for a data-parallel training run on $K = 16$ GPU workers. Estimate the total featurized dataset size in bytes, the size of one shard if shards are equal, and, given a shared-storage read bandwidth of $2$ gigabytes per second per worker, the time for all workers to read one full epoch in parallel. Compare that read time to a plausible per-epoch GPU compute time of a few minutes, and argue from the two numbers whether the data loader of Chapter 8 or the training compute is more likely to bound this pipeline. State which way your conclusion flips if the read bandwidth is ten times lower.