• Pytorch 分布式训练中DP和DDP的原理和用法


    一、聊聊DP和DDP

    pytorch中的有两种分布式训练方式

    一种是常用的DataParallel(DP)

    另外一种是DistributedDataParallel(DDP)

    两者都可以用来实现数据并行方式的分布式训练

    两者的区别如下:

    • DP采用的是PS模式,DDP采用的是Ring-all-reduce模式
    • DP是单进程多线程的实现方式,DDP是采用多进程的方式
    • DP只能在单机上使用,DDP单机和多机都可以使用
    • DDP相比于DP训练速度要快

    二、聊聊PS模式和Ring-all-reduce模式

    PS模式

    即Parameter Server架构,主要由server节点和worker节点组成
    在这里插入图片描述

    server节点的主要功能是初始化和保存模型参数

    接受worker节点计算出的局部梯度、汇总计算全局梯度,并更新模型参数(DP)

    worker节点的主要功能是各自保存部分训练数据

    初始化模型,从server节点拉取最新的模型参数(pull),

    再读取参数,根据训练数据计算局部梯度,上传给server节点(push)

    PS模式下的DP,会造成负载不均衡

    因为充当server的GPU需要一定的显存用来保存worker节点计算出的局部梯度

    另外server还需要将更新后的模型参数broadcast到每个worker

    server的带宽就成了server与worker之间的通信瓶颈

    server与worker之间的通信成本会随着worker数目的增加而线性增加

    Ring-all-reduce模式

    ring-all-reduce模式没有server节点,worker与worker之间的通信构成一个环
    在这里插入图片描述
    ring-all-reduce模式下,所有worker只和自己相邻的两个worker进行通信

    该工作模式分为两个工作阶段:

    1. Scatter Reduce:在这个 Scatter Reduce阶段,GPU 会逐步交换彼此的梯度并融合,最后每个 GPU 都会包含完整融合梯度的一部分
    2. Allgather:GPU 会逐步交换彼此不完整的融合梯度,最后所有 GPU 都会得到完整的融合梯度

    首先将模型的梯度按照集群中GPU数量进行分块

    然后进行第一步scatter reduce,注意该过程中所有的GPU都是在进行通信的

    ring all reduce模式不存在网络通信带宽的瓶颈


    三、聊聊使用

    开始之前需要先熟悉几个概念。

    group:进程组

    默认情况下,只有一个组,一个 job 即为一个组,也即一个 world。

    world size:全局进程个数。

    如果是多机多卡就表示机器数量,如果是单机多卡就表示 GPU 数量。

    rank:进程序号,用于进程间通讯,表征进程优先级

    rank = 0 的主机为 master 节点

    如果是多机多卡就表示对应第几台机器

    如果是单机多卡,由于一个进程内就只有一个 GPU,所以 rank 也就表示第几块 GPU

    local_rank:表示进程内,GPU 编号,非显式参数

    由 torch.distributed.launch 内部指定

    多机多卡中 rank = 3,local_rank = 0 表示第 3 个进程内的第 1 块 GPU

    在使用 distributed 包的任何其他函数之前,需要使用 init_process_group 初始化进程组,同时初始化 distributed 包

    如果需要进行小组内集体通信,用 new_group 创建子分组

    创建分布式并行(DistributedDataParallel)模型 DDP(model, device_ids=device_ids)

    为数据集创建 Sampler

    使用启动工具 torch.distributed.launch 在每个主机上执行一次脚本,开始训练

    使用 destory_process_group() 销毁进程组

    """
    1. 添加参数  --local_rank
    每个进程分配一个 local_rank 参数,表示当前进程在当前主机上的编号。
    例如:rank=2, local_rank=0 表示第 3 个节点上的第 1 个进程。
    这个参数是torch.distributed.launch传递过来的,我们设置位置参数来接受,local_rank代表当前程序进程使用的GPU标号
    """
    parser = argparse.ArgumentParser()
    parser.add_argument('--local_rank', default=-1, type=int)
    args = parser.parse_args()
    print(args.local_rank))
    
    # 检查nccl是否可用
    torch.distributed.is_nccl_available ()
    
    """
    2.初始化使用nccl后端
    When using a single GPU per process and per
    DistributedDataParallel, we need to divide the batch size
    ourselves based on the total number of GPUs we have
    """
    torch.distributed.init_process_group(backend='nccl') 
    
    device_ids=[1,3]
    ngpus_per_node=len(device_ids)
    args.batch_size = int(args.batch_size / ngpus_per_node)
    
    """
    3.使用DistributedSampler
    别忘了设置pin_memory=true
    使用 DistributedSampler 对数据集进行划分。
    它能帮助我们将每个 batch 划分成几个 partition,在当前进程中只需要获取和 rank 对应的那个 partition 进行训练
    """
    
    train_dataset = MyDataset(train_filelist, train_labellist, args.sentence_max_size, embedding, word2id)
    train_sampler = t.utils.data.distributed.DistributedSampler(train_dataset)
    
    # DataLoader:num_workers这个参数决定了有几个进程来处理data loading。0意味着所有的数据都会被load进主进程
    # testset不用sampler
    train_dataloader = DataLoader(train_dataset, pin_memory=true, shuffle=(train_sampler is None),
                                    batch_size=args.batch_size, num_workers=args.workers, sampler=train_sampler)
                                    
    """
    4.分布式训练
    使用 DistributedDataParallel 包装模型
    它能帮助我们为不同 GPU 上求得的梯度进行 all reduce(即汇总不同 GPU 计算所得的梯度,并同步计算结果)。
    all reduce 后不同 GPU 中模型的梯度均为 all reduce 之前各 GPU 梯度的均值. 注意find_unused_parameters参数
    """
    
    net = textCNN(args,vectors=t.FloatTensor(wvmodel.vectors))
    if args.cuda: net.cuda()
    if len(device_ids)>1: net=torch.nn.parallel.DistributedDataParallel(net,find_unused_parameters=True)   
               
    """
    5.最后,把数据和模型加载到当前进程使用的 GPU 中,正常进行正反向传播:  
    """
    for batch_idx, (data, target) in enumerate(train_loader):  
      if args.cuda:
        data, target = data.cuda(), target.cuda()
        output = net(images)
        loss = criterion(output, target)
        ...
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
    """
    6.在使用时,命令行调用 torch.distributed.launch 启动器启动:
    pytorch 为我们提供了 torch.distributed.launch 启动器,用于在命令行分布式地执行 python 文件。
    --nproc_per_node参数指定为当前主机创建的进程数。一般设定为=NUM_GPUS_YOU_HAVE当前主机的 GPU 数量,每个进程独立执行训练脚本。
    这里是单机多卡,所以node=1,就是一台主机,一台主机上--nproc_per_node个进程
    """
    
    CUDA_VISIBLE_DEVICES=0,1,2,3 python -m torch.distributed.launch --nproc_per_node=4 main.py
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73

    参考文献

    1. https://zhuanlan.zhihu.com/p/356967195
    2. https://www.cnblogs.com/yh-blog/p/12877922.html
    3. https://medium.com/huggingface/training-larger-batches-practical-tips-on-1-gpu-multi-gpu-distributed-setups-ec88c3e51255
  • 相关阅读:
    kube-apiserver鉴权源码简析
    Android根据bootmode设置usb config
    Axios 封装
    Ubuntu21.10升级22.04操作记录
    使用dockerfile部署springboot应用
    python实现人脸检测、分割、并计算人脸各个部分的颜色
    C语言入门基础题:奇偶 ASCII 值判断(C语言版)和ASCII码表,什么是ASCII码,它的特点和应用?
    SQL如何对数据进行排序
    Git Flow的简单使用
    基于Spark的智能餐饮推荐系统报告(只含部分代码)
  • 原文地址:https://blog.csdn.net/qq_38973721/article/details/127846151