学习笔记 | PyTorch单机多卡分布式训练

人工智能与算法机器学习视频云

写在前面

大家好,我是刘聪NLP。

今天给大家带来一篇好友小杨的一篇pytorch单机多卡分布式训练学习笔记。

pytorch利用torch.distributed进行分布式训练,distributed会在内部开辟多个进程,进程数与可用的GPU数一致,多个进程分别加载数据集的一部分,在每个GPU上实现加载部分数据集的前向与反向传播,多个GPU上的反向传播得到的梯度会通过gpu间的all_reduce实现平均,再在每个gpu上进行模型的参数更新,这样保证了不同GPU之间的模型参数一致,同时实现了更大batch_size的训练。

这里尝试了利用distributed进行单机多卡的训练,并梳理了整个过程,希望可以帮到需要之人。深入理解建议查看官方文档:


          
文档:https://pytorch.org/tutorials/intermediate/ddp_tutorial.html  

      

一、环境配置


          
import torch.distributed as dist  
from torch.nn.parallel import DistributedDataParallel  
  
def dist\_setup(rank, world\_size):  
    os.environ['MASTER\_ADDR'] = 'localhost'  
    os.environ['MASTER\_PORT'] = '12345'  
    dist.init_process_group(backend='nccl', world_size=world_size, rank=rank)  

      

首先要在环境变量中设置master ip和port,便于进程或多机间的通信,由于本次是单机,故MASTER_ADDR写成localhost即可,如果是多机,则配置成主节点机器的ip。

另外分布式环境需要dist.init_process_group()来启动,介绍一下其中主要的参数:backend表示进程或节点间的通信方式,gpu训练用nccl比较块;world_size表示启用的进程数量,与可用的GPU数量一致,rank表示进程编号。rank这个参数是由进程控制的,不用显性设置,后面可以看到。

二、数据集与加载器构造

数据集datasets按正常数据集构造,如下:


          
class Datasets(Dataset):  
    def \_\_init\_\_(self, data\_list):  
        self.data = data_list  
  
    def \_\_len\_\_(self):  
        return len(self.data)  
  
    def \_\_getitem\_\_(self, index):  
        return self.data[index]  

      

在构造加载器dataloader的时候,需要用到DistributedSampler:


          
sampler = torch.utils.data.DistributedSampler(Datasets, num_replicas=2,  
                                              rank=dist.get_rank(), shuffle=True,  
                                              drop_last=True)  
loader = DataLoader(Datasets, batch_size=8, num_workers=4, pin_memory=True,  
                    sampler=sampler, shuffle=False, collate_fn=None)  

      

在构造sampler时,num_replicas表示数据要分成几个部分,这与world_size的值一致,表示每个进程上分数据集的一部分;rank是进程编号,这里需要让每个进程自己获取该进程的编号,并根据编号来获取该进程需要负责的部分数据;在sampler中设置shuffle为True时,Dataloader中shuffle就应关掉;最后,这里的batch_size是指每个进程的batch大小,即每块GPU的batch大小,所以实际的batch_size=num_gpu * batch_size。

三、模型

模型的写法不用变,原来怎么写现在就怎么写,这里只写了伪代码


          
class Model(nn.Module):  
    def \_\_init\_\_(self):  
        super(Model, self).__init__()  
        self.layer = nn.xxx  
  
    def forward(self, x):  
        return self.layer(x)  

      

四、训练

前面配置好后,到了重点的训练部分,整体上还是原来训练步骤的写法,中间有一些细节地方需要调整。


          
# 首先构建整体的训练逻辑框架  
def main(rank, world\_size, *args):  
    # rank,world\_size必须作为参数传入,其他需要传入的参数可以放后面  
    
 # 启动分布式训练环境  
    dist_setup(rank, world_size)  
  
    # 设置随机种子,非必要  
    # set\_seed(config.seed)  
  
    # 加载数据集  
    datasets = Datasets(data_list)  
    
 # 构造sampler和dataloader  
    num_tasks = dist.get_world_size()  # 获取进程数  
    sampler = torch.utils.data.DistributedSampler(Datasets, num_replicas=num_tasks,   
                                                  rank=dist.get_rank(), shuffle=True,  
                                                  drop_last=True)  
 loader = DataLoader(Datasets, batch_size=8, num_workers=4, pin_memory=True,  
                        sampler=sampler, shuffle=False, collate_fn=None)  
  
    # 构造模型和优化器  
    model = Model()  
    optimizer = torch.optim.AdamW(params=model.parameters(), lr=1e-4)  
    
 # 如果继续训练,加载保存的模型参数与优化器参数  
    if init_checkpoint:  
        checkpoint = torch.load(init_checkpoint, map_location='cpu')  
        state_dict = checkpoint['model']  
        model.load_state_dict(state_dict)  
        model = model.to(rank)  # 由于优化器的device和模型的device一致,所以这里需要将模型转到GPU上  
        optimizer.load_state_dict(checkpoint['optimizer'])  
  
    model = model.to(rank)  
 # 需要用DistributedDataParallel将model包装,实现分布式通信,即梯度平均  
 # find\_unused\_parameters最好设成True,避免模型中有些不参与梯度回传的参数影响平均梯度的计算与回传  
    model = DistributedDataParallel(model,device_ids=[rank],find_unused_parameters=True)  
  
    for epoch in range(max_epoch):  
   
        train_one_epoch(rank, model, dataloader, optimizer, epoch)  
      
  # 保存节点,只在进程0上保存节点,所以设置rank==0  
        if rank == 0:  
            save_obj = {  
                    'model': model_without_ddp.state_dict(),  
                    'optimizer': optimizer.state_dict()  
            }  
            torch.save(save_obj, '/your/checkpoint/path')  
      
  # 等待所有进程结束,类似于join  
        dist.barrier()  
  
 # 训练结束后关闭分布式环境  
    dist_cleanup()  

      

这样主体的训练框架已构建完成,下面只剩train_one_epoch,里面也有一些细节需要注意。


          
def train(rank, model, dataloader, optimizer, epoch):  
    model.train()  
    
 # ddp\_loss是为了收集不同进程返回的loss,  
 # 以便我们记录并展示所有进程的平均loss,来看loss的下降趋势  
    ddp_loss = torch.zeros(1).to(rank)  
  
 # 每次epoch前调用sampler.set\_epoch,会产生不同的随机采样  
    dataloader.sampler.set_epoch(epoch)  
  
    for i, batch in enumerate(dataloader):  
          
        optimizer.zero_grad()  
  
        logits = model(batch)  
  # 伪代码  
  loss = loss_func(logits, targets)  
  
        loss.backward()  
        optimizer.step()  
  
        ddp_loss[0] += loss.item()  
  
        if (i+1) % 10 == 0:  
   # 用all\_reduce收集所有进程上的某个参数的值,op表示收集操作,这里使用SUM来求所有loss的和  
            dist.all_reduce(ddp_loss, op=dist.ReduceOp.SUM)  
            size = dist.get_world_size()  
            batch_loss = ddp_loss[0].item() / (10 * size)  # 求平均  
  
            if rank == 0:  # 只在进程0上打印  
                print(f'*** loss: {batch\_loss} ***')  
            ddp_loss = torch.zeros(1).to(rank)  

      

最后还有一步,启用多进程运行。pytorch distributed提供了两种多进程启用方法。一种是torch.multioricessing.spawn,另一种是torch.distributed.launch。后面一种是通过命令行启动,这里没有深入研究,下面只介绍前一种的方法,前一种仍然是代码的形式,如下:


          
if __name__ == "\_\_main\_\_":  
 # 设置可用GPU数量  
 os.environ['CUDA\_DEVICE\_ORDER'] = 'PCI\_BUS\_ID'  
    os.environ['CUDA\_VISIBLE\_DEVICES'] = '0,1,2,3'  
    world_size = 4  # 进程数,要与cuda\_visible\_devices的数量一致  
    torch.multiprocessing.spawn(main, args=(world_size,), nprocs=world_size, join=True)  

      

总结

想要训练一个大模型,多卡训练是必不可少的一项。该篇笔记记录了pytorch单机多卡训练的初步流程,目前有些地方仍然不清楚具体逻辑,后面遇到问题会继续深入探索。

请多多关注知乎「刘聪NLP」,有问题的朋友也欢迎加我微信「logCong」私聊,交个朋友吧,一起学习,一起进步。我们的口号是“生命不止,学习不停”。

往期推荐:

0
0
0
0
关于作者
关于作者

文章

0

获赞

0

收藏

0

相关资源
CV 技术在视频创作中的应用
本次演讲将介绍在拍摄、编辑等场景,我们如何利用 AI 技术赋能创作者;以及基于这些场景,字节跳动积累的领先技术能力。
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论