数据并行
Premium理解通信原语和 DDP 的梯度同步机制
Companion Code训练大模型的第一步,是让多块 GPU 一起干活。数据并行(Data Parallelism)是最直觉的方式:每块 GPU 持有完整的模型副本,各自处理不同的数据,最后把梯度汇总。本章我们先理解单卡的内存瓶颈,再学习多卡通信的基础,最后深入 DDP 的实现。
单卡训练的内存组成
一个 7B 参数的模型,fp16 权重只需要 14 GB(7B × 2 bytes),但在 80GB 的 A100 上却训练不了。内存都去哪了?
答案是:训练时 GPU 上不只存参数,还要存梯度和优化器状态。
混合精度训练的内存需求
现代训练普遍采用混合精度(Mixed Precision):前向和反向用 fp16 计算(快),但参数更新用 fp32(精确)。以 Adam 优化器为例,假设模型有 个参数:
fp16 部分(前向/反向):
- 参数: bytes
- 梯度: bytes
fp32 部分(优化器):
- 参数副本: bytes(用于精确更新)
- 一阶矩 : bytes(梯度的指数移动平均)
- 二阶矩 : bytes(梯度平方的移动平均)
总计: bytes
| 组件 | 精度 | 内存(bytes) |
|---|---|---|
| 参数 | fp16 | |
| 梯度 | fp16 | |
| 参数副本 | fp32 | |
| 一阶矩 | fp32 | |
| 二阶矩 | fp32 | |
| 总计 |
为什么需要 fp32 参数副本?
训练时前向和反向都用 fp16 计算(速度快、省显存),但 fp16 精度只有约 3 位有效数字。当学习率很小时,参数 += 学习率 × 梯度 这个更新量可能小到被 fp16 直接舍入为零,模型就学不动了。所以 Adam 在内部维护一份 fp32 精度的参数副本,用 fp32 做更新,再把结果转回 fp16 给下一轮前向使用。
为什么需要 和 ?
Adam 不是简单地"沿着梯度走一步"。它需要维护两个跨步骤的历史统计量:(梯度的指数移动平均,相当于动量)和 (梯度平方的移动平均,用来自适应调节每个参数的学习率)。梯度是"这一步往哪走", 和 是"过去所有步的经验"。
7B 模型的实际内存:
- 参数:
14 GB - 梯度:
14 GB - 优化器状态:
84 GB(参数副本 28GB + 28GB + 28GB) - 总计:约 112 GB(还没算激活值)
一张 A100 只有 80 GB 显存,连模型的训练状态都放不下。
Training Memory per GPU
Mixed Precision + Adam通信原语
既然单卡放不下,就需要多卡协作。分布式训练的核心是 GPU 之间的数据交换。PyTorch 通过 torch.distributed(下文简写为 dist)提供了一组集合通信(Collective Communication)操作。我们只需要掌握四个:
Broadcast
一个节点把数据发送给所有节点。
# Rank 0 持有数据 [A, B, C, D],其他 rank 为空
# broadcast 后:所有 rank 都持有 [A, B, C, D]
dist.broadcast(tensor, src=0)用途:训练开始前,确保所有 GPU 的模型参数完全一致。
All-Reduce
所有节点上的数据先归约(如求和),然后每个节点都拿到完整的归约结果。
# Rank 0: [1, 2] Rank 1: [3, 4] Rank 2: [5, 6] Rank 3: [7, 8]
# all_reduce (SUM) 后:
# 所有 rank: [16, 20]
dist.all_reduce(tensor, op=dist.ReduceOp.SUM)用途:反向传播后,把所有 GPU 上的梯度求平均,让每个 GPU 都拿到相同的结果。本章会详细介绍。
API vs 算法:dist.all_reduce() 是 PyTorch 提供的接口,定义了"做什么"(归约并分发)。具体"怎么做"(用什么算法)由底层通信库(如 NCCL)根据数据大小、GPU 数量、网络拓扑等自动选择。在大规模训练中,NCCL 通常会选择 Ring All-Reduce 算法,因为它带宽最优。后文会详细介绍 Ring All-Reduce 的工作原理。
Reduce-Scatter
先归约,再按块分发。每个节点只拿到结果的一个切片。
# Rank 0: [1, 2] Rank 1: [3, 4]
# reduce_scatter (SUM) 后:
# Rank 0: [4] Rank 1: [6]
# (1+3=4 给 Rank 0, 2+4=6 给 Rank 1)
output = torch.empty(1) # 每个 rank 只接收一个切片
input_list = [rank_0_tensor, rank_1_tensor] # 所有 rank 的完整数据
dist.reduce_scatter(output, input_list, op=dist.ReduceOp.SUM)用途:当每个 GPU 只需要结果的一部分时,归约后直接按块分发,省去存完整副本的内存。后续章节会用到。
All-Gather
每个节点贡献自己的一块数据,收集拼合后所有节点都拿到完整数据。
# Rank 0: [A] Rank 1: [B] Rank 2: [C] Rank 3: [D]
# all_gather 后:
# 所有 rank: [A, B, C, D]
output_list = [torch.empty(1) for _ in range(4)] # 预分配,用于接收所有切片
tensor = local_shard # 当前 rank 自己的那一块
dist.all_gather(output_list, tensor)用途:每个 GPU 只存一部分参数,需要完整参数时再拼回来。后续章节会用到。
Collective Operations
记住这个关系:All-Reduce = Reduce-Scatter + All-Gather。理解了这一点,后面 ZeRO 和 FSDP 的通信模式就清晰了。
DataParallel:最朴素的多卡方案
PyTorch 提供了一个开箱即用的方案:nn.DataParallel,一行代码就能用多张 GPU 训练:
# 模型放到 GPU 0
model = GPT(cfg).to(device)
# 一行搞定多卡:
model = nn.DataParallel(model)
# 训练循环和单卡完全一样
for step in range(50):
x, y = loader.next_batch(device)
_, loss = model(x, y)
loss = loss.mean()
optimizer.zero_grad()
loss.backward()
optimizer.step()为什么需要 loss.mean()?因为 DataParallel 会自动 gather 所有 GPU 的输出到 GPU 0。模型在每张卡上各算出一个 loss 标量,gather 后拼成一个向量(比如 2 张卡就是 shape [2]),需要手动取平均得到最终 loss。
它的工作方式很直觉:每个前向传播时,GPU 0 把模型复制到其他 GPU,把输入 batch 沿 batch 维度均分发过去,各自算完后把输出收回 GPU 0。然后 loss.backward() 和 optimizer.step() 都只在 GPU 0 上执行 — 其他 GPU 的参数不会更新,反正下次 forward 会重新从 GPU 0 复制。
注意数据切分的方式:DataParallel 是"给我全部数据,我来分" — 你喂一个完整 batch 进去,它内部自动沿 batch 维度 split。后面的 DDP 正好相反,是"各自取各自的" — 每个进程独立运行,需要 DistributedSampler 在 DataLoader 层面让各进程拿到不重叠的数据子集。
但这种简单带来了两个问题:
- GPU 0 瓶颈:所有的模型复制、数据切分、结果汇总都经过 GPU 0。其他 GPU 在等 GPU 0 干完活才能开始算,GPU 0 的显存占用也比其他卡高得多。这种 master-worker 的通信模式本质上是 Tree-based Reduce(所有梯度汇聚到 GPU 0,再广播回去),存在带宽瓶颈
- GIL 锁:
DataParallel是单进程多线程实现。Python 的全局解释器锁(GIL)让多线程无法真正并行,性能打了折扣
通信机制对比:DataParallel 使用 CUDA 的点对点通信(不涉及 NCCL),实现方式就是 Tree 结构。而 DDP 使用 NCCL 集合通信库,NCCL 会根据情况选择高效算法(通常是 Ring All-Reduce),实现带宽最优。
简单说,DataParallel 适合快速实验,但不适合正式训练。我们需要一个真正并行的方案。
nn.DataParallel Workflow
Single ProcessDDP 的工作原理
DistributedDataParallel(DDP)用多进程替代了多线程:每个 GPU 运行一个独立的进程,彻底绕开 GIL。GPU 之间不再有"主从"关系,而是通过集合通信对等协作。
数据并行的核心思想很简单:
- 复制模型:每个 GPU 持有完整的模型副本
- 切分数据:把一个 mini-batch 均匀分给 个 GPU
- 独立前向:各 GPU 用自己的数据子集计算前向传播
- 同步梯度:反向传播后,通过
all_reduce将所有 GPU 的梯度求平均 - 独立更新:每个 GPU 用相同的平均梯度更新参数(保证模型一致)
关键在第 4 步。DDP 在 loss.backward() 时自动插入梯度同步:
# 初始化分布式环境
dist.init_process_group(backend="nccl")
rank = dist.get_rank()
device = torch.device(f"cuda:{rank}")
# 每个进程创建自己的模型副本
model = GPT(cfg).to(device)
model = DDP(model, device_ids=[rank])
# 训练循环
for step in range(50):
x, y = loader.next_batch(device)
_, loss = model(x, y)
optimizer.zero_grad()
loss.backward() # DDP 自动 all_reduce 梯度
optimizer.step()就这一行 loss.backward(),DDP 会在反向传播时自动对所有参数的梯度做 all_reduce,把所有 GPU 上的梯度求和再除以 得到平均值。
注意 DDP 需要 DistributedSampler:因为每个进程独立运行,如果直接用普通 DataLoader,所有进程会读到相同的数据。DistributedSampler 确保各进程拿到不重叠的数据子集。
DistributedDataParallel Workflow
Multi-Process, Peer-to-PeerRing All-Reduce:高效的梯度同步
DDP 的梯度同步使用 All-Reduce 通信原语:每个 GPU 持有一份梯度,归约后每个 GPU 都得到所有梯度的总和。
实现 All-Reduce 有多种算法。当使用 NCCL 作为通信后端时,NCCL 会根据数据大小、GPU 数量等因素自动选择最优算法,在大规模训练中通常采用 Ring All-Reduce。
为什么需要 Ring All-Reduce?
朴素实现(Tree-based All-Reduce)的问题:
- 所有 GPU 的数据都发送到主节点 → 主节点成为带宽瓶颈
- 通信复杂度 ,随 GPU 数量线性增长
Ring All-Reduce 的特点:
- 无中心节点,GPU 组成环状拓扑,每个 GPU 只与左右邻居通信
- 充分利用所有 GPU 的带宽,实现带宽最优
- 通信复杂度 ,与 GPU 数量无关
算法流程
Ring All-Reduce 分为两个阶段,每个阶段需要 轮通信( 为 GPU 数量):
-
Scatter-Reduce:每个 GPU 的数据切分成 块,通过环形传递逐步归约。每轮每个 GPU 发送一块给右邻居,从左邻居接收一块并累加。 轮后,每个 GPU 持有一个完整归约的块。
-
All-Gather:继续环形传递,但不再累加,而是直接覆盖。 轮后,所有 GPU 都持有完整的归约结果。
Ring All-Reduce Algorithm
Phase 1: Scatter-Reduce2(N-1)/N × data size ≈ 2× data size. Bandwidth-optimal with no single-GPU bottleneck.通信量分析
假设梯度总大小为 ,有 个 GPU:
- Scatter-Reduce 阶段:每个 GPU 发送 次,每次发送 ,总通信量
- All-Gather 阶段:每个 GPU 发送 次,每次发送 ,总通信量
- 总通信量:
当 很大时,通信量约为 ,与 GPU 数量无关!这就是 Ring All-Reduce 的高效之处。
DDP 的梯度同步通信量是 ,其中 是梯度的总字节数。对于 7B 模型(fp16 梯度),,通信量为 。
梯度同步机制
在 DDP 中,每个 GPU 独立完成前向和反向传播,但需要在参数更新前同步所有 GPU 的梯度。如果等到所有梯度计算完成后再统一通信,会浪费大量时间。为了提高效率,DDP 通过计算-通信重叠来实现梯度同步:当某层的梯度计算完成时,立即发起该层的通信,同时继续计算前面层的梯度。
DDP 使用 autograd hook 系统来实现梯度同步。根据 PyTorch 官方文档:
"DDP registers an autograd hook for each parameter in the model. When the backward pass is run, this hook fires and triggers gradient synchronization across all processes."
工作流程:
- DDP 为每个参数注册 autograd hook
- 反向传播时,当某个参数的梯度计算完成,hook 被触发
- Hook 发起该参数(或 bucket)的 AllReduce 操作
- AllReduce 使用独立的 CUDA 流,与后续计算并行执行
Bucket 机制:为了减少通信次数,DDP 将多个小参数的梯度打包成一个 bucket(默认 25MB),然后一次性做 AllReduce。这样可以显著提高通信效率。
梯度累积与 no_sync:在梯度累积场景中,只有最后一个 micro-batch 需要同步梯度。DDP 提供了 no_sync() 上下文管理器来跳过中间步骤的通信:
accumulation_steps = 4
for step in range(100):
for micro_step in range(accumulation_steps):
x, y = next_batch()
# 前 3 个 micro-batch 不同步梯度
if micro_step < accumulation_steps - 1:
with model.no_sync():
_, loss = model(x, y)
loss.backward()
else:
# 最后一个 micro-batch 才同步
_, loss = model(x, y)
loss.backward()
optimizer.step()
optimizer.zero_grad()DDP 的局限
DDP 解决了"多卡一起训练"的问题,但没有解决内存问题。每个 GPU 仍然需要存储:
- 完整的参数: bytes
- 完整的梯度: bytes
- 完整的优化器状态: bytes
对于 7B 模型,每个 GPU 仍需约 112 GB。4 张 GPU 总共使用 448 GB,其中 336 GB 是重复存储。
这自然引出一个问题:既然 4 个 GPU 上存的东西一模一样,能不能让每个 GPU 只存 ?
这就是下一章 ZeRO 优化器要解决的问题。
总结
本章我们建立了分布式训练的基础:
- 四个通信原语:Broadcast、All-Reduce、Reduce-Scatter、All-Gather 是所有并行策略的构建模块
- DataParallel vs DDP:DP 使用 Tree 结构存在瓶颈,DDP 使用 NCCL 集合通信实现对等协作
- Ring All-Reduce:无中心节点的环形拓扑,通信量 ,实现带宽最优
- 计算-通信重叠:DDP 通过 autograd hook 在梯度计算完成时立即发起通信,与后续计算并行执行
- DDP 的瓶颈:内存冗余严重,每个 GPU 重复存储完整的参数、梯度和优化器状态
在下一章,我们将探讨 ZeRO 优化器,看它如何通过渐进式分片消除这些冗余。
Log in to continue reading
This is premium content. Please log in to access the full article.
CookLLM Docs