• 16、Pytorch Lightning入门


    资源

    官方手册
    GitHub地址
    GItHub案例:Pytorch-Lightning-Template项目
    pytorch也是有缺陷的,例如要用半精度训练、BatchNorm参数同步、单机多卡训练,则要安排一下Apex。而pl则不同,这些全部都安排,而且只要设置一下参数就可以了。另外,还有一个特色,就是你的超参数全部保存到模型中,如果你要调巨多参数,那就不需要再对每个训练的模型进行参数标记了,而且恢复模型时可以直接恢复超参数,可以大大减小代码量和工作量

    基础案例实现ResNet

    import torch
    from torch import nn
    from torch import optim
    from torchvision import datasets, transforms
    from torch.utils.data import random_split, DataLoader
    import pytorch_lightning as pl
    from pytorch_lightning.metrics.functional import accuracy
    
    
    class ResNet(pl.LightningModule):
        def __init__(self):
            super().__init__()
            self.l1 = nn.Linear(28 * 28, 64)
            self.l2 = nn.Linear(64, 64)
            self.l3 = nn.Linear(64, 10)
            self.do = nn.Dropout(0.1)
            self.loss = nn.CrossEntropyLoss()
    
        def forward(self, x):
            # nn.ReLU()
            h1 = nn.functional.relu(self.l1(x))
            h2 = nn.functional.relu(self.l2(h1))
            do = self.do(h2 + h1)
            logits = self.l3(do)
            return logits
    
        def configure_optimizers(self):
            optimiser = optim.SGD(self.parameters(), lr=1e-2)
            return optimiser
    
        def training_step(self, batch, batch_idx):
            x, y = batch
            b = x.size(0)
            x = x.view(b, -1)
            logits = self(x)
            J = self.loss(logits, y)
    
            acc = accuracy(logits, y)
            pbar = {"train_acc": acc}
    
            return {"loss": J, "progress_bar": pbar}
    
        def validation_step(self, batch, batch_idx):
            results = self.training_step(batch, batch_idx)
            results['progress_bar']['val_acc'] = results['progress_bar']['train_acc']
            del results['progress_bar']['train_acc']
            return results
    
        def validation_epoch_end(self, val_step_outputs):
            avg_val_loss = torch.tensor([x['loss'] for x in val_step_outputs]).mean()
            avg_val_acc = torch.tensor([x['progress_bar']['val_acc'] for x in val_step_outputs]).mean()
    
            pbar = {"avg_val_acc": avg_val_acc}
    
            return {'val_loss': avg_val_loss, 'progress_bar': pbar}
    
        def prepare_data(self):
            datasets.MNIST('data', train=True, download=True, transform=transforms.ToTensor())
    
        def setup(self, stage: str):
            train_data = datasets.MNIST('data', train=True, download=False, transform=transforms.ToTensor())
            self.tra, self.val = random_split(train_data, [55000, 5000])
    
        def train_dataloader(self):
            train_loader = DataLoader(self.tra, batch_size=32, num_workers=48)
            return train_loader
    
        def val_dataloader(self):
            val_loader = DataLoader(self.val, batch_size=32, num_workers=48)
            return val_loader
    
    
    if __name__ == '__main__':
        model = ResNet()
        # Lightning 会自动保存最近训练的epoch的模型到当前的工作空间(or.getcwd()),也可以在定义Trainer的时候指定
        trainer = pl.Trainer(progress_bar_refresh_rate=20, max_epochs=1, gpus=1, default_root_dir="./root")
        trainer.fit(model)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78

    数据集

    数据集有两种实现方法,直接调用第三方公开数据集(如:MNIST等数据集)和 自定义数据集(继承torch.utils.data.dataset.Dataset)

    使用现有的公开数据集

        def prepare_data(self):
            datasets.MNIST('data', train=True, download=True, transform=transforms.ToTensor())
    
        def setup(self, stage: str):
            train_data = datasets.MNIST('data', train=True, download=False, transform=transforms.ToTensor())
            self.tra, self.val, self.test = random_split(train_data, [50000, 5000,5000])
    
        def train_dataloader(self):
            train_loader = DataLoader(self.tra, batch_size=32, num_workers=48)
            return train_loader
    
        def val_dataloader(self):
            val_loader = DataLoader(self.val, batch_size=32, num_workers=48)
            return val_loader
        def test_dataloader(self):
            test_loader = DataLoader(self.test, batch_size=32, num_workers=48)
            return test_loader     
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    自定义数据集

    Dataset

    import sys
    import pathlib
    import torch
    from torch.utils.data import Dataset
    from utils import sort_batch_by_len, source2ids
    
    abs_path = pathlib.Path(__file__).parent.absolute()
    sys.path.append(sys.path.append(abs_path))
    
    
    class SampleDataset(Dataset):
        """
        The class represents a sample set for training.
        """
    
        def __init__(self, data_pairs, vocab):
            self.src_texts = [data_pair[0] for data_pair in data_pairs]
            self.tgt_texts = [data_pair[1] for data_pair in data_pairs]
            self.vocab = vocab
            self._len = len(data_pairs)  # Keep track of how many data points.
    
        def __len__(self):
            return self._len
            
        def __getitem__(self, index):
            # print("\nself.src_texts[{0}] = {1}".format(index, self.src_texts[index]))
            src_ids, oovs = source2ids(self.src_texts[index], self.vocab)  # 将当前文本self.src_texts[index]转为ids,oovs为超出词典范围的词汇文本
            item = {
                'x': [self.vocab.SOS] + src_ids + [self.vocab.EOS],
                'y': [self.vocab.SOS] + [self.vocab[i] for i in self.tgt_texts[index]] + [self.vocab.EOS],
                'x_len': len(self.src_texts[index]),
                'y_len': len(self.tgt_texts[index]),
                'oovs': oovs,
                'len_oovs': len(oovs)
            }
    
            return item
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    DataLoader

    from torch.utils.data import DataLoader, random_split
    import pytorch_lightning as pl
    
    
    class MyDataModule(pl.LightningDataModule):
        def __init__(self):
            super().__init__()
    
        def prepare_data(self):
            # 在该函数里一般实现数据集的下载等,只有cuda:0 会执行该函数
            # download, split, etc...
            # only called on 1 GPU/TPU in distributed
            pass
    
        def setup(self, stage):
            # make assignments here (val/train/test split)
            # called on every process in DDP
            # 实现数据集的定义,每张GPU都会执行该函数, stage 用于标记是用于什么阶段
            if stage == 'fit' or stage is None:
                self.train_dataset = MyDataset(self.train_file_path, self.train_file_num, transform=None)
                self.val_dataset = MyDataset(self.val_file_path, self.val_file_num, transform=None)
            if stage == 'test' or stage is None:
                self.test_dataset = MyDataset(self.test_file_path, self.test_file_num, transform=None)
    
        def train_dataloader(self):
            return DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=False, num_workers=0)
    
        def val_dataloader(self):
            return DataLoader(self.val_dataset, batch_size=self.batch_size, shuffle=False)
    
        def test_dataloader(self):
            return DataLoader(self.test_dataset, batch_size=1, shuffle=True)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    pytorch-lightning流程

    初始化 def init(self) -->训练training_step(self, batch, batch_idx) --> 校验validation_step(self, batch, batch_idx) --> 测试 test_step(self, batch, batch_idx). 就完事了

    当然,除了这三个主要的,还有一些其他的函数,为了方便我们实现其他的一些功能,因此更为完整的流程是在training_step 、validation_step、test_step 后面都紧跟着其相应的 training_step_end(self,batch_parts)和training_epoch_end(self, training_step_outputs) 函数,当然,对于校验和测试,都有相应的_step_end和_epoch_end函数。**

    ** *_step_end – 即每一个 * 步完成后调用 **

    ** *_epoch_end – 即每一个 * 的epoch 完成之后会自动调用 **

    def training_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self.model(x)
        loss = F.cross_entropy(y_hat, y)
        pred = ...
        return {'loss': loss, 'pred': pred}
    
    def training_step_end(self, batch_parts):
        '''
        当gpus=0 or 1时,这里的batch_parts即为traing_step的返回值(已验证)
        当gpus>1时,这里的batch_parts为list,list中每个为training_step返回值,list[i]为i号gpu的返回值(这里未验证)
        '''
        gpu_0_prediction = batch_parts[0]['pred']
        gpu_1_prediction = batch_parts[1]['pred']
    
        # do something with both outputs
        return (batch_parts[0]['loss'] + batch_parts[1]['loss']) / 2
    
    def training_epoch_end(self, training_step_outputs):
        '''
        当gpu=0 or 1时,training_step_outputs为list,长度为steps的数量(不包括validation的步数,当你训练时,你会发现返回list<训练时的steps数,这是因为训练时显示的steps数据还包括了validation的,若将limit_val_batches=0.,即关闭validation,则显示的steps会与training_step_outputs的长度相同)。list中的每个值为字典类型,字典中会存有`training_step_end()`返回的键值,键名为`training_step()`函数返回的变量名,另外还有该值是在哪台设备上(哪张GPU上),例如{device='cuda:0'}
        '''
        for out in training_step_outputs:
           # do something with preds
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    Train

    训练主要是重写def training_setp(self, batch, batch_idx)函数,并返回要反向传播的loss即可,其中batch 即为从 train_dataloader 采样的一个batch的数据,batch_idx即为目前batch的索引

    def training_setp(self, batch, batch_idx):
        image, label = batch
        pred = self.forward(iamge)
        loss = ...
        # 一定要返回loss
        return loss
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    Validation

    • 每训练n个epochs 校验一次
      默认为每1个epoch校验一次,即自动调用validation_step()函数:check_val_every_n_epoch
    trainer = Trainer(check_val_every_n_epoch=1)
    
    
    • 1
    • 2
    • 单个epoch内校验频率
      当一个epoch 比较大时,就需要在单个epoch 内进行多次校验,这时就需要对校验的调动频率进行修改, 传入val_check_interval的参数为float型时表示百分比,为int时表示batch
    # 每训练单个epoch的 25% 调用校验函数一次,注意:要传入float型数
    trainer = Trainer(val_check_interval=0.25)
    # 当然也可以是单个epoch训练完多少个batch后调用一次校验函数,但是一定是传入int型
    trainer = Trainer(val_check_interval=100) # 每训练100个batch校验一次
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    校验和训练是一样的,重写def validation_step(self, batch, batch_idx)函数

    def validation_step(self, batch, batch_idx):
        results = self.training_step(batch, batch_idx)
        results['progress_bar']['val_acc'] = results['progress_bar']['train_acc']
        del results['progress_bar']['train_acc']
        return results
    
    • 1
    • 2
    • 3
    • 4
    • 5

    test

    在pytoch_lightning框架中,test 在训练过程中是不调用的,也就是说是不相关,在训练过程中只进行training和validation,因此如果需要在训练过中保存validation的一些信息,就要放到validation中。测试是在训练完成之后的

    # 获取恢复了权重和超参数等的模型
    model = MODEL.load_from_checkpoint(checkpoint_path='my_model_path/heiheihei.ckpt')
    # 修改测试时需要的参数,例如预测的步数等
    model.pred_step = 1000
    # 定义trainer, 其中limit_test_batches表示取测试集中的0.05的数据来做测试
    trainer = pl.Trainer(gpus=1, precision=16, limit_test_batches=0.05)
    # 测试,自动调用test_step(), 其中dm为数据集
    trainer.test(model=dck, datamodule=dm)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    模型、Trainer的保存与恢复

    Lightning 会自动保存最近训练的epoch的模型到当前的工作空间(or.getcwd()),也可以在定义Trainer的时候指定

    trainer = Trainer(default_root_dir='/your/path/to/save/checkpoints')
    
    
    • 1
    • 2

    也可以关闭自动保存模型

    trainer = Trainer(checkpoint_callback=False)
    
    
    • 1
    • 2

    利用ModelCheckpoint (callbacks)保存模型

    所有参数均为optional

    ModelCheckpoint(
        dirpath=None,
        filename=None,
        monitor=None,
        verbose=False,
        save_last=None,
        save_top_k=1,
        save_weights_only=False,
        mode="min",
        auto_insert_metric_name=True,
        every_n_train_steps=None,
        train_time_interval=None,
        every_n_epochs=None,
        save_on_train_epoch_end=None,
        every_n_val_epochs=None
    )
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    在这里插入图片描述
    自动保存下,也可以自定义要监控的量来保存模型

    • 计算需要监控的量,例如校验误差:loss
    • 使用log()函数标记该要监控的量(直接在training_step、validation_step中添加)
    • 初始化ModelCheckpoint回调,并设置要监控的量
    • 将其传回到Trainer中
    from pytorch_lightning import Trainer, LightningDataModule, LightningModule, Callback, seed_everything
    from pytorch_lightning.callbacks import ModelCheckpoint
    from pytorch_lightning.loggers import TensorBoardLogger
    
    # Transformer LightningModule
    class GLUETransformer(LightningModule):
    	...
        def training_step(self, batch, batch_idx):
            # 1. 计算loss
            outputs = self(**batch)
            train_loss = outputs[0]
            # 2. 使用log()函数标记该要监控的量,名字叫'val_loss'
            self.log('ltrain_lossoss', train_loss)
            return train_loss
    	...
        def validation_step(self, batch, batch_idx, dataloader_idx=0):
            outputs = self(**batch)
    
            # 1. 计算需要监控的量
            val_loss, logits = outputs[:2]
    
            # 2. 使用log()函数标记该要监控的量,名字叫'val_loss'
            self.log('val_loss', val_loss)
    
            if self.hparams.num_labels >= 1:
                preds = torch.argmax(logits, axis=1)
            elif self.hparams.num_labels == 1:
                preds = logits.squeeze()
    
            labels = batch["labels"]
    
            return {"loss": val_loss, "preds": preds, "labels": labels}
    	...
    
    
    # Training & Test
    if __name__ == "__main__":
        seed_everything(42)
        # 定义数据集
        data_module = GLUEDataModule(model_name_or_path=r"D:\Pretrained_Model\albert-base-v2", task_name="cola")
        # 定义模型
        model = GLUETransformer(model_name_or_path=r"D:\Pretrained_Model\albert-base-v2", num_labels=data_module.num_labels, eval_splits=data_module.eval_splits, task_name=data_module.task_name)
        # 初始化`ModelCheckpoint`回调,并设置要监控的量
        checkpoint_callback = ModelCheckpoint(
            dirpath='saved_module',
            filename='sample-cola-{epoch:02d}-{val_loss:.2f}',
            monitor='val_loss'
        )
        # 定义trainer
        trainer = Trainer(max_epochs=20, gpus=AVAIL_GPUS, check_val_every_n_epoch=1, callbacks=[checkpoint_callback])  # 默认为每1个epoch校验一次,即自动调用validation_step()函数;将 checkpoint_callback 放到Trainer的callback 的list中
        # 开始训练
        trainer.fit(model=model, datamodule=data_module)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    在上面的 filename 参数中,定义了模型文件的保存格式,然后通过自动调用format_checkpoint_name 函数给其中的变量赋值的,返回 string 类型,文件名

    >>> tmpdir = os.path.dirname(__file__)
    >>> ckpt = ModelCheckpoint(dirpath=tmpdir, filename='{epoch}')
    >>> os.path.basename(ckpt.format_checkpoint_name(0, 1, metrics={}))
    'epoch=0.ckpt'
    >>> ckpt = ModelCheckpoint(dirpath=tmpdir, filename='{epoch:03d}')
    >>> os.path.basename(ckpt.format_checkpoint_name(5, 2, metrics={}))
    'epoch=005.ckpt'
    >>> ckpt = ModelCheckpoint(dirpath=tmpdir, filename='{epoch}-{val_loss:.2f}')
    >>> os.path.basename(ckpt.format_checkpoint_name(2, 3, metrics=dict(val_loss=0.123456)))
    'epoch=2-val_loss=0.12.ckpt'
    >>> ckpt = ModelCheckpoint(dirpath=tmpdir, filename='{missing:d}')
    >>> os.path.basename(ckpt.format_checkpoint_name(0, 4, metrics={}))
    'missing=0.ckpt'
    >>> ckpt = ModelCheckpoint(filename='{step}')
    >>> os.path.basename(ckpt.format_checkpoint_name(0, 0, {}))
    'step=0.ckpt'
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    因为根据上面保存的参数,可能保存了多个模型,根据 best_model_path 恢复最好的模型

    from pytorch_lightning import Trainer
    from pytorch_lightning.callbacks import ModelCheckpoint
    
    checkpoint_callback = ModelCheckpoint(dirpath='my/path/')
    trainer = Trainer(callbacks=[checkpoint_callback])
    model = ...
    trainer.fit(model)
    # 训练完成之后,保存了多个模型,下面是获得最好的模型,也就是将原来保存的模型中最好的模型权重apply到当前的网络上
    checkpoint_callback.best_model_path
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    手动保存模型

    除了自动保存,也可以手动保存、加载模型

    model = MyLightningModule(hparams)
    trainer.fit(model)
    trainer.save_checkpoint("example.ckpt")
    new_model = MyModel.load_from_checkpoint(checkpoint_path="example.ckpt")
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    当我们采用该 Pytorch Lightning 框架做强化学习的时候,由于强化学习的训练数据集不是固定的,是与环境实时交互生成的训练数据,因此在整个训练过程中,Epoch恒为0,模型就不会自动保存,这时候需要我们手动保存模型

    另外,保存的模型一般都挺大的,因此保存最好的三个模型就OK了,可以通过一个队列来进行维护,保存新的,删除旧的

    from collections import deque
    import os
    # 维护一个队列
    self.save_models = deque(maxlen=3)
    # 这里的self 是指这个函数放到继承了pl.LightningModule的类里,跟training_step()是同级的
    def manual_save_model(self):
        model_path = 'your_model_save_path_%s' % (your_loss)
        if len(self.save_models) >= 3:
            # 当队列满了,取出最老的模型的路径,然后删除掉
            old_model = self.save_models.popleft()
            if os.path.exists(old_model):
                os.remove(old_model)
        # 手动保存
        self.trainer.save_checkpoint(model_path)
        # 将保存的模型路径加入到队列中
        self.save_models.append(model_path)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    上面的函数,可以通过简单的判断,如果损失更小的,或者reward更大了,我们再调用,保存模型。

    为了保险起见,我们也可以每隔一段时间就保存一个最新的模型。

    这个函数是从pl的原码中抠出来的,因此保存的路径是我们前面在设置 checkpoint_callbacks 的时候设置的路径,也就是本文前面ModelCheckpoint (callbacks) 这一节中的 dir_path 路径,会在该路径下自动保存 latest.ckpt 文件

    # 保存最新的路径
    def save_latest_model(self):
            checkpoint_callbacks = [c for c in self.trainer.callbacks if isinstance(c, ModelCheckpoint)]
            print("Saving latest checkpoint...")
            model = self.trainer.get_model()
            [c.on_validation_end(self.trainer, model) for c in checkpoint_callbacks]
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    加载Checkpoint

    load_from_checkpoint 方法,该方法是从checkpoint 加载模型的主要方法

    pl.LightningModule.load_from_checkpoint(
                                                checkpoint_path=checkpoint_path,
                                                map_location=None,
                                                hparams_file=None,
                                                strict=True,
                                                **kwargs
                                            )
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    加载模型的权重、偏置和超参数

    class LitModel(LightningModule):
        def __init__(self, in_dim, out_dim):
            super().__init__()
            self.save_hyperparameters()
            # 在这里使用新的超参数,而不是从模型中加载的超参数
            self.l1 = nn.Linear(self.hparams.in_dim, self.hparams.out_dim)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    可以如下恢复模型

    # 例如训练的时候初始化in_dim=32, out_dim=10
    LitModel(in_dim=32, out_dim=10)
    # 下面的方式恢复模型,使用in_dim=32和out_dim=10为保存的参数
    model = LitModel.load_from_checkpoint(PATH)
    # 当然也可以覆盖这些参数,例如改成in_dim=128, out_dim=10
    model = LitModel.load_from_checkpoint(PATH, in_dim=128, out_dim10)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    恢复模型和Trainer,如果不仅仅是想恢复模型,而且还要接着训练,则可以恢复Trainer

    model = LitModel()
    trainer = Trainer(resume_from_checkpoint='some/path/to/my_checkpoint.ckpt')
    # 自动恢复模型、epoch、step、学习率信息(包括LR schedulers),精度等
    # automatically restores model, epoch, step, LR schedulers, apex, etc...
    trainer.fit(model)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    辅助工具

    Early Stopping

    监控 validation_step() 方法步骤中某一个量,如果其不能再变得更优,则提前停止训练

    pytorch_lightning.callbacks.early_stopping.EarlyStopping(
                                                                monitor='early_stop_on',
                                                                min_delta=0.0,
                                                                patience=3,
                                                                verbose=False,
                                                                mode='auto',
                                                                strict=True,
                                                                check_finite=True,
                                                                stopping_threshold=None,
                                                                divergence_threshold=None,
                                                                check_on_train_epoch_end=None
                                                            )
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    在这里插入图片描述
    比如设置如下,要监控的量为 val_loss

    pytorch_lightning.callbacks.early_stopping.EarlyStopping(monitor='val_loss', min_delta=0.1, patience=3)
    
    
    • 1
    • 2

    Logging

    这里只涉及Tensorboard, 其它有需要的可参考官方文档Logging,tensorboard 有两种基本的方法:一种是只适用于scaler,可直接使用self.log(),另一种是图像、权重等

    # 在定义Trainer对象的时候,传入tensorboardlogger
    logger = TensorBoardLogger(args['log_dir'], name='DCK_PL')
    trainer = pl.Trainer(logger=logger)
    # 获取tensorboard Logger, 以在validation_step()函数为例
    def validation_step():
        tensorboard = self.logger.experiment
        # 例如求得validation loss为:
        loss = ...
        # 直接log
        self.log('val_loss', loss, on_step=True, on_epoch=True, prog_bar=True, logger=True)
        # 如果是图像等,就需要用到tensorboard的API
        tensorboard.add_image()
        # 同时log多个
        other_loss = ...
        loss_dict = {'val_loss': loss, 'loss': other_loss}
        tensorboard.add_scalars(loss_dict)
        # log 权重等
        tensorboard.add_histogram(...)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    注意如果是用anaconda的话,要先激活你的env,另外要注意的是,–logdir=my_log_dir/, 这里的logdir要到version_0/目录,该目录下保存有各种你log的变量的文件夹

    # 查看的方法跟tensorboard是一样的,在终端下
    (base) C:\whx-study-pytorch-lightning\my_logs\WHX_PL\version_0>tensorboard --logdir ./
    2022-03-19 20:18:17.460974: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library cudart64_110.dll
    Serving TensorBoard on localhost; to expose to the network, use a proxy or pass --bind_all
    TensorBoard 2.4.0 at http://localhost:6006/ (Press CTRL+C to quit)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    当然也可以继承 LightningLoggerBase 类来自定义Logger

    optimizer 和 lr_scheduler

    在训练过程中,对学习率的掌控也是非常重要的,合理设置学习率有利于提高效果,学习率衰减可查看 pytorch必须掌握的的4种学习率衰减策略。那在pytorch_lightning 中如何设置呢?其实跟pytorch是一样的,基本上不需要修改

    重写configure_optimizers()函数即可

    # 设置优化器
    def configure_optimizers(self):
        weight_decay = 1e-6  # l2正则化系数
        # 假如有两个网络,一个encoder一个decoder
        optimizer = optim.Adam([{'encoder_params': self.encoder.parameters()}, {'decoder_params': self.decoder.parameters()}], lr=learning_rate, weight_decay=weight_decay)
        # 同样,如果只有一个网络结构,就可以更直接了
        optimizer = optim.Adam(my_model.parameters(), lr=learning_rate, weight_decay=weight_decay)
        # 我这里设置2000个epoch后学习率变为原来的0.5,之后不再改变
        scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=[2000], gamma=0.5)
        optim_dict = {'optimizer': optimizer, 'lr_scheduler': scheduler}
        return optim_dict
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    def configure_optimizers(self):
        """Prepare optimizer and schedule (linear warmup and decay)"""
        model = self.model
        no_decay = ["bias", "LayerNorm.weight"]
        optimizer_grouped_parameters = [
            {
                "params": [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)],
                "weight_decay": self.hparams.weight_decay,
            },
            {
                "params": [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)],
                "weight_decay": 0.0,
            },
        ]
        optimizer = AdamW(optimizer_grouped_parameters, lr=self.hparams.learning_rate, eps=self.hparams.adam_epsilon)
    
        scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=self.hparams.warmup_steps, num_training_steps=self.total_steps)
        scheduler = {"scheduler": scheduler, "interval": "step", "frequency": 1}
        return [optimizer], [scheduler]
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    这样就OK了,只要在 training_step() 函数中返回了loss,就会自动反向传播,并自动调用 loss.backward() 和 optimizer.step() 和 scheduler .step() 了

    多优化器用于多模型等网络结构

    当我们训练的是复杂的网络结构时,可能有多个模型,需要不同的训练顺序,不同的训练学习率等,这时候就需要设计多个优化器,并手动调用梯度反传函数

     # multiple optimizer case (e.g.: GAN)
     def configure_optimizers(self):
         opt_d = Adam(self.model_d.parameters(), lr=0.01)
         opt_g = Adam(self.model_g.parameters(), lr=0.02)
         return opt_d, opt_g
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    然后要关掉自动优化,这样就可以跟pytorch一样手动控制优化器的权重更新了,达到了跟pytorch一样可以进行复杂地更新顺序等地控制,同时pytorch lightning的优势还在,例如多GPU下batchnorm的参数同步等。

     # 在new Trainer对象的时候,把自动优化关掉
     trainer = Trainer(automatic_optimization=False)
    
    
    • 1
    • 2
    • 3

    这时候 training_step() 函数也就不是直接返回 loss 或者 字典了,而是不需要返回loss了,因为在该函数里就手动完成权重更新函数地调用

    另外需要注意的是,不再使用 loss.backward() 函数,改用 self.manual_backward(loss, opt),就可以实现半精度训练。并且忽略optimizer_idx参数

     def training_step(self, batch, batch_idx, opt_idx):
         # 获取在configure_optimizers()中返回的优化器
         (opt_d, opt_g) = self.optimizers()
         loss_g = self.acquire_loss_g()
         
         # 注意:不再使用loss.backward(). 另外以GAN为例,因为生成器的动态图还要保持给判别器用于更新,因此retain_graph=True.
         self.manual_backward(loss_g, opt_g, retain_graph=True)
         # 销毁动态图
         self.manual_backward(loss_g, opt_g)
         opt_g.step()
         # 在更新判别器的时候,保存生成器是0梯度的
         opt_g.zero_grad()
         
         # 更新判别器
         loss_d = self.acquire_loss_d()
         self.manual_backward(loss_d, opt_d)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    其他比较重要的设置主要有同步BatchNorm的参数、采用半精度训练(原来apex的特色,不过PL比apex更香),多gpu 训练等

    多GPU训练

    如果是CPU训练,在定义Trainer时不管gpus这个参数就可以了,或者设置该参数为0

    trainer = pl.Trainer(gpus=0)
    
    • 1

    多GPU训练,也是很方便,只要将该参数设置为你要用的gpu数就可以,例如用4张GPU

    trainer = pl.Trainer(gpus=4)
    
    
    • 1
    • 2

    而如果你有很多张GPU,但是要跟同学分别使用,只要在程序最前面设置哪些GPU可用就可以了,例如服务器有4张卡,但是你只能用0和2号卡

    import os
    os.environ['CUDA_VISIBLE_DEVICES'] = '0, 2'
    trainer = pl.Trainer(gpus=2)
    
    
    • 1
    • 2
    • 3
    • 4

    半精度训练

    半精度训练也是Apex的一大特色,可以在几乎不影响效果的情况下降低GPU显存的使用率(大概50%),提高训练速度,现在pytorch_lightning 统统都给你,可以只要设置一下参数就可以

    trainer = pl.Trainer(precision=16)
    
    
    • 1
    • 2

    累积梯度

    默认情况是每个batch 之后都更新一次梯度,当然也可以N个batch后再更新,这样就有了大batch size 更新的效果了,例如当你内存很小,训练的batch size 设置的很小,这时候就可以采用累积梯度

    # 默认情况下不开启累积梯度
    trainer = Trainer(accumulate_grad_batches=1)
    
    
    • 1
    • 2
    • 3

    自动缩放batch_size(不建议用)

    这方法还有很多限制,直接 trainer.fit(model) 是无效的,感觉挺麻烦,不建议用

    大的batch_size 通过可以获得更好的梯度估计。但同时也要更长的时间,另外,如果内存满了,电脑会卡住动不了。

    ‘power’ – 从batch size 为1 开始翻倍地往上找,例如 1–>2 --> 4 --> … 一直到内存溢出(out-of-memory, OOM);binsearch也是翻倍地找,直OOM,但是之后还要继续进行一个二叉搜索,找到一个更好的 batch size。另外,搜索的 batch size 最大不会超过数据集的尺寸

    # 默认不开启
    trainer = Trainer(auto_scale_batch_size=None)
    
    # 自动找满足内存的 batch size
    trainer = Trainer(auto_scale_batch_size=None|'power'|'binsearch')
    
    # 加载到模型
    trainer.tune(model)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    保存所有超参数到模型中

    将所有的模型超参数都保存到模型中,恢复模型时再也不用自己去拖动恢复模型中的超参数了

    # 例如你传入的超参数字典为params_dict
    self.hparams.update(params_dict)    # 直接将你的超参数更新到pl模型的超参数字典中
    # 这样,在保存的时候就会保存超参数了
    self.save_hyperparameters()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    当然,对于我们训练的不同的模型,我们还是需要查看其超参数,可以通过将超参数字典保存到本地txt的方法,来以便后期查看

    def save_dict_as_txt(list_dict, save_dir):
        with open(save_dir, 'w') as fw:
            if isinstance(list_dict, list):
                for dict in list_dict:
                    for key in dict.keys():
                        fw.writelines(key + ': ' + str(dict.get(key)) + '\n')
            else:
                for key in list_dict.keys():
                    fw.writelines(key + ': ' + str(list_dict.get(key)) + '\n')
            fw.close()
    # 保存超参数字典到txt        
    save_dict_as_txt(self.hparams, save_dir)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    梯度剪裁

    当需要避免发生梯度爆炸时,可以采用梯度剪裁的方法,这个梯度范数是通过所有的模型权重计算出来的:

    # 默认不剪裁
    trainer = Trainer(gradient_clip_val=0)
    
    # 梯度范数的上限为0.5
    trainer = Trainer(gradient_clip_val=0.5)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    设置训练的最小和最大epochs

    默认最小训练1个epoch,最大训练1000个epoch

    trainer = Trainer(min_epochs=1, max_epochs=1000)
    
    
    • 1
    • 2

    小数据集

    当我们的数据集过大或者当我们进行debug时,不想要加载整个数据集,则可以只加载其中的一小部分。默认是全部加载,即下面的参数值都为1.0

    # 参训练集、校验集和测试集分别只加载 10%, 20%, 30%,或者使用int 型表示batch
    trainer = Trainer(
        limit_train_batches=0.1, # 模型情况下是 1.0
        limit_val_batches=0.2,	# 模型情况下是 1.0
        limit_test_batches=0.3	# 模型情况下是 1.0
    )
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    其中比较需要注意的是训练集和测试集比例的设置,因为pytorch_lightning 每次validation和test时,都是会计算一个epoch,而不是一个step,因此在训练过程中,如果你的validation dataset比较大,那就会消耗大量的时间在validation上,而我们实际上只是想要知道在训练过程中,模型训练的怎么样了,不需要跑完整个epoch,因此就可以将limit_val_batches设置的小一些。对于test,在训练完成后,如果我们不希望对所有的数据都进行test,也可以通过这个参数来设置。

    提前校验,避免校验时出错导致浪费时间

    另外,该框架有个参数 num_sanity_val_steps,用于设置在开始训练前先进行num_sanity_val_steps个 batch 的 validation,以免你训练了一段时间,在校验的时候程序报错,导致浪费时间。该参数在获得trainer的时间传入

    # 默认为2个batch的validation
    trainer = Trainer(num_sanity_val_steps=2)
    
    # 关闭开始训练前的validaion,直接开始训练
    trainer = Trainer(num_sanity_val_steps=0)
    
    # 把校验集都运行一遍(可能会浪费很多时间)
    trainer = Trainer(num_sanity_val_steps=-1)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    参考资料

    https://zhuanlan.zhihu.com/p/319810661

  • 相关阅读:
    随笔
    【算法基础】TOPSIS法
    linux修改用户密码脚本
    Unity中Shader纹理的多级渐远Mipmap
    解决GPU显存句柄泄漏问题
    前端总结35.JS封装事件库
    Redis之主从复制,哨兵模式,集群
    Auth.js:多合一身份验证解决方案 | 开源日报 No.60
    广和通正式发布工业级低功耗单频双模GNSS模组G030&G031
    新版AndroidStudio的Gradle窗口显示task list not built 问题解决
  • 原文地址:https://blog.csdn.net/weixin_50973728/article/details/126227752