"I am an object, living in plasma, waiting to be pulled. I was serialized exactly once, and I intend to be read a thousand times without anyone copying me again."
A Tensor at Rest in the Object Store
Ray is a general-purpose distributed-compute substrate: it turns an ordinary Python function into a cluster-wide task and an ordinary Python class into a cluster-wide stateful service, and it ties them together with a shared object store that moves large tensors between machines without re-serializing them. The schedulers of the earlier sections place containers and pods; Ray sits one layer above, placing fine-grained Python work onto whatever those schedulers gave it, and stitching the results into a single dataflow graph. Almost every higher-level AI system in this book that needs to glue heterogeneous pieces together (a data-parallel trainer, an inference fleet, a population of reinforcement-learning actors, a hyperparameter sweep, a swarm of language-model agents) can be expressed as Ray tasks and actors sharing objects through one store. This section builds that substrate from its three parts: the cluster topology, the task-and-actor execution model, and the plasma object store whose zero-copy reads are the reason large-tensor AI workloads run on it at all.
The previous sections of this chapter scheduled coarse units: a container onto a node, a gang of pods onto a topology, a slice of a GPU onto a tenant. Those mechanisms answer the question "where does this process run?" but they say nothing about how the Python objects inside those processes find each other, share a forty-gigabyte embedding table, or accumulate state across thousands of calls. A modern AI application is rarely one monolithic program; it is a graph of interacting pieces, a preprocessing stage feeding a trainer feeding an evaluator feeding a server, often written by different teams in the same language. Ray exists to make that graph a first-class object. You write functions and classes that look local, annotate them, and Ray distributes their execution and their data across the cluster while preserving the illusion that you are calling them in one process.
That illusion has a precise cost structure, and the rest of the section is about paying it cheaply. Distributing function calls is easy; distributing the large arrays they pass to each other without copying those arrays over and over is the hard part, and it is exactly the part that decides whether an AI workload is fast or hopeless. Ray's answer is an object store shared by all the workers on a node, so that a tensor produced once is read by many tasks through a zero-copy memory map rather than a fresh deserialization per reader. We will see why that single design choice is what makes Ray, rather than a generic task queue, the substrate of choice for tensor-heavy AI.
1. Ray as a Distributed-Compute Substrate Beginner
It helps to fix what Ray is and is not before opening the machinery. Ray is not a training framework, not a serving system, and not a scheduler for containers; it is a substrate, a thin distributed runtime on which all of those are built. Its promise is small and sharp: take Python work that you could run in one process, and let it run across a cluster with the same call syntax, while a runtime handles placement, data movement, and reference tracking. Everything distinctive about Ray follows from honoring that promise for two kinds of work, stateless function calls and stateful objects, and for the data that flows between them.
Because the substrate is general, the AI systems built on it are libraries rather than separate products, and they reappear throughout this book. Ray Train wraps data-parallel deep learning of the kind introduced in Chapter 15; Ray Serve runs the inference fleets of Chapter 23; RLlib drives the actor-learner reinforcement-learning systems of Chapter 20 (which lives in Part IV); and Ray Tune runs the distributed hyperparameter searches of Chapter 21. Each library is a few thousand lines of orchestration over the same tasks, actors, and object store we develop here. Learning the substrate once therefore demystifies all four at the same time, which is why we treat it as foundational infrastructure rather than as a tool tied to any one workload.
Ray's surface reduces to a remarkably small vocabulary: a task is a stateless function call placed anywhere on the cluster, an actor is a stateful object that lives on one worker and serializes its method calls, and an object is an immutable value in the shared store named by a reference. Tasks and actors are the two verbs (do something, do something with memory); the stored object is the one noun (a value many computations point at). Every higher Ray library, and every system you build on Ray, is some composition of these three. If you can read a Ray program as a graph of tasks and actors exchanging object references, you can read all of them.
2. Cluster Anatomy: Head Node, Workers, and the GCS Beginner
A Ray cluster has one head node and any number of worker nodes, and the asymmetry matters. The head node runs the cluster-wide control plane; the workers run the actual Python tasks and actors and contribute their memory to the object store. Figure 33.7.1 lays out the pieces. The single most important control-plane component is the Global Control Store (the GCS), a key-value service on the head node that holds the cluster's metadata: which nodes are alive, where each actor lives, the location of every object in the store, and the registry of named resources. The GCS is the authoritative map of the cluster; when a task needs to know where an object physically sits before it can read it, it is the GCS (cached aggressively on each node) that answers.
Each worker node runs a per-node agent called the raylet, which bundles two responsibilities: a local scheduler that decides which of this node's tasks run next, and the object-store manager that owns this node's slice of shared memory. The Python worker processes that actually execute your code are children of the raylet. This two-level structure, one global control plane plus a raylet on every node, is what lets Ray schedule millions of tiny tasks per second: the common case is decided locally by the raylet without ever consulting the head node, and the GCS is touched only when a decision needs cluster-wide knowledge. We made the same point about avoiding a central bottleneck for the parameter server in Chapter 11; Ray applies it to scheduling and metadata rather than to gradients.
Standing up the head node, raylets, object stores, and GCS by hand would be hundreds of lines of process management. Ray collapses cluster bring-up to one call, and the same call connects a client to an already-running cluster:
import ray
ray.init() # start a local single-node cluster, or ...
# ray.init(address="auto") # ... attach to the existing cluster on this node
# ray.init(address="ray://head.example.com:10001") # ... attach to a remote head
print(ray.cluster_resources()) # {'CPU': 64.0, 'GPU': 8.0, 'memory': ..., ...}
ray.init brings up (or joins) the head node, raylets, object stores, and GCS, and exposes the cluster's aggregate resources. The autoscaler and node provisioning that would otherwise be bespoke infrastructure are handled behind this single entry point.3. The Distributed Scheduler: Tasks and Actors Intermediate
With the topology in place, the question becomes how work is described and placed. Ray exposes exactly two units. A task is a stateless remote function: you mark a Python function with @ray.remote, call it with .remote(...), and instead of running it the call returns immediately with an object reference, a future for the eventual result. The function then runs on some worker the scheduler chooses, possibly on another machine, and its return value lands in that worker's object store. An actor is a stateful remote class: you mark a class with @ray.remote, instantiate it with .remote(...), and Ray places one long-lived instance on a worker; method calls on that instance are tasks too, but they all run on the same worker and execute one at a time, so the actor's fields behave like ordinary mutable state guarded by an implicit lock.
The scheduler's job is to place each of these units subject to resource requests. A task or actor can declare it needs, say, one GPU and two CPUs (@ray.remote(num_gpus=1, num_cpus=2)), and the raylet will only run it where those resources are free, decrementing the node's logical inventory while it runs. Placement is bottom-up and locality-aware: a task is preferentially scheduled on a node that already holds its large input objects, because moving the task to the data is far cheaper than moving the data to the task. When the local raylet cannot satisfy a request, it forwards the decision upward using the GCS's view of the cluster. This is the same locality principle that drove computation toward data in MapReduce (Chapter 6), now applied at the granularity of a single function call.
The choice between a task and an actor is the choice between statelessness and state, and it is the most consequential modeling decision in any Ray program. Use tasks when work is embarrassingly parallel and each unit is independent: data preprocessing, a batch of inference calls, the rollouts of a search. Use actors when a computation must remember something across calls: a model kept warm in GPU memory, a replay buffer accumulating experience, a counter, a shard of a parameter table. Tasks give you throughput because the scheduler can run thousands at once on any free worker; actors give you a stable home for mutable state at the cost of serializing their own method calls. Most real systems are a few actors (the stateful spine) surrounded by clouds of tasks (the parallel muscle).
This actor model is not a Ray curiosity; it is the same abstraction that organizes several systems elsewhere in the book, which is why Ray became their natural host. A parameter server (Chapter 11) is an actor that owns a shard of the weights and exposes push and pull methods; the learner and the rollout workers of a distributed reinforcement-learning system (Chapter 20) are actors, one holding the policy under optimization and many generating experience against it; and an orchestrated fleet of language-model agents (Chapter 32) is a population of actors, each an agent with private memory, exchanging messages and tool results. In every case the actor gives the stateful entity a single addressable home on the cluster, and tasks supply the surrounding parallelism.
4. The Plasma Object Store and Zero-Copy Reads Advanced
The scheduler decides where work runs; the object store decides how the data it consumes and produces moves, and for tensor-heavy AI this is where the performance lives. Every worker on a node shares one object store (historically named Plasma) implemented as a region of shared memory. When you call ray.put(x), or when a task returns a value, Ray serializes the object once into that shared region and registers its location in the GCS, handing you back an object reference. Crucially, the bytes are laid out so that a NumPy array or a tensor is stored in its native buffer format. Any worker on the same node that later calls ray.get(ref) receives a view that maps directly onto those shared bytes: no deserialization, no per-reader copy. This is the zero-copy read, and it is the single feature that makes Ray viable for AI.
To see why it matters, count the copies. Suppose a stage produces an object of $B$ bytes that $R$ downstream tasks must each read, all colocated on one node. A naive task system that passes arguments by value would serialize and ship $B$ bytes to each reader, moving $R \cdot B$ bytes in total. With a shared-memory store and zero-copy reads, the object is materialized once and every reader maps it, so the data movement is $$\text{bytes moved} \;=\; B \;+\; R \cdot c,$$ where $c$ is a tiny constant (the cost of mapping a page table entry, kilobytes regardless of $B$). For a 64 MB embedding batch read by 16 tasks, the difference is roughly 1 GB of copying versus a single 64 MB write, and the gap only widens with tensor size. The same accounting is why passing a large object by reference rather than re-deriving it inside each task is the cardinal Ray performance rule.
Objects in the store are immutable and reference-counted, and Ray uses an ownership model to decide when each one can be freed. The worker that created an object reference is its owner; it tracks every copy of the reference that has been passed to other tasks, and when the last reference goes out of scope across the cluster the owner tells the raylets holding the object that they may evict it. This distributed reference counting is what lets a Ray program churn through millions of intermediate objects without leaking memory, and it is the quiet machinery behind the demo below. When an object is requested on a node that does not have it, the store pulls a copy from a node that does (the one cross-store arrow in Figure 33.7.1); locality-aware scheduling exists precisely to make that pull the exception rather than the rule.
The demo in Code 33.7.2 makes these semantics concrete without requiring a Ray installation. It builds a miniature object store on top of multiprocessing.shared_memory, so that put writes a large array once into shared memory and get returns a zero-copy NumPy view onto the same buffer. Four stateless task processes then read disjoint slices of that one shared tensor and return partial sums (the task-and-object-store pattern), and a separate actor process accumulates a running total across calls (the stateful-actor pattern). The accompanying Ray version is shown afterward so the correspondence is exact.
"""Conceptual Ray demo: object store (zero-copy), stateless tasks, a stateful actor.
No 'ray' needed; uses multiprocessing + shared_memory to mirror plasma semantics."""
import numpy as np
from multiprocessing import Process, Queue
from multiprocessing import shared_memory as shm
# ---- A tiny "plasma" object store: put() returns a ref (a name), get() is a
# zero-copy numpy view onto the SAME shared buffer. No re-serialization on read.
def put_array(a):
block = shm.SharedMemory(create=True, size=a.nbytes)
view = np.ndarray(a.shape, dtype=a.dtype, buffer=block.buf)
view[:] = a # one copy IN, at put time
return {"name": block.name, "shape": a.shape, "dtype": str(a.dtype)}, block
def get_array(ref):
block = shm.SharedMemory(name=ref["name"]) # attach, no data copied
return np.ndarray(ref["shape"], dtype=ref["dtype"], buffer=block.buf), block
# ---- A stateless "task": reads the shared tensor by ref, returns a partial sum.
def task_partial(ref, lo, hi, out):
arr, block = get_array(ref) # zero-copy read of the shard
out.put(float(arr[lo:hi].sum()))
block.close()
# ---- A stateful "actor": its own process, owns mutable state across calls.
def actor_loop(inbox, outbox):
total = 0.0 # private state lives here
while True:
msg = inbox.get()
if msg is None: break
total += msg # mutate in place, no reload
outbox.put(total)
if __name__ == "__main__":
big = np.arange(8_000_000, dtype=np.float64) # ~64 MB tensor
ref, owner = put_array(big) # ray.put: one shared copy
print("object ref :", ref["name"][-12:], ref["shape"], ref["dtype"])
# Fan out 4 stateless tasks over the SAME buffer; none re-serializes it.
out = Queue(); bounds = np.linspace(0, len(big), 5, dtype=int)
procs = [Process(target=task_partial, args=(ref, bounds[i], bounds[i+1], out))
for i in range(4)]
for p in procs: p.start()
parts = [out.get() for _ in procs]
for p in procs: p.join()
print("task partial sums :", [f"{x:.3e}" for x in parts])
print("combined (tasks) :", f"{sum(parts):.6e}")
print("ground truth :", f"{big.sum():.6e}")
# Drive a stateful actor: same handle, accumulating state across calls.
inbox, outbox = Queue(), Queue()
a = Process(target=actor_loop, args=(inbox, outbox)); a.start()
running = []
for v in (10.0, 5.0, 100.0):
inbox.put(v); running.append(outbox.get())
inbox.put(None); a.join()
print("actor running totals :", running)
owner.close(); owner.unlink()
put_array and get_array stand in for ray.put and ray.get: the 64 MB array is serialized into shared memory once, and each task reads a zero-copy view rather than receiving its own copy. The four task processes mirror stateless @ray.remote functions; the persistent actor_loop process mirrors a stateful @ray.remote actor accumulating state across calls.object ref : nsm_b79b1ff1 (8000000,) float64
task partial sums : ['2.000e+12', '6.000e+12', '1.000e+13', '1.400e+13']
combined (tasks) : 3.200000e+13
ground truth : 3.200000e+13
actor running totals : [10.0, 15.0, 115.0]
The roughly fifty lines of shared-memory plumbing in Code 33.7.2 exist only to imitate what Ray gives you in three primitives. The equivalent Ray program is the following, and it works unchanged across many nodes rather than one:
import ray, numpy as np
ray.init()
@ray.remote
def partial(arr, lo, hi): # a stateless TASK over a shared object
return float(arr[lo:hi].sum())
@ray.remote
class Accumulator: # a stateful ACTOR
def __init__(self): self.total = 0.0
def add(self, v): self.total += v; return self.total
big = np.arange(8_000_000, dtype=np.float64)
ref = ray.put(big) # one shared, zero-copy-readable object
b = np.linspace(0, len(big), 5, dtype=int)
parts = ray.get([partial.remote(ref, b[i], b[i+1]) for i in range(4)])
print("combined :", sum(parts)) # -> 3.2e13
acc = Accumulator.remote()
print("totals :", ray.get([acc.add.remote(v) for v in (10., 5., 100.)]))
ray.put places the 64 MB array in the object store once; passing ref into four partial.remote tasks gives each a zero-copy view; the Accumulator actor carries state across add calls. Ray supplies the object store, ownership-based reference counting, locality-aware placement, and cross-node fetch that Code 33.7.2 only sketches, in a fraction of the lines.Ray is where several axes of distribution from Chapter 1 meet on one runtime. Distribute training: Ray Train spreads data-parallel workers and lets them all-reduce. Distribute inference: Ray Serve replicates model actors across a fleet. Distribute intelligence: an agent swarm is a population of actors. The object store is the connective tissue that keeps all three honest about cost, because it refuses to re-serialize the large tensors they pass around. When you see a later system "built on Ray," read it as tasks and actors over a shared store, and the distribution axis it lives on becomes obvious.
5. How Ray Underpins the Higher Layers Intermediate
The payoff of a single substrate is that workloads which look unrelated turn out to be the same three primitives arranged differently, and they compose on one cluster. Ray Train expresses data-parallel deep learning as a set of worker actors that each hold a model replica and synchronize gradients with the collectives of Chapter 4; the training data flows to them as objects, and checkpoints land in the store. Ray Serve expresses an inference fleet as deployment actors, each a warm model replica, fronted by a router that load-balances requests, which is the fleet pattern of Chapter 23 with actors playing the role of replicas. RLlib expresses reinforcement learning as rollout-worker actors feeding experience objects to a learner actor, the actor-learner architecture of Chapter 20 realized directly. Ray Tune expresses hyperparameter search as a pool of trial tasks or actors whose results a scheduler prunes, the AutoML loop of Chapter 21.
Because they share the substrate, these libraries interlock without glue code. A single Ray program can preprocess data as tasks, hand the resulting objects to Ray Train actors, register the trained model with Ray Serve deployment actors, and run a Ray Tune sweep over the whole pipeline, all on the same cluster and all exchanging objects through the same store rather than through files or a message broker. That composability, not raw speed on any one task, is the strategic reason Ray is widely used to glue AI workloads together: it lets a team express an end-to-end system as one dataflow graph in one language on one runtime, instead of stitching four specialized systems across process and serialization boundaries.
Who: An ML platform engineer at a recommendation company assembling a nightly retraining pipeline.
Situation: Four stages (feature preprocessing, data-parallel training, a held-out evaluation, and a canary deployment) each ran as a separate service, passing multi-gigabyte feature tensors between them through object storage and a message queue.
Problem: Most of the wall-clock time was serialization: every stage wrote its tensors to disk, the next stage read and deserialized them, and the same embedding tables were re-encoded several times along the way.
Dilemma: Keep four decoupled services with clean boundaries but heavy serialization tax, or fuse them into one runtime that shares memory but couples the stages more tightly.
Decision: They moved the pipeline onto a single Ray cluster, expressing preprocessing as tasks, training as Ray Train actors, evaluation as a task, and deployment as a Ray Serve actor, with feature tensors passed by object reference between stages.
How: Each stage returned object references rather than file paths; downstream stages received zero-copy views of the same shared tensors, and only the final model artifact was written to durable storage.
Result: The repeated serialize-write-read-deserialize cycles vanished, the embedding tables were materialized once and read in place, and the nightly run finished comfortably inside its window with markedly lower I/O.
Lesson: When stages share large tensors, a shared object store can eliminate the dominant cost (re-serialization) that a broker-and-files architecture pays at every boundary; the substrate's job is to keep that data in place.
6. Limits, Failure, and the Research Frontier Advanced
A substrate this general has sharp edges worth naming. The GCS, though replicated and backable by an external store, is a control-plane component whose availability bounds the cluster's; Ray has steadily hardened GCS fault tolerance precisely because a substrate that hosts everything cannot afford a single point of failure. Object-store memory is finite, so a program that produces objects faster than it frees them will spill to disk and then stall; the ownership-based reference counting of Section 4 is what keeps this in check, but a leaked reference defeats it. And the locality assumptions that make the scheduler fast degrade when objects must be fetched across nodes, so a poorly placed workload can drown in cross-store transfers, the very copies the store was built to avoid. None of these is a flaw so much as a cost that the cluster operator must measure, in the spirit of the performance models of Chapter 3.
The substrate is being pushed by the shape of current AI work. As language-model agents (Chapter 32) proliferate, the object store is being asked to hold and share long contexts and KV caches across actors, turning Ray into a backbone for agentic and tool-using fleets; libraries such as Ray Serve LLM and the broader push toward serving large models on Ray make the store's zero-copy reads matter for multi-gigabyte caches, not just training tensors. A second thread is heterogeneity: scheduling tasks across mixed CPU, GPU, and accelerator pools while respecting the fine-grained sharing of earlier sections, so that one Ray cluster can co-host preprocessing, training, and serving without static partitioning. A third is reliability under preemption, since substrates increasingly run on spot and elastic capacity (Chapter 18), which demands that lineage-based object reconstruction and actor restart be cheap and automatic. The common direction is clear: the substrate is absorbing more of the AI stack, and its object store is the component under the most pressure to scale.
Ownership-based reference counting produces a small philosophical comedy. The worker that created an object can finish, exit, and be reclaimed, yet the object lives on in the store as long as some other task still holds a reference to it. The value persists not because its author is alive, but because someone, somewhere on the cluster, still intends to read it. The tensor in this section's epigraph is patient for a reason: in a Ray cluster, being referenced is the only form of immortality on offer.
With Ray we have a substrate that places fine-grained Python work onto the containers and GPUs the earlier sections scheduled, and shares the large tensors that work produces without re-serializing them. The next section turns from this general-purpose runtime to the question of cost, where the cheapest capacity in the cloud arrives with the right to revoke it: Section 33.8 shows how spot and preemptible scheduling turns the checkpoint-interval mathematics of Chapter 18 into a scheduling decision the substrate must survive.
For each component, decide whether you would implement it as a Ray task or a Ray actor, and justify the choice in terms of state and parallelism: (a) a function that tokenizes one document; (b) a replay buffer that accumulates transitions from many rollout workers in a reinforcement-learning system; (c) a warm large-language-model replica answering inference requests; (d) a shard of a billion-row embedding table serving lookups. For any you mark as an actor, name the mutable state it must hold, and connect each answer to the corresponding system in Chapter 11, Chapter 20, or Chapter 23.
Starting from Code 33.7.2, write a second version in which each task receives its own freshly copied slice of the array as an argument instead of reading the shared buffer by reference (have the parent slice big and pass the slice into the process). Time both versions for a 64 MB array read by 4, 8, and 16 tasks, and plot bytes moved against the model in Section 4, $B + R \cdot c$ versus $R \cdot B$. Explain how the measured gap supports the cardinal Ray rule that large objects must be passed by reference, and predict how the gap scales as the tensor grows to 1 GB.
Suppose a task needs an object of $B = 256$ MB that lives on another node, and the inter-node link moves data at 12.5 gigabytes per second. Estimate the time to fetch the object versus the time to instead schedule the task on the node that already holds it (treat task placement as effectively free). Now suppose the object is read by $R$ tasks: derive the threshold $R$ at which gathering the readers onto the object's node beats fetching the object to each reader, and relate your answer to why Ray's scheduler is locality-aware. Make the estimate rigorous with the alpha-beta model of Chapter 3.