写在前面
大家好,我是刘聪NLP。
今天给大家带来一篇好友小杨的一篇pytorch单机多卡分布式训练学习笔记。
pytorch利用torch.distributed进行分布式训练,distributed会在内部开辟多个进程,进程数与可用的GPU数一致,多个进程分别加载数据集的一部分,在每个GPU上实现加载部分数据集的前向与反向传播,多个GPU上的反向传播得到的梯度会通过gpu间的all_reduce实现平均,再在每个gpu上进行模型的参数更新,这样保证了不同GPU之间的模型参数一致,同时实现了更大batch_size的训练。
这里尝试了利用distributed进行单机多卡的训练,并梳理了整个过程,希望可以帮到需要之人。深入理解建议查看官方文档:
文档:https://pytorch.org/tutorials/intermediate/ddp_tutorial.html
一、环境配置
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel
def dist\_setup(rank, world\_size):
os.environ['MASTER\_ADDR'] = 'localhost'
os.environ['MASTER\_PORT'] = '12345'
dist.init_process_group(backend='nccl', world_size=world_size, rank=rank)
首先要在环境变量中设置master ip和port,便于进程或多机间的通信,由于本次是单机,故MASTER_ADDR写成localhost即可,如果是多机,则配置成主节点机器的ip。
另外分布式环境需要dist.init_process_group()来启动,介绍一下其中主要的参数:backend表示进程或节点间的通信方式,gpu训练用nccl比较块;world_size表示启用的进程数量,与可用的GPU数量一致,rank表示进程编号。rank这个参数是由进程控制的,不用显性设置,后面可以看到。
二、数据集与加载器构造
数据集datasets按正常数据集构造,如下:
class Datasets(Dataset):
def \_\_init\_\_(self, data\_list):
self.data = data_list
def \_\_len\_\_(self):
return len(self.data)
def \_\_getitem\_\_(self, index):
return self.data[index]
在构造加载器dataloader的时候,需要用到DistributedSampler:
sampler = torch.utils.data.DistributedSampler(Datasets, num_replicas=2,
rank=dist.get_rank(), shuffle=True,
drop_last=True)
loader = DataLoader(Datasets, batch_size=8, num_workers=4, pin_memory=True,
sampler=sampler, shuffle=False, collate_fn=None)
在构造sampler时,num_replicas表示数据要分成几个部分,这与world_size的值一致,表示每个进程上分数据集的一部分;rank是进程编号,这里需要让每个进程自己获取该进程的编号,并根据编号来获取该进程需要负责的部分数据;在sampler中设置shuffle为True时,Dataloader中shuffle就应关掉;最后,这里的batch_size是指每个进程的batch大小,即每块GPU的batch大小,所以实际的batch_size=num_gpu * batch_size。
三、模型
模型的写法不用变,原来怎么写现在就怎么写,这里只写了伪代码
class Model(nn.Module):
def \_\_init\_\_(self):
super(Model, self).__init__()
self.layer = nn.xxx
def forward(self, x):
return self.layer(x)
四、训练
前面配置好后,到了重点的训练部分,整体上还是原来训练步骤的写法,中间有一些细节地方需要调整。
# 首先构建整体的训练逻辑框架
def main(rank, world\_size, *args):
# rank,world\_size必须作为参数传入,其他需要传入的参数可以放后面
# 启动分布式训练环境
dist_setup(rank, world_size)
# 设置随机种子,非必要
# set\_seed(config.seed)
# 加载数据集
datasets = Datasets(data_list)
# 构造sampler和dataloader
num_tasks = dist.get_world_size() # 获取进程数
sampler = torch.utils.data.DistributedSampler(Datasets, num_replicas=num_tasks,
rank=dist.get_rank(), shuffle=True,
drop_last=True)
loader = DataLoader(Datasets, batch_size=8, num_workers=4, pin_memory=True,
sampler=sampler, shuffle=False, collate_fn=None)
# 构造模型和优化器
model = Model()
optimizer = torch.optim.AdamW(params=model.parameters(), lr=1e-4)
# 如果继续训练,加载保存的模型参数与优化器参数
if init_checkpoint:
checkpoint = torch.load(init_checkpoint, map_location='cpu')
state_dict = checkpoint['model']
model.load_state_dict(state_dict)
model = model.to(rank) # 由于优化器的device和模型的device一致,所以这里需要将模型转到GPU上
optimizer.load_state_dict(checkpoint['optimizer'])
model = model.to(rank)
# 需要用DistributedDataParallel将model包装,实现分布式通信,即梯度平均
# find\_unused\_parameters最好设成True,避免模型中有些不参与梯度回传的参数影响平均梯度的计算与回传
model = DistributedDataParallel(model,device_ids=[rank],find_unused_parameters=True)
for epoch in range(max_epoch):
train_one_epoch(rank, model, dataloader, optimizer, epoch)
# 保存节点,只在进程0上保存节点,所以设置rank==0
if rank == 0:
save_obj = {
'model': model_without_ddp.state_dict(),
'optimizer': optimizer.state_dict()
}
torch.save(save_obj, '/your/checkpoint/path')
# 等待所有进程结束,类似于join
dist.barrier()
# 训练结束后关闭分布式环境
dist_cleanup()
这样主体的训练框架已构建完成,下面只剩train_one_epoch,里面也有一些细节需要注意。
def train(rank, model, dataloader, optimizer, epoch):
model.train()
# ddp\_loss是为了收集不同进程返回的loss,
# 以便我们记录并展示所有进程的平均loss,来看loss的下降趋势
ddp_loss = torch.zeros(1).to(rank)
# 每次epoch前调用sampler.set\_epoch,会产生不同的随机采样
dataloader.sampler.set_epoch(epoch)
for i, batch in enumerate(dataloader):
optimizer.zero_grad()
logits = model(batch)
# 伪代码
loss = loss_func(logits, targets)
loss.backward()
optimizer.step()
ddp_loss[0] += loss.item()
if (i+1) % 10 == 0:
# 用all\_reduce收集所有进程上的某个参数的值,op表示收集操作,这里使用SUM来求所有loss的和
dist.all_reduce(ddp_loss, op=dist.ReduceOp.SUM)
size = dist.get_world_size()
batch_loss = ddp_loss[0].item() / (10 * size) # 求平均
if rank == 0: # 只在进程0上打印
print(f'*** loss: {batch\_loss} ***')
ddp_loss = torch.zeros(1).to(rank)
最后还有一步,启用多进程运行。pytorch distributed提供了两种多进程启用方法。一种是torch.multioricessing.spawn,另一种是torch.distributed.launch。后面一种是通过命令行启动,这里没有深入研究,下面只介绍前一种的方法,前一种仍然是代码的形式,如下:
if __name__ == "\_\_main\_\_":
# 设置可用GPU数量
os.environ['CUDA\_DEVICE\_ORDER'] = 'PCI\_BUS\_ID'
os.environ['CUDA\_VISIBLE\_DEVICES'] = '0,1,2,3'
world_size = 4 # 进程数,要与cuda\_visible\_devices的数量一致
torch.multiprocessing.spawn(main, args=(world_size,), nprocs=world_size, join=True)
总结
想要训练一个大模型,多卡训练是必不可少的一项。该篇笔记记录了pytorch单机多卡训练的初步流程,目前有些地方仍然不清楚具体逻辑,后面遇到问题会继续深入探索。
请多多关注知乎「刘聪NLP」,有问题的朋友也欢迎加我微信「logCong」私聊,交个朋友吧,一起学习,一起进步。我们的口号是“生命不止,学习不停”。
往期推荐:
- 中文NER数据集整理
- ACL2022|NoisyTune:微调前加入少量噪音可能会有意想不到的效果
- ACL2022论文分类汇总-Prompt、句子表征、检索排序&摘要
- 总结|Prompt在NER场景的应用
- NAACL2022-Prompt相关论文&对Prompt的看法
- PolyLoss:一种将分类损失函数加入泰勒展开式的损失函数
- PERT:一种基于乱序语言模型的预训练模型
- DiffCSE:结合句子间差异的无监督句子嵌入对比学习方法
- PairSCL:句子对级别的有监督对比学习方法
- OpenAI:基于对比学习的预训练文本&代码表征技术
- SNCSE:一种基于软负例的无监督句向量对比学习方法
- SimCSE论文精读