I’ve been excited for this to make it to OSS: The PyTorch team at Meta recently soft-launched Monarch on Github.
pytorch-labs/monarch: PyTorch Single Controller
Back in 2022, Google’s Pathways paper proposed (revisiting) a single-controller approach for managing machine learning runs. Typically, ML jobs use an SPMD (Single Program, Multiple Data) approach, distributing identical code across multiple hosts. Each host runs independently, synchronizing during collective operations. This works, as evidenced by the many large training runs in the world. It also introduces complexity, especially with pipeline parallelism where conditional logic for different ranks can clutter up your training code. Even without that, subtle issues can arise: for example, slight differences in torch.compile optimization have (in the past!) lead to deadlocks by placing collectives differently on separate nodes.
The single-controller model simplifies this by centralizing program execution on one main node and using generic workers on the hosts that execute assigned tasks. This provides a consistent, global view of the entire computation, making it easier to get to a correct implementation of parallelisms and other distributed work. This doesn’t come for free though: the main node must efficiently manage (potentially!) thousands of GPUs without becoming a bottleneck, and existing code must adapt to this new centralized model.
Monarch is the PyTorch team’s implementation of this single-controller concept. It provides a familiar PyTorch frontend, additional module wrappers, and a high-performance Rust-based actor system for distributing and managing work.
The fundamental abstraction in Monarch is the Actor. Each Actor executes on their own accelerator, maintains state and behavior. Communication with other Actors is via async message passing on methods decorated with @endpoint. The nice thing about the programming model is you can interact with all of the actors in your mesh in a consistent way.
Monarch is appealing even if you’re not GPU-rich. For instance, at home, I have a machine equipped with two (mismatched) 3090s, and Monarch allows me to run and debug jobs directly in notebooks without relying on external services.
Installation had minor hurdles because I built from source rather than using the available pip package. Although the README specifies Python 3.10, Python 3.13 worked fine. The dependencies reference dnf (reflecting Meta’s internal Linux distro choice), so adapting commands to other Linux distributions (Ubuntu in my case) was necessary. Additionally, I had to set BINDGEN_EXTRA_CLANG_ARGS="-I/usr/include/c++/11 -I/usr/include/x86_64-linux-gnu/c++/11" to resolve Rust compilation issues.
Once installed, running Monarch’s distributed data-parallel notebook was straightforward (see: monarch/examples/notebooks/spmd_ddp.ipynb). The notebook shows that minimal code changes to the standard DDP example are required, mainly subclassing Actor (e.g., class DDPActor(Actor)), while keeping the training loop virtually identical. Monarch handles the rest, including distributed execution and debugging across multiple GPUs.
Setting up the environment means providing the mesh configuration and launching the actors, which can be done from a cell:
# Spawn a process mesh
local_proc_mesh = await proc_mesh(
gpus=WORLD_SIZE,
env={
"MASTER_ADDR": "localhost",
"MASTER_PORT": "12455",
},
)
# Spawn our actor mesh on top of the process mesh
ddp_actor = await local_proc_mesh.spawn("ddp_actor", DDPActor)
I didn’t have to manually start any other services; it all happened under the hood. Triggering the run is just:
await ddp_actor.demo_basic.call()
Which output:
self.rank=0 Running basic DDP example
self.rank=1 Running basic DDP example
self.rank=1 Finished running basic DDP example
self.rank=0 Finished running basic DDP example
What I find really appealing is how easy it is to execute across ranks. For example, to query for system info:
print("Gathering system info from all ranks...")
system_info = await ddp_actors.get_system_info.call()
print("\n SYSTEM INFORMATION ACROSS ALL RANKS:")
print("=" * 60)
for point, rank_info in system_info:
print(f"Rank {rank_info['rank']}: PID={rank_info['process_id']}, "
f"Device={rank_info['device_name']}, "
f"GPU Memory={rank_info['gpu_memory_allocated']/1e6:.1f}MB")
print(f"\nFound {len(system_info)} ranks in the mesh")
all_rank_info = [value for point, value in system_info]
print(f"Total GPU memory across all ranks: {sum(info['gpu_memory_allocated'] for info in all_rank_info)/1e6:.1f}MB")
Outputting:
Gathering system info from all ranks...
[Rank 0] System Info: PID=10519, CPU=0.1%, RAM=5.2%, GPU_MEM=0.0MB
[Rank 1] System Info: PID=10520, CPU=0.1%, RAM=5.2%, GPU_MEM=0.0MB
SYSTEM INFORMATION ACROSS ALL RANKS:
============================================================
Rank 0: PID=10519, Device=NVIDIA GeForce RTX 3090, GPU Memory=0.0MB
Rank 1: PID=10520, Device=NVIDIA GeForce RTX 3090, GPU Memory=0.0MB
Found 2 ranks in the mesh
Total GPU memory across all ranks: 0.1MB
I can also stop training and dump state if I need to , making it easy to check norms and debug:
print("Running training steps...")
for step in range(3):
print(f"\n--- Step {step + 1} ---")
step_results = await ddp_actors.train_step.call()
all_results = [value for point, value in step_results]
losses = [result['loss'] for result in all_results]
grad_norms = [result['grad_norm'] for result in all_results]
throughputs = [result['throughput'] for result in all_results]
print(f"Losses across ranks: {[f'{l:.4f}' for l in losses]}")
print(f"Gradient norms: {[f'{g:.4f}' for g in grad_norms]}")
print(f"Avg throughput: {sum(throughputs)/len(throughputs):.1f} samples/sec")
--- Step 1 ---
[Rank 1] Step 1: Loss=1.1128, GradNorm=0.3198, Time=0.241s
[Rank 0] Step 1: Loss=1.0414, GradNorm=0.3198, Time=0.253s
Losses across ranks: ['1.0414', '1.1128']
Gradient norms: ['0.3198', '0.3198']
Avg throughput: 129.8 samples/sec
--- Step 2 ---
[Rank 0] Step 2: Loss=1.1526, GradNorm=0.3096, Time=0.003s
[Rank 1] Step 2: Loss=1.0546, GradNorm=0.3096, Time=0.003s
Losses across ranks: ['1.1526', '1.0546']
Gradient norms: ['0.3096', '0.3096']
Avg throughput: 9800.9 samples/sec
--- Step 3 ---
[Rank 1] Step 3: Loss=0.9116, GradNorm=0.2243, Time=0.002s
[Rank 0] Step 3: Loss=0.9662, GradNorm=0.2243, Time=0.002s
Losses across ranks: ['0.9662', '0.9116']
Gradient norms: ['0.2243', '0.2243']
Avg throughput: 19977.5 samples/sec
While the distributed stuff here is cool, it’s not wildly different than using a distributed framework like Ray and a little bit of setup (though I am pretty allergic to setup). What is most interesting is how this changes the programming model of PyTorch, and makes it really easy to build out distributed experiments.
For example, if I was building a param server the sync only requires an await’d read of the weights from another object, taking advantage of the RDMA support for an efficient cop1y:
@endpoint
async def worker_sync_with_ps(self, param_server) -> bool:
"""Synchronize with parameter server and get RDMA handles"""
self._log("Synchronizing with parameter server...")
# Get RDMA buffer handles
self.weight_buffers = await param_server.ps_get_weight_handles.call_one()
self.gradient_buffers = await param_server.ps_get_gradient_handles.call_one()
# Get metadata
metadata = await param_server.ps_get_metadata.call_one()
self.weight_metadata = metadata['weights']
self.gradient_metadata = metadata['gradients']
# Perform initial weight sync
sync_time = await self._sync_weights_from_ps()
self._log(f"Synchronized with parameter server (sync time: {sync_time:.3f}s)")
return True
Getting those weight buffers is as simple as creating the right Monarch object:
def tensor_to_rdma_buffer(tensor: torch.Tensor) -> RDMABuffer:
# RDMA requires 1D contiguous uint8 tensors
byte_tensor = tensor.view(torch.uint8).flatten()
return RDMABuffer(byte_tensor)
For an early preview of a library, Monarch is surprisingly complete, and definitely worth a look.
- Not that this would do anything for my 3090s! ↩︎