PyTorch实战:分布式训练

当前深度学习模型训练正在朝着大模型的趋势发展,这里包括海量的训练数据(预训练)以及亿级别的模型参数,而我们单张显卡的计算与存储资源是相对有限的,因此需要针对大规模数据进行分布式训练,甚至还要对模型进行并行化处理(有可能模型的参数多到 batch_size=1 的数据都无法喂入)。尽管分布式训练听起来如此高大上,但请记住一点,分布式会产生通信开销,所以深度学习框架与底层硬件的分布式技术都是在 GPU 的计算资源、显存资源与带宽资源上做文章。

集合通信

在展开分布各种分布式训练技术细节之前,我们先统一一些基础的概念,也就是分布式技术中涉及到的集合通信原语:

  • Broadcast(广播):当一台服务器计算完成了自己部分的参数数据,在分布式训练中想要把自己这部分数据同时发送给其他所有服务器,那么这种操作方式就叫做广播
  • Scatter(散射):当一台服务器计算完成自己部分的参数数据,但是因为有时候服务器上全部的参数数据过大,于是我们想要把这台服务器上的数据切分成几个同等大小的数据块(buffer),再按照序列(rank index)向其他服务器发送其中的一个数据块,这就叫散射
  • Gather(聚集):当服务器都做了散射之后,每个服务器获得了其他服务器的一个数据块,我们将一台服务器获得的数据块拼接在一起的操作就叫做聚集
  • AllGather(全聚集):所有的服务器都将自己收到的数据块拼接在一起(都做聚集的操作),那么就是全聚集
  • Reduce(规约):当所有服务器都做广播或散射的时候,我们作为接收方的服务器收到各服务器发来的数据,我们将这些收到的数据进行某种规约的操作(常见如求和,求最大值)后再存入自己服务器内存中,那么这就叫规约
  • AllReduce(全规约):对所有服务器上的数据做一次规约操作,那么就是全规约

在深度学习框架下,一张 GPU 就是上述的一台服务器,而 GPU 的底层计算架构(如CUDA)必须实现一定量的上述集合通信原语才能够支撑分布式的训练。

数据并行

当将神经网络的训练并行化到多个 GPU 上时,我们关注一种称为数据并行随机梯度下降(SGD)的技术。在数据并行训练中,每个 GPU 都有整个神经网络模型的完整副本;对于每次迭代,每个 GPU 在其数据上运行网络的前向传播,随后进行误差反向传播,以计算损耗相对于网络参数的梯度;最后,GPU 相互通信以平均由不同 GPU 计算的梯度,将平均梯度应用于参数的更新。其效果与在单个 GPU 上执行 SGD 是一致的,但是我们通过在多个 GPU 之间分发数据与并行执行计算实现了训练的加速。

在上述的基本原理中,汇总各 GPU 上的梯度求平均的过程就需要 AllReduce 集合通信原语来支撑,当模型有数亿个参数时,相应的梯度也需要几亿字节的空间,如果此时正在协调几十个 GPU 进行训练,实现 AllReduce 的通信机制就变得至关重要了。

DP 模式

Data Parallel 模式是最简单的一种单机多卡实践模式,其底层的分布式通信采用了参数服务器(Parameter Server)框架,以一台单机四卡的 GPU 服务器为例,假设 GPU-0(后面我们称其为主卡)被用作参数服务器,它负责将输入的 batch 向其他卡进行切分,并将网络结构向每张卡进行复制,同时存储着网络模型的参数。训练过程中,每张卡独立的进行前向传播,计算 loss,主卡会收集每张卡上的 loss 并以一个向量的形式返回;反向传播时,每张卡上的梯度都会与主卡进行通信与传输,主卡收集梯度进行归约(reduce),再进行参数更新,并将更新后的参数发送给每张卡。这种分布式通信机制又称为 Tree Allreduce(拓扑结构像一棵树)。

这种模式在实现时非常简单,仅仅需要两行代码就可以搞定,需要注意的一点是执行 loss 的反向传播前,要对 loss 求均值:

1
2
3
model = torch.nn.DataParallel(model, device_ids[0,1,2,3])
...
loss.mean().backward()

另外,在推理时加载 DP 模式的模型需要加入 .module 方可取到模型,因为此时的模型是 DP 模型(也可以在保存时只保存 .module 部分):
1
model = torch.load(model_path).module

可以看出,DP 模式存在负载均衡与通信方式的问题,主卡要负责对所有梯度的归约以及参数分发,这就造成了主卡的资源占用率过高,且参数更新依赖于主卡的通信传输,这在模型较大的情况下会严重降低速度。

DDP 模式

Distributed Data Parallel 模式不仅可以实现单机多卡的并行化,还可以实现多机多卡的并行化。该模式采用的分布式通信机制称为 Ring Allreduce(拓扑结构是无向环),其特点是网络单次通信量是一个恒定值,不会随着 GPU 数量的增加而增加。Ring Allreduce 机制将 GPU 集群组织成一个逻辑环,每个 GPU 只从其左邻居接收数据并发送给其右邻居,即每一次同步,每个 GPU 只获得最终结果的一个块(部分梯度更新),待走完一个完整的环,每个 GPU 就都获得了最终结果(完整的参数)。关于 Ring Allreduce 更详细的介绍可以参考简书上的这篇博文

我们还需要指出的一点是,在 DP 模式中,其参数服务器的设计理念加之受 Python GIL 的限制,使得只有一个进程来控制多个 GPU,这也是性能瓶颈的一部分。而 DDP 模式中已经没有了所谓主卡的概念,会启动多个进程,一个进程控制一个 GPU,它们都执行相同的任务,从一定程度上缓解了 GIL 带来的性能开销。

下面,我们需要澄清一些 DDP 中的常用术语的概念,这里假设我们有 2 个计算节点,每个节点有 8 个 GPU,即 2 机 8 卡,并行数为 16:

  • group:进程组,默认情况下只有一个组
  • world_size:全局的并行数,本例中该值为 16
    1
    torch.distributed.get_world_size() # 不同进程里都是一样的,其值为 16
  • rank:当前进程的序号,用于进程间的通信,本例中取值为 0,1,2,…,15。注意:rank=0 的进程可以看做是“master进程”
    1
    torch.distributed.get_rank() # 每个进程都有自己的序号,各不相同
  • local_rank:每个计算节点上的进程序号,本例中取值为 0,1,2,…,7
    1
    torch.distributed.local_rank() # 一般情况下,需要用来手动设置当前模型是跑在当前机器的哪块GPU上面

DDP 模式的代码相对 DP 模式多一些,首先要进行一番环境准备工作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import torch,argparse
# 新增1:依赖
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP

# 新增2:从外面得到local_rank参数,在调用DDP的时候,其会自动给出这个参数
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", default=-1, type=int)
FLAGS = parser.parse_args()
local_rank = FLAGS.local_rank

# 新增3:DDP backend初始化
# a.根据local_rank来设定当前使用哪块GPU
torch.cuda.set_device(local_rank)
# b.初始化DDP,使用默认backend(nccl)就行
dist.init_process_group(backend='nccl')

# 新增4:定义并把模型放置到单独的GPU上
device = torch.device("cuda", local_rank)
model = nn.Linear(10, 10).to(device)

# 新增5:之后才是初始化DDP模型
model = DDP(model, device_ids=[local_rank], output_device=local_rank)

环境准备的关键是 init_process_group 这一步,各个进程会在这一步与 “master进程” 进行握手,建立连接,如果连接上的进程数量不足约定的 word_size,进程会一直等待。在初始化组的过程中,需要指定 dist_backend 参数设置后端的通信模式,一般 NV 的显卡默认是 “nccl”;另外,如果是多机多卡还需要传入 init_method 参数用以设置通信主机的地址与端口(如使用 TCP 通信方式来分享信息,则设置:tcp://192.168.10.103:23456);local_rank 指定每个节点内启动的进程编号,以及 global_rank 指定显卡的全局编号。

接下来最重要的是数据的并行化,DDP 模式不同于 DP 模式,它同时启动了多个进程,但是这些进程使用的都是同一份训练数据,那么就会有数据上的冗余性,我们需要对 DataLoader 做一些改造,这里用到了一个特殊的 sampler,来使得各个进程上的数据各不相同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 新增1:使用 DistributedSampler
train_sampler = torch.utils.data.distributed.DistributedSampler(my_trainset)
# 需要注意的是,这里的 batch_size 指的是每个进程下的 batch_size
trainloader = torch.utils.data.DataLoader(my_trainset, batch_size=batch_size, sampler=train_sampler)

for epoch in range(num_epochs):
# 新增2:设置 sampler 的 epoch,DistributedSampler 需要这个来维持各个进程之间的相同随机数种子
trainloader.sampler.set_epoch(epoch)
# 后面这部分,则与单机单卡的写法一模一样
for data, label in trainloader:
prediction = model(data)
loss = loss_fn(prediction, label)
loss.backward()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.step()

模型的保存也需要进行改造,一方面保存的是 model.module,因为 model 已经变成了 DDP model 了,另一方面,只需要在某一个进程中保存就行了:

1
2
3
# 只需要在进程 0 上保存一次就行了,避免多次保存重复的东西
if dist.get_rank() == 0:
torch.save(model.module, "saved_model.ckpt")

最后,我们需要用 torch.distributed.launch 来启动训练,而且需要传入一些重要的参数:

  • —nnodes:有多少台机器
  • —node_rank:当前是哪台机器
  • —nproc_per_node:每台机器有多少个进程
  • —master_addr:当进行多机多卡训练时,需要指明主节点的地址
  • —master_port:对应主节点的端口
  • —backend:通信后端,可选的包括:nccl(NVIDIA推出)、gloo(Facebook推出)、mpi(OpenMPI)

单机多卡模式,假设我们只在一台机器上运行,可用卡数是 8,则执行命令:

1
2
3
4
python -m torch.distributed.launch --nproc_per_node 8 main.py

# 假设我们只用4,5,6,7号卡
CUDA_VISIBLE_DEVICES="4,5,6,7" python -m torch.distributed.launch --nproc_per_node 4 main.py

多机多卡模式,假设我们有 2 个计算节点,每个节点上有 8 张卡,则执行命令:

1
2
3
4
5
6
# 机器1:
python -m torch.distributed.launch --nnodes=2 --node_rank=0 --nproc_per_node 8 \
--master_adderss $my_address --master_port $my_port main.py
# 机器2:
python -m torch.distributed.launch --nnodes=2 --node_rank=1 --nproc_per_node 8 \
--master_adderss $my_address --master_port $my_port main.py

很显然上述执行 DDP 模式训练的命令行与单卡训练时有很大的区别(DP 模式则不存在此问题),所以 PyTorch 引入了 torch.multiprocessing.spawn,可以使得单卡、DDP 下的外部调用一致:

1
2
3
4
5
6
7
8
9
10
def main(world_size, ngpus_per_node):
mp.spawn(main_worker,
args=(world_size,),
nprocs=ngpus_per_node,
join=True)

def main_worker(rank, world_size):
dist.init_process_group("nccl", rank=rank, world_size=world_size)
# training process ...
...

该代码结构可以抽象为 主进程main函数+子进程main_worker函数,在主进程中用 spawn 函数启动每个子节点的子进程,而在子进程中需要对全部的进程分入一个 group 并初始化。

上述的 DDP 实践过程需要我们可以实际操作物理主机,然而我们现在的很多深度学习训练平台并不向用户开放操作物理主机的权限,它们往往以 docker 镜像的方式将训练环境分发到需要的计算节点上,训练结束后在讲模型从 docker 中拷贝出来,这里我们给出一种基于此类环境的开发范式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# 主进程
def main():
parser = argparse.ArgumentParser()
parser.add_argument(...) # 增加需要解析的参数
args = parser.parse_args()

ngpus_per_node = torch.cuda.device_count() # 通过代码获取每个节点的GPU个数
args.world_size = ngpus_per_node * args.word_size # 计算进程总数(此例中从平台传入的word_size是节点数),计算法方法因平台而异

# 在主进程中通过torch多进程spawn启动多个子进程
# nprocs为每个节点启动的进程数,args为向子进程函数中传入的参数
mp.spawn(train, nprocs=ngpus_per_node, args=(ngpus_per_node, args))

# 仅在0号节点上获取模型
if args.rank == 0:
copy_model(src_path, dest_path)

# 子进程
def train(local_rank, ngpus_per_node, args):
## local rank参数为主进程中mp.spawn自动传入的,需要放在第一个位置接收
global_rank = args.rank * (ngpus_per_node) + local_rank

## 初始化进程组,需要使用DDP的六要素
dist.init_process_group(backend=args.dist_backend, init_method=args.init_method,
world_size=args.world_size, rank=global_rank)

## 计算每个进程的batch_size,这里默认batch_size为所有进程的,比如你想让每个进程的batch_size为1,而你有12个8卡的机器,那么你的初始batch_size需要设为96(因平台而异)
args.train_batch_size = int(config.batch_size / args.world_size)

## 锁定模型随机种子,保证每个进程的模型初始化相同
random.seed(args.seed)
np.random.seed(args.seed)
torch.manual_seed(args.seed)
torch.cuda.manual_seed_all(args.seed)

# 设置device
torch.cuda.set_device(local_rank)
device = torch.device("cuda", local_rank)

# 对数据进行distributed sampler,保证每个进程采出的数据不一样
train_dataset = train_dataset.to(device)
train_sampler = DistributedSampler(train_dataset)
train_dataloader = DataLoader(train_dataset, sampler=train_sampler, batch_size=args.train_batch_size)

# 初始化DDP模型,模型分布在不同GPU上
model = MyModel().to(device)
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], find_unused_parameters=True)

# 迭代训练
for e in range(int(args.num_train_epochs)):
train_sampler.set_epoch(e) # 保证每个epoch启动相同的random
for step, batch in enumerate(train_dataloader):
...
...
# 在global_rank = 0处保存模型即可(可以所有epoch迭代完再保存)
if global_rank == 0:
save(model, args.output_dir, str(e + 1))

可以看到, PyTorch 提供了数据并行的原生方法,所以其实践应该说是比较简单的,当我们面临不是很大的模型(即单个 GPU 能够存储完整的模型),且训练数据是海量的,在这样的场景下数据并行的训练不失为一个最佳实践方案。

但是我们也看到,近些年来超级大模型层出不穷,比如 GPT-3 模型的参数量达到 1750 亿,像这样的大模型根本无法在一张显卡上完整地存储,那么我们就要考虑对模型也进行并行化处理。

模型并行

当一个神经网络模型的参数多到无法在单一 GPU 上保存和计算的时候,我们关注一种将模型进行切分的训练模式,即模型在 worker 之间进行划分,以便每个 worker 仅对模型参数的一个子集进行评估和更新,而每部分模型的激活和梯度是需要跨机器通信的:

  • 层间模型并行:会在多个 worker 之间划分模型的各个层
  • 层内模型并行:把每层的模型参数切分到多个设备。层内模型并行在有的论文里被叫做 “Tensor 级别的模型并行” ,是对某一层的参数(矩阵) Tensor 切分(基于矩阵乘法原理按行/列拆分),从而将大的模型 Tensor 分成多个相对较小的 Tensor 进行并行计算

比较遗憾的是,目前的深度学习框架并没有提供对模型并行的原生支持,对模型的拆分需要我们手动构建传输过程,相当于直接对物理编程,所以对分布式使用的门槛更高。

我们从包含两个线性层的玩具模型(toy model)开始。要在两个 GPU 上运行此模型,只需将每个线性层放在不同的 GPU 上,然后移动输入(input)和中间输出(intermediate outputs)以匹配层设备(layer devices)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import torch
import torch.nn as nn
import torch.optim as optim

class ToyModel(nn.Module):
def __init__(self):
super(ToyModel, self).__init__()
self.net1 = torch.nn.Linear(10, 10).to('cuda:0') # 将net1放置在第1个GPU上
self.relu = torch.nn.ReLU()
self.net2 = torch.nn.Linear(10, 5).to('cuda:1') # 将net2放置在第2个GPU上

def forward(self, x):
x = self.relu(self.net1(x.to('cuda:0')))
return self.net2(x.to('cuda:1'))

请注意对于 ToyModel ,除了四个用于将线性层和张量放置在适当的设备上的 to(device) 调用之外,以上内容与在单个 GPU 上实现该功能非常相似。backward() 和 torch.optim 会自动关注梯度,就好像模型是一个 GPU 一样。调用损失函数时,只需确保标签与输出在同一设备上。

1
2
3
4
5
6
7
8
9
model = ToyModel()
loss_fn = nn.MSELoss()
optimizer = optim.SGD(model.paraeters(), lr=0.001)

optimizer.zero_grad()
outputs = model(torch.randn(20, 10))
labels = torch.randn(20, 5).to('cuda:1') # ToyMode 的 output 是在 'cuda:1' 上,此处的 label 也应该置于 'cuda:1' 上
loss_fn(outputs,labels).backward()
optimizer.step()

对于模型太大而无法放入单个 GPU 的情况,上述实现解决了该问题。但是,如果模型合适,这种模型并行的解决方案将比在单个 GPU 上运行要慢。这是因为在任何时间点,两个 GPU 中只有一个在工作,而另一个在那儿什么也没做;中间结果还要再 GPU 间复制,这使得性能进一步恶化。

为了解决 GPU 闲置的问题,有一种选择是将每个批次进一步划分为拆分流水线,以便当一个拆分到达第二子网时,可以将下一个拆分馈入第一子网。这样,两个连续的拆分可以在两个GPU上同时运行

流水并行

Pipeline 模型并行将模型的按层分成多个 stage,再把各个 sage 映射到多台设备上。为了提高设备资源的利用率,又将 mini-batch 划分成多个 micro-batch, 这样就能够使得不同设备在同一时刻处理不同 micro-batch 的数据。

一种 Pipeline 并行方式(Gpipe) 要求反向计算要等所有设备的正向计算完成后才开始,而反向计算可能依赖于正向的输出,导致每个卡正向计算过程中累积的 activation 内存与 micro-batch 数量成正比,从而限制了 micro-batch 的数量。MindSpore 的 Pipeline 并行中,将反向提前,每个 micro-batch 计算完成后,就开始计算反向,有效降低 activation 存储时间,从而提升整体并行效率。

为了训练如此大的模型,GPipe把一个多层网络分割成若干个复合层,然后每个复合层被部署到GPU/TPU之上。但是这若干个复合层只能顺序并行,这就严重影响了训练速度。所以GPipe引入了流水线并行机制(pipeline parallelism),在不同的GPU设备之间对层进行流水线处理。另外,GPipe 也使用了重新计算这个技巧来降低内存,这样可以训练更大的模型。

微批量大小的选择会影响GPU的利用率。较小的微批量可以减少等待先前微批次输出的延迟,但较大的微批量可以更好地利用GPU。因此,关于微批次数量,存在了一个权衡,即每个微批次的GPU利用率和bubble总面积之间的权衡,用户需要为模型找到最佳的微批次数量。

要使用GPipe训练模块,只需将其用 torchgpipe.GPipe 来包装即可,但是用户的模块必须是 的实例。

GPipe 会将自动将模块分割为多个分区,分区是在单个设备上一起运行的一组连续层,其中:

balance参数确定每个分区中的层数。

chunks参数指定微批处理的数量。

下面的示例代码显示了如何将具有四层的模块拆分为两个分区,每个分区有两层。此代码还将一个小批次 mini-batch 拆分为8个微批次(micro-batches)

1
2
3
4
5
6
7
8
9
10
from torchgpipe import GPipe

model = nn.Sequential(a, b, c, d)
model = GPipe(model, balance=[2, 2], chunks=8)

# 1st partition: nn.Sequential(a, b) on cuda:0
# 2nd partition: nn.Sequential(c, d) on cuda:1

for input in data_loader:
output = model(input)

~torchgpipe.GPipe使用CUDA进行训练。用户不需要自己将模块移动到GPU,因为~torchgpipe.GPipe自动把每个分区移动到不同的设备上。默认情况下,可用的GPU从cuda:0开始,并且按顺序为每个分区选择可用GPU。用户也可以利用device 参数指定使用的GPU。

1
2
3
4
model = GPipe(model,
balance=[2, 2],
devices=[4, 2], # Specify GPUs.
chunks=8)

与典型module不同,GPipe之中,输入设备与输出设备不同,除非只有一个分区。这是因为第一个分区和最后一个分区被放置在不同的设备上。因此,必须将输入和目标移动到相应的设备。可以通过 torchgpipe.GPipe.devices 的属性来完成,这个属性保存了每个分区的设备列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
in_device = model.devices[0]
out_device = model.devices[-1]

for input, target in data_loader:
# input on in_device
input = input.to(in_device, non_blocking=True)

# target on out_device
target = target.to(out_device, non_blocking=True)

# output on out_device
output = model(input)
loss = F.cross_entropy(output, target)
loss.backward()
...

当~torchgpipe.GPipe拆分一个module时候,它将模块的每个子模块视为单一的、不可分割的层。然而,模型事实上并不一定这样,有些子模块可能是另一个顺序模块,可能需要进一步拆分它们。

GPipe 不会支持这些嵌套的 Sequentials module,所以用户需要把module打平(flatten the module)。还好,这在PyTorch中并不难。以下代码段显示了嵌套顺序模块如何展平:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
_3_layers = nn.Sequential(...)  # len(_3_layers) == 3
_4_layers = nn.Sequential(...) # len(_4_layers) == 4
model = nn.Sequential(_3_layers, _4_layers) # len(model) == 2

def flatten_sequential(module):
def _flatten(module):
for name, child in module.named_children():
if isinstance(child, nn.Sequential):
for sub_name, sub_child in _flatten(child):
yield (f'{name}_{sub_name}', sub_child)
else:
yield (name, child)
return nn.Sequential(OrderedDict(_flatten(module)))

model = flatten_sequential(model) # len(model) == 7
model = GPipe(model, balance=[2, 3, 2], chunks=4)

典型的模型并行(Typical Model Parallelism)是GPipe的一个特例。模型并行性是相当于禁用了微批处理和检查点的GPipe,可以通过chunks=1 和 checkpoint=’never’ 来做到。

1
model = GPipe(model, balance=[2, 2], chunks=1, checkpoint='never')