• DeepSpeed


    DeepSpeed概念

    DeepSpeed中用到的技术包括以下几个等级:
    ZeRO-1:只对optimizer进行切片后分布式保存
    ZeRO-2:对optimizer和grad进行切片后分布式保存
    ZeRO-3:对optimizer、grad和模型参数进行切片后分布式保存
    offload:将forward中间结果保存到内存、硬盘(NVMe)等缓存中,然后在需要时进行加载或重计算,进一步降低显存占用

    • 对应deepspeed参数
    zero_stage  : 这个对应者DeepSpeed工具中的zero方式,分别是0,1,2,3
    offload     : ZeRO-Offload 通过利用主机CPU上的计算和内存资源来执行优化器,从而减少此类模型的GPU计算和内存需求。
    local_rank  : 分布式训练时的一个变量,用于标识当前 GPU 设备的本地排名(本机排名,与global-rank不同)
    gradient_checkpointing : 降低深度学习模型训练过程中内存消耗的技术
    
    • 1
    • 2
    • 3
    • 4

    args.local_rank:local_rank 在分布式训练时使用的一个变量,用于标识当前 GPU 设备的本地排名(local rank)。

    • 当 args.local_rank 等于 -1 时,表示代码不在分布式设置下运行,仅使用单个 GPU 进行训练。

    • 如果 args.local_rank 不等于 -1,则表示代码在分布式设置下运行,当前 GPU 设备被分配了一个唯一的本地排名。代码会将设备设置为指定的 GPU(torch.device("cuda", args.local_rank)),并使用 deepspeed.init_distributed() 函数调用初始化分布式后端。

    • 注意: 在 PyTorch 中也有分布式初始化方法 torch.distributed.init_process_group() 函数。但是当使用 DeepSpeed 库时,不要替换为 deepspeed.init_distributed()。

    args.global_rank:在分布式训练中,每个进程都有一个唯一的全局排名,用于标识该进程在分布式环境中的位置。全局排名的范围是从0到world_size-1,其中 world_size 是整个分布式环境中进程的总数。

    • 本程序中通过 torch.distributed.get_rank() 来读取 global_rank, 本函数在初始化分布式后端之后才能调用。

    数据加载:

        train_phase = 1
        train_dataset, eval_dataset = create_prompt_dataset( # 自定义的函数
            args.local_rank, args.data_path, args.data_split,
            args.data_output_path, train_phase, args.seed, tokenizer,
            args.max_seq_len)
    
    • 1
    • 2
    • 3
    • 4
    • 5

    然后是初始化sampler:

    • 单GPU使用RandomSampler和SequentialSampler
    • 分布式处理使用DistributedSampler。sampler 主要用来设置数据采用的顺序。比如随机采样来提高模型的鲁棒性。
        # DataLoaders creation:
        if args.local_rank == -1:
            train_sampler = RandomSampler(train_dataset)
            eval_sampler = SequentialSampler(eval_dataset)
        else:
            train_sampler = DistributedSampler(train_dataset)
            eval_sampler = DistributedSampler(eval_dataset)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    数据读取使用 PyTorch 标准的 DataLoader 来处理。使用Dataloader 不仅可以设置sampler定义采样方式,还可以自动进行批处理,并且支持多进程数据加载。

        train_dataloader = DataLoader(train_dataset,
                                      collate_fn=default_data_collator,
                                      sampler=train_sampler,
                                      batch_size=args.per_device_train_batch_size)
        eval_dataloader = DataLoader(eval_dataset,
                                     collate_fn=default_data_collator,
                                     sampler=eval_sampler,
                                     batch_size=args.per_device_eval_batch_size)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    model = create_hf_model(AutoModelForCausalLM, args.model_name_or_path,
                            tokenizer, ds_config)
    ## LoRA优化                        
    if args.lora_dim > 0:
         model = convert_linear_layer_to_lora(model, args.lora_module_name,
                                              args.lora_dim)
         if args.only_optimize_lora:
             model = only_optimize_lora_parameters(model)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 提取需要被优化的参数 optimizer_grouped_parameters
    • 在代码中,get_optimizer_grouped_parameters() 函数被用来将权重分成两组,一组需要应用权重衰减,另一组则不需要。该函数通过遍历模型的所有参数,并检查参数名称是否包含 bias 或 LayerNorm 等特殊字符串,来区分需要应用权重衰减的参数和不需要的参数。

    分组原因的解说: 一般来说,对于参数名称中不包含 bias 或 LayerNorm 等特殊字符串的参数,我们认为它们是需要应用权重衰减的参数。对于这些参数,通常会将它们的权重矩阵与权重衰减超参数相乘,以降低它们的权重。
    与此相反,对于参数名称中包含 bias 或 LayerNorm 等特殊字符串的参数,我们认为它们是不需要应用权重衰减的参数。这是因为 bias 或 LayerNorm 参数通常只是用来偏移或缩放其他层的输出,而不是真正的权重参数。通过将权重分成两组,并分别应用权重衰减和不应用权重衰减,我们可以更好地控制模型的复杂度,从而提高模型的泛化性能。

    • 然后设置Optimizer优化器,根据参数不同会选择 DeepSpeedCPUAdam 或者 FusedAdam 优化器。 并传入了一些参数,包括分组的参数、学习率和 betas。

    Adam优化器的解说: 在 Hugging Face 的 Transformers 库中,有两种 Adam 优化器可供选择:FusedAdam 和 DeepSpeedCPUAdam。它们都是基于 PyTorch 实现的优化器,但在不同的硬件上具有不同的优化和性能特征。FusedAdam 是使用 NVIDIA Apex 库实现的优化器,它支持混合精度训练,并且可以同时计算梯度和权重更新操作,从而提高训练效率。FusedAdam 优化器在使用支持 CUDA 的 NVIDIA GPU 时具有较好的性能。DeepSpeedCPUAdam 是一种 CPU 上的优化器,它是 DeepSpeed 框架中的一部分,支持分布式训练和模型平行化。DeepSpeedCPUAdam 优化器在使用 CPU 时具有较好的性能。在上面的代码中,如果 args.offload 为 True,则表示使用基于 CPU 的优化,因此会选择使用 DeepSpeedCPUAdam 优化器。

        # Split weights in two groups, one with weight decay and the other not.
        optimizer_grouped_parameters = get_optimizer_grouped_parameters(
            model, args.weight_decay)
     
        AdamOptimizer = DeepSpeedCPUAdam if args.offload else FusedAdam
        optimizer = AdamOptimizer(optimizer_grouped_parameters,
                                  lr=args.learning_rate,
                                  betas=(0.9, 0.95))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 初始化

    使用DeepSpeed进行优化是,需要使用deepspeed.initialize() 函数来初始化模型、优化器、学习率调度器等训练相关的组件。

    • 其中,model 和 optimizer 是必需的参数,而其他参数则是可选
    • deepspeed.initialize() 函数会对传入的参数进行检查和优化,并返回新的模型、优化器和学习率调度器等组件。例如,它会根据训练参数设置和硬件配置自动调整优化器和梯度累积的设置,并设置模型权重的分布式训练策略。dist_init_required=True 参数指示 DeepSpeed 是否需要进行分布式训练初始化。
        model, optimizer, _, lr_scheduler = deepspeed.initialize(
            model=model,
            optimizer=optimizer,
            args=args,
            config=ds_config,
            lr_scheduler=lr_scheduler,
            dist_init_required=True)    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    在这里插入图片描述


    用法简介

    1. 使用DeepSpeed时,首先需要提供一个ds_config文件
    2. 然后参照上文基础用法,将model用deepspeed.initialize()包装起来
      • deepspeed.initialize()内部会初始化通信,所以就不需要手动调用dist.init_process_group()了(当然也可以手动调用deepspeed.init_distributed()来初始化)
      • 如果模型太大,不能在同一张卡上加载,可以在deepspeed.zero.Init()的context下初始化模型,让模型在初始化阶段就能加载到不同卡上(因为只有ZeRO-3才对模型参数进行切分,所以deepspeed.zero.Init()只在ZeRO-3下才有效)
      • deepspeed不属于数据并行(可以认为属于流水线并行),所以不需要使用DistributedSampler进行数据集分片(待确认?)

    ds_config 案例:

    {
        "train_batch_size": 32,
        "train_micro_batch_size_per_gpu": 8,
        "steps_per_print": 100,
        "wall_clock_breakdown": false,
        "fp16": {
            "enabled": false,
            "loss_scale": 0,
            "loss_scale_window": 1000,
            "initial_scale_power": 16,
            "hysteresis": 2,
            "min_loss_scale": 1
        },
        "bf16": {
            "enabled": true
        },
        "optimizer": {
            "type": "AdamW",
            "params": {
                "lr": 3e-5,
                "betas": [
                    0.8,
                    0.999
                ],
                "eps": 1e-8,
                "weight_decay": 3e-7
            }
        },
        "scheduler": {
            "type": "WarmupLR",
            "params": {
                "warmup_min_lr": 0,
                "warmup_max_lr": 3e-5,
                "warmup_num_steps": 500
            }
        },
        "zero_optimization": {
            "stage": 1,
            "offload_optimizer": {
                "device": "cpu",
                "pin_memory": true
            },
            "allgather_partitions": true,
            "allgather_bucket_size": 2e8,
            "overlap_comm": true,
            "reduce_scatter": true,
            "reduce_bucket_size": 2e8,
            "contiguous_gradients": true
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    实战代码

    • 讲解链接:https://blog.csdn.net/u010751000/article/details/123516433

    • 代码链接:https://github.com/microsoft/DeepSpeedExamples/blob/master/training/cifar/cifar10_deepspeed.py

    • 使用DeepSpeed其实和写一个pytorch模型只有部分区别,一开始的流程是一样的。

    pip install deepspeed
    
    • 1

    初始化Deepspeed

    DeepSpeed 通过输入参数来启动训练,因此需要使用argparse解析参数:

    import argparse
     
     
    def add_argument():
        parser = argparse.ArgumentParser(description='CIFAR')
        parser.add_argument('-b',
                            '--batch_size',
                            default=32,
                            type=int,
                            help='mini-batch size (default: 32)')
        parser.add_argument('-e',
                            '--epochs',
                            default=30,
                            type=int,
                            help='number of total epochs (default: 30)')
        parser.add_argument('--local_rank',
                            type=int,
                            default=-1,
                            help='local rank passed from distributed launcher')
     
        parser.add_argument('--log-interval',
                            type=int,
                            default=2000,
                            help="output logging information at a given interval")
     
        parser = deepspeed.add_config_arguments(parser) # 参数传入deepspeed
        args = parser.parse_args()
        return args
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    加载数据

    import torch
    import torchvision
    import torchvision.transforms as transforms
     
    trainset = torchvision.datasets.CIFAR10(root='./data',
                                            train=True,
                                            download=True,
                                            transform=transform)
    trainloader = torch.utils.data.DataLoader(trainset,
                                              batch_size=16,
                                              shuffle=True,
                                              num_workers=2)
    testset = torchvision.datasets.CIFAR10(root='./data',
                                           train=False,
                                           download=True,
                                           transform=transform)
    testloader = torch.utils.data.DataLoader(testset,
                                             batch_size=4,
                                             shuffle=False,
                                             num_workers=2)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 此外,模型初始化的时候除了参数,还需要model及其parameters,还有训练集,使用deepspeed初始化:
    args = add_argument()
    net = Net()
    parameters = filter(lambda p: p.requires_grad, net.parameters())
    model_engine, optimizer, trainloader, __ = deepspeed.initialize(
        args=args, model=net, model_parameters=parameters, training_data=trainset)
    
    • 1
    • 2
    • 3
    • 4
    • 5

    训练

    下面的部分和我们平时训练模型是几乎一样的代码,请注意 local_rank 是你不需要管的参数,在后面启动模型训练的时候,DeepSpeed会自动给这个参数赋值。

    • local_rank是模型训练的gpu设备
    • model_engine就是模型,就model,optimizer,trainloader全部都使用deepspeed初始化之后的
    for epoch in range(2):
        running_loss = 0.0
        for i, data in enumerate(trainloader):
            inputs, labels = data[0].to(model_engine.local_rank), data[1].to(
                model_engine.local_rank)
            outputs = model_engine(inputs)
            loss = criterion(outputs, labels)
            model_engine.backward(loss)
            model_engine.step()
     
            # print statistics
            running_loss += loss.item()
            if i % args.log_interval == (args.log_interval - 1):
                print('[%d, %5d] loss: %.3f' % (epoch + 1, i + 1, running_loss / args.log_interval))
                running_loss = 0.0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    测试

    correct = 0
    total = 0
    with torch.no_grad():
        for data in testloader:
            images, labels = data
            outputs = net(images.to(model_engine.local_rank))
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels.to(
                model_engine.local_rank)).sum().item()
     
    print('Accuracy of the network on the 10000 test images: %d %%' %
          (100 * correct / total))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    定义模型参数

    {
       "train_batch_size": 4,
       "steps_per_print": 2000,
       "optimizer": {
         "type": "Adam",
         "params": {
           "lr": 0.001,
           "betas": [
             0.8,
             0.999
           ],
           "eps": 1e-8,
           "weight_decay": 3e-7
         }
       },
       "scheduler": {
         "type": "WarmupLR",
         "params": {
           "warmup_min_lr": 0,
           "warmup_max_lr": 0.001,
           "warmup_num_steps": 1000
         }
       },
       "wall_clock_breakdown": false
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    运行

    export CUDA_VISIBLE_DEVICES="6,7,8,9"  # gpu编号
    deepspeed test.py --deepspeed_config config.json  #加载配置运行
    
    • 1
    • 2
  • 相关阅读:
    tensor.eq() tensor.item() tensor.argmax()
    【大数据】Hadoop在呼唤Hive(附一键部署Hive脚本)
    腾讯云服务器可用区是什么意思?可用区详细说明
    【Mongoose应用和文件上传】一.Express框架访问MongDB数据库;二.node实现文件上传
    QT资源文件导入
    uniapp 连接夜神安卓模拟器真机调试
    企业进行媒体宣传的重要性
    【qml】性能优化 | 常见的界面元素优化
    如何在虚幻引擎中渲染动画?
    PyTorch之MLP
  • 原文地址:https://blog.csdn.net/RandyHan/article/details/132630214