• 【pytorch记录】模型的分布式训练DataParallel、DistributedDataParallel


    使用多GPU对神经网络进行训练时,pytorch有相应的api将模型放到多GPU上运行:
    nn.DataParalleltorch.nn.parallel.DistributedDataParallel。后者好处多多,下来开始记录两者区别


    !!!拖延症要好好克服了

    nn.DataParallel


    API说明

    gpus=[0,1]
    torch.nn.DataParallel(model.cuda(), decice_ids=gpus, output_device=gpus[0]) 
    
    • 1
    • 2

    参数【module】定义的模型。【device_ids】训练网络的GPU号。【output_device】输出结果的device,需要完成每个gpu的数据汇总等计算。默认为 gpus[0]


    DataParallel的并行处理机制是

    • [主gpu]将 数据与模型 的散播
      从硬盘中读取数据 到主机的页面锁定内存中,再传输到[主gpu]的显存中,再以batch的形式 分到每个gpu上;将模型加载到 [主gpu] 上,再将模型复制到其他gpu上
    • 每个gpu上的forward
      每个gpu在独立的线程上 对自己的数据独立的进行 forward 计算出output;
    • [主gpu] 收集网络的输出、计算loss、散播 loss
      在主gpu上收集每个gpu的网络的output;并通过将网络输出与批次中每个元素的真实数据标签进行比较来计算损失函数值loss;;然后将loss分散给每个gpu
    • 每个gpu进行backward
      每个gpu上分别进行反向传播,计算梯度
    • [主gpu] 梯度汇总、权重更新、权重同步到其他gpu
      所有梯度汇总到主gpu 进行相加,然后梯度下降 权重更新,然后在将更新好的权重分发到每个gpu上。

    其他说明
    使用单进程控制,将模型和数据加载到多个GPU中。dataparallel可以看做把训练参数从主gpu拷贝到其他的gpu,每个gpu都负责forward和backward(只计算梯度但不更新权重),主gpu还要额外负责:每个gpu的output的拷贝、loss的计算、梯度的汇总、权重的更新、权重复制到每个gpu上、冗余数据副本(数据从硬盘读取到主gpu,然后平分到其他gpu)。
    这样存在的问题:

    • 负载不均衡:GPU0作为master来进行各种数据的管理,它的显存和使用率就会比其它的高。使用 watch -n 1 nvidia-smi 观察gpu 的使用情况,第一块gpu上显存占用严重大于其他gpu。
    • 网络的通信 会称为一个瓶颈,且整体GPU使用率较低

    其他的

    • 不支持同步BN。假设batch_size=8,使用两张显卡训练的效果,要差于使用单卡batch_size=16的效果,毕竟这里bn的统计参数是按照单卡上的batch计算的 (但效果肯定强于单卡batch_size=8)
    • 想要指定显卡进行训练,比如在编号为1、2的显卡训练,需要如下设置
      os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"  # 按照PCI_BUS_ID顺序从0开始排列GPU设备 
      os.environ["CUDA_VISIBLE_DEVICES"] ="1, 2"  
      #设置当前使用的GPU设备为1,2号两个设备,名称依次为'/gpu:0'、'/gpu:1'。#表示优先使用1号设备,然后使用2号设备
      
      ...
      gpus = [0, 1] # 对应的 设备里的显卡编号为 1、2
      net = torch.nn.DataParallel(net.cuda(), device_ids=gpus)  # 其中 output_device默认的为 gpus[0]=0,也就是设备中的显卡1
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      如果使用的如下设置,一部分的操作在 设备中0号显卡上进行,此时如果0号显卡已用满的状态,程序便无法正常运行
      gpus = [1, 2] # 对应的 设备里的显卡编号为 1、2
      net = torch.nn.DataParallel(net.cuda(), device_ids=gpus)  # 其中 output_device默认的为 gpus[0]=1,此时对应的是设备中的显卡0
      
      • 1
      • 2

    代码使用

    import os
    os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID" 
    os.environ["CUDA_VISIBLE_DEVICES"] ="1, 2"   # 放在import torch之前
    import torch
    import torch distributed as dist
    
    gpus = [0, 1]
    torch.cuda.set_device("cuda".format(gpus[0]))
    
    train_dataset = ...
    train_loader = torch.utils.data.DataLoader(trian_dataset, ...)
    model = ...
    model = nn.DataParallel(model.to(device), device_ids=gpus)
    
    optimizer = optim.SGD(model.parameters())  # 注意,网络其他的设置,要在 nn.DataParallel之后
    for epoch in range(1000):
       for batch_idx, (data, target) in enumerate(train_loader):
           images, target = images.cuda(non_blocking=True), target.cuda(non_blocking=True)
           ...
           output = model(image)
           loss = criterion(output, target)
           ...
           optimizer.zero_grad()
           loss.backward()
           optimizer.step()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    torch.nn.parallel.DistributedDataParallel


    API说明

    model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.local_rank])
    
    • 1

    在1.0之后,官方对分布式的常用方法进行了封装,支持了 all-reduce、broadcast、send、receive等等。通过MPI实现了CPU通信,通过 NCCL 实现了 GPU的通信。DistributedDataParallel 解决了 DataParallel 速度慢,GPU负载不均衡的问题。


    运行机制

    • 独立数据加载:
      每个进程加载自己的数据,从磁盘加载到主机上的页面锁定内存中,使用多个辅助进程来并行数据加载。 同时将小批量数据从页面锁定内存传输到每个GPU。不需要数据广播、也不需要模型广播(每个gpu都有一个相同的模型副本)。
      其中,分布式数据采样器(DistributedSampler)可确保加载的数据在各个进程之间不重叠
    • 每个gpu上的forward、backward
      每个gpu独立进行前向传播,计算网络的输出;
      每个gpu独立计算loss,进行反向计算梯度;
      各进程需要将梯度进行汇总平均,然后由 rank=0 的进程,将其 broadcast 到所有进程
    • 更新模型参数
      每个gpu用相同的梯度 独立更新参数。因为每个gpu都是从一个相同的模型副本开始的,初始参数一致,并且下降的梯度相同,所以所有gpu上的权重更新都是相同的。因此不需要模型同步

    其他说明
    torch.distributed使用的是多进程控制。在编写相应的代码后,torch就会自动将其分配给n个进行,分别在那个GPU上运行。不再有主GPU,每个GPU执行相同的任务。

    和单进程训练不同,多进程训练需要注意以下事项:

    • 要告诉每个进程自己使用了哪块GPU(args.local_rank
    • 初始化分布式的 dist
    • 数据分布式传送
      一个完整的batch被分到了多个进程,要确保加载的数据在各个进程之间不重叠(DistributedSamplerDistributedSampler),训练时在每个epoch后进行数据打乱 train_sampler.set_epoch(epoch)
    • 搭建模型 设计loss,然后网络进行SyncBN、构建 DDP 模型
      • 使用BN的好处是:训练时在网络内部进行了归一化,为训练过程提供了正则化,防止了中间层feature map的协方差偏移,有助于抑制过拟合。使用BN,不需要特别依赖于初始化参数,可以使用较大的学习率,因此可以加速模型的训练过程。
      • 现有api中的 Batch Normalization 实现的是单卡模式,是对单个卡上的样本进行归一化。当我们使用多卡训练时,4张卡总共batch_size为32,但实际的bn中参数仍是对8个样本完成的。如果单卡上本身批量就小,就会影响模型的收敛效果。
      • 跨卡同步 Batch Normalization 可以使用全局的样本进行归一化,这样是真正想要的多卡上增大了batch_size的训练方式。使用跨卡BN会显著的提高实验效果。
    • 优化器的设置
      每次迭代中,每个进程具有自己的 optimizer,并独立完成所有的优化步骤,进程内与一般的训练相一致。每个进程对应一个独立的训练过程,只有对梯度等少量数据进行信息交换。
      nn.DataParallel 中, 全程维护一个 optimizer,然后梯度求和,然后在主gpu上进行参数更新,再将更新后的参数广播到其他gpu上。比较而言,前者传输数据量更少,因此速度更快,效率更高
    • 训练
    • 记录loss
      在使用多进程时,每个进程有自己计算得到的loss,记录数据时 希望对不同进程上的loss取平均,其它数据也是想要平均。这时需要用到api如下,详细的参见源码
      def all_reduce(tensor, op=ReduceOp.SUM, group=group.WORLD, async_op=False):
          """
          Reduces the tensor data across all machines in such a way that all get
          the final result.
          """
      
      • 1
      • 2
      • 3
      • 4
      • 5
    • 模型保存
      由于使用 DDP 后,模型在每个GPU上都复制了一份,同时被封装了一层。所以保存模型时只需要保存 master 节点的模型,并将平时的 model 变成 model.module,具体如下:
      if dist.get_rank()==0:
          torch.save(model.module.state_dict(), "{}.ckpt".format(str(epoch)))
      
      • 1
      • 2
      在加载模型时,只需要在构造 DDP 模型之前,在master节点上加载:
      if dist.get_rank() == 0 and ckpt_path is not None:
          model.load_state_dict(torch.load(ckpt_path))
      
      • 1
      • 2

    补充

    • 每个进程包含独立的解释器和GIL
      一般使用的Python解释器 CPython:是用C语言实现Python,是目前应用最广泛的解释器。全局锁使Python在多线程效果升表现不佳。全局解释器锁(Global Interpreter Lock)是Python用于同步线程的工具,使得任何时刻仅有一个线程在执行。
      由于每个进程拥有独立的解释器和GIL,消除了来自单个Python进程中的多个执行线程,模型副本或GPU的额外解释器开销、线程颠簸,因此可以减少解释器和GIL使用冲突。这对于严重依赖 Python runtime 的 models 而言,比如说包含 RNN 层或大量小组件的 models 而言,这尤为重要。

    使用方式----代码编写

    import os
    import argparse
    
    import torch
    import torch.nn as nn
    import torch.distributed as dist
    
    def parse():
       parser = argparse.ArgumentParser()
       parser.add_argument('--local_rank', type=int, default=0)
       args = parser.parse_args()
       return args
       
    def reduce_tensor(tensor):
       rt = tensor.clone()
       dist.all_reduce(rt, op=dist.reduce_op.SUM)
       rt /= dist.get_world_size()
       return rt
    
    def record_loss(loss):
       reduced_loss = reduce_tensor(loss.data)
       train_epoch_loss += reduced_loss.item()
       # 注意在写入TensorBoard的时候只让一个进程写入就够了:
       # TensorBoard
       if args.local_rank == 0:
           writer.add_scalars('Loss/training', {'train_loss': train_epoch_loss,
                                                                                'val_loss': val_epoch_loss}, epoch + 1)
    
    def main():
       """=============================================================
       在启动器启动python脚本后,会通过参数 local_rank 来告诉当前进程使用的是哪个GPU,
       用于在每个进程中指定不同的device
       ================================================================"""
       args = parse()
       torch.cuda.set_device(args.local_rank)
       dist.init_process_group(
       'nccl',                                               # 初始化GPU通信方式(NCCL)
       init_menthod='env://'              # 参数的获取方式(env代表通过环境变量)
       )
    
       """=============================================================
       分布式数据读取,具体使用方式, 参考 https://blog.csdn.net/magic_ll/article/details/123294552
       ================================================================"""
       train_dataset = ...
       train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
       train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler)
    
       """=======分布式模型的调用:包括SynBN========================================"""
       model = ...
       model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)
       model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.local_rank])
       optimizer = optim.SGD(model.parameters())
    
       """=======训练===================================================="""
       for epoch in range(100):
           train_sampler.set_epoch(epoch)
           for batch_idx, (data, target) in enumerate(train_loader):
               images = images.cuda(non_blocking=True)
               target = target.cuda(non_blocking=True)
               ...
               output = model(images)
               loss = criterion(output, target)
               ...
               optimizer.zero_grad()
               loss.backward()
               optimizer.step()
               record_loss(loss)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67

    代码的启动方式

    • 在多进程的启动方面,不用自己手写 multiprocess 进行一系列复杂的CPU/GPU分配任务,PyTorch提供了一个很方便的启动器 torch.distributed.launch 用于启动文件,故运行训练代码的方式如下:
      CUDA_VISIBLE_DEVICES=0,1,2,3 python -m torch.distributed.launch --nproc_per_node=4 main.py

  • 相关阅读:
    存储过程遍历更新
    Rocket快速实战与高级原理详解
    uboot添加自定义命令
    【老生谈算法】matlab实现遗传算法在调节控制系统参数中的应用——遗传算法
    Docker-compose update db password
    WRF模型教程(ububtu系统)-WPS(WRF Pre-Processing System)概述
    Ajax
    VR建筑仿真场景编辑软件有助于激发创作者的灵感和创造力
    K8s Docker实践二
    CG广告牌
  • 原文地址:https://blog.csdn.net/magic_ll/article/details/123442178