• Pytorch 多卡并行(3)—— 使用 DDP 加速 minGPT 训练



    0. 项目组织

    • 本文改写 MinGPT 库中的 chargpt 例程,这是一个 character-level 语言模型项目,组织如下
      在这里插入图片描述
    • 说明一下主要文件内容
      1. data/input.txt 是训练用的数据集
      2. char_dataset.py 定义了一个 char-level 的 torch.utils.data.Dataset
      3. gpt_snapshot.pt 是程序运行过程中保存的快照,使用 torchrun 时可以从此重启所有进程的训练
      4. gpt2_train_cfg.yaml 是 yaml 配置文件,记录了训练超参数
      5. main.log 是 hydra 生成的 logging 文件
      6. main.py 是程序入口,符合前文 使用 torchrun 进行容错处理 第1节给出的标准形式
      7. model.py 定义了 GPT 模型结构和 optimizer 的构造方法
      8. trainer.py 定义了训练过程,包括快照保存和加载等操作

    1. 参数准备

    • 本项目使用 YAML文件存储超参数设置。YAML 是一种轻量级的数据序列化格式。相较于JSON等其他格式,YAML 更加易读易写,也更加适合用于配置文件等场景。YAML的语法结构主要包含键值对、列表、注释等几种元素
      data_config:
        path: ./data/input.txt
        block_size: 128   # 输入序列长度
        train_split: 0.9  # 训练集和测试集划分
        truncate: 0.02    # 只用5%的数据进行训练
      gpt_config:
        n_layer: 8
        n_head: 8
        n_embd: 512       
      trainer_config:
        max_epochs: 10
        batch_size: 216
        data_loader_workers: 4
        grad_norm_clip: 1.0
        snapshot_path: gpt_snapshot.pt
        save_every: 3
        use_amp: True
      optimizer_config:
        weight_decay: 0.1
        learning_rate: 0.0003
      
      hydra:
        run:
          dir: ./
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24

      使用yaml文件时,可以使用 ${node.key} 的方式引用yaml中的其他变量;如果超参数的值缺失,可以使用 ??? 输入缺失值,或使用 null 输入空值。

    • 使用 Hydra 来管理超参数,它可以以装饰器的形式方便地加载不同路径下的 yaml 配置文件,最小用例如下
      import hydra
      from omegaconf import DictConfig
      
      @hydra.main(version_base=None, config_path='configs', config_name='config')
      def main(cfg: DictConfig) -> None:
          cfg['key'] # 获得对应的参数值
      
      if __name__ == '__main__':
          main()
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      这样就把 ./configs/config.yaml 文件的参数加载到 main 函数中了,使用 cfg['key'] 这样的形式获取参数值
    • 使用 Hydra 还有一个好处是它对 logging 标准库进行了包装。在 hydra.main 装饰器中对 log 输出格式规范为 "[%(asctime)s][%(name)s][%(levelname)s] - %(message)s",并设置 level 为 info,运行程序就会自动生成 main.log 日志文件。可以通过命令行的hydra.verbose 参数修改 log 的显示 level

    2. 数据准备

    • 使用的数据是 tiny-shakespear 数据集,它是一个记录了一些英文对话的文本文档,截取如下
      First Citizen:
      Before we proceed any further, hear me speak.
      
      All:
      Speak, speak.
      
      First Citizen:
      You are all resolved rather to die than to famish?
      
      All:
      Resolved. resolved.
      
      First Citizen:
      First, you know Caius Marcius is chief enemy to the people.
      
      All:
      We know't, we know't.
      
      First Citizen:
      Let us kill him, and we'll have corn at our own price.
      Is't a verdict?
      
      All:
      No more talking on't; let it be done: away, away!
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
    • 下面来构造数据集,思路是把 txt 文件中所有字符去重排序生成 vocab table;样本生成时先把 txt 内容全部读取进来,然后构造 n-gram 样本。如下
      import torch
      from torch.utils.data import Dataset
      import fsspec
      from dataclasses import dataclass
      
      """
      Adapted from https://github.com/karpathy/minGPT/blob/master/projects/chargpt/chargpt.py
      """
      
      @dataclass
      class DataConfig:
          path: str = None
          block_size: int = None      # 输入序列长度    
          train_split: float = None   # 训练集和测试集划分
          truncate: float = 1.0       # 用于训练的数据占全体数据的比例
      
      class CharDataset(Dataset):
      
          def __init__(self, data_cfg: DataConfig): #data_path: str, block_size):
              # 加载所需比例的数据
              data = fsspec.open(data_cfg.path).open().read().decode('utf-8')
              data = data[ : int(len(data) * data_cfg.truncate)]
      
              # Set 去重,转 list 后排序得到数据集中的唯一字符列表作为词表
              chars = sorted(list(set(data))) 
              data_size, vocab_size = len(data), len(chars)
              print('Data has %d characters, %d unique.' % (data_size, vocab_size))
      
              # 得到字符和词表索引之间的双射
              self.stoi = {ch: i for i, ch in enumerate(chars)}   # 字符 -> 词表索引
              self.itos = {i: ch for i, ch in enumerate(chars)}   # 词表索引 -> 字符
              
              self.block_size = data_cfg.block_size  	# 模型输入序列长度
              self.vocab_size = vocab_size			# 词表尺寸
              self.data = data
      
          def __len__(self):
              return len(self.data) - self.block_size
      
          def __getitem__(self, idx):
              # grab a chunk of (block_size + 1) characters from the data
              chunk = self.data[idx:idx + self.block_size + 1]
              
              # encode every character to an integer
              dix = [self.stoi[s] for s in chunk]
              x = torch.tensor(dix[:-1], dtype=torch.long)
              y = torch.tensor(dix[1:], dtype=torch.long)
              return x, y
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48

    3. 程序入口

    • 使用 torchrun 命令进行容错,按前文 使用 torchrun 进行容错处理 给出的标准形式来编写程序入口(mian.py),如下
      import os
      import torch
      from torch.utils.data import random_split
      from torch.distributed import init_process_group, destroy_process_group
      from model import GPT, GPTConfig, OptimizerConfig, create_optimizer
      from trainer import Trainer, TrainerConfig
      from char_dataset import CharDataset, DataConfig
      from omegaconf import DictConfig
      import hydra
      
      
      def ddp_setup():
      	os.environ["MASTER_ADDR"] = "localhost" # 由于这里是单机实验所以直接写 localhost
          os.environ["MASTER_PORT"] = "12355"     # 任意空闲端口
          init_process_group(backend="nccl")
          torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
      
      def get_train_objs(gpt_cfg: GPTConfig, opt_cfg: OptimizerConfig, data_cfg: DataConfig):
          dataset = CharDataset(data_cfg)
          train_len = int(len(dataset) * data_cfg.train_split)
          train_set, test_set = random_split(dataset, [train_len, len(dataset) - train_len])
      
          gpt_cfg.vocab_size = dataset.vocab_size
          gpt_cfg.block_size = dataset.block_size
          model = GPT(gpt_cfg)
          optimizer = create_optimizer(model, opt_cfg)
          
          return model, optimizer, train_set, test_set
       
      @hydra.main(version_base=None, config_path=".", config_name="gpt2_train_cfg")
      def main(cfg: DictConfig):
          # 初始化进程池
          ddp_setup()
      
          # 从 yaml 文件读取超参数
          gpt_cfg = GPTConfig(**cfg['gpt_config'])
          opt_cfg = OptimizerConfig(**cfg['optimizer_config'])
          data_cfg = DataConfig(**cfg['data_config'])
          trainer_cfg = TrainerConfig(**cfg['trainer_config'])
      
          # 创建训练对象
          model, optimizer, train_data, test_data = get_train_objs(gpt_cfg, opt_cfg, data_cfg)
          trainer = Trainer(trainer_cfg, model, optimizer, train_data, test_data)
          
          # 开始训练
          trainer.train()
      
          # 训练完成后,销毁进程池
          destroy_process_group()
      
      
      if __name__ == "__main__":
          main()
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
    • 注意其中使用 hydra.main 装饰器来加载参数;运行时使用以下命令指定 GPU 运行
      CUDA_VISIBLE_DEVICES=1,2 torchrun --standalone --nproc_per_node=gpu main.py
      
      • 1

    4. 定义模型

    • 整个模型定义部分相比 MinGPT 原始代码逻辑上没有区别,只是换了一下写法看起来更清晰一点。首先定义两个 @dataclass 保存模型和优化器参数
      from dataclasses import dataclass
      import math
      import torch
      import torch.nn as nn
      from torch.nn import functional as F
      
      @dataclass
      class GPTConfig:
          model_type: str = 'gpt2'
          # model configurations
          n_layer: int = None
          n_head: int = None
          n_embd: int =  None
          # openai's values for gpt2
          vocab_size: int = 50257 
          block_size: int = 1024
          # dropout hyperparameters
          embd_pdrop: float = 0.1
          resid_pdrop: float = 0.1
          attn_pdrop: float = 0.1
      
      @dataclass
      class OptimizerConfig:
          learning_rate: float = 3e-4
          weight_decay: float = 0.1
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
    • 定义多头 masked self-attention 模块,原本 MinGPT 库是全部手写的,这里则用了 pytorch 自己的多头注意力模块。具体做法是使用 torch.nn.MultiheadAttention 定义普通多头注意力层,在 forward 方法中用同一个序列输入构造 qkv 实现 self-attention,再用过对注意力输出设置遮盖实现 mask
      class MultiheadAttentionLayer(nn.Module):
          """
          A multi-head masked self-attention layer with a projection at the end.
          """
      
          def __init__(self, config, device="cpu", dtype=torch.float32):
              super().__init__()
              assert config.n_embd % config.n_head == 0
              self.resid_drop = nn.Dropout(config.resid_pdrop)
              
              # output projection
              self.c_proj = nn.Linear(config.n_embd, config.n_embd, device=device, dtype=dtype)
      
              # Causal mask。注意这个mask是通过 self.register_buffer 方法登记的
              # 这样登记过的张量可以求梯度也可以随模型在 CPU/GPU 之间移动,但是不进行参数优化
              self.register_buffer("mask", torch.tril(torch.ones(config.block_size, config.block_size))
                                   .view(1, 1, config.block_size, config.block_size))
              
              self.attn = torch.nn.MultiheadAttention(
                  embed_dim=config.n_embd,
                  num_heads=config.n_head,
                  dropout=config.attn_pdrop,
                  batch_first=True,
                  device=device,
                  dtype=dtype
              )
      
          def forward(self, x):
              _, seq_size, _ = x.size()   # batch size, sequence length, embedding dimensionality (n_embd)
              y = self.attn(x, x, x, attn_mask=self.mask[0, 0, :seq_size, :seq_size])[0]
              y = self.resid_drop(self.c_proj(y))
              return y
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32

      我感觉这里 self.attn(x, x, x, attn_mask=self.mask[0, 0, :seq_size, :seq_size])[0] 的调用有问题,因为 torch.nn.MultiheadAttention 的前向过程需要输入 query,key 和 value 三个 tensor,这里应该把 x 用三个线性层变换后再作为输入。如果读者有其他想法可以和我讨论。考虑到本文主要说明 DDP 并行,暂不关注此问题

    • 定义 Transformer block
      class Block(nn.Module):
          """ an unassuming Transformer block """
          def __init__(self, config: GPTConfig):
              super().__init__()
              self.ln1 = nn.LayerNorm(config.n_embd)
              self.ln2 = nn.LayerNorm(config.n_embd)
              self.attn = MultiheadAttentionLayer(config)
              self.mlp = nn.Sequential(
                  nn.Linear(config.n_embd, 4 * config.n_embd),
                  nn.GELU(),
                  nn.Linear(4 * config.n_embd, config.n_embd),
                  nn.Dropout(config.resid_pdrop),
              )
      
          def forward(self, x):
              x = x + self.attn(self.ln1(x))
              x = x + self.mlp(self.ln2(x))
              return x
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
    • 定义字符嵌入层,用 nn.Embedding 嵌入 token,再设置一个 nn.Parameter 作为可学习的位置编码
      class EmbeddingStem(nn.Module):
          def __init__(self, config: GPTConfig, device="cpu", dtype=torch.float32):
              super().__init__()
              self.tok_emb = nn.Embedding(config.vocab_size, config.n_embd, device=device, dtype=dtype)
              self.pos_emb = nn.Parameter(torch.zeros(1, config.block_size, config.n_embd, device=device, dtype=dtype))
              self.drop = nn.Dropout(config.embd_pdrop)
              self.block_size = config.block_size
      
          def reset_parameters(self): 
              self.tok_emb.reset_parameters() # 将 nn.Embedding 层参数初始化为正态分布采样
      
          def forward(self, idx):
              b, t = idx.size()
              assert t <= self.block_size, f"Cannot forward sequence of length {t}, block size is only {self.block_size}"
      
              token_embeddings = self.tok_emb(idx)            # each index maps to a (learnable) embedding vector
              position_embeddings = self.pos_emb[:, :t, :]    # each position maps to a (learnable) position vector
              return self.drop(token_embeddings + position_embeddings)
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
    • 把以上组件合在一起,定义 GPT 模型
      class GPT(nn.Module):
          """ GPT Language Model """
      
          def __init__(self, config: GPTConfig):
              super().__init__()
              self.block_size = config.block_size
              config = self._set_model_config(config)
      
              # input embedding stem
              self.emb_stem = EmbeddingStem(config)
              # transformer
              self.blocks = nn.Sequential(*[Block(config) for _ in range(config.n_layer)])
              # decoder head
              self.ln_f = nn.LayerNorm(config.n_embd)
              self.head = nn.Linear(config.n_embd, config.vocab_size, bias=False)
      
              # init all weights, and apply a special scaled init to the residual projections, per GPT-2 paper
              self.apply(self._init_weights)
              for pn, p in self.named_parameters():
                  if pn.endswith('c_proj.weight'):
                      p.data.normal_(mean=0.0, std=0.02/math.sqrt(2 * config.n_layer))
      
              # report number of parameters (note we don't count the decoder parameters in lm_head)
              n_params = sum(p.numel() for p in self.blocks.parameters())
              print("number of parameters: %.2fM" % (n_params/1e6,))
      
          def _set_model_config(self, config):
              type_given = config.model_type is not None
              params_given = all([config.n_layer is not None, config.n_head is not None, config.n_embd is not None])
              # assert type_given ^ params_given # exactly one of these (XOR)
              if type_given and not params_given:
                  # translate from model_type to detailed configuration
                  config.__dict__.update({
                      # names follow the huggingface naming conventions
                      # GPT-1
                      'openai-gpt':   dict(n_layer=12, n_head=12, n_embd=768),  # 117M params
                      # GPT-2 configs
                      'gpt2':         dict(n_layer=12, n_head=12, n_embd=768),  # 124M params
                      'gpt2-medium':  dict(n_layer=24, n_head=16, n_embd=1024), # 350M params
                      'gpt2-large':   dict(n_layer=36, n_head=20, n_embd=1280), # 774M params
                      'gpt2-xl':      dict(n_layer=48, n_head=25, n_embd=1600), # 1558M params
                      # Gophers
                      'gopher-44m':   dict(n_layer=8, n_head=16, n_embd=512),
                      # (there are a number more...)
                      # I made these tiny models up
                      'gpt-mini':     dict(n_layer=6, n_head=6, n_embd=192),
                      'gpt-micro':    dict(n_layer=4, n_head=4, n_embd=128),
                      'gpt-nano':     dict(n_layer=3, n_head=3, n_embd=48),
                  }[config.model_type])
              return config
          
          def _init_weights(self, module):
              if isinstance(module, (nn.Linear, nn.Embedding)):
                  module.weight.data.normal_(mean=0.0, std=0.02)
                  if isinstance(module, nn.Linear) and module.bias is not None:
                      module.bias.data.zero_()
              elif isinstance(module, nn.LayerNorm):
                  module.bias.data.zero_()
                  module.weight.data.fill_(1.0)
      
          def forward(self, idx, targets=None):
              x = self.emb_stem(idx)
              x = self.blocks(x)
              x = self.ln_f(x)
              logits = self.head(x)
      
              # if we are given some desired targets also calculate the loss
              loss = None
              if targets is not None:
                  loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=-1)
      
              return logits, loss
      
          @torch.no_grad()
          def generate(self, idx, max_new_tokens, temperature=1.0, do_sample=False, top_k=None):
              """
              Take a conditioning sequence of indices idx (LongTensor of shape (b,t)) and complete
              the sequence max_new_tokens times, feeding the predictions back into the model each time.
              Most likely you'll want to make sure to be in model.eval() mode of operation for this.
              """
              for _ in range(max_new_tokens):
                  # if the sequence context is growing too long we must crop it at block_size
                  idx_cond = idx if idx.size(1) <= self.block_size else idx[:, -self.block_size:]
                  # forward the model to get the logits for the index in the sequence
                  logits, _ = self(idx_cond)
                  # pluck the logits at the final step and scale by desired temperature
                  logits = logits[:, -1, :] / temperature
                  # optionally crop the logits to only the top k options
                  if top_k is not None:
                      v, _ = torch.topk(logits, top_k)
                      logits[logits < v[:, [-1]]] = -float('Inf')
                  # apply softmax to convert logits to (normalized) probabilities
                  probs = F.softmax(logits, dim=-1)
                  # either sample from the distribution or take the most likely element
                  if do_sample:
                      idx_next = torch.multinomial(probs, num_samples=1)
                  else:
                      _, idx_next = torch.topk(probs, k=1, dim=-1)
                  # append sampled index to the running sequence and continue
                  idx = torch.cat((idx, idx_next), dim=1)
      
              return idx
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
      • 94
      • 95
      • 96
      • 97
      • 98
      • 99
      • 100
      • 101
      • 102
    • 最后我们来定义优化器,
      def create_optimizer(model: torch.nn.Module, opt_config: OptimizerConfig):
          """
          This long function is unfortunately doing something very simple and is being very defensive:
          We are separating out all parameters of the model into two buckets: those that will experience
          weight decay for regularization and those that won't (biases, and layernorm/embedding weights).
          We are then returning the PyTorch optimizer object.
          """
      
          # separate out all parameters to those that will and won't experience regularizing weight decay
          decay = set()
          no_decay = set()
          whitelist_weight_modules = (torch.nn.Linear, )
          blacklist_weight_modules = (torch.nn.LayerNorm, torch.nn.Embedding)
          for mn, m in model.named_modules():
              for pn, p in m.named_parameters():
                  fpn = '%s.%s' % (mn, pn) if mn else pn # full param name
                  # random note: because named_modules and named_parameters are recursive
                  # we will see the same tensors p many many times. but doing it this way
                  # allows us to know which parent module any tensor p belongs to...
                  if pn.endswith('bias'):
                      # all biases will not be decayed
                      no_decay.add(fpn)
                  elif pn.endswith('weight') and isinstance(m, whitelist_weight_modules):
                      # weights of whitelist modules will be weight decayed
                      decay.add(fpn)
                  elif pn.endswith('in_proj_weight'):
                      # MHA projection layer
                      decay.add(fpn)
                  elif pn.endswith('weight') and isinstance(m, blacklist_weight_modules):
                      # weights of blacklist modules will NOT be weight decayed
                      no_decay.add(fpn)
                  elif pn.endswith('pos_emb'):
                      # positional embedding shouldn't be decayed
                      no_decay.add(fpn)
      
          # validate that we considered every parameter
          param_dict = {pn: p for pn, p in model.named_parameters()}
          inter_params = decay & no_decay
          union_params = decay | no_decay
          assert len(inter_params) == 0, "parameters %s made it into both decay/no_decay sets!" % (str(inter_params), )
          assert len(param_dict.keys() - union_params) == 0, "parameters %s were not separated into either decay/no_decay set!" \
                                                      % (str(param_dict.keys() - union_params), )
      
          # create the pytorch optimizer object
          optim_groups = [
              {"params": [param_dict[pn] for pn in sorted(list(decay))], "weight_decay": opt_config.weight_decay},
              {"params": [param_dict[pn] for pn in sorted(list(no_decay))], "weight_decay": 0.0},
          ]
          optimizer = torch.optim.AdamW(optim_groups, lr=opt_config.learning_rate, betas=(0.9, 0.95))
          return optimizer
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      这里主要是通过权重衰减方法来进行正则化避免过拟合。注意到作者通过一个二重遍历考察 GPT 模型所有 sub module 的所有 parameters,仅对所有 torch.nn.Linear 层的 weight 参数进行衰减,bias 参数及所有 torch.nn.LayerNormtorch.nn.Embedding 模块的参数都不做处理。由于模块是递归组织的,这个二重遍历会重复访问很多参数,所以通过 set 自动去重,最后根据处理结果定义 torch.optim.AdamW 优化器返回

      关于权重衰减的理论说明,参考:机器学习基础(6)—— 使用权重衰减和丢弃法缓解过拟合问题

    5. 定义 Trainer

    • Trainer 定义和原始 MinGPT 库主要有两个区别

      1. 按指定周期要求 rank0 进程保存 snapshot,本项目中应包含 epoch、模型参数和优化器参数三部分内容;初始化 Trainer 时应当加载可能存在的 snapshot 文件,这样在 torchrun 自动重启进程时可以从最近的 snapshot 恢复训练

      2. 可以使用 torch.cuda.amp.GradScaler 进行混合精度训练

        • 混合精度训练(Mixed Precision Training)是一种训练深度学习模型的技术,旨在提高模型的训练速度和效率。它利用了现代GPU可以混合计算精度的硬件特性,使用FP16数据类型对模型中的某些操作进行加速。具体而言,模型的参数通常使用FP32数据类型,而输入数据和梯度则使用FP16数据类型,从而减少内存开销,加速计算速度,提高模型的训练效率。此外,混合精度训练还可以通过减少浮点运算和内存访问,降低能源消
        • 混合精度训练的主要困难在于 fp16 的表示范围有限,在训练中常出现溢出问题,尤其是下溢出,因为在网络训练的后期,模型的梯度往往很小;另外还有舍入误差问题,这是指当梯度过小,小于当前区间内的最小间隔时,该次梯度更新可能会失效
        • 解决以上问题的方法包括损失缩放FP32权重备份等,前者对计算出的 loss 值进行缩放(scale),这样梯度也会被缩放进而平移到 FP16 的有效范围内存储,在进行梯度更新之前先将缩放后的梯度转化为 FP32 再unscale回去;后者将模型权重、激活值、梯度等数据用 FP16 来存储,同时维护一份 FP32 的模型权重副本用于更新。在反向传播得到 FP16 的梯度以后,将其转化成 FP32 并 unscale,最后更新 FP32 的模型权重。因为整个更新过程是在 FP32 的环境中进行的,所以不会出现舍入误差
        • 有一些代码库可以帮助我们快速实现混合精度训练,而无需大幅修改代码,包括 nvidia 的 apex 库和 pytorch 1.6 后引入的 amp 库等

        本项目使用 pytorch 的 amp 库进行混合精度训练,主要用到 GradScaler 和 autocast 两个组件。其中 Gradscalar 对会检查梯度是否发现溢出,并对优化器进行控制 (将丢弃的batches转换为 no-op);autocast 是一个上下文管理器,当进入 autocast 上下文后,tensor 的数据类型会自动转换为半精度浮点型,从而在不损失训练精度的情况下加快运算,而不需要手动调用 .half()。 一个最小实践示例为

        from torch.cuda.amp import autocast as autocast, GradScaler
        '''
        other code
        '''
         
        # 在训练最开始之前实例化一个GradScaler对象
        scaler = GradScaler()
        '''
        other code
        '''
                # 前向过程(model + loss)开启 autocast
                with autocast():
                    output = model(input)
                    loss = loss_fn(output, target)
         
                # Scales loss,这是因为半精度的数值范围有限,因此需要用它放大
                scaler.scale(loss).backward()
         
                # scaler.step() unscale之前放大后的梯度,但是scale太多可能出现inf或NaN
                # 故其会判断是否出现了inf/NaN
                # 如果梯度的值不是 infs 或者 NaNs, 那么调用optimizer.step()来更新权重,
                # 如果检测到出现了inf或者NaN,就跳过这次梯度更新,同时动态调整scaler的大小
                scaler.step(optimizer)
         
                # 查看是否要更新scaler,这个要注意不能丢
                scaler.update()
         
        '''
        other code
        '''
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
        • 24
        • 25
        • 26
        • 27
        • 28
        • 29
        • 30
    • 下面开始分析 trainer 代码,首先定义两个 @dataclass 存储 Trainer 参数和 snapshot 参数

      @dataclass
      class TrainerConfig:
          max_epochs: int = None
          batch_size: int = None
          data_loader_workers: int = None
          grad_norm_clip: float = None
          snapshot_path: Optional[str] = None
          save_every: int = None
          use_amp: bool = None
      
      @dataclass
      class Snapshot:
          model_state: 'OrderedDict[str, torch.Tensor]'
          optimizer_state: Dict[str, Any]
          finished_epoch: int
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
    • 定义 Trianer 的初始化方法

      class Trainer:
          def __init__(self, trainer_config: TrainerConfig, model, optimizer, train_dataset, test_dataset=None):
              self.config = trainer_config
              # set torchrun variables
              self.local_rank = int(os.environ["LOCAL_RANK"]) # 在所有node的所有进程中当前GPU进程的rank
              self.global_rank = int(os.environ["RANK"])      # 在当前node中当前GPU进程的rank
              
              # data stuff
              self.train_dataset = train_dataset
              self.train_loader = self._prepare_dataloader(train_dataset)
              self.test_loader = self._prepare_dataloader(test_dataset) if test_dataset else None
              
              # initialize train states
              self.epochs_run = 0
              self.model = model.to(self.local_rank)
              self.optimizer = optimizer        
              self.save_every = self.config.save_every
      
              # load snapshot if available. only necessary on the first node.
              if self.config.snapshot_path is None:
                  self.config.snapshot_path = "snapshot.pt"
              self._load_snapshot()
      
              # wrap with DDP. this step will synch model across all the processes.
              self.model = DDP(self.model, device_ids=[self.local_rank])
      
              # torch.cuda.amp.GradScaler 是一个用于自动混合精度训练的 PyTorch 工具,它可以帮助加速模型训练并减少显存使用量
              # 具体来说,GradScaler 可以将梯度缩放到较小的范围,以避免数值下溢或溢出的问题,同时保持足够的精度以避免模型的性能下降
              if self.config.use_amp: 
                  self.scaler = torch.cuda.amp.GradScaler()
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30

      注意几点

      1. torchrun 帮助我们自动分发进程,通过环境变量获取当前运行代码的 GPU rank 信息
      2. 初始化 Trainer 时加载可能存在的 snapshot,实现断点续训
      3. 模型使用 DDP 进行包装
      4. 定义混合精度训练所需的 torch.cuda.amp.GradScaler()
    • 定义 DataLoder,注意使用 DistributedSampler 来分发训练数据

      def _prepare_dataloader(self, dataset: Dataset):
         return DataLoader(
              dataset,
              batch_size=self.config.batch_size,
              pin_memory=True,
              shuffle=False,
              num_workers=self.config.data_loader_workers,
              sampler=DistributedSampler(dataset)                 # 这个 sampler 自动将数据分块后送个各个 GPU,它能避免数据重叠
          )
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
    • 定义 snapshot 的加载和保存方法

      def _save_snapshot(self, epoch):
      	# capture snapshot
      	model = self.model
      	raw_model = model.module if hasattr(model, "module") else model
      	snapshot = Snapshot(
      	    model_state=raw_model.state_dict(),
      	    optimizer_state=self.optimizer.state_dict(),
      	    finished_epoch=epoch
      	)
      	# save snapshot
      	snapshot = asdict(snapshot)
      	torch.save(snapshot, self.config.snapshot_path)
      	print(f"Snapshot saved at epoch {epoch}")
      
      def _load_snapshot(self):
          try:
              snapshot = fsspec.open(self.config.snapshot_path)   # fsspec 为各种后端存储系统提供统一的 Python 接口,可以用相同的语法打开本地、AWS S3 和 GCS 等各种云存储平台的文件
              with snapshot as f:
                  snapshot_data = torch.load(f, map_location="cpu")
          except FileNotFoundError:
              print("Snapshot not found. Training model from scratch")
              return 
      
          snapshot = Snapshot(**snapshot_data)
          self.model.load_state_dict(snapshot.model_state)
          self.optimizer.load_state_dict(snapshot.optimizer_state)
          self.epochs_run = snapshot.finished_epoch
          print(f"Resuming training from snapshot at Epoch {self.epochs_run}")
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
    • 定义训练流程

      def _run_batch(self, source, targets, train: bool = True) -> float:
          with torch.set_grad_enabled(train), torch.cuda.amp.autocast(dtype=torch.float16, enabled=(self.config.use_amp)):
              _, loss = self.model(source, targets)
          
          if train:
              self.optimizer.zero_grad(set_to_none=True)
              if self.config.use_amp: 
                  self.scaler.scale(loss).backward()
                  torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.config.grad_norm_clip)
                  self.scaler.step(self.optimizer)
                  self.scaler.update()
              else:
                  loss.backward()
                  torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.config.grad_norm_clip)
                  self.optimizer.step()
          
          #return loss.item()
          return loss
      
      def _run_epoch(self, epoch: int, dataloader: DataLoader, train: bool = True):
          dataloader.sampler.set_epoch(epoch)
          for iter, (source, targets) in enumerate(dataloader):
              step_type = "Train" if train else "Eval"
              source = source.to(self.local_rank)
              targets = targets.to(self.local_rank)
              batch_loss = self._run_batch(source, targets, train)
              if iter % 100 == 0:
                  #print(f"[GPU{self.global_rank}] Epoch {epoch} | Iter {iter} | {step_type} Loss {batch_loss.item():.5f}")
                  if train:
                      print(f"[GPU{self.global_rank}] Epoch {epoch} | Iter {iter} | {step_type} Loss {batch_loss.item():.5f}")
                  else:
                      eval_loss_list = [torch.zeros_like(batch_loss) for _ in range(int(os.environ['WORLD_SIZE']))]
                      dist.gather(
                          batch_loss,
                          eval_loss_list if self.local_rank == 0 else None, 
                          dst=0
                      )
                      if self.local_rank == 0:
                          for i, loss in enumerate(eval_loss_list):
                              print(f"[GPU{i}] Epoch {epoch} | Iter {iter} | {step_type} Loss {loss.item():.5f}")
      
      def train(self):
          for epoch in range(self.epochs_run, self.config.max_epochs):
              epoch += 1
              
              # train for one epoch
              self._run_epoch(epoch, self.train_loader, train=True)
      
              # 各个 GPU 上都在跑一样的训练进程,这里指定 rank0 进程保存 snapshot 以免重复保存
              if self.local_rank == 0 and epoch % self.save_every == 0:
                  self._save_snapshot(epoch)
      
              # eval run
              if self.test_loader:
                  self._run_epoch(epoch, self.test_loader, train=False)
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55

      这里需要注意几点:

      1. 指定 rank0 进程保存 snapshot 以免重复保存

      2. _run_batch 方法中,计算 loss 的部分设置在 torch.amp.autocast 上下文中,启动混合精度训练

      3. _run_epoch 方法中,使用 torch.distributed.gather 原语汇聚各个 GPU 的验证损失信息到 rank0 上,常用这种操作进行 log 训练信息。除此以外 Pytorch 一共提供了六个进程通信原语,如下

        import torch.distributed as dist
        
        dist.broadcast(tensor, src, group)				# 将 tensor 从 src 复制到所有其他进程。
        dist.reduce(tensor, dst, op, group)				# 将 op 应用于每个 tensor 并将结果存储在 dst 中。
        dist.all_reduce(tensor, op, group)				# 与 reduce 相同,但结果存储在所有进程中。
        dist.scatter(tensor, scatter_list, src, group)	# 复制  tensor scatter_lost[i] 到  进程
        dist.gather(tensor,gather_list, dst, group)		# 从 dst 中的所有进程复制 tensor。
        dist.all_gather(tensor_list, tensor, group)		# 将所有进程的 tensor 复制到所有进程上的 tensor_list。
        dist.barrier(group)								# 阻塞组中的所有进程,直到每个进程都进入该函数。
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9

        在这里插入图片描述

        其中 op 操作有四种

        dist.ReduceOp.SUM,
        dist.ReduceOp.PRODUCT,
        dist.ReduceOp.MAX,
        dist.ReduceOp.MIN.
        
        • 1
        • 2
        • 3
        • 4

        这些方法在需要手动汇聚或分发信息时特别有用,具体用法可以参考 pytorch 官方文档

  • 相关阅读:
    SCS【1】今天开启单细胞之旅,述说单细胞测序的前世今生
    摩尔信使MThings的设备高级参数
    5个 GIS空间分析 空间查询与量算 的重要知识点
    实验一:单臂路由+端口安全
    MyBatis 笔记
    Java.lang.Class类 getSupperclass()方法有什么功能呢?
    【Hive SQL 每日一题】统计各个商品今年销售额与去年销售额的增长率及排名变化
    前端学习路线(二)
    2022年编程语言排名,官方数据来了,让人大开眼界
    作为主播如何打造个人IP的7个技巧
  • 原文地址:https://blog.csdn.net/wxc971231/article/details/132829661