Multi-GPU and Distributed Computing
When One GPU Is Not Enough
As models and datasets grow, a single GPU becomes insufficient. Multi-GPU training provides two scaling strategies:
- Data parallelism: Each GPU processes a different batch; gradients are averaged across GPUs. Scales batch size linearly with GPU count.
- Model parallelism: Different parts of the model live on different GPUs. Enables training models too large for a single GPU's memory.
This section covers PyTorch's DataParallel, DistributedDataParallel
(DDP), FSDP, and the communication primitives that enable multi-GPU computing.
Definition: DataParallel (DP)
DataParallel (DP)
torch.nn.DataParallel is the simplest multi-GPU wrapper:
model = nn.DataParallel(model) # wraps model for multi-GPU
output = model(input) # splits input across GPUs
How it works:
- Replicates model to each GPU
- Splits the input batch across GPUs
- Forward pass runs in parallel on each GPU
- Outputs are gathered to GPU 0
- Loss and backward run on GPU 0
- Gradients are broadcast to update all replicas
Limitations: GPU 0 is a bottleneck (gathers all outputs, computes loss); uses Python threads (GIL-limited); does not scale beyond a single machine.
DataParallel is deprecated in favor of DistributedDataParallel.
Use DP only for quick prototyping on a single machine.
Definition: DistributedDataParallel (DDP)
DistributedDataParallel (DDP)
DistributedDataParallel is the production-grade multi-GPU solution:
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
dist.init_process_group('nccl')
local_rank = int(os.environ['LOCAL_RANK'])
model = model.to(local_rank)
model = DDP(model, device_ids=[local_rank])
Key advantages over DataParallel:
- One process per GPU (no GIL contention)
- Gradient all-reduce overlaps with backward pass
- Scales to multiple machines via NCCL
- No GPU 0 bottleneck (symmetric across all GPUs)
Launch DDP with torchrun: torchrun --nproc_per_node=4 train.py.
Each process gets its own LOCAL_RANK environment variable.
Definition: All-Reduce Communication Primitive
All-Reduce Communication Primitive
All-reduce is the fundamental operation in distributed training. It computes the element-wise sum (or average) of tensors across all GPUs and distributes the result to every GPU:
where is the gradient on GPU .
# Manual all-reduce
dist.all_reduce(tensor, op=dist.ReduceOp.SUM)
tensor /= dist.get_world_size()
DDP performs all-reduce automatically during backward(), overlapping
communication with gradient computation using bucketed all-reduce.
NCCL implements all-reduce using a ring algorithm with bandwidth cost for parameters across GPUs, nearly independent of GPU count for large .
Definition: Model Parallelism (Pipeline and Tensor)
Model Parallelism (Pipeline and Tensor)
When a model does not fit on a single GPU, split it across GPUs:
Pipeline parallelism: Split layers across GPUs.
class PipelineModel(nn.Module):
def __init__(self):
self.part1 = nn.Sequential(...).to('cuda:0')
self.part2 = nn.Sequential(...).to('cuda:1')
def forward(self, x):
x = self.part1(x.to('cuda:0'))
x = self.part2(x.to('cuda:1'))
return x
Tensor parallelism: Split individual layers (e.g., split a large linear layer's weight matrix column-wise across GPUs).
FSDP (Fully Sharded Data Parallel): Shards parameters, gradients, and optimizer states across GPUs, gathering them on-demand.
Definition: Communication Backends: NCCL, Gloo, MPI
Communication Backends: NCCL, Gloo, MPI
PyTorch supports three communication backends:
| Backend | GPU-GPU | CPU-CPU | Multi-node | Recommended for |
|---|---|---|---|---|
| NCCL | Yes | No | Yes | GPU training |
| Gloo | No | Yes | Yes | CPU training |
| MPI | Yes | Yes | Yes | HPC clusters |
# Initialize with NCCL (best for GPU)
dist.init_process_group(
backend='nccl',
init_method='env://',
)
NCCL (NVIDIA Collective Communications Library) provides optimized GPU-to-GPU communication that bypasses the CPU entirely via NVLink or PCIe peer-to-peer.
Theorem: Linear Scaling Rule for Distributed Training
When scaling from 1 GPU to GPUs with data parallelism, to maintain the same convergence behavior:
where is the learning rate and is the batch size. This keeps the ratio constant. Use a linear warmup over the first few epochs to stabilize training at the higher learning rate.
Each GPU sees of the data per step. Scaling the learning rate compensates for the -fold increase in effective batch size. The warmup prevents divergence at the start when the model is far from a good minimum.
Gradient averaging
With GPUs, the gradient estimate is averaged: . The variance decreases by , but the step size should scale with the batch, giving .
Example: Complete DDP Training Script
Write a minimal but complete distributed training script using
DistributedDataParallel that works on 1-8 GPUs.
Initialize distributed
import os, torch, torch.distributed as dist
import torch.nn as nn
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler
def setup():
dist.init_process_group('nccl')
local_rank = int(os.environ['LOCAL_RANK'])
torch.cuda.set_device(local_rank)
return local_rank
Create model and data
def main():
local_rank = setup()
model = nn.Linear(1024, 10).to(local_rank)
model = DDP(model, device_ids=[local_rank])
dataset = ... # your dataset
sampler = DistributedSampler(dataset)
loader = DataLoader(dataset, sampler=sampler,
batch_size=64, num_workers=4)
Training loop
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
for epoch in range(10):
sampler.set_epoch(epoch) # essential for shuffling
for x, y in loader:
x, y = x.to(local_rank), y.to(local_rank)
loss = nn.functional.cross_entropy(model(x), y)
loss.backward() # all-reduce happens here
optimizer.step()
optimizer.zero_grad()
dist.destroy_process_group()
# Launch: torchrun --nproc_per_node=4 train.py
Example: Profiling Communication vs Computation
Measure the fraction of training time spent on gradient communication (all-reduce) vs computation to identify scaling bottlenecks.
Timing approach
import torch
import time
# Time computation only (no sync)
torch.cuda.synchronize()
t0 = time.perf_counter()
output = model(x)
loss = criterion(output, y)
loss.backward()
torch.cuda.synchronize()
t_total = time.perf_counter() - t0
# Estimate communication (DDP adds all-reduce to backward)
# Compare with model without DDP wrapper
Using PyTorch profiler
with torch.profiler.profile(
activities=[
torch.profiler.ProfilerActivity.CPU,
torch.profiler.ProfilerActivity.CUDA,
],
) as prof:
for i, (x, y) in enumerate(loader):
if i >= 5: break
loss = model(x.cuda()).sum()
loss.backward()
optimizer.step()
optimizer.zero_grad()
print(prof.key_averages().table(
sort_by="cuda_time_total", row_limit=10))
Distributed Training Scaling Efficiency
Watch how training throughput and communication overhead evolve as GPUs are added. Observe the transition from compute-bound to communication-bound scaling as the number of GPUs increases.
Parameters
DataParallel vs DistributedDataParallel
Multi-GPU Training
# Code from: ch13/python/multi_gpu.py
# Load from backend supplements endpointPerformance Profiling
# Code from: ch13/python/profiling.py
# Load from backend supplements endpointQuick Check
In DDP training with 4 GPUs and a dataset of 10000 samples, what does
DistributedSampler do?
Each GPU sees all 10000 samples per epoch
Each GPU sees 2500 non-overlapping samples per epoch
Samples are randomly assigned to GPUs each iteration
Only GPU 0 loads data and broadcasts to others
DistributedSampler partitions the dataset into world_size shards. Each GPU processes its shard, and gradients are averaged via all-reduce.
Common Mistake: Forgetting sampler.set_epoch() in DDP
Mistake:
Not calling sampler.set_epoch(epoch) before each epoch in DDP training.
Without this, the same data ordering is used every epoch, meaning each GPU
always sees the same 1/N partition with no shuffling across GPUs.
Correction:
Always call sampler.set_epoch(epoch) at the start of each epoch.
This seeds the sampler's random shuffling, ensuring different data
ordering each epoch while maintaining non-overlapping partitions.
Key Takeaway
Always use DistributedDataParallel over DataParallel. DDP uses
one process per GPU, overlaps communication with backward pass, and
scales to multiple machines. Remember: DistributedSampler +
sampler.set_epoch(epoch) are required for correct data partitioning.
Why This Matters: Distributed Computing for Large-Scale MIMO Simulations
Simulating a cell-free massive MIMO network with 128 access points, 1000 users, and full channel estimation requires processing channel matrices per coherence interval. Multi-GPU parallelism distributes users across GPUs (data parallelism) or distributes antenna processing across GPUs (model parallelism). The all-reduce operation for gradient averaging mirrors the fronthaul aggregation in real cell-free systems.
See full treatment in Domain-Specific Plot Types
Historical Note: The Rise of Distributed Training
21st centuryThe foundational paper on large-scale distributed deep learning was "Large Scale Distributed Deep Networks" by Dean et al. at Google (2012), which used model parallelism across 16000 CPU cores. The shift to data-parallel GPU training was catalyzed by the ImageNet moment: in 2017, Facebook trained ResNet-50 on ImageNet in 1 hour using 256 GPUs with synchronized SGD and the linear scaling rule. Today, models like GPT-4 train on thousands of GPUs using a combination of data, tensor, and pipeline parallelism.
DistributedDataParallel (DDP)
PyTorch's production-grade multi-GPU training wrapper that uses one process per GPU, NCCL for communication, and overlaps gradient all-reduce with backward computation.
Related: All-Reduce
All-Reduce
A collective communication operation that computes the sum (or average) of tensors across all processes and distributes the result to every process.
Related: DistributedDataParallel (DDP)
NCCL
NVIDIA Collective Communications Library -- an optimized library for GPU-to-GPU communication via NVLink or PCIe, implementing all-reduce, broadcast, and other collective operations.
DataParallel vs DistributedDataParallel
| Feature | DataParallel | DDP |
|---|---|---|
| Process model | Single process, multi-thread | Multi-process (1 per GPU) |
| GIL limitation | Yes (threads share GIL) | No (separate processes) |
| GPU utilization | Asymmetric (GPU 0 bottleneck) | Symmetric |
| Gradient sync | Gather on GPU 0 | All-reduce (overlapped) |
| Multi-node | No | Yes (via NCCL/Gloo) |
| Recommended | No (deprecated) | Yes (production) |