Exercises
ex-sp-ch13-01
EasyWrite a function that prints the current GPU memory allocated, reserved, and peak allocated. Then create a 1000x1000 float32 tensor on GPU, check the memory again, delete the tensor, and check once more. Explain why reserved memory may not decrease.
Use torch.cuda.memory_allocated(), memory_reserved(), and max_memory_allocated().
Implementation
import torch
def mem_stats():
alloc = torch.cuda.memory_allocated() / 1e6
res = torch.cuda.memory_reserved() / 1e6
peak = torch.cuda.max_memory_allocated() / 1e6
print(f"Allocated: {alloc:.1f} MB, Reserved: {res:.1f} MB, Peak: {peak:.1f} MB")
mem_stats() # baseline
x = torch.randn(1000, 1000, device='cuda') # ~4 MB
mem_stats() # allocated increases
del x
mem_stats() # allocated drops, reserved stays (caching allocator)
ex-sp-ch13-02
EasyUse torch.bmm to compute the product of 200 pairs of
complex matrices. Compare the result with a Python loop.
Complex tensors: torch.randn(200, 8, 8, dtype=torch.complex64).
Implementation
import torch
A = torch.randn(200, 8, 8, dtype=torch.complex64, device='cuda')
B = torch.randn(200, 8, 8, dtype=torch.complex64, device='cuda')
# Batched
C_batch = torch.bmm(A, B)
# Loop
C_loop = torch.stack([A[i] @ B[i] for i in range(200)])
print(f"Max error: {(C_batch - C_loop).abs().max().item():.2e}")
ex-sp-ch13-03
EasyCreate a tensor of value 300.0 in FP16, FP32, and BF16. Square each and report which format overflows. What is the maximum representable value in each format?
Use torch.finfo(dtype).max to get the maximum value.
Implementation
import torch
for dtype in [torch.float16, torch.bfloat16, torch.float32]:
x = torch.tensor(300.0, dtype=dtype)
sq = x * x
print(f"{dtype}: 300^2 = {sq.item()}, max = {torch.finfo(dtype).max}")
# FP16: inf (overflow), BF16: 90112 (close to 90000), FP32: 90000
ex-sp-ch13-04
EasyCreate a DataLoader with num_workers=0 and num_workers=4
for a synthetic dataset of 10000 samples. Time iterating through
the full dataset and report the speedup.
Use time.perf_counter() and iterate the full loader.
Implementation
import torch, time
from torch.utils.data import Dataset, DataLoader
class FakeData(Dataset):
def __len__(self): return 10000
def __getitem__(self, i):
return torch.randn(256), torch.randint(0, 10, (1,))
for nw in [0, 4]:
loader = DataLoader(FakeData(), batch_size=64, num_workers=nw)
t0 = time.perf_counter()
for x, y in loader: pass
print(f"workers={nw}: {time.perf_counter()-t0:.3f}s")
ex-sp-ch13-05
EasyWrite a training loop that accumulates gradients over 4 mini-batches
before calling optimizer.step(). This simulates a 4x larger
effective batch size using the same GPU memory.
Divide loss by accum_steps and call optimizer.step() every 4 iterations.
Implementation
import torch, torch.nn as nn
model = nn.Linear(100, 10).cuda()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
accum_steps = 4
for i in range(20):
x = torch.randn(32, 100, device='cuda')
loss = model(x).sum() / accum_steps
loss.backward()
if (i + 1) % accum_steps == 0:
optimizer.step()
optimizer.zero_grad()
ex-sp-ch13-06
MediumImplement gradient checkpointing for a 20-layer residual network. Measure peak GPU memory with and without checkpointing at batch size 128 and hidden dimension 512. Report the memory savings and the time overhead.
Use torch.utils.checkpoint.checkpoint with use_reentrant=False.
Use torch.cuda.reset_peak_memory_stats() before each run.
Implementation
import torch, torch.nn as nn, time
from torch.utils.checkpoint import checkpoint
class Block(nn.Module):
def __init__(self, d):
super().__init__()
self.net = nn.Sequential(nn.Linear(d, d), nn.ReLU(), nn.Linear(d, d))
def forward(self, x): return self.net(x) + x
class Net(nn.Module):
def __init__(self, d, L, ckpt=False):
super().__init__()
self.blocks = nn.ModuleList([Block(d) for _ in range(L)])
self.ckpt = ckpt
def forward(self, x):
for b in self.blocks:
x = checkpoint(b, x, use_reentrant=False) if self.ckpt else b(x)
return x
for ckpt in [False, True]:
torch.cuda.reset_peak_memory_stats()
m = Net(512, 20, ckpt).cuda()
x = torch.randn(128, 512, device='cuda')
t0 = time.perf_counter()
m(x).sum().backward()
torch.cuda.synchronize()
t = time.perf_counter() - t0
peak = torch.cuda.max_memory_allocated() / 1e6
print(f"ckpt={ckpt}: peak={peak:.0f}MB, time={t:.3f}s")
ex-sp-ch13-07
MediumUse torch.einsum to implement batched MIMO detection:
given channel matrices
and received vectors ,
compute the matched filter output .
The Hermitian transpose in einsum: conjugate first, then transpose indices.
Implementation
import torch
B, Nr, Nt = 100, 8, 4
H = torch.randn(B, Nr, Nt, dtype=torch.complex64, device='cuda')
y = torch.randn(B, Nr, dtype=torch.complex64, device='cuda')
# H^H @ y using einsum: conjugate H, contract over Nr
x_hat = torch.einsum('bnm,bn->bm', H.conj(), y)
# Equivalent: x_hat = (H.conj().transpose(-2,-1) @ y.unsqueeze(-1)).squeeze(-1)
print(f"Output shape: {x_hat.shape}") # (100, 4)
ex-sp-ch13-08
MediumImplement a complete AMP training loop with BFloat16 for a 3-layer MLP. Compare training loss convergence (100 steps) between FP32 and BF16 to verify they match.
BF16 does not need GradScaler. Use torch.autocast("cuda", dtype=torch.bfloat16).
Implementation
import torch, torch.nn as nn
def train(use_amp):
torch.manual_seed(42)
model = nn.Sequential(
nn.Linear(256, 1024), nn.ReLU(),
nn.Linear(1024, 1024), nn.ReLU(),
nn.Linear(1024, 10)).cuda()
opt = torch.optim.Adam(model.parameters(), lr=1e-3)
x = torch.randn(128, 256, device='cuda')
y = torch.randint(10, (128,), device='cuda')
losses = []
for _ in range(100):
if use_amp:
with torch.autocast('cuda', dtype=torch.bfloat16):
loss = nn.functional.cross_entropy(model(x), y)
else:
loss = nn.functional.cross_entropy(model(x), y)
loss.backward()
opt.step()
opt.zero_grad()
losses.append(loss.item())
return losses
fp32_loss = train(False)
bf16_loss = train(True)
print(f"Final FP32: {fp32_loss[-1]:.4f}, BF16: {bf16_loss[-1]:.4f}")
ex-sp-ch13-09
MediumImplement a custom IterableDataset that generates batches of
random channel matrices on-the-fly (streaming). Show that it
uses constant memory regardless of total samples generated.
Subclass torch.utils.data.IterableDataset and implement __iter__.
Implementation
import torch
from torch.utils.data import IterableDataset, DataLoader
class ChannelStream(IterableDataset):
def __init__(self, nr, nt, n_batches):
self.nr, self.nt = nr, nt
self.n_batches = n_batches
def __iter__(self):
for _ in range(self.n_batches):
H = (torch.randn(self.nr, self.nt) +
1j * torch.randn(self.nr, self.nt)) / (2**0.5)
y = H @ torch.randn(self.nt) + 0.1 * torch.randn(self.nr)
yield H, y
ds = ChannelStream(8, 4, 10000)
loader = DataLoader(ds, batch_size=None)
for i, (H, y) in enumerate(loader):
if i >= 3: break
print(f"Batch {i}: H={H.shape}, y={y.shape}")
ex-sp-ch13-10
MediumWrite a memory-efficient attention computation using gradient checkpointing. Compare memory usage for sequence length 1024 with and without checkpointing.
Checkpoint the attention score computation (Q @ K^T).
Implementation
import torch, torch.nn as nn
from torch.utils.checkpoint import checkpoint
class Attention(nn.Module):
def __init__(self, dim, use_ckpt=False):
super().__init__()
self.qkv = nn.Linear(dim, 3 * dim)
self.proj = nn.Linear(dim, dim)
self.use_ckpt = use_ckpt
self.dim = dim
def _attend(self, q, k, v):
scores = q @ k.transpose(-2, -1) / (self.dim ** 0.5)
return torch.softmax(scores, dim=-1) @ v
def forward(self, x):
qkv = self.qkv(x).chunk(3, dim=-1)
if self.use_ckpt:
attn = checkpoint(self._attend, *qkv, use_reentrant=False)
else:
attn = self._attend(*qkv)
return self.proj(attn)
for ckpt in [False, True]:
torch.cuda.reset_peak_memory_stats()
m = Attention(256, ckpt).cuda()
x = torch.randn(8, 1024, 256, device='cuda')
m(x).sum().backward()
print(f"ckpt={ckpt}: {torch.cuda.max_memory_allocated()/1e6:.0f} MB")
ex-sp-ch13-11
HardImplement a custom CUDA memory allocator that logs every allocation and free event. Use it to identify memory leaks in a training loop where tensors are accidentally kept alive.
Use torch.cuda.memory._record_memory_history() and _snapshot().
Approach
import torch
torch.cuda.memory._record_memory_history(max_entries=10000)
# Simulated "leaky" loop
leaked = []
for i in range(100):
x = torch.randn(1000, 1000, device='cuda')
if i % 10 == 0:
leaked.append(x) # intentional leak
snapshot = torch.cuda.memory._snapshot()
torch.cuda.memory._record_memory_history(enabled=None)
# Analyze: count active allocations
print(f"Leaked tensors: {len(leaked)}")
print(f"Memory: {torch.cuda.memory_allocated()/1e6:.1f} MB")
# In production, export snapshot for visualization
ex-sp-ch13-12
HardImplement a batched MMSE MIMO detector: for channel realizations with , . Use batched Cholesky solve and measure throughput in channels/second.
Build as a batched tensor.
Use torch.linalg.cholesky and torch.cholesky_solve.
Implementation
import torch, time
B, Nr, Nt = 1000, 16, 4
sigma2 = 0.1
device = 'cuda'
H = torch.randn(B, Nr, Nt, dtype=torch.complex64, device=device)
y = torch.randn(B, Nr, 1, dtype=torch.complex64, device=device)
I = torch.eye(Nt, dtype=torch.complex64, device=device)
torch.cuda.synchronize()
t0 = time.perf_counter()
A = H.conj().transpose(-2,-1) @ H + sigma2 * I
Hy = H.conj().transpose(-2,-1) @ y
L = torch.linalg.cholesky(A)
x_hat = torch.cholesky_solve(Hy, L)
torch.cuda.synchronize()
elapsed = time.perf_counter() - t0
print(f"Throughput: {B/elapsed:.0f} channels/s ({elapsed*1000:.2f} ms)")
ex-sp-ch13-13
HardWrite a DDP training script that trains a simple model on 2 GPUs
(simulated with torch.multiprocessing.spawn). Verify that
gradients are identical across GPUs after each step.
Use mp.spawn(worker_fn, nprocs=2) to simulate 2 GPUs.
Use dist.init_process_group("gloo") for CPU-only testing.
Implementation
import torch, torch.nn as nn, torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
def worker(rank, world_size):
dist.init_process_group('gloo', rank=rank, world_size=world_size,
init_method='tcp://localhost:12355')
torch.manual_seed(42)
model = nn.Linear(10, 5)
model = DDP(model)
opt = torch.optim.SGD(model.parameters(), lr=0.01)
x = torch.randn(8, 10)
loss = model(x).sum()
loss.backward()
# Check: all ranks should have same gradients
for p in model.parameters():
grad_sum = p.grad.clone()
dist.all_reduce(grad_sum)
print(f"Rank {rank}: grad_norm={p.grad.norm():.4f}")
opt.step()
dist.destroy_process_group()
mp.spawn(worker, args=(2,), nprocs=2, join=True)
ex-sp-ch13-14
ChallengeImplement a complete mixed-precision training pipeline for a MIMO autoencoder: encoder maps symbols to transmitted signals, channel applies , decoder estimates the original symbols. Use BF16 autocast, gradient checkpointing on the decoder, and batched channel application. Benchmark FP32 vs mixed-precision in throughput and final loss.
The channel layer is not trainable; use torch.no_grad() for it.
Architecture sketch
# See code supplement ch13/python/mimo_autoencoder.py
# Key elements:
# 1. Encoder: Linear -> ReLU -> Linear (BF16 autocast)
# 2. Channel: H @ x + noise (FP32 for numerical accuracy)
# 3. Decoder: checkpointed deep network (BF16)
# 4. Loss: MSE in FP32
ex-sp-ch13-15
ChallengeBuild a performance profiling dashboard that measures and visualizes: (a) GPU memory timeline, (b) kernel execution time breakdown, (c) data loading vs compute ratio, and (d) arithmetic intensity. Apply it to a batched MIMO simulation pipeline and identify the bottleneck.
Use torch.profiler.profile with on_trace_ready=torch.profiler.tensorboard_trace_handler.
Approach
import torch
with torch.profiler.profile(
schedule=torch.profiler.schedule(wait=1, warmup=1, active=3),
on_trace_ready=torch.profiler.tensorboard_trace_handler('./logs'),
record_shapes=True,
profile_memory=True,
with_stack=True,
) as prof:
for step, batch in enumerate(loader):
if step >= 5: break
# Your MIMO pipeline here
prof.step()
# View: tensorboard --logdir=./logs