• 昇思25天学习打卡营第15天|ResNet50迁移学习


    迁移学习

    通常的做法是在非常大的数据集(如ImageNet,其中包含120万张图片和1000个类别)上对卷积神经网络预训练,再在实际任务中使用这个网络或再进行微调。

    有两种主要的迁移学习方案:
    1. 预训练模型作为固定特征提取器:自己训练或者拿别人的预训练模型,移除最后的全连接层(这层是用来对图片分类的),把这个网络作为固定特征提取器。再为新数据集替换和重新训练分类器
    2. 微调ConvNet:在1的基础上,还继续通过反向传播训练网络权重。这里可以保持某些底层的参数固定,只微调一些更高级别的部分。

    微调分成多个场景:
    1. 新数据集量级小,和原数据集相似:由于数据集较小,因此再去微调就会有过拟合的风险。这里最好是直接训练一个线性分类器
    2. 新数据集量级大,和原数据集相似:数据集较大时没有过拟合风险,因此可以对整个网络微调训练。
    3. 新数据量级小不同于原数据集:小数据集最好只训练线性分类器,而数据集差异很大,最好就从网络顶部训练分类器,因为网络中包含了很多训练数据集的特征。因此这里选择从网络的早期某处激活中训练SVM分类器。
    4. 新数据量级大不同于原数据集:可以对整个网络微调

    数据加载

    使用分类狼与狗的Canidae数据集,该数据集图像来自ImageNet,每个分类包含120张训练图和30张验证图。

    batch_size = 18                             # 批量大小
    image_size = 224                            # 训练图像空间大小
    num_epochs = 5                             # 训练周期数
    lr = 0.001                                  # 学习率
    momentum = 0.9                              # 动量
    workers = 4                                 # 并行线程个数
    

    创建训练数据集

    def create_dataset_canidae(dataset_path, usage):
        data_set = ds.ImageFolderDataset(dataset_path,
                                         num_parallel_workers=workers,
                                         shuffle=True,)
    
        # 数据增强操作
        mean = [0.485 * 255, 0.456 * 255, 0.406 * 255]
        std = [0.229 * 255, 0.224 * 255, 0.225 * 255]
        scale = 32
    
        if usage == "train":
            #训练集则随机剪裁、解码、缩放、水平翻转、正则化、改变通道顺序。
            trans = [
                vision.RandomCropDecodeResize(size=image_size, scale=(0.08, 1.0), ratio=(0.75, 1.333)),
                vision.RandomHorizontalFlip(prob=0.5),
                vision.Normalize(mean=mean, std=std),
                vision.HWC2CHW()
            ]
        else:
            # 解码、重定义大小、中心裁剪、正则化、改变通道顺序
            trans = [
                vision.Decode(),
                vision.Resize(image_size + scale),
                vision.CenterCrop(image_size),
                vision.Normalize(mean=mean, std=std),
                vision.HWC2CHW()
            ]
    
    
        # 数据映射操作
        data_set = data_set.map(
            operations=trans,
            input_columns='image',
            num_parallel_workers=workers)
    
    
        # 批量操作
        data_set = data_set.batch(batch_size)
    
        return data_set
    
    
    dataset_train = create_dataset_canidae(data_path_train, "train")
    step_size_train = dataset_train.get_dataset_size()
    
    dataset_val = create_dataset_canidae(data_path_val, "val")
    step_size_val = dataset_val.get_dataset_size()
    

    模型搭建

    构建一个基础的残差块

    class ResidualBlockBase(nn.Cell):
        expansion: int = 1  # 最后一个卷积核数量与第一个卷积核数量相等
    
        def __init__(self, in_channel: int, out_channel: int,
                     stride: int = 1, norm: Optional[nn.Cell] = None,
                     down_sample: Optional[nn.Cell] = None) -> None:
            super(ResidualBlockBase, self).__init__()
            if not norm:
                self.norm = nn.BatchNorm2d(out_channel)
            else:
                self.norm = norm
    		# 创建两个3*3的卷积层
            self.conv1 = nn.Conv2d(in_channel, out_channel,
                                   kernel_size=3, stride=stride,
                                   weight_init=weight_init)
            self.conv2 = nn.Conv2d(in_channel, out_channel,
                                   kernel_size=3, weight_init=weight_init)
            self.relu = nn.ReLU()
            self.down_sample = down_sample
    
        def construct(self, x):
            identity = x  # shortcuts分支(跳跃连接)
    
            out = self.conv1(x)  # 主分支第一层:3*3卷积层
            out = self.norm(out)
            out = self.relu(out)
            out = self.conv2(out)  # 主分支第二层:3*3卷积层
            out = self.norm(out)
    
            if self.down_sample is not None:
                identity = self.down_sample(x)
            out += identity  # 输出为主分支与shortcuts之和
    		# 加在一起后再经过ReLU激活函数
            out = self.relu(out)
    
            return out
    

    构建残差层

    class ResidualBlock(nn.Cell):
        expansion = 4  # 最后一个卷积核的数量是第一个卷积核数量的4倍
    
        def __init__(self, in_channel: int, out_channel: int,
                     stride: int = 1, down_sample: Optional[nn.Cell] = None) -> None:
            super(ResidualBlock, self).__init__()
    
            self.conv1 = nn.Conv2d(in_channel, out_channel,
                                   kernel_size=1, weight_init=weight_init)
            self.norm1 = nn.BatchNorm2d(out_channel)
            self.conv2 = nn.Conv2d(out_channel, out_channel,
                                   kernel_size=3, stride=stride,
                                   weight_init=weight_init)
            self.norm2 = nn.BatchNorm2d(out_channel)
            self.conv3 = nn.Conv2d(out_channel, out_channel * self.expansion,
                                   kernel_size=1, weight_init=weight_init)
            self.norm3 = nn.BatchNorm2d(out_channel * self.expansion)
    
            self.relu = nn.ReLU()
            self.down_sample = down_sample
    
        def construct(self, x):
    
            identity = x  # shortscuts分支
    
            out = self.conv1(x)  # 主分支第一层:1*1卷积层
            out = self.norm1(out)
            out = self.relu(out)
            out = self.conv2(out)  # 主分支第二层:3*3卷积层
            out = self.norm2(out)
            out = self.relu(out)
            out = self.conv3(out)  # 主分支第三层:1*1卷积层
            out = self.norm3(out)
    
            if self.down_sample is not None:
                identity = self.down_sample(x)
    
            out += identity  # 输出为主分支与shortcuts之和
            out = self.relu(out)
    
            return out
    

    构建堆叠的残差块。

    def make_layer(last_out_channel, block: Type[Union[ResidualBlockBase, ResidualBlock]],
                   channel: int, block_nums: int, stride: int = 1):
        down_sample = None  # shortcuts分支
    
        if stride != 1 or last_out_channel != channel * block.expansion:
    	# 构建 下采样模块:1个卷积层+批正则化层
    	# 下采样用于减少特征图的空间分辨率,同时增加特征的抽象程度
            down_sample = nn.SequentialCell([
                nn.Conv2d(last_out_channel, channel * block.expansion,
                          kernel_size=1, stride=stride, weight_init=weight_init),
                nn.BatchNorm2d(channel * block.expansion, gamma_init=gamma_init)
            ])
    
        layers = []
        layers.append(block(last_out_channel, channel, stride=stride, down_sample=down_sample))
    
        in_channel = channel * block.expansion
        
        # 堆叠残差网络
        for _ in range(1, block_nums):
            layers.append(block(in_channel, channel))
    
        return nn.SequentialCell(layers)
    

    构建ResNet网络

    class ResNet(nn.Cell):
        def __init__(self, block: Type[Union[ResidualBlockBase, ResidualBlock]],
                     layer_nums: List[int], num_classes: int, input_channel: int) -> None:
            super(ResNet, self).__init__()
    
            self.relu = nn.ReLU()
            # 第一个卷积层,输入channel为3(彩色图像),输出channel为64
            self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, weight_init=weight_init)
    	# 使用批正则化
            self.norm = nn.BatchNorm2d(64)
            # 最大池化层,缩小图片的尺寸
            self.max_pool = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode='same')
            # 各个残差网络结构块定义,
            self.layer1 = make_layer(64, block, 64, layer_nums[0])
            self.layer2 = make_layer(64 * block.expansion, block, 128, layer_nums[1], stride=2)
            self.layer3 = make_layer(128 * block.expansion, block, 256, layer_nums[2], stride=2)
            self.layer4 = make_layer(256 * block.expansion, block, 512, layer_nums[3], stride=2)
            # 平均池化层
            self.avg_pool = nn.AvgPool2d()
            # flattern层
            self.flatten = nn.Flatten()
            # 全连接层
            self.fc = nn.Dense(in_channels=input_channel, out_channels=num_classes)
    
        def construct(self, x):
    
            x = self.conv1(x)
            x = self.norm(x)
            x = self.relu(x)
            x = self.max_pool(x)
    		# 四个残差块
            x = self.layer1(x)
            x = self.layer2(x)
            x = self.layer3(x)
            x = self.layer4(x)
    		# 收尾:平均池化层、展平、全连接层
            x = self.avg_pool(x)
            x = self.flatten(x)
            x = self.fc(x)
    
            return x
    

    真正开始构建,可以选择使用预训练模型。

    def _resnet(model_url: str, block: Type[Union[ResidualBlockBase, ResidualBlock]],
                layers: List[int], num_classes: int, pretrained: bool, pretrianed_ckpt: str,
                input_channel: int):
        model = ResNet(block, layers, num_classes, input_channel)
    
        if pretrained:
            # 加载预训练模型
            download(url=model_url, path=pretrianed_ckpt, replace=True)
            param_dict = load_checkpoint(pretrianed_ckpt)
            load_param_into_net(model, param_dict)
    
        return model
    
    
    def resnet50(num_classes: int = 1000, pretrained: bool = False):
          resnet50_url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/models/application/resnet50_224_new.ckpt"
        resnet50_ckpt = "./LoadPretrainedModel/resnet50_224_new.ckpt"
        return _resnet(resnet50_url, ResidualBlock, [3, 4, 6, 3], num_classes,
                       pretrained, resnet50_ckpt, 2048)
    

    固定特征训练

    这里冻结最后一层意外的所有网络层,使用requires_grad == False 冻结参数

    import mindspore as ms
    import matplotlib.pyplot as plt
    import os
    import time
    
    # 实例化网络
    net_work = resnet50(pretrained=True)
    
    # 全连接层输入层的大小
    in_channels = net_work.fc.in_channels
    # 输出通道数大小为狼狗分类数2
    head = nn.Dense(in_channels, 2)
    # 重置全连接层
    net_work.fc = head
    
    # 平均池化层kernel size为7
    avg_pool = nn.AvgPool2d(kernel_size=7)
    # 重置平均池化层
    net_work.avg_pool = avg_pool
    
    # 冻结除最后一层外的所有参数
    for param in net_work.get_parameters():
        if param.name not in ["fc.weight", "fc.bias"]:
            param.requires_grad = False
    
    # 定义优化器和损失函数
    opt = nn.Momentum(params=net_work.trainable_params(), learning_rate=lr, momentum=0.5)
    loss_fn = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean')
    
    
    def forward_fn(inputs, targets):
        logits = net_work(inputs)
        loss = loss_fn(logits, targets)
    
        return loss
    
    grad_fn = ms.value_and_grad(forward_fn, None, opt.parameters)
    
    def train_step(inputs, targets):
        loss, grads = grad_fn(inputs, targets)
        opt(grads)
        return loss
    
    # 实例化模型
    model1 = train.Model(net_work, loss_fn, opt, metrics={"Accuracy": train.Accuracy()})
    

    训练和评估

    import mindspore as ms
    import matplotlib.pyplot as plt
    import os
    import time
    # 开始循环训练
    print("Start Training Loop ...")
    
    best_acc = 0
    
    # 按照定好的epoch数进行迭代
    for epoch in range(num_epochs):
        losses = []
        net_work.set_train()
    
        # 为每轮训练读入数据
        for i, (images, labels) in enumerate(data_loader_train):
            labels = labels.astype(ms.int32)
            loss = train_step(images, labels)
            losses.append(loss)
    
        # 每个epoch结束后,验证准确率
        acc = model1.eval(dataset_val)['Accuracy']
        
        print("-" * 20)
        print("Epoch: [%3d/%3d], Average Train Loss: [%5.3f], Accuracy: [%5.3f]" % (
            epoch+1, num_epochs, sum(losses)/len(losses), acc
        ))
       
       # 保存下来准确度最佳的模型
        if acc > best_acc:
            best_acc = acc
            if not os.path.exists(best_ckpt_dir):
                os.mkdir(best_ckpt_dir)
            ms.save_checkpoint(net_work, best_ckpt_path)
    
    print("=" * 80)
    print(f"End of validation the best Accuracy is: {best_acc: 5.3f}, "
          f"save the best ckpt file in {best_ckpt_path}", flush=True)
    

    预测

    import matplotlib.pyplot as plt
    import mindspore as ms
    
    # 可视化预测结果
    def visualize_model(best_ckpt_path, val_ds):
        net = resnet50()
        # 全连接层输入层的大小
        in_channels = net.fc.in_channels
        # 输出通道数大小为狼狗分类数2
        head = nn.Dense(in_channels, 2)
        # 重置全连接层
        net.fc = head
        # 平均池化层kernel size为7
        avg_pool = nn.AvgPool2d(kernel_size=7)
        # 重置平均池化层
        net.avg_pool = avg_pool
        # 加载模型参数
        param_dict = ms.load_checkpoint(best_ckpt_path)
        ms.load_param_into_net(net, param_dict)
        model = train.Model(net)
        # 加载验证集的数据进行验证
        data = next(val_ds.create_dict_iterator())
        images = data["image"].asnumpy()
        labels = data["label"].asnumpy()
        class_name = {0: "dogs", 1: "wolves"}
        # 预测图像类别
        output = model.predict(ms.Tensor(data['image']))
        pred = np.argmax(output.asnumpy(), axis=1)
    
        # 显示图像及图像的预测值
        plt.figure(figsize=(5, 5))
        for i in range(4):
            plt.subplot(2, 2, i + 1)
            # 若预测正确,显示为蓝色;若预测错误,显示为红色
            color = 'blue' if pred[i] == labels[i] else 'red'
            plt.title('predict:{}'.format(class_name[pred[i]]), color=color)
            picture_show = np.transpose(images[i], (1, 2, 0))
            mean = np.array([0.485, 0.456, 0.406])
            std = np.array([0.229, 0.224, 0.225])
            picture_show = std * picture_show + mean
            picture_show = np.clip(picture_show, 0, 1)
            plt.imshow(picture_show)
            plt.axis('off')
    
        plt.show()
    

    最后得到模型的预测结果。
    在这里插入图片描述

    总结

    本章依然使用了ResNet50网络模型,完成了狼狗图片分类任务。

    打卡凭证

    在这里插入图片描述

  • 相关阅读:
    计算机操作系统 第六章:输入输出系统(3)
    前端工程师都应该掌握的抓包神器工具
    速卖通店铺流量下滑什么原因,如何做提升?(测评补单)
    lux和ffmpeg进行下载各大主流自媒体平台视频
    【计算机网络】HTTP/HTTPS协议基础知识汇总
    【Vue实战】使用elementui实现表格的增加、删除、跳转详情页功能
    【Pinia和Vuex区别】
    Nacos2.X源码阅读总结
    学习Source Generators之IncrementalValueProvider
    docker安装Nacos并配置MySQL
  • 原文地址:https://blog.csdn.net/ptyp222/article/details/140438921