• PyTorch使用多GPU并行训练及其原理和注意事项


    1. 常见的多GPU使用方法

    在这里插入图片描述

    • 模型并行(model parallel) -> 大型网络(对速度没有提升)
      当模型需要的显卡很大,一张GPU的显存放不下时,使用这种方式就可以训练一个大型的网络
    • 数据并行(data parallel)-> 加速训练速度
      可以将整个模型放到一张GPU时,我们可以将每一个模型放到每一个GPU上,让它们同时进行训练(正向传播+反向传播)

    2. 训练速度与GPU数量之间的关系

    性能实测:数据来源霹雳巴拉WZ

    • PyTorch 1.7
    • CUDA:10.1
    • Model:ResNet-34
    • Dataset:flower_photos(非常小的一个数据集)
    • BatchSize:16
    • Optimizer:SGD
    • GPU:Tesla V100(上一代卡皇)

    在这里插入图片描述

    从上图我们可以看到,训练速度和GPU数量并不是简单的倍乘关系。随着GPU数量的增加,提速效果越来越差,这时因为不同GPU之间需要进行通信,会有性能的损耗。

    3. 重点

    1. 数据集如何在不同设备间分配
    2. 误差梯度如何在不同设备之间通信
    3. Batch Normalization如何在不同设备之间同步

    3.1 数据集分配

    3.2 误差通信

    在使用多GPU训练时,每张GPU在进行完一个step后会产生梯度,我们需要将所有GPU的梯度求平均(并不是每张GPU各自学各自的,那样就没有意义了),这样才能将每张GPU的训练结果结合在一起。

    3.3 BN同步

    在这里插入图片描述

    假设BS=2,feature1和feature2是网络中的某一层经过卷积操作后得到特征图,因为BS=2,所以有两个feature。在正向传播过程中,BN会求特征矩阵每一个channel的均值和方差,再对每个通道上的数据进行“减均值除标准差”的操作,这样就得到经过BN的特征图了。


    在这里插入图片描述

    当我们使用多GPU训练时,每个GPU都会计算各自的均值和方差。这里我们同样假设每个GPU上数据的BS=2,那么每个BN层求的均值 μ i \mu_i μi和方差 σ i 2 \sigma^2_i σi2都是针对两个特征图而言的。

    之前我们说过,BS越大,BN求的均值和方差越接近全体样本,准确率越高。

    所以如果我们使用多GPU训练,我们就应该考虑“我们是不是应该去求每一个BN层在所有设备上的均值和方差”。这样我们所求得的均值和方差是更加有意义的。

    如果我们不考虑多GPU之间BN的参数关系,那么我们所求得的BN层的均值和方差都是针对输入的两个样本(BS=2)求解的。

    如果我们考虑到另外的设备呢?

    GPU1的BN层得到两个特征图1和2,GPU2的BN层得到两个特征图3和4。如果我们求的BN参数是特征图12+34,那么我们的BN就变相等于在BS=4的情况下求得的均值和方差。这样对我们最终训练的结果是有一定帮助的。

    霹雳巴拉WZ说,如果不使用同步的BN(即普通的nn.BatchNorm),那么得到的结果和使用单GPU的结果基本上是一致的。
    当然,使用不同BN的多GPU也对模型的训练速度有很大的帮助
    如果使用了同步的BN后,最终结果一般会有将近一个点的提升
    所有同步的BN确实的有一定的作用

    如果你的GPU显存很大,本来在一张GPU上就可以很大的BS,那么使用同步的BN也不会有很大的作用。

    同步BN主要用在:网络比较大,一张GPU它的BS不能设置很大的情况下,同步的BN对准确率的帮助比较大。

    注意:使用了具有同步BN的方法,多GPU的并行速度会下降。可能会降低30%的速度。

    • 想要更快的速度 -> 不使用同步的BN
    • 更高的精度 -> 使用同步的BN

    4. PyTorch实现多卡并行计算的方式

    分为两种:

    1. DataParallel
      PyTorch官方很久之前给出的一种方案
    2. DistributedDataParallel
      更新一代的多卡训练方法

    DDP不仅仅局限于单机多卡的情况,还适用于多级多卡的场景。

    1. PyTorch关于DP的文档
    2. [PyTorch关于DDP的文档](DistributedDataParallel — PyTorch 1.11.0 documentation)

    在这里插入图片描述

    • DP是一个单进程、多线程并且仅仅只能工作在单一的设备中(单节点,不适用于多机的情况)
    • DDP是一个多进程的,可以工作在单机或者多机的场景中
    • DP通常要慢于DDP(即便在单一的设备上)

    这里所说的单机和多机并不是单GPU和多GPU,而是单个服务器和多个服务器的意思


    DP和DDP都可以用在单机的情况下,单DDP可以用在多机的情况下,而且即便在单机下(一台机器有多个GPU),DDP的速度也要比DP的速度快。

    [PyTorch关于单机多卡和多级多卡的训练教程](PyTorch Distributed Overview — PyTorch Tutorials 1.11.0+cu102 documentation)

    5. PyTorch中使用多GPU训练的常用启动方式

    1. torch.distributed.launch:代码量少,启动速度快

      python -m torch.distributed.launch
      
      # -m: run library module as a script
      # --help: 可以通过`torch.distributed.launch --help`这样真的方式去查看使用方法
      
      • 1
      • 2
      • 3
      • 4

      在PyTorch官方实现的Faster R-CNN源码中,多GPU训练就是使用distributed.launch进行启动的,因此后面将的主要基于distributed.launch来启动

    2. torch.multiprocessing:代码量更多,但拥有更好的控制和灵活性


    注意事项:

    • 在使用torch.distributed.launch的方法进行训练。一旦训练开始后,手动强制终止训练程序(ctrl + c),会有小概率出现进程没有杀掉的情况。
      此时程序还会占用GPU的显存以及资源。所以需要我们将这些进程kill -9

    5.1 单机单卡的训练脚本

    5.1.1 main

    """
        单机单卡的训练脚本 —— 训练ResNet34/101
    """
    
    
    import os
    import math
    import argparse
    
    import torch
    import torch.optim as optim
    from torch.utils.tensorboard import SummaryWriter
    from torchvision import transforms
    import torch.optim.lr_scheduler as lr_scheduler
    
    from model import resnet34, resnet101
    from my_dataset import MyDataSet
    from utils import read_split_data
    from multi_train_utils.train_eval_utils import train_one_epoch, evaluate
    
    
    def main(args):
        # 检查机器的配置(是否有GPU,没有GPU则为CPU)
        device = torch.device(args.device if torch.cuda.is_available() else "cpu")
    
        print(args)  # 打印传入的参数
        print('Start Tensorboard with "tensorboard --logdir=runs", view at http://localhost:6006/')
        tb_writer = SummaryWriter()  # 创建Tensorborad对象
        if os.path.exists("./weights") is False:  # 检查保存权值文件的文件夹是否存在,不存在则创建该文件夹
            os.makedirs("./weights")
    
        train_info, val_info, num_classes = read_split_data(args.data_path)
        train_images_path, train_images_label = train_info
        val_images_path, val_images_label = val_info
    
        # check num_classes
        assert args.num_classes == num_classes, "dataset num_classes: {}, input {}".format(args.num_classes,
                                                                                           num_classes)
    
        data_transform = {
            "train": transforms.Compose([transforms.RandomResizedCrop(224),
                                         transforms.RandomHorizontalFlip(),
                                         transforms.ToTensor(),
                                         transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]),
            "val": transforms.Compose([transforms.Resize(256),
                                       transforms.CenterCrop(224),
                                       transforms.ToTensor(),
                                       transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])}
    
        # 实例化训练数据集
        train_data_set = MyDataSet(images_path=train_images_path,
                                   images_class=train_images_label,
                                   transform=data_transform["train"])
    
        # 实例化验证数据集
        val_data_set = MyDataSet(images_path=val_images_path,
                                 images_class=val_images_label,
                                 transform=data_transform["val"])
    
        batch_size = args.batch_size
        
        # 根据BS的数量和训练设备CPU核心数来定义num_worker的大小
        nw = min([os.cpu_count(), batch_size if batch_size > 1 else 0, 8])  # number of workers
        print('Using {} dataloader workers every process'.format(nw))
    
        # 读取训练数据
        train_loader = torch.utils.data.DataLoader(train_data_set,
                                                   batch_size=batch_size,
                                                   shuffle=True,
                                                   pin_memory=True,
                                                   num_workers=nw,
                                                   collate_fn=train_data_set.collate_fn)
    
        # 读取验证数据
        val_loader = torch.utils.data.DataLoader(val_data_set,
                                                 batch_size=batch_size,
                                                 shuffle=False,
                                                 pin_memory=True,
                                                 num_workers=nw,
                                                 collate_fn=val_data_set.collate_fn)
    
        # 定义模型对象并添加到所属设备中
        model = resnet34(num_classes=args.num_classes).to(device)
        # 如果存在预训练权重则载入
        if args.weights != "":
            if os.path.exists(args.weights):
                # 先使用torch.load加载指定文件中的权重
                weights_dict = torch.load(args.weights, map_location=device)
    
                # 只加载key和value元素相等的键值对
                load_weights_dict = {k: v for k, v in weights_dict.items()
                                     if model.state_dict()[k].numel() == v.numel()}
                # 1. 模型加载字典(不严格加载);2. 打印
                print(model.load_state_dict(load_weights_dict, strict=False))
            else:
                raise FileNotFoundError("not found weights file: {}".format(args.weights))
    
        # 是否冻结权重
        if args.freeze_layers:
            for name, para in model.named_parameters():  # name: 层的名字; para: 对应的参数
                # 除最后的全连接层外,其他权重全部冻结
                if "fc" not in name:  # 除了fc层外,所有层的参数都没有梯度(不进行反向传播,即不进行参数更新)
                    para.requires_grad_(False)
    
        # 将带有梯度的参数传入pg这个list中
        pg = [p for p in model.parameters() if p.requires_grad]
    
        # 定义参数优化器,第一个参数即为需要更新的参数,也就是上一行pg列表中的结果
        optimizer = optim.SGD(pg, lr=args.lr, momentum=0.9, weight_decay=0.005)
    
        # 定义学习率变化函数,参考:Scheduler https://arxiv.org/pdf/1812.01187.pdf -> 其实就是一个余弦函数[0, pi]
        lf = lambda x: ((1 + math.cos(x * math.pi / args.epochs)) / 2) * (1 - args.lrf) + args.lrf  # cosine
    
        # scheduler即为调整学习率变化的对象,将优化器和学习率变化曲线传给它就可以实现学习率的规律性变化
        scheduler = lr_scheduler.LambdaLR(optimizer, lr_lambda=lf)
    
        # 开始迭代训练
        for epoch in range(args.epochs):
            # train
            """
                可以看到,这里的train阶段只是返回平均loss,没有预测概率
            """
            mean_loss = train_one_epoch(model=model,
                                        optimizer=optimizer,
                                        data_loader=train_loader,
                                        device=device,
                                        epoch=epoch)
    
            # epoch进行了一个,学习率变化器需要更新一下(使得optimizer中的学习率进行变化)
            scheduler.step()
    
            # validate
            sum_num = evaluate(model=model,
                               data_loader=val_loader,
                               device=device)
            acc = sum_num / len(val_data_set)  # top-1准确率 = 预测正确数 / 验证样本数
            
            # 打印该epoch下的准确率
            print("[epoch {}] accuracy: {}".format(epoch, round(acc, 3)))
    
            # 将train和validation得到的数据添加到tensorboard中
            # optimizer.param_groups[0]["lr"]即为对应epoch的学习率
            tags = ["loss", "accuracy", "learning_rate"]
            tb_writer.add_scalar(tags[0], mean_loss, epoch)
            tb_writer.add_scalar(tags[1], acc, epoch)
            tb_writer.add_scalar(tags[2], optimizer.param_groups[0]["lr"], epoch)
    
            # 保存对应epoch的模型(这里没有选择最佳模型,可以通过acc达到只保留准确率高的权值文件)
            torch.save(model.state_dict(), "./weights/model-{}.pth".format(epoch))
    
    
    if __name__ == '__main__':
        parser = argparse.ArgumentParser()
        parser.add_argument('--num_classes', type=int, default=5)
        parser.add_argument('--epochs', type=int, default=30)
        parser.add_argument('--batch-size', type=int, default=16)
        parser.add_argument('--lr', type=float, default=0.001)
        # lrf为倍率因子,即学习率最终降低到初始学习率lr的多少倍。
        # 最终学习率为lr * lrf
        parser.add_argument('--lrf', type=float, default=0.1)
    
        # 数据集所在根目录
        # http://download.tensorflow.org/example_images/flower_photos.tgz
        parser.add_argument('--data-path', type=str,
                            default="/home/w180662/my_project/my_github/data_set/flower_data/flower_photos")
    
        # resnet34 官方权重下载地址
        # https://download.pytorch.org/models/resnet34-333f7ec4.pth
        parser.add_argument('--weights', type=str, default='resNet34.pth',
                            help='initial weights path')  # 为""表示不使用预训练模型
        parser.add_argument('--freeze-layers', type=bool, default=False)
        parser.add_argument('--device', default='cuda', help='device id (i.e. 0 or 0,1 or cpu)')
    
        opt = parser.parse_args()
    
        main(opt)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177

    5.1.2 train_eval_utils

    import sys
    
    from tqdm import tqdm
    import torch
    
    from multi_train_utils.distributed_utils import reduce_value, is_main_process
    
    
    def train_one_epoch(model, optimizer, data_loader, device, epoch):
        model.train()  # 声明模型的状态
        loss_function = torch.nn.CrossEntropyLoss()  # 定义损失函数
        mean_loss = torch.zeros(1).to(device)  # 生成shape为[1, ]的全零矩阵,并添加到对应的设备中 -> 用于存储后续计算得到mean_loss
    
        # 清空优化器中的梯度残留
        optimizer.zero_grad()
    
        # 在进程0中打印训练进度
        if is_main_process():  # 判断是否为主进程
            # 使用tqdm库包装一下data_loader这个变量,一会儿在遍历data_loader时就会打印一个进度条
            data_loader = tqdm(data_loader, file=sys.stdout)
        
        # 迭代data_loader,获取step和对应的数据
        """
            在data_loader中就定义了BS的大小, 所以这里step就是$step = 所有图片的数量 / BS$
            step即为BS的数量, 每走过一次step即为处理了BS张图片
            
            第1个step对应第1个bs
            第2个step对应第2个bs
            第3个step对应第3个bs
            ...
        """
        for step, data in enumerate(data_loader):
            images, labels = data  # data包含两部分数据:1. 预处理后的图片;2. 对应的真实标签
    
            # 通过正向传播获取图片的预测结果
            pred = model(images.to(device))  
    
            # 根据预测结果与真实标签计算loss
            loss = loss_function(pred, labels.to(device))
    
            # 对loss进行反向传播
            loss.backward()
    
            # 对loss进行求平均处理
            """
                reduce_value:
                    def reduce_value(value, average=True):
                        world_size = get_world_size()
                        if world_size < 2:  # 单GPU的情况
                            return value  # 原值返回
    
                        with torch.no_grad():
                            dist.all_reduce(value)
                            if average:
                                value /= world_size  # 需要除以GPU数量后再返回
    
                            return value
            """
            loss = reduce_value(loss, average=True)
    
            # 对历史损失求平均
            mean_loss = (mean_loss * step + loss.detach()) / (step + 1)  # update mean losses
    
            # 在进程0中打印平均loss
            if is_main_process():  # 判断是否为主进程
                data_loader.desc = "[epoch {}] mean loss {}".format(epoch, round(mean_loss.item(), 3))
    
            # 判断loss是否为有限数据(不能是infty)
            if not torch.isfinite(loss):
                print('WARNING: non-finite loss, ending training ', loss)
                sys.exit(1)  # 如果loss为无穷,则退出训练
    
            optimizer.step()  # 参数优化器进行参数更新
            optimizer.zero_grad()  # 参数优化器更新完参数后,需要将梯度清空
    
        # 等待所有进程计算完毕
        if device != torch.device("cpu"):
            torch.cuda.synchronize(device)  # 等待CUDA设备上所有流中的所有内核完成。
        
        # 返回计算求得的平均loss
        return mean_loss.item()
    
    
    """
        这里使用了@torch.no_grad()这个装饰器对该方法进行修改,也可以使用 with torch.no_grad: 这个上下文管理器
    """
    @torch.no_grad()
    def evaluate(model, data_loader, device):
        model.eval()  # 声明模型状态 -> 1. 关闭BN; 2. Dropout
    
        # 用于存储预测正确的样本个数
        sum_num = torch.zeros(1).to(device)
    
        # 在进程0中打印验证进度
        if is_main_process():
            data_loader = tqdm(data_loader, file=sys.stdout)  # 在主进程中包装dataloader
    
        for step, data in enumerate(data_loader):
            images, labels = data
            pred = model(images.to(device))  # 获取预测概率
            pred = torch.max(pred, dim=1)[1]  # 获取预测概率最的max
            """
            torch.eq(tensor, tensor/value)
            对两个张量Tensor进行逐元素的比较,若相同位置的两个元素相同,则返回True;若不同,返回False。
            """
            sum_num += torch.eq(pred, labels.to(device)).sum()  # 计算所有预测正确的数量
    
        # 等待所有进程计算完毕
        if device != torch.device("cpu"):
            torch.cuda.synchronize(device)
    
        sum_num = reduce_value(sum_num, average=False)  # 统计所有预测正确的数量
    
        return sum_num.item()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115

    5.2 分布式训练

    5.2.1 main

    from cgi import test
    import os
    import math
    import tempfile
    import argparse
    
    import torch
    import torch.optim as optim
    import torch.optim.lr_scheduler as lr_scheduler
    from torch.utils.tensorboard import SummaryWriter
    from torchvision import transforms
    
    from model import resnet34
    from my_dataset import MyDataSet
    from utils import read_split_data, plot_data_loader_image
    from multi_train_utils.distributed_utils import init_distributed_mode, dist, cleanup
    from multi_train_utils.train_eval_utils import train_one_epoch, evaluate
    
    
    def main(args):
        if torch.cuda.is_available() is False:  # 没有GPU设备会直接报错
            raise EnvironmentError("not find GPU device for training.")
    
        # 初始化各进程环境 -> args容器中多了几个参数:1. args.rank; 2. args.world_size; 3. args.gpu
        init_distributed_mode(args=args)
    
        # 将args中新增的DDP参数赋值到全局变量中
        rank = args.rank
        device = torch.device(args.device)
        batch_size = args.batch_size
        weights_path = args.weights
        """
            当我们在使用多GPU并行训练时,梯度一般是将多块GPU的梯度求平均。
                在原本的单卡上,我们的每学习一个step,梯度前进1m(这里是为了方便理解),如果学习两部,则梯度前进2m
                假设我们的GPU数量为2。那么我们看起来是学习了一步,但因为有2块GPU,所以是两块一起运算,那么就是一次性
                算了两个step,但更新时我们对梯度进行了平均,所以这2个step的值只更新了依次,意味着梯度只前进了1m
                学习率变相地降低了,所以我们需要扩大学习率
        """
        args.lr *= args.world_size  # 学习率要根据并行GPU的数量进行倍增 -> 这里是简单粗暴的增大学习率
    
        """
            使用DDP时,一般的写入操作、打印操作都是放在第一个进程(主进程)中操作的(没有必要在每一个进程中执行相同的操作)
        """
        if rank == 0:  # 在第一个进程中打印信息,并实例化tensorboard(只在第一个进程中打印参数)
            print(args)
            print('Start Tensorboard with "tensorboard --logdir=runs", view at http://localhost:6006/')
            tb_writer = SummaryWriter()
            if os.path.exists("./weights") is False:
                os.makedirs("./weights")
    
        train_info, val_info, num_classes = read_split_data(args.data_path)
        train_images_path, train_images_label = train_info
        val_images_path, val_images_label = val_info
    
        # check num_classes
        assert args.num_classes == num_classes, "dataset num_classes: {}, input {}".format(args.num_classes,
                                                                                           num_classes)
    
        data_transform = {
            "train": transforms.Compose([transforms.RandomResizedCrop(224),
                                         transforms.RandomHorizontalFlip(),
                                         transforms.ToTensor(),
                                         transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]),
            "val": transforms.Compose([transforms.Resize(256),
                                       transforms.CenterCrop(224),
                                       transforms.ToTensor(),
                                       transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])}
    
        # 实例化训练数据集
        train_data_set = MyDataSet(images_path=train_images_path,
                                   images_class=train_images_label,
                                   transform=data_transform["train"])
    
        # 实例化验证数据集
        val_data_set = MyDataSet(images_path=val_images_path,
                                 images_class=val_images_label,
                                 transform=data_transform["val"])
    
        # 给每个rank对应的进程分配训练的样本索引
        train_sampler = torch.utils.data.distributed.DistributedSampler(train_data_set)
        val_sampler = torch.utils.data.distributed.DistributedSampler(val_data_set)
    
        # 将样本索引每batch_size个元素组成一个list
        # BatchSampler是对train_sampler做进一步的处理
        train_batch_sampler = torch.utils.data.BatchSampler(train_sampler, 
                                                            batch_size, 
                                                            drop_last=True)
    
        nw = min([os.cpu_count(), batch_size if batch_size > 1 else 0, 8])  # number of workers
        if rank == 0:  # 打印也是在第一个进程中
            print('Using {} dataloader workers every process'.format(nw))
    
        train_loader = torch.utils.data.DataLoader(train_data_set,  # 传入的仍然是实例化的训练集(不是DistributedSampler)
                                                   batch_sampler=train_batch_sampler,  # 这里传入的是BatchSampler,而不是简单的batch_size
                                                   pin_memory=True,  # 直接将数据加载到GPU中,从而达到加速的效果
                                                   num_workers=nw,  # num workers
                                                   collate_fn=train_data_set.collate_fn)
    
        val_loader = torch.utils.data.DataLoader(val_data_set,  # 传入的仍然是实例化的验证集(不是DistributedSampler)
                                                 batch_size=batch_size,  # 这里传入的还是batch_size,而不是BatchSampler
                                                 sampler=val_sampler,  # 因为我们刚才没有使用BatchSampler对DistributedSampler进行处理,所以这里
                                                                       # 直接传入DistributedSampler,将验证集进行随机打乱后均匀地分配给不同设备
                                                                       # 这里重点不是随机打乱,而是均匀地分配给不同的设备。
                                                 pin_memory=True,
                                                 num_workers=nw,
                                                 collate_fn=val_data_set.collate_fn)
        
        # 实例化模型并指认到指定的设备中。
        # Note:刚才在定义args时,device默认的是cuda,而在初始化DDP环境时有 torch.cuda.set_device(args.gpu)
        # 将对应的GPU指认到args.gpu,后面还有device = torch.device(args.device)
        # 所以这里直接使用device就可以了 -> 会帮助我们自动分配到对应的GPU上
        model = resnet34(num_classes=num_classes).to(device)
    
        # 如果存在预训练权重则载入
        if os.path.exists(weights_path):
            weights_dict = torch.load(weights_path, map_location=device)
            load_weights_dict = {k: v for k, v in weights_dict.items()
                                 if model.state_dict()[k].numel() == v.numel()}
            model.load_state_dict(load_weights_dict, strict=False)
        
            """
                如果我们使用多GPU训练,我们必须保证每个设备上初始的权重的是一模一样的!这样我们使用多GPU训练
                才是正确的;如果我们初始化的权重都不一样的化,那么在训练过程中所求得的梯度其实就不是针对同一组
                参数而言的。
    
                那我们应该怎么做呢?这里使用的是以下的语句:
                    checkpoint_path = os.path.join(tempfile.gettempdir(), "initial_weights.pt")
                
                其中,tempfile.gettempdir()用于返回保存临时文件的文件夹路径。所以我们可以得到一个保存临时文件的路径:
                    xxxxtempfolder/initial_weights.pt
    
                之后我们再将主进程模型的权值文件保存到这个临时文件中
                    if rank == 0:  # 在主进程中保存模块的初始化权重
                        torch.save(model.state_dict(), checkpoint_path)
    
                最终让所有模型都读入该权重(不管是主进程还是其他进程 <=> 不管是生成pt文件的GPU还是其他GPU):
                    model.load_state_dict(torch.load(checkpoint_path, map_location=device))
            """
        else:  # 如果不存在权重
            checkpoint_path = os.path.join(tempfile.gettempdir(), "initial_weights.pt")
            # 如果不存在预训练权重,需要将第一个进程中的权重保存,然后其他进程载入,保持初始化权重一致
            if rank == 0:  # 在主进程中保存模块的初始化权重
                torch.save(model.state_dict(), checkpoint_path)
    
            dist.barrier()  # 等待所有GPU都执行到这步
    
            # 这里注意,一定要指定map_location参数,否则会导致第一块GPU占用更多资源
            # map_location用于重定向,参考:https://blog.csdn.net/qq_43219379/article/details/123675375
            model.load_state_dict(torch.load(checkpoint_path, map_location=device))
    
        # 是否冻结权重
        if args.freeze_layers:
            for name, para in model.named_parameters():
                # 除最后的全连接层外,其他权重全部冻结(FC层中是没有BN的,所以使用同步功能的BN是没有意义的)
                if "fc" not in name:
                    para.requires_grad_(False)
        else:  # 不冻结权重
            # 只有训练带有BN结构的网络时使用SyncBatchNorm才有意义(不带BN的话就不用了)
            if args.syncBN:  # 使用带有同步功能的BN
                # 使用SyncBatchNorm后训练会更耗时
                # 这个将BN换为sync_BN是不用管BN是2d还是3d的
                model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model).to(device)
    
        # 转为DDP模型:使用torch.nn.parallel.DistributedDataParallel包装我们的模型,再指认对应的设备ID
        model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu])
    
        # optimizer
        pg = [p for p in model.parameters() if p.requires_grad]
        optimizer = optim.SGD(pg, lr=args.lr, momentum=0.9, weight_decay=0.005)
        # Scheduler https://arxiv.org/pdf/1812.01187.pdf
        lf = lambda x: ((1 + math.cos(x * math.pi / args.epochs)) / 2) * (1 - args.lrf) + args.lrf  # cosine
        scheduler = lr_scheduler.LambdaLR(optimizer, lr_lambda=lf)
    
        for epoch in range(args.epochs):
            """
                前面我们提到了,使用DistributedSampler会打乱数据并将数据均匀的分配给每个设备。
                但是设备的拥有的数据是不变的。设置该方法后,每次DistributedSampler打乱的顺序就不同了,这就导致
                每个设备所拥有的数据不同 -> 让每个设备都能接触到所有的样本
            """
            train_sampler.set_epoch(epoch)
    
            mean_loss = train_one_epoch(model=model,
                                        optimizer=optimizer,
                                        data_loader=train_loader,
                                        device=device,
                                        epoch=epoch)
    
            scheduler.step()
    
            sum_num = evaluate(model=model,
                               data_loader=val_loader,
                               device=device)
    
            # 这里的val_sampler.total_size为整个验证集样本的总数(包括补充的数据)
            acc = sum_num / val_sampler.total_size
    
            if rank == 0:  # 在主进程中给Tensorboard添加数据
                print("[epoch {}] accuracy: {}".format(epoch, round(acc, 3)))
                tags = ["loss", "accuracy", "learning_rate"]
                tb_writer.add_scalar(tags[0], mean_loss, epoch)
                tb_writer.add_scalar(tags[1], acc, epoch)
                tb_writer.add_scalar(tags[2], optimizer.param_groups[0]["lr"], epoch)
    
                # 在主进程中保存权重参数
                torch.save(model.module.state_dict(), "./weights/model-{}.pth".format(epoch))
    
        # 删除临时缓存文件
        """
            如果不使用预训练权重,那么会生成一个temp file用来保证所有设备模型的初始化一致
            所以我们要将其删除
        """
        if rank == 0:
            if os.path.exists(checkpoint_path) is True:
                os.remove(checkpoint_path)
    
        # 当训练完毕后,我们需要调用 cleanup 这个方法去摧毁进程组 -> 释放资源
        cleanup()
    
    
    if __name__ == '__main__':
        parser = argparse.ArgumentParser()
        parser.add_argument('--num_classes', type=int, default=5)
        parser.add_argument('--epochs', type=int, default=30)
        parser.add_argument('--batch-size', type=int, default=16)
        parser.add_argument('--lr', type=float, default=0.001)
        parser.add_argument('--lrf', type=float, default=0.1)
        # 是否启用SyncBatchNorm
        parser.add_argument('--syncBN', type=bool, default=True)
    
        # 数据集所在根目录
        # http://download.tensorflow.org/example_images/flower_photos.tgz
        parser.add_argument('--data-path', type=str, default="/home/wz/data_set/flower_data/flower_photos")
    
        # resnet34 官方权重下载地址
        # https://download.pytorch.org/models/resnet34-333f7ec4.pth
        parser.add_argument('--weights', type=str, default='resNet34.pth',
                            help='initial weights path')
        parser.add_argument('--freeze-layers', type=bool, default=False)
    
        # 以下三个参数不要进行修改,系统会自动分配
        parser.add_argument('--device', default='cuda', help='device id (i.e. 0 or 0,1 or cpu)')
        # 开启的进程数(注意不是线程),不用设置该参数,会根据nproc_per_node自动设置
        parser.add_argument('--world-size', default=4, type=int,
                            help='number of distributed processes')
        parser.add_argument('--dist-url', default='env://', help='url used to set up distributed training')
        opt = parser.parse_args()
    
        main(opt)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249

    5.2.2 distributed_utils

    import os
    
    import torch
    import torch.distributed as dist
    
    
    def init_distributed_mode(args):
        """
            DDP可以使用在:
                1. 多机多卡
                2. 单机多卡
            
            + 如果是多机多卡的场景下,WORLD_SIZE对应所有机器中使用的进程数量(一个进程对应一块GPU)。
            + RANK代表所有进程中的第几个进程
            + LOCAL_RANK对应当前机器中第几个进程
    
            因为我们讲的是单机多卡的场景,所以这里的:
                + WORLD_SIZE就是有几块GPU
                + RANK代表哪块GPU
                + LOCAL_RANK和RANK是一样的
    
            因为我们的脚本使用的是 torch.distributed.launch 这个方法,所以需要使用 --use_env 这个方法(args.use_env==True)
        """
        if 'RANK' in os.environ and 'WORLD_SIZE' in os.environ:  # 我们使用的是这个
            args.rank = int(os.environ["RANK"])  # 将环境中的RANK强转为int后传入args.rank变量
            args.world_size = int(os.environ['WORLD_SIZE'])
            args.gpu = int(os.environ['LOCAL_RANK'])
        elif 'SLURM_PROCID' in os.environ:
            args.rank = int(os.environ['SLURM_PROCID'])
            args.gpu = args.rank % torch.cuda.device_count()
        else:
            print('Not using distributed mode')
            args.distributed = False
            return
    
        args.distributed = True  # 将是否使用分布式训练的flag设置为True
    
        # 指定当前进程使用的GPU。在使用单机多卡的时候,其实是针对每一个GPU起了一个进程
        torch.cuda.set_device(args.gpu)
    
        args.dist_backend = 'nccl'  # 通信后端,nvidia GPU推荐使用NCCL
    
        # 打印当前所使用GPU的RANK及其distributed url信息
        print('| distributed init (rank {}): {}'.format(args.rank, args.dist_url), flush=True)
    
        # 通过 init_process_group方法创建进程组
        dist.init_process_group(backend=args.dist_backend,   # 通信后端(N卡推荐使用nccl)
                                init_method=args.dist_url,  # 初始化方法(直接使用默认的方法 -> "env://")
                                world_size=args.world_size,  # 对于不同的进程而言(一块GPU分配一个进程),WORLD_SIZE是一样的
                                rank=args.rank)  # 但不同的进程中,RANK是不一样的(如果有两张GPU并行,第一张GPU的RANK=0,第二张GPU的RANK=1)
        
        # 调用barrier方法等待所有的GPU都运行到这个地方之后再往下走    
        dist.barrier()
    
    
    def cleanup():
        dist.destroy_process_group()
    
    
    def is_dist_avail_and_initialized():
        """检查是否支持分布式环境"""
        if not dist.is_available():
            return False
        if not dist.is_initialized():
            return False
        return True
    
    
    def get_world_size():
        if not is_dist_avail_and_initialized():
            return 1
        return dist.get_world_size()
    
    
    def get_rank():
        if not is_dist_avail_and_initialized():
            return 0
        return dist.get_rank()
    
    
    def is_main_process():
        return get_rank() == 0
    
    
    def reduce_value(value, average=True):
        world_size = get_world_size()
        if world_size < 2:  # 单GPU的情况
            return value
    
        with torch.no_grad():  # 多GPU的情况下
            dist.all_reduce(value)  # 通过 all_reduce 方法对不同设备的value进行求和操作
            # 通过 all_reduce 操作之后,value就成了所有设备value的之和
            if average:  # 如果要进行求平均
                value /= world_size  # world_size为GPU数量
    
            return value
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97

    5.2.3 train_eval_utils

    import sys
    
    from tqdm import tqdm
    import torch
    
    from multi_train_utils.distributed_utils import reduce_value, is_main_process
    
    
    def train_one_epoch(model, optimizer, data_loader, device, epoch):
        model.train()  # 声明模型的状态
        loss_function = torch.nn.CrossEntropyLoss()  # 定义损失函数
        mean_loss = torch.zeros(1).to(device)  # 生成shape为[1, ]的全零矩阵,并添加到对应的设备中 -> 用于存储后续计算得到mean_loss
    
        # 清空优化器中的梯度残留
        optimizer.zero_grad()
    
        # 在进程0中打印训练进度
        if is_main_process():  # 判断是否为主进程
            # 使用tqdm库包装一下data_loader这个变量,一会儿在遍历data_loader时就会打印一个进度条
            data_loader = tqdm(data_loader, file=sys.stdout)
        
        # 迭代data_loader,获取step和对应的数据
        """
            在data_loader中就定义了BS的大小, 所以这里step就是$step = 所有图片的数量 / BS$
            step即为BS的数量, 每走过一次step即为处理了BS张图片
            
            第1个step对应第1个bs
            第2个step对应第2个bs
            第3个step对应第3个bs
            ...
        """
        for step, data in enumerate(data_loader):
            images, labels = data  # data包含两部分数据:1. 预处理后的图片;2. 对应的真实标签
    
            # 通过正向传播获取图片的预测结果
            pred = model(images.to(device))  
    
            # 根据预测结果与真实标签计算loss
            loss = loss_function(pred, labels.to(device))
    
            # 对loss进行反向传播
            """
                这里计算的loss是针对当前GPU当前batch求得的损失。但是在单机多卡的环境中,我们想要求得所有GPU的平均损失。
                所有我们应该想办法求得不同设备之间求得的loss的均值。这里是通过 reduce_value这个方法实现的
            """
            loss.backward()
    
            # 对loss进行求平均处理
            """
                reduce_value:
                    def reduce_value(value, average=True):
                        world_size = get_world_size()
                        if world_size < 2:  # 单GPU的情况
                            return value  # 原值返回
    
                        with torch.no_grad():
                            dist.all_reduce(value)
                            if average:
                                value /= world_size  # 需要除以GPU数量后再返回
    
                            return value
            """
            loss = reduce_value(loss, average=True)  # 这行代码在单机单卡环境是不起作用的
    
            # 对历史损失求平均
            mean_loss = (mean_loss * step + loss.detach()) / (step + 1)  # update mean losses
    
            # 在进程0中打印平均loss
            if is_main_process():  # 判断是否为主进程
                data_loader.desc = "[epoch {}] mean loss {}".format(epoch, round(mean_loss.item(), 3))
    
            # 判断loss是否为有限数据(不能是infty)
            if not torch.isfinite(loss):
                print('WARNING: non-finite loss, ending training ', loss)
                sys.exit(1)  # 如果loss为无穷,则退出训练
    
            optimizer.step()  # 参数优化器进行参数更新
            optimizer.zero_grad()  # 参数优化器更新完参数后,需要将梯度清空
    
        # 等待所有进程计算完毕
        if device != torch.device("cpu"):
            torch.cuda.synchronize(device)  # 等待CUDA设备上所有流中的所有内核完成。
        
        # 返回计算求得的平均loss
        return mean_loss.item()
    
    
    """
        这里使用了@torch.no_grad()这个装饰器对该方法进行修改,也可以使用 with torch.no_grad: 这个上下文管理器
    """
    @torch.no_grad()
    def evaluate(model, data_loader, device):
        model.eval()  # 声明模型状态 -> 1. 关闭BN; 2. Dropout
    
        # 用于存储预测正确的样本个数
        sum_num = torch.zeros(1).to(device)
    
        # 在进程0中打印验证进度
        if is_main_process():
            data_loader = tqdm(data_loader, file=sys.stdout)  # 在主进程中包装dataloader
    
        for step, data in enumerate(data_loader):
            images, labels = data
            pred = model(images.to(device))  # 获取预测概率
            pred = torch.max(pred, dim=1)[1]  # 获取预测概率最的max
            """
            torch.eq(tensor, tensor/value)
            对两个张量Tensor进行逐元素的比较,若相同位置的两个元素相同,则返回True;若不同,返回False。
            """
            sum_num += torch.eq(pred, labels.to(device)).sum()  # 计算所有预测正确的数量
    
        # 等待所有进程计算完毕
        if device != torch.device("cpu"):
            torch.cuda.synchronize(device)
    
        # 统计所有预测正确的数量
        # 这里使用 reduce_value 方法实现所有GPU的sum_num变量之和
        sum_num = reduce_value(sum_num, average=False)
    
        return sum_num.item()
        
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121

    6. 补充知识

    6.1 Cosine学习率变化曲线

    lf = lambda x: ((1 + math.cos(x * math.pi / args.epochs)) / 2) * (1 - arg.lrf) + args.lrf
    
    • 1

    l r ( e p o c h ) = 1 2 [ 1 + cos ⁡ π ⋅ e p o c h e p o c h s ] × ( 1 − α ) + α \mathrm{lr(epoch)} = \frac{1}{2}[1 + \cos\frac{\pi \cdot \mathrm{epoch}}{\mathrm{epochs}}] \times (1 - \alpha) + \alpha lr(epoch)=21[1+cosepochsπepoch]×(1α)+α

    其中:

    • l f \mathrm{lf} lf为当前学习率
    • e p o c h \mathrm{epoch} epoch为当前迭代的epoch数
    • e p o c h s \mathrm{epochs} epochs为总的epoch数
    • α \alpha α为倍率因子

    其函数曲线为:

    在这里插入图片描述

    6.2 DistributedSampler的讲解

    # 实例化训练数据集
    train_data_set = MyDataSet(images_path=train_images_path,
                               images_class=train_images_label,
                               transform=data_transform["train"])
    
    # 实例化验证数据集
    val_data_set = MyDataSet(images_path=val_images_path,
                             images_class=val_images_label,
                             transform=data_transform["val"])
    
    # 给每个rank对应的进程分配训练的样本索引
    train_sampler = torch.utils.data.distributed.DistributedSampler(train_data_set)
    val_sampler = torch.utils.data.distributed.DistributedSampler(val_data_set)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    现在对torch.utils.data.distributed.DistributedSampler进行讲解。

    在这里插入图片描述

    假设当前的数据集一共有11个样本 -> [0, 1, 2, 3, 4, …, 10]。

    首先DistributedSampler会对数据进行Shuffle处理,得到[6, 1, 9, 3, …, 7]这样随机的数据顺序。然后根据GPU的数量进行计算。假设我们使用2块GPU进行并行。那么DistributedSampler:

    1. 先会对总的样本数/2(向上取整)=6。
    2. 再对得到的数乘上GPU的个数,即6*2=12
    3. 这个12就是2块GPU需要的数据总的样本个数。但是我们的样本总数是11,那么就会对其进行补充。怎么补充?—— 对最开头的数据进行复制,再放到最后。这里我们缺了1个数据,所有就将6补充到最后。(如果我们差3个数据,就将[6, 1, 9]进行复制,补充到后面)

    这样我们就有12个数据了,就可以均衡的分配到每一个GPU设备中了。

    最后对数据进行分配。分配的方式也非常简单,就是间隔地将数据分配到不同的设备中。


    通过DistributedSampler,我们就将所有的数据平分到各个设备上了。设备只能使用分配的数据,不能使用其他设备拥有的数据。

    6.3 BatchSampler

    # 给每个rank对应的进程分配训练的样本索引
    train_sampler = torch.utils.data.distributed.DistributedSampler(train_data_set)
    val_sampler = torch.utils.data.distributed.DistributedSampler(val_data_set)
    
    # -----------------------------------------------------------
    
    # 将样本索引每batch_size个元素组成一个list
    # BatchSampler是对train_sampler做进一步的处理
    train_batch_sampler = torch.utils.data.BatchSampler(train_sampler, 
                                                        batch_size, 
                                                        drop_last=True)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    在这里插入图片描述

    假设我们有2块GPU,通过DistributedSampler后,数据被均匀地分配到两个设备中。我们拿第一块GPU为例,它分配到的数据索引为[6, 9, 10, 1, 8, 7]。假设我们的Batch Size=2,那么BatchSampler会将不同设备的数据以2个为一组组合在一起。


    在BatchSampler中还有一个参数drop_last。这个参数的意思是:

    如果我们的BS=4,那么我们发现,只有前4个数据可以打包为一组,后2个数据凑不够了:

    • 如果Droplast==True,那么剩下的2个数据就被丢弃了。
    • 如果Droplast==False,那么剩下的2个数据打包为一个batch。

    Note

    • 一般BatchSampler只处理训练集的DistributedSampler,验证集的不需要处理。

    7. 总结

    1. 打印、保存模型、计算时间等操作要放在主线程中执行 (rank==0)
    2. 别忘记给模型用DDP进行包装
    3. 加载数据集后别忘记使用DistributedSampler进行处理;还有BatchSampler对前者进行处理
    4. 记得对loss队形reduce_value的计算
    5. 对需要累加的东西都别忘了使用reduce_value进行计算
    6. loss.backward()直接进行梯度反向传播就行,不需要先进行reduce_value再传播。因为每张卡的loss是不一样的,是针对自己的数据求出来的值,mean loss只是展示用,并不是实际反向传播的loss
    7. to(device)的方法很好用,以后就可以不用.cuda()了 😂
    8. tensorboard里面东西排序是看英文顺序,不是语句顺序
    9. 如果追求速度,关闭同步的BN(SyncBN)

    知识来源

    1. https://www.bilibili.com/video/BV1yt4y1e7sZ?share_source=copy_pc
  • 相关阅读:
    国内 11 家通过备案的 AI 大模型产品
    Spring之注解开发
    MindSponge分子动力学模拟——计算单点能(2023.08)
    著名音乐app网易云推广运营策划案
    【八股】在Gradle和Maven之间抉择构建工具
    逗号分隔的字符串笔记
    用ffmpeg修改MP4文件头信息,使其支持流式加载及播放
    openGauss学习笔记-127 openGauss 数据库管理-设置账本数据库-修复账本数据库
    深度学习入门(十五)环境和分布偏移(了解)
    Python每日练习:使用百度AI识别表情包并抓取
  • 原文地址:https://blog.csdn.net/weixin_44878336/article/details/125412625