Part IV: Pretraining at Scale
Chapter 18

Distributed Training

Data parallelism, tensor parallelism, pipeline parallelism
20 Exercises
18.1

Chapter 15's training loop assumed a single GPU. Frontier models shatter that assumption. A modern LLM has billions of parameters and trains on trillions of tokens — neither the model nor the data fits or finishes on one device. This chapter builds the techniques that spread a single training run across thousands of GPUs while keeping it mathematically equivalent to the single-device version.

The Memory Budget of Training

To see why one GPU is not enough, count the memory. For a model with N parameters trained in mixed precision with AdamW, each parameter needs storage for several quantities at once:

QuantityBytes/paramWhy
bf16 parameters2The weights used in forward/backward
bf16 gradients2Computed in the backward pass
fp32 master weights4High-precision copy for updates
fp32 Adam momentum (m)4First moment estimate
fp32 Adam variance (v)4Second moment estimate
Total (model+optimizer)16Per parameter, before activations
textThe 16-bytes-per-parameter rule
Memory(model + optimizer) ≈ 16 · N bytes

7B model:   16 × 7e9   = 112 GB    (an 80GB GPU cannot hold it)
70B model:  16 × 70e9  = 1,120 GB  (needs ~14+ GPUs just for state)
+ activations, which grow with batch × sequence length
Dist Note: The Optimizer State Dominates
Notice that of the 16 bytes per parameter, 12 are optimizer state (fp32 master weights + two Adam moments). The model weights themselves are only 2 bytes in bf16. This is the key insight behind ZeRO: most of the memory is optimizer state, and that state can be sharded across GPUs since each GPU only needs the slice it updates.
Even before considering activations, a 7B model needs 112GB just for parameters and optimizer state — more than the 80GB of an H100. Training large models is therefore IMPOSSIBLE on a single device; distribution is not an optimization but a necessity.

Two Things to Distribute

Distribution solves two distinct problems, and the parallelism strategies map onto them. DATA parallelism addresses throughput — processing trillions of tokens in reasonable time by having many GPUs work on different data. MODEL parallelism (tensor and pipeline) addresses capacity — fitting a model too large for one GPU by splitting the model itself across devices. Real systems combine both.

18.2

Before the parallelism strategies, we need the primitives they use to coordinate. Distributed training is built on a small set of collective communication operations — implemented in libraries like NCCL (NVIDIA) and exposed by PyTorch's torch.distributed. Understanding these five operations is enough to understand every parallelism strategy in this chapter.

OperationWhat it does
BroadcastSend one GPU's tensor to all GPUs
ReduceCombine (sum/mean) tensors from all GPUs onto one GPU
All-ReduceReduce, then broadcast result — all GPUs get the sum
All-GatherEach GPU collects the shards from all GPUs (concatenate)
Reduce-ScatterReduce, then each GPU keeps only its shard of the result

All-reduce is the workhorse of data parallelism: it sums gradients across all GPUs so every GPU ends up with the identical averaged gradient. The crucial efficiency fact is that all-reduce can be implemented as a reduce-scatter followed by an all-gather — the 'ring all-reduce' algorithm — whose communication cost is independent of the number of GPUs, making it scale beautifully.

textRing all-reduce cost
Naive all-reduce: each GPU sends its data to all others → O(P) per GPU

Ring all-reduce:  reduce-scatter + all-gather around a ring
    each GPU sends ≈ 2(P-1)/P × (data size)  ≈ constant (2× data)
    ⇒ cost independent of P, the number of GPUs
PythonCollective operations in PyTorch
import torch; import torch.distributed as dist

# Initialize the process group (one process per GPU)
dist.init_process_group(backend='nccl')
rank = dist.get_rank(); world = dist.get_world_size()

x = torch.ones(4).cuda() * rank        # each GPU has different data

# All-reduce: every GPU ends up with the SUM across all GPUs
dist.all_reduce(x, op=dist.ReduceOp.SUM)
# With 4 GPUs (ranks 0,1,2,3): x becomes [6,6,6,6] on every GPU

# All-gather: collect each GPU's shard into a full tensor
gathered = [torch.zeros(4).cuda() for _ in range(world)]
dist.all_gather(gathered, x)            # every GPU now has all shards

# Reduce-scatter: sum across GPUs, but each keeps only its slice
out = torch.zeros(1).cuda()
dist.reduce_scatter(out, list(x.chunk(world)))
Dist Note: Interconnect Bandwidth Is the Real Constraint
Compute is cheap; moving data between GPUs is expensive. Within a node, GPUs connect via NVLink (~900 GB/s on H100); across nodes, via InfiniBand or Ethernet (~100–400 GB/s). The orders-of-magnitude gap between intra-node and inter-node bandwidth dictates the entire design of distributed training.
The golden rule: keep high-frequency communication (tensor parallelism's per-layer all-reduces) WITHIN a node over fast NVLink, and reserve low-frequency communication (data-parallel gradient sync, pipeline activations) for the slower inter-node links. This mapping of parallelism to hardware topology is the heart of 3D-parallel design.
18.3

Data parallelism is the foundation. The idea is simple: replicate the entire model on every GPU, give each a different slice of the batch, and average the gradients. Because every replica processes different data and then synchronizes gradients, the result is mathematically identical to training on one giant batch on one device — but it runs many times faster.

The Data-Parallel Algorithm

textDistributed Data Parallel (DDP) (Pseudocode)
# Every GPU holds a full copy of the model
replicate model on all P GPUs

each step:
    split the global batch into P micro-batches (one per GPU)
    each GPU: forward + backward on its micro-batch
    all-reduce gradients across GPUs (sum, then divide by P)
    each GPU: optimizer.step()  with the SAME averaged gradient
    # all replicas stay identical because they apply identical updates

Here is how data parallelism lays out across 4 GPUs: each holds the complete model, and they differ only in which batch shard they process. The gradients are synchronized so all GPUs apply the same update.

Device Grid: Data parallelism: full model replicated, batch sharded

GPU 0GPU 1GPU 2GPU 3
Modelfullfullfullfull
Batchshard 0shard 1shard 2shard 3
PythonData parallelism with PyTorch DDP
import torch; import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP

dist.init_process_group('nccl')
local_rank = int(os.environ['LOCAL_RANK']); torch.cuda.set_device(local_rank)

model = GPT(...).cuda()
model = DDP(model, device_ids=[local_rank])   # wraps model; hooks gradients
opt   = torch.optim.AdamW(model.parameters(), lr=3e-4)

for batch in sharded_dataloader:   # each rank gets a different shard
    loss = lm_loss(model, batch.cuda())
    loss.backward()                   # DDP all-reduces grads automatically
    opt.step(); opt.zero_grad()

# DDP overlaps gradient all-reduce WITH the backward pass: as soon as
# a layer's gradients are ready, their all-reduce starts while earlier
# layers are still computing. This hides most communication latency.
Dist Note: DDP Overlaps Communication with Computation
A key DDP optimization: it does not wait for the entire backward pass to finish before synchronizing. As each layer's gradients become available (during backprop), DDP buckets them and launches the all-reduce immediately, so communication for early layers overlaps with computation of later ones.
This overlap is why DDP scales so well: for large models the gradient sync is almost entirely hidden behind the backward pass. The limitation of plain DDP is memory — every GPU holds a full model and full optimizer state — which is exactly what ZeRO addresses next.
18.4

Plain data parallelism is wasteful: every GPU stores a full, identical copy of the model, gradients, and optimizer state. If you have 64 GPUs, you store 64 redundant copies. ZeRO (Zero Redundancy Optimizer; Rajbhandari et al., 2020) eliminates this redundancy by SHARDING the state across GPUs — each GPU holds only its slice — while preserving the simplicity and mathematics of data parallelism.

The Three ZeRO Stages

ZeRO comes in three progressively more aggressive stages, each sharding more of the per-parameter state. Recall from Section 18.1 that the 16 bytes/param split into optimizer state (12), gradients (2), and parameters (2). ZeRO shards them in that order:

StageShardsMemory/GPU (vs DDP)Extra comm.
ZeRO-1Optimizer state16 → 4 + 12/PMinimal
ZeRO-2+ Gradients16 → 2 + 14/PMinimal
ZeRO-3+ Parameters16/P (full shard)All-gather params

ZeRO-1 shards only the optimizer state (the 12 redundant bytes), giving most of the memory savings for almost no extra communication. ZeRO-2 additionally shards gradients. ZeRO-3 shards the parameters themselves, so each GPU holds only 1/P of the model — the full per-parameter memory drops from 16 bytes to 16/P, enabling truly enormous models, at the cost of all-gathering parameters on demand during forward and backward.

textZeRO memory scaling (per GPU, P GPUs)
DDP:     16N bytes        (no sharding)
ZeRO-1:  4N + 12N/P       (optimizer sharded)
ZeRO-2:  2N + 14N/P       (+ gradients sharded)
ZeRO-3:  16N/P            (everything sharded)

For N=7B, P=64:  DDP=112GB,  ZeRO-3 ≈ 1.75GB per GPU
Dist Note: ZeRO-3 Trades Communication for Memory
ZeRO-3's catch: since each GPU holds only a slice of the parameters, it must all-gather the full parameters of each layer just before using them (in both forward and backward), then discard them. This adds communication that ZeRO-1 and ZeRO-2 avoid.
The trade-off is usually worth it: ZeRO-3 makes models fit that otherwise could not, and the extra all-gathers overlap with computation when tuned well. The communication volume of ZeRO-3 is roughly 1.5× that of plain DDP — a modest price for cutting per-GPU memory by a factor of P.
18.5

Fully Sharded Data Parallel (FSDP) is PyTorch's native implementation of ZeRO-3-style parameter sharding. It is the standard way to train large models in PyTorch today, having largely superseded the older approaches for most use cases. Understanding its mechanics makes the ZeRO ideas concrete.

How FSDP Works

textFSDP forward/backward for one layer (Pseudocode)
# Parameters are sharded across GPUs; each holds 1/P
forward(layer):
    all-gather the layer's full parameters from all GPUs
    compute the forward pass
    free the gathered parameters (keep only own shard)

backward(layer):
    all-gather the layer's full parameters again
    compute gradients
    reduce-scatter gradients (each GPU keeps its shard)
    free the gathered parameters
    # optimizer updates only the local shard
PythonWrapping a model with FSDP
import torch; from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
from torch.distributed.fsdp.wrap import transformer_auto_wrap_policy
import functools

model = GPT(...).cuda()

# Wrap each Transformer block as a separate FSDP unit -- so only ONE
# block's parameters are gathered at a time, minimizing peak memory
policy = functools.partial(
    transformer_auto_wrap_policy,
    transformer_layer_cls={TransformerBlock},
)
model = FSDP(
    model,
    auto_wrap_policy=policy,
    sharding_strategy=ShardingStrategy.FULL_SHARD,   # ZeRO-3 style
    mixed_precision=MixedPrecision(param_dtype=torch.bfloat16),
)

# Sharding strategies trade memory for communication:
#   FULL_SHARD   (ZeRO-3): shard params+grads+optimizer  -- least memory
#   SHARD_GRAD_OP (ZeRO-2): shard grads+optimizer only
#   NO_SHARD      (= DDP):  no sharding
#   HYBRID_SHARD: full-shard within a node, replicate across nodes
Effic Note: Wrap at the Block Level
The most important FSDP tuning knob is the wrapping granularity. Wrapping each Transformer block as its own FSDP unit means only one block's parameters are gathered (un-sharded) at any moment — keeping peak memory low. Wrapping the whole model as one unit would gather ALL parameters at once, defeating the purpose.
HYBRID_SHARD is increasingly popular for multi-node training: it fully shards within a node (over fast NVLink) but replicates across nodes (data-parallel over slower inter-node links), matching the communication pattern to the hardware topology of Section 18.2.
18.6

Data parallelism and ZeRO replicate or shard the model across the batch dimension. Tensor parallelism (Shoeybi et al., 2019, 'Megatron-LM') takes a different cut: it splits the computation WITHIN each layer across GPUs. A single matrix multiplication is partitioned so that each GPU computes part of it, and the results are combined. This is essential when even a single layer's activations are too large for one GPU.

Splitting a Linear Layer

Consider a linear layer Y = XW. There are two ways to split W across GPUs. Column parallelism splits W by columns, so each GPU produces a slice of the output. Row parallelism splits W by rows, so each GPU produces a partial sum that must be all-reduced. Megatron cleverly pairs them so that an FFN or attention block needs only one all-reduce.

textTensor-parallel linear layers
Column split:  W = [W₁ | W₂]    Y = X[W₁|W₂] = [XW₁ | XW₂]
    each GPU computes one output slice; concatenate (all-gather)

Row split:     W = [W₁; W₂]    Y = [X₁|X₂][W₁;W₂] = X₁W₁ + X₂W₂
    each GPU computes a partial sum; combine (all-reduce)

Megatron's insight: in the FFN (two linear layers, d→4d→d), make the first layer column-parallel and the second row-parallel. Then each GPU independently computes its slice through both layers, and only ONE all-reduce is needed at the end. The same trick applies to attention by splitting across heads. Here is how the FFN weights split across 2 GPUs:

Device Grid: Tensor-parallel FFN across 2 GPUs (Megatron)

GPU 0GPU 1
W1 (d→4d)cols 0..2dcols 2d..4d
GELUlocallocal
W2 (4d→d)rows 0..2drows 2d..4d
PythonTensor-parallel linear layers (conceptual)
import torch; import torch.distributed as dist

class ColumnParallelLinear:
    """Y = X @ W, with W split by columns across GPUs."""
    def __init__(self, d_in, d_out, tp_size):
        self.W = torch.empty(d_in, d_out // tp_size).cuda()  # local slice
    def forward(self, x):
        return x @ self.W              # (B,T,d_out/tp) local output slice

class RowParallelLinear:
    """Y = X @ W, with W split by rows; output all-reduced."""
    def __init__(self, d_in, d_out, tp_size):
        self.W = torch.empty(d_in // tp_size, d_out).cuda()
    def forward(self, x):  # x is already the local slice
        out = x @ self.W                 # partial sum (B,T,d_out)
        dist.all_reduce(out)             # sum partial results -> full output
        return out

# FFN = RowParallel(GELU(ColumnParallel(x)))
# Column produces local slices, row consumes them and all-reduces ONCE.
# This single all-reduce per FFN/attention block is Megatron's key efficiency.
Dist Note: Tensor Parallelism Lives Inside a Node
Tensor parallelism communicates an all-reduce on every layer, in both forward and backward — the highest-frequency communication of any strategy. This makes it viable ONLY over the fastest interconnect: NVLink within a single node. Tensor-parallel groups are almost always confined to the 8 GPUs of one node.
This is why tensor-parallel degree rarely exceeds 8: it is bounded by the number of NVLink-connected GPUs in a node. Beyond that, the all-reduces would cross slow inter-node links and cripple throughput. Tensor parallelism handles intra-node model splitting; pipeline parallelism handles the inter-node dimension.
18.7

Pipeline parallelism (Huang et al., 2019, 'GPipe') splits the model by LAYERS: GPU 0 holds the first few layers, GPU 1 the next few, and so on. Activations flow forward through the stages and gradients flow backward. Because each GPU holds only a contiguous block of layers, the model can be far larger than any single device — and because stages communicate only at their boundaries, the communication is infrequent and tolerant of slower inter-node links.

The Pipeline Bubble

Pipeline parallelism has a fundamental inefficiency: the bubble. While GPU 0 processes the first micro-batch, GPUs 1–N sit idle waiting for activations. At the end, GPU 0 is idle while the last stages finish. This idle time — the bubble — wastes compute. The fix is micro-batching: split the batch into many micro-batches and feed them through in a staggered pipeline, so all stages stay busy most of the time.

Device Grid: Pipeline parallelism: model layers split across 4 GPUs

GPU 0GPU 1GPU 2GPU 3
Layers1–89–1617–2425–32
textThe pipeline bubble fraction
P pipeline stages, m micro-batches:

bubble fraction = (P - 1) / (m + P - 1)

P=4, m=1:   bubble = 3/4 = 75% wasted!
P=4, m=32:  bubble = 3/35 ≈ 8.6%   (more micro-batches → smaller bubble)

The bubble shrinks as you increase the number of micro-batches m relative to the number of stages P. This is why pipeline parallelism uses many micro-batches. Advanced schedules — GPipe's simple fill-drain, PipeDream's 1F1B (one-forward-one-backward), and interleaved schedules — further reduce the bubble and the activation memory it requires.

PythonPipeline parallelism schedule (1F1B, conceptual)
# 1F1B: once the pipeline is full, alternate one forward and one
# backward per step, keeping all stages busy and bounding memory.

def pipeline_1f1b(stage, micro_batches, n_stages, rank):
    warmup = n_stages - rank - 1       # forwards before first backward
    # Warmup phase: fill the pipeline
    for _ in range(warmup):
        act = stage.forward(recv_from_prev())
        send_to_next(act)
    # Steady state: 1 forward + 1 backward per iteration
    for mb in remaining_micro_batches:
        act  = stage.forward(recv_from_prev()); send_to_next(act)
        grad = stage.backward(recv_grad_from_next()); send_grad_to_prev(grad)
    # Cooldown: drain remaining backwards
    for _ in range(warmup):
        grad = stage.backward(recv_grad_from_next()); send_grad_to_prev(grad)

# 1F1B keeps at most `warmup` activations in memory per stage,
# vs GPipe which holds ALL micro-batches' activations at once.
Dist Note: Pipeline Parallelism Spans Nodes
Pipeline parallelism communicates only at stage boundaries — sending activations forward and gradients backward, once per micro-batch. This is the LOWEST-frequency model-parallel communication, making it the right choice for splitting a model ACROSS nodes over slower inter-node interconnect.
The division of labor is now clear: tensor parallelism splits within a node (high-frequency, needs NVLink), pipeline parallelism splits across nodes (low-frequency, tolerates InfiniBand), and data parallelism replicates across the remaining dimension. Combining all three is 3D parallelism.
18.8

No single parallelism strategy suffices for the largest models. The state of the art combines all three — data, tensor, and pipeline parallelism — into 3D parallelism (Narayanan et al., 2021, 'Megatron-LM' at scale). Each strategy handles the dimension it is best suited to, and together they map a model onto thousands of GPUs efficiently.

The Three Dimensions

DimensionSplitsCommunicationHardware
Tensor (TP)Within each layerPer-layer all-reduceWithin node (NVLink)
Pipeline (PP)Across layer groupsStage boundariesAcross nodes
Data (DP)Across the batchGradient all-reduceOutermost

The total GPU count is the product of the three degrees: total = TP × PP × DP. A typical large-model configuration might use TP=8 (one node), PP=8 (eight nodes form one pipeline), and DP=16 (sixteen such pipelines in parallel) — 8×8×16 = 1,024 GPUs training one model. ZeRO can be layered on the data-parallel dimension to shard its optimizer state too.

textMapping a model onto a cluster
Total GPUs = TP × PP × DP

Example (1024 GPUs, 128 nodes × 8 GPUs):
    TP = 8    each node holds one tensor-parallel group
    PP = 8    8 nodes form one pipeline (the full model)
    DP = 16   16 pipelines train in data-parallel
    ⇒ 8 × 8 × 16 = 1024 GPUs
Dist Note: The Configuration Is a Real Optimization Problem
Choosing TP, PP, and DP degrees for a given model and cluster is a genuine optimization, balancing memory (must fit the model), communication (minimize slow inter-node traffic), and the pipeline bubble (PP should not be too large). Tools like Megatron-LM, DeepSpeed, and the Llama training infrastructure encode heuristics and auto-tuners for this.
The general recipe: set TP to the node size (8), set PP just large enough that the model fits, then fill the remaining GPUs with DP. ZeRO on the DP dimension shards optimizer state across data-parallel replicas. Getting this mapping right is worth large fractions of total throughput — and thus millions of dollars on a frontier run.
18.9

With all the strategies in hand, how do you choose? The decision depends on model size, cluster size, and interconnect. Here is a practical decision guide that captures current best practice.

SituationRecommended strategy
Model fits on one GPUPlain DDP (data parallel)
Model fits, optimizer state too bigZeRO-1 / ZeRO-2 (or FSDP SHARD_GRAD_OP)
Model does not fit on one GPUFSDP / ZeRO-3 (shard everything)
Model too big even for one node+ Tensor parallelism within nodes
Model spans many nodes+ Pipeline parallelism across nodes
Frontier scale (100B+)Full 3D parallelism + ZeRO on DP dim

The Modern Default

For most practitioners training models up to tens of billions of parameters on a single multi-GPU node or a small cluster, FSDP (ZeRO-3) is the pragmatic default: it is built into PyTorch, requires no model surgery, and shards everything. Tensor and pipeline parallelism enter only at the largest scales, where they require deliberate model partitioning via frameworks like Megatron-LM or DeepSpeed.

PythonA complete distributed training launch
# Launch with torchrun across all GPUs on all nodes:
#   torchrun --nnodes=128 --nproc_per_node=8 train.py

import os, torch; import torch.distributed as dist
from torch.distributed.fsdp import FullyShardedDataParallel as FSDP

def main():
    dist.init_process_group('nccl')
    rank = dist.get_rank(); local = int(os.environ['LOCAL_RANK'])
    torch.cuda.set_device(local)

    model = GPT(...).cuda()
    model = FSDP(model, auto_wrap_policy=block_policy,
                 sharding_strategy=ShardingStrategy.HYBRID_SHARD)  # shard in-node, replicate cross-node
    opt = torch.optim.AdamW(model.parameters(), lr=3e-4)

    for step, batch in enumerate(distributed_loader(rank)):
        with torch.autocast('cuda', dtype=torch.bfloat16):
            loss = lm_loss(model, batch.cuda())
        loss.backward()                 # FSDP handles reduce-scatter
        model.clip_grad_norm_(1.0)
        opt.step(); opt.zero_grad()
        if rank == 0 and step % 100 == 0:
            print(f"step {step}: loss {loss.item():.3f}")

    dist.destroy_process_group()
18.10

A frontier training run uses thousands of GPUs for weeks or months. At that scale, hardware failures are not exceptional — they are routine. A GPU fails, a network link flaps, a node crashes. The infrastructure must detect, recover, and continue without losing the run. This operational reality is as important as the parallelism math.

The Scale of Failure

With thousands of GPUs running continuously, the mean time between failures is measured in hours, not months. Meta's LLaMA-3 training (Llama Team, 2024) reported hundreds of interruptions over its run, the majority from hardware faults. A single failed GPU can stall the entire synchronized run, so the system must handle failures gracefully.

ChallengeMitigation
GPU/node failuresFrequent checkpointing; elastic restart from last checkpoint
StragglersDetect slow GPUs; the synchronized step runs at the speed of the slowest
Network congestionTopology-aware placement; overlap comm. with compute
Silent data corruptionChecksums, redundant computation, monitoring for loss anomalies
Checkpoint costAsynchronous and sharded checkpointing to avoid stalling training
Memory fragmentationActivation checkpointing, careful allocator tuning
Dist Note: Checkpointing Is the Lifeline
Just as in Chapter 15, frequent full-state checkpointing is what stands between a hardware failure and a ruined run. At distributed scale, checkpoints are SHARDED (each GPU saves its slice) and often written ASYNCHRONOUSLY (training continues while the checkpoint flushes to storage), because a synchronous full checkpoint of a large model can take minutes — minutes of idle GPUs at thousand-GPU scale is enormously expensive.
Elastic training frameworks (torchrun with --max-restarts, or systems like Meta's) automatically restart from the last checkpoint when a node fails, sometimes with a reduced GPU count, so the run survives the inevitable failures of long, large jobs.
ML Connection: The Hidden Cost of Frontier Training
The compute FLOPs of Chapter 16's 6ND rule tell only part of the story. Real training efficiency — 'Model FLOPs Utilization' (MFU) — is the fraction of theoretical peak compute actually achieved, often only 35–55% even on well-tuned runs. The gap is communication, bubbles, stragglers, and failures.
Pushing MFU higher — through better parallelism mapping, communication overlap, and fault tolerance — directly reduces the cost and time of training. This is why frontier labs invest enormous engineering effort in distributed-training infrastructure: a few percent of MFU on a nine-figure run is a very large sum.
18.11

Parallelism Quick-Reference

StrategySplitsComm. frequencyScope
Data (DDP)BatchPer step (grad)Any
ZeRO / FSDPOptimizer/grad/paramsPer layerAny
TensorWithin layerPer layerIn node
PipelineAcross layersStage boundaryAcross nodes
3DAll threeMixedWhole cluster

Exercises

Exercises 1–10 are pen-and-paper or derivations; 11–20 require code.

Exercise 1: Pen & Paper
Derive the 16-bytes-per-parameter memory rule for mixed-precision AdamW training. List each component and its byte count.
Exercise 2: Pen & Paper
For a 13B model, compute the memory for parameters + optimizer state under DDP, ZeRO-1, ZeRO-2, and ZeRO-3 with P=32 GPUs.
Exercise 3: Pen & Paper
Explain why all-reduce equals reduce-scatter + all-gather, and why ring all-reduce has communication cost independent of the number of GPUs.
Exercise 4: Pen & Paper
Why is plain data parallelism memory-wasteful? Quantify the redundancy for a 7B model on 64 GPUs and explain how ZeRO removes it.
Exercise 5: Derive
Derive the pipeline bubble fraction (P-1)/(m+P-1) for P stages and m micro-batches. Compute it for P=8, m=8 and P=8, m=64.
Exercise 6: Pen & Paper
In Megatron tensor parallelism, why is the FFN made column-parallel then row-parallel? Show that this needs only one all-reduce per FFN.
Exercise 7: Pen & Paper
Explain why tensor parallelism is confined to a single node while pipeline parallelism can span nodes. Connect to interconnect bandwidth.
Exercise 8: Pen & Paper
A cluster has 512 GPUs (64 nodes × 8). Propose TP, PP, and DP degrees for a 70B model and justify the choice.
Exercise 9: Pen & Paper
Define Model FLOPs Utilization (MFU). List four reasons a real run achieves only 40% MFU despite the 6ND FLOP count.
Exercise 10: Pen & Paper
Why does ZeRO-3 add communication that ZeRO-1 does not? Describe the extra all-gather and estimate its relative cost.
Exercise 11: Code
Implement data-parallel training with torch.distributed: manually all-reduce gradients across processes and verify the result matches single-GPU training.
Exercise 12: Code
Implement the five collective operations (broadcast, reduce, all-reduce, all-gather, reduce-scatter) using only point-to-point send/recv, for 4 simulated ranks.
Exercise 13: Code
Implement a ring all-reduce from scratch and verify it produces the correct sum. Measure its communication volume and confirm it is ~2× the data size.
Exercise 14: Code Lab
Wrap a small Transformer in FSDP with block-level auto-wrapping. Compare peak GPU memory against plain DDP on the same model and report the savings.
Exercise 15: Code
Implement column-parallel and row-parallel linear layers. Build a tensor-parallel FFN across 2 simulated GPUs and verify it matches the single-GPU output.
Exercise 16: Code
Simulate the pipeline bubble: model P stages and m micro-batches, count idle vs busy stage-steps, and plot bubble fraction vs m. Confirm the formula.
Exercise 17: Code Lab
Implement a simple 1F1B pipeline schedule for a 2-stage model across 2 processes. Verify the gradients match non-pipelined training.
Exercise 18: Code
Measure communication/computation overlap: train with DDP and compare wall-clock time when gradient all-reduce is overlapped vs done after the full backward.
Exercise 19: Code
Implement sharded asynchronous checkpointing: each rank saves its parameter shard, and verify the model can be reconstructed and resumed correctly.
Exercise 20: Code (Challenge)
Build a mini 2D-parallel trainer combining data parallelism with tensor parallelism (TP=2, DP=2 across 4 processes). Train a small model, verify numerical equivalence to single-GPU training, and report the memory and throughput characteristics of each configuration.

Further reading: “ZeRO: Memory Optimizations Toward Training Trillion Parameter Models” (Rajbhandari et al., 2020). “Megatron-LM: Training Multi-Billion Parameter Language Models Using Model Parallelism” (Shoeybi et al., 2019) and “Efficient Large-Scale Language Model Training on GPU Clusters” (Narayanan et al., 2021) for tensor and 3D parallelism. “GPipe” (Huang et al., 2019) and “PipeDream” (Narayanan et al., 2019) for pipeline schedules. The PyTorch FSDP paper and documentation (Zhao et al., 2023). The DeepSpeed and Megatron-LM codebases. The LLaMA-3 technical report (2024) for a candid account of training infrastructure at scale.


Next → Chapter 19: Architecture Variants

You can now spread a training run across thousands of GPUs. Chapter 19 returns to the model itself, surveying the architectural variants that make large models more capable and more efficient: the modern positional encodings (RoPE, ALiBi) for longer context, normalization and activation refinements (RMSNorm, SwiGLU), attention efficiency variants (multi-query and grouped-query attention) that shrink the memory bottleneck, and the design choices that distinguish GPT, LLaMA, and the other model families. These are the refinements that, layered on the stable Transformer core, define each generation of frontier models.

20 Exercises in this chapter
Attempt each exercise before checking the worked solutions.
View Solutions →