数据并行 会员专享理解通信原语和 DDP 的梯度同步机制
配套代码 训练大模型的第一步,是让多块 GPU 一起干活。数据并行(Data Parallelism)是最直觉的方式:每块 GPU 持有完整的模型副本,各自处理不同的数据,最后把梯度汇总。本章我们先理解单卡的内存瓶颈,再学习多卡通信的基础,最后深入 DDP 的实现。
一个 7B 参数的模型,fp16 权重只需要 14 GB(7B × 2 bytes),但在 80GB 的 A100 上却训练不了。内存都去哪了?
答案是:训练时 GPU 上不只存参数,还要存梯度 和优化器状态 。
现代训练普遍采用混合精度(Mixed Precision):前向和反向用 fp16 计算(快),但参数更新用 fp32(精确)。以 Adam 优化器为例,假设模型有 Φ \Phi Φ 个参数:
fp16 部分 (前向/反向):
参数:2 Φ 2\Phi 2Φ bytes
梯度:2 Φ 2\Phi 2Φ bytes
fp32 部分 (优化器):
登录以继续阅读 这是一篇付费内容,请登录您的账户以访问完整内容。
参数副本:4 Φ 4\Phi 4Φ bytes(用于精确更新)
一阶矩 m m m :4 Φ 4\Phi 4Φ bytes(梯度的指数移动平均)
二阶矩 v v v :4 Φ 4\Phi 4Φ bytes(梯度平方的移动平均)
总计 :2 Φ + 2 Φ + 4 Φ + 4 Φ + 4 Φ = 16 Φ 2\Phi + 2\Phi + 4\Phi + 4\Phi + 4\Phi = 16\Phi 2Φ + 2Φ + 4Φ + 4Φ + 4Φ = 16Φ bytes
组件 精度 内存(bytes) 参数 fp16 2 Φ 2\Phi 2Φ 梯度 fp16 2 Φ 2\Phi 2Φ 参数副本 fp32 4 Φ 4\Phi 4Φ 一阶矩 m m m fp32 4 Φ 4\Phi 4Φ 二阶矩 v v v
为什么需要 fp32 参数副本?
训练时前向和反向都用 fp16 计算(速度快、省显存),但 fp16 精度只有约 3 位有效数字。当学习率很小时,参数 += 学习率 × 梯度 这个更新量可能小到被 fp16 直接舍入为零,模型就学不动了。所以 Adam 在内部维护一份 fp32 精度的参数副本,用 fp32 做更新,再把结果转回 fp16 给下一轮前向使用。
为什么需要 m m m 和 v v v ?
Adam 不是简单地"沿着梯度走一步"。它需要维护两个跨步骤的历史统计量:m m m (梯度的指数移动平均,相当于动量)和 v v v (梯度平方的移动平均,用来自适应调节每个参数的学习率)。梯度是"这一步往哪走",m m m 和 是"过去所有步的经验"。
参数:14 GB
梯度:14 GB
优化器状态:84 GB(参数副本 28GB + m m m 28GB + v v v 28GB)
总计:约 112 GB (还没算激活值)
一张 A100 只有 80 GB 显存,连模型的训练状态都放不下。
Training Memory per GPU Mixed Precision + Adam Total = 16Φ bytes
Optimizer States(fp32 params + m + v) Example: 7B Model (Φ = 7×10⁹)
Optimizer States 84 GB
Gradients 14 GB
Parameters 14 GB
Total 112 GB
既然单卡放不下,就需要多卡协作。分布式训练的核心是 GPU 之间的数据交换。PyTorch 通过 torch.distributed(下文简写为 dist)提供了一组集合通信 (Collective Communication)操作。我们只需要掌握四个:
# Rank 0 持有数据 [A, B, C, D],其他 rank 为空
# broadcast 后:所有 rank 都持有 [A, B, C, D]
dist.broadcast(tensor, src = 0 ) 用途 :训练开始前,确保所有 GPU 的模型参数完全一致。
所有节点上的数据先归约(如求和),然后每个节点都拿到完整的归约结果。
# Rank 0: [1, 2] Rank 1: [3, 4] Rank 2: [5, 6] Rank 3: [7, 8]
# all_reduce (SUM) 后:
# 所有 rank: [16, 20]
dist.all_reduce(tensor, op = dist.ReduceOp. SUM ) 用途 :反向传播后,把所有 GPU 上的梯度求平均,让每个 GPU 都拿到相同的结果。本章会详细介绍。
API vs 算法 :dist.all_reduce() 是 PyTorch 提供的接口,定义了"做什么"(归约并分发)。具体"怎么做"(用什么算法)由底层通信库(如 NCCL)根据数据大小、GPU 数量、网络拓扑等自动选择。在大规模训练中,NCCL 通常会选择 Ring All-Reduce 算法,因为它带宽最优。后文会详细介绍 Ring All-Reduce 的工作原理。
先归约,再按块分发。每个节点只拿到结果的一个切片。
# Rank 0: [1, 2] Rank 1: [3, 4]
# reduce_scatter (SUM) 后:
# Rank 0: [4] Rank 1: [6]
# (1+3=4 给 Rank 0, 2+4=6 给 Rank 1)
output = torch.empty( 1 ) # 每个 rank 只接收一个切片
input_list = [rank_0_tensor, rank_1_tensor] # 所有 rank 的完整数据
dist.reduce_scatter(output, input_list, op = 用途 :当每个 GPU 只需要结果的一部分时,归约后直接按块分发,省去存完整副本的内存。后续章节会用到。
每个节点贡献自己的一块数据,收集拼合后所有节点都拿到完整数据。
# Rank 0: [A] Rank 1: [B] Rank 2: [C] Rank 3: [D]
# all_gather 后:
# 所有 rank: [A, B, C, D]
output_list = [torch.empty( 1 ) for _ in range ( 4 )] # 预分配,用于接收所有切片
tensor = local_shard
用途 :每个 GPU 只存一部分参数,需要完整参数时再拼回来。后续章节会用到。
Collective Operations Broadcast All-Reduce Reduce-Scatter All-Gather
Rank 0 sends its data to all other ranks.
记住这个关系:All-Reduce = Reduce-Scatter + All-Gather。理解了这一点,后面 ZeRO 和 FSDP 的通信模式就清晰了。
PyTorch 提供了一个开箱即用的方案:nn.DataParallel,一行代码就能用多张 GPU 训练:
systems/distributed_training/00_data_parallel.py # 模型放到 GPU 0
model = GPT(cfg).to(device)
# 一行搞定多卡:
model = nn.DataParallel(model)
# 训练循环和单卡完全一样
for step in range ( 50 ):
x, y =
为什么需要 loss.mean()?因为 DataParallel 会自动 gather 所有 GPU 的输出到 GPU 0。模型在每张卡上各算出一个 loss 标量,gather 后拼成一个向量(比如 2 张卡就是 shape [2]),需要手动取平均得到最终 loss。
它的工作方式很直觉:每个前向传播时,GPU 0 把模型复制到其他 GPU,把输入 batch 沿 batch 维度均分发过去,各自算完后把输出收回 GPU 0。然后 loss.backward() 和 optimizer.step() 都只在 GPU 0 上执行 — 其他 GPU 的参数不会更新,反正下次 forward 会重新从 GPU 0 复制。
注意数据切分的方式:DataParallel 是"给我全部数据,我来分" — 你喂一个完整 batch 进去,它内部自动沿 batch 维度 split。后面的 DDP 正好相反,是"各自取各自的" — 每个进程独立运行,需要 DistributedSampler 在 DataLoader 层面让各进程拿到不重叠的数据子集。
GPU 0 瓶颈 :所有的模型复制、数据切分、结果汇总都经过 GPU 0。其他 GPU 在等 GPU 0 干完活才能开始算,GPU 0 的显存占用也比其他卡高得多。这种 master-worker 的通信模式本质上是 Tree-based Reduce (所有梯度汇聚到 GPU 0,再广播回去),存在带宽瓶颈
GIL 锁 :DataParallel 是单进程多线程实现。Python 的全局解释器锁(GIL)让多线程无法真正并行,性能打了折扣
通信机制对比 :DataParallel 使用 CUDA 的点对点通信(不涉及 NCCL),实现方式就是 Tree 结构。而 DDP 使用 NCCL 集合通信库,NCCL 会根据情况选择高效算法(通常是 Ring All-Reduce),实现带宽最优。
简单说,DataParallel 适合快速实验,但不适合正式训练。我们需要一个真正并行的方案。
nn.DataParallel Workflow Single Process 1. Scatter 2. Forward 3. Gather
GPU 0 splits batch & copies model to all GPUs
Bottleneck GPU 0 handles scatter & gather (2 of 3 steps). Other GPUs sit idle waiting. Backward & optimizer step also run only on GPU 0.
DistributedDataParallel(DDP)用多进程替代了多线程:每个 GPU 运行一个独立的进程,彻底绕开 GIL。GPU 之间不再有"主从"关系,而是通过集合通信对等协作。
复制模型 :每个 GPU 持有完整的模型副本
切分数据 :把一个 mini-batch 均匀分给 N N N 个 GPU
独立前向 :各 GPU 用自己的数据子集计算前向传播
同步梯度 :反向传播后,通过 all_reduce 将所有 GPU 的梯度求平均
独立更新 :每个 GPU 用相同的平均梯度更新参数(保证模型一致)
关键在第 4 步。DDP 在 loss.backward() 时自动插入梯度同步:
systems/distributed_training/01_ddp.py # 初始化分布式环境
dist.init_process_group( backend = "nccl" )
rank = dist.get_rank()
device = torch.device( f "cuda: { rank } " )
# 每个进程创建自己的模型副本
就这一行 loss.backward(),DDP 会在反向传播时自动对所有参数的梯度做 all_reduce,把所有 GPU 上的梯度求和再除以 N N N 得到平均值。
注意 DDP 需要 DistributedSampler:因为每个进程独立运行,如果直接用普通 DataLoader,所有进程会读到相同的数据。DistributedSampler 确保各进程拿到不重叠的数据子集。
DistributedDataParallel Workflow Multi-Process, Peer-to-Peer 1. Data 2. Forward 3. Backward 4. All-Reduce 5. Update
DistributedSampler ensures each rank gets different data
Each process loads different data via DistributedSampler
Advantages All ranks are peers (no bottleneck). Multi-process bypasses GIL. Each rank independently updates parameters with synchronized gradients.
DDP 的梯度同步使用 All-Reduce 通信原语:每个 GPU 持有一份梯度,归约后每个 GPU 都得到所有梯度的总和。
实现 All-Reduce 有多种算法。当使用 NCCL 作为通信后端时,NCCL 会根据数据大小、GPU 数量等因素自动选择最优算法,在大规模训练中通常采用 Ring All-Reduce 。
朴素实现(Tree-based All-Reduce)的问题:
所有 GPU 的数据都发送到主节点 → 主节点成为带宽瓶颈
通信复杂度 O ( N ) O(N) O ( N ) ,随 GPU 数量线性增长
无中心节点,GPU 组成环状拓扑,每个 GPU 只与左右邻居通信
充分利用所有 GPU 的带宽,实现带宽最优
通信复杂度 O ( 1 ) O(1) O ( 1 ) ,与 GPU 数量无关
Ring All-Reduce 分为两个阶段,每个阶段需要 N − 1 N-1 N − 1 轮通信(N N N 为 GPU 数量):
Scatter-Reduce :每个 GPU 的数据切分成 N N N 块,通过环形传递逐步归约。每轮每个 GPU 发送一块给右邻居,从左邻居接收一块并累加。N − 1 N-1 N − 1 轮后,每个 GPU 持有一个完整归约的块。
All-Gather :继续环形传递,但不再累加,而是直接覆盖。N − 1 N-1 N − 1 轮后,所有 GPU 都持有完整的归约结果。
Ring All-Reduce Algorithm Phase 1: Scatter-Reduce Initial state: each GPU holds 4 chunks of its gradient
Efficiency Total communication: 2(N-1)/N × data size ≈ 2× data size. Bandwidth-optimal with no single-GPU bottleneck.
假设梯度总大小为 S S S ,有 N N N 个 GPU:
Scatter-Reduce 阶段 :每个 GPU 发送 ( N − 1 ) (N-1) ( N − 1 ) 次,每次发送 S N \frac{S}{N} N S ,总通信量 ( N − 1 ) ⋅ S N \frac{(N-1) \cdot S}{N}
当 N N N 很大时,通信量约为 2 S 2S 2 S ,与 GPU 数量无关!这就是 Ring All-Reduce 的高效之处。
DDP 的梯度同步通信量是 2 ∣ grad ∣ 2|\text{grad}| 2∣ grad ∣ ,其中 ∣ grad ∣ |\text{grad}| ∣ grad ∣ 是梯度的总字节数。对于 7B 模型(fp16 梯度),∣ grad ∣ = 2 × 7 × 10 9 = 14 GB |\text{grad}| = 2 \times 7 \times 10^9 = 14\text{GB} ∣ grad ∣ = 2 × 7 ,通信量为 。
在 DDP 中,每个 GPU 独立完成前向和反向传播,但需要在参数更新前同步所有 GPU 的梯度。如果等到所有梯度计算完成后再统一通信,会浪费大量时间。为了提高效率,DDP 通过计算-通信重叠 来实现梯度同步:当某层的梯度计算完成时,立即发起该层的通信,同时继续计算前面层的梯度。
"DDP registers an autograd hook for each parameter in the model. When the backward pass is run, this hook fires and triggers gradient synchronization across all processes."
DDP 为每个参数注册 autograd hook
反向传播时,当某个参数的梯度计算完成,hook 被触发
Hook 发起该参数(或 bucket)的 AllReduce 操作
AllReduce 使用独立的 CUDA 流,与后续计算并行执行
Bucket 机制 :为了减少通信次数,DDP 将多个小参数的梯度打包成一个 bucket(默认 25MB),然后一次性做 AllReduce。这样可以显著提高通信效率。
梯度累积与 no_sync :在梯度累积场景中,只有最后一个 micro-batch 需要同步梯度。DDP 提供了 no_sync() 上下文管理器来跳过中间步骤的通信:
accumulation_steps = 4
for step in range ( 100 ):
DDP 解决了"多卡一起训练"的问题,但没有解决内存问题。每个 GPU 仍然需要存储:
完整的参数 :2 Φ 2\Phi 2Φ bytes
完整的梯度 :2 Φ 2\Phi 2Φ bytes
完整的优化器状态 :12 Φ 12\Phi 12Φ bytes
对于 7B 模型,每个 GPU 仍需约 112 GB。4 张 GPU 总共使用 448 GB,其中 336 GB 是重复存储。
这自然引出一个问题:既然 4 个 GPU 上存的东西一模一样,能不能让每个 GPU 只存 1 / 4 1/4 1/4 ?
四个通信原语 :Broadcast、All-Reduce、Reduce-Scatter、All-Gather 是所有并行策略的构建模块
DataParallel vs DDP :DP 使用 Tree 结构存在瓶颈,DDP 使用 NCCL 集合通信实现对等协作
Ring All-Reduce :无中心节点的环形拓扑,通信量 O ( 1 ) O(1) O ( 1 ) ,实现带宽最优
计算-通信重叠 :DDP 通过 autograd hook 在梯度计算完成时立即发起通信,与后续计算并行执行
DDP 的瓶颈 :内存冗余严重,每个 GPU 重复存储完整的参数、梯度和优化器状态
在下一章,我们将探讨 ZeRO 优化器 ,看它如何通过渐进式分片消除这些冗余。
v v v
dist.ReduceOp.
SUM
)
# 当前 rank 自己的那一块
dist.all_gather(output_list, tensor)
loader.next_batch(device)
_, loss = model(x, y)
loss = loss.mean()
optimizer.zero_grad()
loss.backward()
optimizer.step()
model = GPT(cfg).to(device)
model = DDP(model, device_ids = [rank])
# 训练循环
for step in range ( 50 ):
x, y = loader.next_batch(device)
_, loss = model(x, y)
optimizer.zero_grad()
loss.backward() # DDP 自动 all_reduce 梯度
optimizer.step()
N
( N − 1 ) ⋅ S
All-Gather 阶段 :每个 GPU 发送 ( N − 1 ) (N-1) ( N − 1 ) 次,每次发送 S N \frac{S}{N} N S ,总通信量 ( N − 1 ) ⋅ S N \frac{(N-1) \cdot S}{N} 总通信量 :2 ( N − 1 ) ⋅ S N \frac{2(N-1) \cdot S}{N} N 2 ( N − 1 ) ⋅ S
×
1 0 9 =
14 GB
for micro_step in range (accumulation_steps):
x, y = next_batch()
# 前 3 个 micro-batch 不同步梯度
if micro_step < accumulation_steps - 1 :
with model.no_sync():
_, loss = model(x, y)
loss.backward()
else :
# 最后一个 micro-batch 才同步
_, loss = model(x, y)
loss.backward()
optimizer.step()
optimizer.zero_grad()
N
( N − 1 ) ⋅ S