• 【深度学习实验】卷积神经网络(六):自定义卷积神经网络模型(VGG)实现图片多分类任务


    目录

    一、实验介绍

    二、实验环境

    1. 配置虚拟环境

    2. 库版本介绍

    三、实验内容

    0. 导入必要的工具包

    1. 构建数据集(CIFAR10Dataset)

    a. read_csv_labels()

    b. CIFAR10Dataset

    2. 构建模型(FeedForward)

    3.整合训练、评估、预测过程(Runner)

    4. __main__

    预测结果

    5. 代码整合


    一、实验介绍

            本实验实现了一个简化版VGG网络,并基于此完成图像分类任务。
           

            VGG网络是深度卷积神经网络中的经典模型之一,由牛津大学计算机视觉组(Visual Geometry Group)提出。它在2014年的ImageNet图像分类挑战中取得了优异的成绩(分类任务第二,定位任务第一),被广泛应用于图像分类、目标检测和图像生成等任务。

            VGG网络的主要特点是使用了非常小的卷积核尺寸(通常为3x3)和更深的网络结构。该网络通过多个卷积层和池化层堆叠在一起,逐渐增加网络的深度,从而提取图像的多层次特征表示。VGG网络的基本构建块是由连续的卷积层组成,每个卷积层后面跟着一个ReLU激活函数。在每个卷积块的末尾,都会添加一个最大池化层来减小特征图的尺寸。VGG网络的这种简单而有效的结构使得它易于理解和实现,并且在不同的任务上具有很好的泛化性能。

            VGG网络有几个不同的变体,如VGG11、VGG13、VGG16和VGG19,它们的数字代表网络的层数。这些变体在网络深度和参数数量上有所区别,较深的网络通常具有更强大的表示能力,但也更加复杂。

    二、实验环境

        本系列实验使用了PyTorch深度学习框架,相关操作如下:

    1. 配置虚拟环境

    conda create -n DL python=3.7 
    conda activate DL
    pip install torch==1.8.1+cu102 torchvision==0.9.1+cu102 torchaudio==0.8.1 -f https://download.pytorch.org/whl/torch_stable.html
    
    conda install matplotlib
     conda install scikit-learn

    2. 库版本介绍

    软件包本实验版本目前最新版
    matplotlib3.5.33.8.0
    numpy1.21.61.26.0
    python3.7.16
    scikit-learn0.22.11.3.0
    torch1.8.1+cu1022.0.1
    torchaudio0.8.12.0.2
    torchvision0.9.1+cu1020.15.2

    三、实验内容

    ChatGPT:

            卷积神经网络(Convolutional Neural Network,简称CNN)是一种深度学习模型,广泛应用于图像识别、计算机视觉和模式识别等领域。它的设计灵感来自于生物学中视觉皮层的工作原理。

            卷积神经网络通过多个卷积层、池化层全连接层组成。

    • 卷积层主要用于提取图像的局部特征,通过卷积操作和激活函数的处理,可以学习到图像的特征表示。
    • 池化层则用于降低特征图的维度,减少参数数量,同时保留主要的特征信息。
    • 全连接层则用于将提取到的特征映射到不同类别的概率上,进行分类或回归任务。

            卷积神经网络在图像处理方面具有很强的优势,它能够自动学习到具有层次结构的特征表示,并且对平移、缩放和旋转等图像变换具有一定的不变性。这些特点使得卷积神经网络成为图像分类、目标检测、语义分割等任务的首选模型。除了图像处理,卷积神经网络也可以应用于其他领域,如自然语言处理和时间序列分析。通过将文本或时间序列数据转换成二维形式,可以利用卷积神经网络进行相关任务的处理。

    0. 导入必要的工具包

    1. import torch
    2. from torch import nn
    3. import torch.nn.functional as F
    4. from torch.utils.data import Dataset, DataLoader
    5. from torchvision.io import read_image
    6. import matplotlib.pyplot as plt
    7. import os

    1. 构建数据集(CIFAR10Dataset)

    a. read_csv_labels()

            从CSV文件中读取标签信息并返回一个标签字典。

    1. def read_csv_labels(fname):
    2. """读取fname来给标签字典返回一个文件名"""
    3. with open(fname, 'r') as f:
    4. # 跳过文件头行(列名)
    5. lines = f.readlines()[1:]
    6. tokens = [l.rstrip().split(',') for l in lines]
    7. return dict(((name, label) for name, label in tokens))
    •  使用open函数打开指定文件名的CSV文件,并将文件对象赋值给变量f。这里使用'r'参数以只读模式打开文件。

    • 使用文件对象的readlines()方法读取文件的所有行,并将结果存储在名为lines的列表中。通过切片操作[1:],跳过了文件的第一行(列名),将剩余的行存储在lines列表中。

    • 列表推导式(list comprehension):对lines列表中的每一行进行处理。对于每一行,使用rstrip()方法去除行末尾的换行符,并使用split(',')方法将行按逗号分割为多个标记。最终,将所有行的标记组成的子列表存储在tokens列表中。

    • 使用字典推导式(dictionary comprehension)将tokens列表中的子列表转换为字典。对于tokens中的每个子列表,将子列表的第一个元素作为键(name),第二个元素作为值(label),最终返回一个包含这些键值对的字典。

    b. CIFAR10Dataset

    1. class CIFAR10Dataset(Dataset):
    2. def __init__(self, folder_path, fname):
    3. self.labels = read_csv_labels(os.path.join(folder_path, fname))
    4. self.folder_path = os.path.join(folder_path, 'train')
    5. def __len__(self):
    6. return len(self.labels)
    7. def __getitem__(self, idx):
    8. img = read_image(self.folder_path + '/' + str(idx + 1) + '.png')
    9. label = self.labels[str(idx + 1)]
    10. return img, torch.tensor(int(label))
    • 构造函数:

      • 接受两个参数

        • folder_path表示数据集所在的文件夹路径

        • fname表示包含标签信息的文件名。

      • 调用read_csv_labels函数,传递folder_pathfname作为参数,以读取CSV文件中的标签信息,并将返回的标签字典存储在self.labels变量中。

      • 通过拼接folder_path和字符串'train'来构建数据集的文件夹路径,将结果存储在self.folder_path变量中。

    • def __len__(self)

      • 这是CIFAR10Dataset类的方法,用于返回数据集的长度,即样本的数量。

    • def __getitem__(self, idx): 这是CIFAR10Dataset类的方法,用于根据给定的索引idx获取数据集中的一个样本。它首先根据索引idx构建图像文件的路径,并调用read_image函数来读取图像数据,将结果存储在img变量中。然后,它通过将索引转换为字符串,并使用该字符串作为键来从self.labels字典中获取相应的标签,将结果存储在label变量中。最后,它返回一个元组,包含图像数据和经过torch.tensor转换的标签。

    2. 构建模型(FeedForward)

            参考前文:

    【深度学习实验】卷积神经网络(五):深度卷积神经网络经典模型——VGG网络(卷积层、池化层、全连接层)_QomolangmaH的博客-CSDN博客icon-default.png?t=N7T8https://blog.csdn.net/m0_63834988/article/details/133350927?spm=1001.2014.3001.5501

    1. # 每个卷积块由Conv2d卷积 + BatchNorm2d(批量标准化处理) + ReLU激活层组成
    2. def conv_layer(chann_in, chann_out, k_size, p_size):
    3. layer = nn.Sequential(
    4. nn.Conv2d(chann_in, chann_out, kernel_size=k_size, padding=p_size),
    5. nn.BatchNorm2d(chann_out),
    6. nn.ReLU()
    7. )
    8. return layer
    9. # vgg卷积模块是由几个相同的卷积块以及最大池化组成
    10. def vgg_conv_block(in_list, out_list, k_list, p_list, pooling_k, pooling_s):
    11. layers = [conv_layer(in_list[i], out_list[i], k_list[i], p_list[i]) for i in range(len(in_list)) ]
    12. layers += [nn.MaxPool2d(kernel_size = pooling_k, stride = pooling_s)]
    13. return nn.Sequential(*layers)
    14. # vgg全连接层由Linear + BatchNorm1d + ReLU组成
    15. def vgg_fc_layer(size_in, size_out):
    16. layer = nn.Sequential(
    17. nn.Linear(size_in, size_out),
    18. nn.BatchNorm1d(size_out),
    19. nn.ReLU()
    20. )
    21. return layer
    22. # 为了简化,我们少使用了几层卷积层,方便大家使用
    23. class VGG_S(nn.Module):
    24. def __init__ (self, num_classes):
    25. super().__init__()
    26. self.layer1 = vgg_conv_block([3,64], [64,64], [3,3], [1,1], 2, 2)
    27. self.layer2 = vgg_conv_block([64,128], [128,128], [3,3], [1,1], 2, 2)
    28. self.layer3 = vgg_conv_block([128,256,256], [256,256,256], [3,3,3], [1,1,1], 2, 2)
    29. # 全连接层
    30. self.layer4 = vgg_fc_layer(4096, 1024)
    31. # Final layer
    32. self.layer5 = nn.Linear(1024, num_classes)
    33. def forward(self, x):
    34. out = self.layer1(x)
    35. out = self.layer2(out)
    36. vgg16_features = self.layer3(out)
    37. out = vgg16_features.view(out.size(0), -1)
    38. out = self.layer4(out)
    39. out = self.layer5(out)
    40. return out

    3.整合训练、评估、预测过程(Runner)

            参考前文:

    【深度学习实验】前馈神经网络(九):整合训练、评估、预测过程(Runner)_QomolangmaH的博客-CSDN博客icon-default.png?t=N7T8https://blog.csdn.net/m0_63834988/article/details/133219448?spm=1001.2014.3001.5501

            (略有改动:)

    1. class Runner(object):
    2. def __init__(self, model, optimizer, loss_fn, metric=None):
    3. self.model = model
    4. self.optimizer = optimizer
    5. self.loss_fn = loss_fn
    6. # 用于计算评价指标
    7. self.metric = metric
    8. # 记录训练过程中的评价指标变化
    9. self.dev_scores = []
    10. # 记录训练过程中的损失变化
    11. self.train_epoch_losses = []
    12. self.dev_losses = []
    13. # 记录全局最优评价指标
    14. self.best_score = 0
    15. # 模型训练阶段
    16. def train(self, train_loader, dev_loader=None, **kwargs):
    17. # 将模型设置为训练模式,此时模型的参数会被更新
    18. self.model.train()
    19. num_epochs = kwargs.get('num_epochs', 0)
    20. log_steps = kwargs.get('log_steps', 100)
    21. save_path = kwargs.get('save_path','best_model.pth')
    22. eval_steps = kwargs.get('eval_steps', 0)
    23. # 运行的step数,不等于epoch数
    24. global_step = 0
    25. if eval_steps:
    26. if dev_loader is None:
    27. raise RuntimeError('Error: dev_loader can not be None!')
    28. if self.metric is None:
    29. raise RuntimeError('Error: Metric can not be None')
    30. # 遍历训练的轮数
    31. for epoch in range(num_epochs):
    32. total_loss = 0
    33. # 遍历数据集
    34. for step, data in enumerate(train_loader):
    35. x, y = data
    36. logits = self.model(x.float())
    37. loss = self.loss_fn(logits, y.long())
    38. total_loss += loss
    39. if step%log_steps == 0:
    40. print(f'loss:{loss.item():.5f}')
    41. loss.backward()
    42. self.optimizer.step()
    43. self.optimizer.zero_grad()
    44. # 每隔一定轮次进行一次验证,由eval_steps参数控制,可以采用不同的验证判断条件
    45. if eval_steps != 0 :
    46. if (epoch+1) % eval_steps == 0:
    47. dev_score, dev_loss = self.evaluate(dev_loader, global_step=global_step)
    48. print(f'[Evalute] dev score:{dev_score:.5f}, dev loss:{dev_loss:.5f}')
    49. if dev_score > self.best_score:
    50. self.save_model(f'model_{epoch+1}.pth')
    51. print(f'[Evaluate]best accuracy performance has been updated: {self.best_score:.5f}-->{dev_score:.5f}')
    52. self.best_score = dev_score
    53. # 验证过程结束后,请记住将模型调回训练模式
    54. self.model.train()
    55. global_step += 1
    56. # 保存当前轮次训练损失的累计值
    57. train_loss = (total_loss/len(train_loader)).item()
    58. self.train_epoch_losses.append((global_step,train_loss))
    59. self.save_model(f'{save_path}.pth')
    60. print('[Train] Train done')
    61. # 模型评价阶段
    62. def evaluate(self, dev_loader, **kwargs):
    63. assert self.metric is not None
    64. # 将模型设置为验证模式,此模式下,模型的参数不会更新
    65. self.model.eval()
    66. global_step = kwargs.get('global_step',-1)
    67. total_loss = 0
    68. self.metric.reset()
    69. for batch_id, data in enumerate(dev_loader):
    70. x, y = data
    71. logits = self.model(x.float())
    72. loss = self.loss_fn(logits, y.long()).item()
    73. total_loss += loss
    74. self.metric.update(logits, y)
    75. dev_loss = (total_loss/len(dev_loader))
    76. self.dev_losses.append((global_step, dev_loss))
    77. dev_score = self.metric.accumulate()
    78. self.dev_scores.append(dev_score)
    79. return dev_score, dev_loss
    80. # 模型预测阶段,
    81. def predict(self, x, **kwargs):
    82. self.model.eval()
    83. logits = self.model(x)
    84. return logits
    85. # 保存模型的参数
    86. def save_model(self, save_path):
    87. torch.save(self.model.state_dict(), save_path)
    88. # 读取模型的参数
    89. def load_model(self, model_path):
    90. self.model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu')))

    4. __main__

    1. if __name__ == '__main__':
    2. batch_size = 20
    3. # 构建训练集
    4. train_data = CIFAR10Dataset('cifar10_tiny', 'trainLabels.csv')
    5. train_iter = DataLoader(train_data, batch_size=batch_size)
    6. # 构建测试集
    7. test_data = CIFAR10Dataset('cifar10_tiny', 'trainLabels.csv')
    8. test_iter = DataLoader(test_data, batch_size=batch_size)
    9. # 模型训练
    10. num_classes = 10
    11. # 定义模型
    12. model = VGG_S(num_classes)
    13. # 定义损失函数
    14. loss_fn = F.cross_entropy
    15. # 定义优化器
    16. optimizer = torch.optim.SGD(model.parameters(), lr=0.1)
    17. runner = Runner(model, optimizer, loss_fn, metric=None)
    18. runner.train(train_iter, num_epochs=10, save_path='chapter_5')
    19. # 模型预测
    20. runner.load_model('chapter_5.pth')
    21. x, label = next(iter(test_iter))
    22. predict = torch.argmax(runner.predict(x.float()), dim=1)
    23. print('predict:', predict)
    24. print(' label:', label)

    预测结果

    1. predict: tensor([6, 1, 9, 6, 1, 1, 6, 7, 0, 3, 4, 7, 7, 1, 9, 0, 9, 5, 3, 6])
    2. label: tensor([6, 9, 9, 4, 1, 1, 2, 7, 8, 3, 4, 7, 7, 2, 9, 9, 9, 3, 2, 6])

    5. 代码整合

    1. # 导入必要的工具包
    2. import torch
    3. from torch import nn
    4. import torch.nn.functional as F
    5. from torch.utils.data import Dataset, DataLoader
    6. from torchvision.io import read_image
    7. import matplotlib.pyplot as plt
    8. import os
    9. def read_csv_labels(fname):
    10. """读取fname来给标签字典返回一个文件名"""
    11. with open(fname, 'r') as f:
    12. # 跳过文件头行(列名)
    13. lines = f.readlines()[1:]
    14. tokens = [l.rstrip().split(',') for l in lines]
    15. return dict(((name, label) for name, label in tokens))
    16. class CIFAR10Dataset(Dataset):
    17. def __init__(self, folder_path, fname):
    18. self.labels = read_csv_labels(os.path.join(folder_path, fname))
    19. self.folder_path = os.path.join(folder_path, 'train')
    20. def __len__(self):
    21. return len(self.labels)
    22. def __getitem__(self, idx):
    23. img = read_image(self.folder_path + '/' + str(idx + 1) + '.png')
    24. label = self.labels[str(idx + 1)]
    25. return img, torch.tensor(int(label))
    26. # 每个卷积块由Conv2d卷积 + BatchNorm2d(批量标准化处理) + ReLU激活层组成
    27. def conv_layer(chann_in, chann_out, k_size, p_size):
    28. layer = nn.Sequential(
    29. nn.Conv2d(chann_in, chann_out, kernel_size=k_size, padding=p_size),
    30. nn.BatchNorm2d(chann_out),
    31. nn.ReLU()
    32. )
    33. return layer
    34. # vgg卷积模块是由几个相同的卷积块以及最大池化组成
    35. def vgg_conv_block(in_list, out_list, k_list, p_list, pooling_k, pooling_s):
    36. layers = [conv_layer(in_list[i], out_list[i], k_list[i], p_list[i]) for i in range(len(in_list))]
    37. layers += [nn.MaxPool2d(kernel_size=pooling_k, stride=pooling_s)]
    38. return nn.Sequential(*layers)
    39. # vgg全连接层由Linear + BatchNorm1d + ReLU组成
    40. def vgg_fc_layer(size_in, size_out):
    41. layer = nn.Sequential(
    42. nn.Linear(size_in, size_out),
    43. nn.BatchNorm1d(size_out),
    44. nn.ReLU()
    45. )
    46. return layer
    47. # 为了简化,我们少使用了几层卷积层,方便大家使用
    48. class VGG_S(nn.Module):
    49. def __init__(self, num_classes):
    50. super().__init__()
    51. self.layer1 = vgg_conv_block([3, 64], [64, 64], [3, 3], [1, 1], 2, 2)
    52. self.layer2 = vgg_conv_block([64, 128], [128, 128], [3, 3], [1, 1], 2, 2)
    53. self.layer3 = vgg_conv_block([128, 256, 256], [256, 256, 256], [3, 3, 3], [1, 1, 1], 2, 2)
    54. # 全连接层
    55. self.layer4 = vgg_fc_layer(4096, 1024)
    56. # Final layer
    57. self.layer5 = nn.Linear(1024, num_classes)
    58. def forward(self, x):
    59. out = self.layer1(x)
    60. out = self.layer2(out)
    61. vgg16_features = self.layer3(out)
    62. out = vgg16_features.view(out.size(0), -1)
    63. out = self.layer4(out)
    64. out = self.layer5(out)
    65. return out
    66. class Runner(object):
    67. def __init__(self, model, optimizer, loss_fn, metric=None):
    68. self.model = model
    69. self.optimizer = optimizer
    70. self.loss_fn = loss_fn
    71. # 用于计算评价指标
    72. self.metric = metric
    73. # 记录训练过程中的评价指标变化
    74. self.dev_scores = []
    75. # 记录训练过程中的损失变化
    76. self.train_epoch_losses = []
    77. self.dev_losses = []
    78. # 记录全局最优评价指标
    79. self.best_score = 0
    80. # 模型训练阶段
    81. def train(self, train_loader, dev_loader=None, **kwargs):
    82. # 将模型设置为训练模式,此时模型的参数会被更新
    83. self.model.train()
    84. num_epochs = kwargs.get('num_epochs', 0)
    85. log_steps = kwargs.get('log_steps', 100)
    86. save_path = kwargs.get('save_path', 'best_model.pth')
    87. eval_steps = kwargs.get('eval_steps', 0)
    88. # 运行的step数,不等于epoch数
    89. global_step = 0
    90. if eval_steps:
    91. if dev_loader is None:
    92. raise RuntimeError('Error: dev_loader can not be None!')
    93. if self.metric is None:
    94. raise RuntimeError('Error: Metric can not be None')
    95. # 遍历训练的轮数
    96. for epoch in range(num_epochs):
    97. total_loss = 0
    98. # 遍历数据集
    99. for step, data in enumerate(train_loader):
    100. x, y = data
    101. logits = self.model(x.float())
    102. loss = self.loss_fn(logits, y.long())
    103. total_loss += loss
    104. if step % log_steps == 0:
    105. print(f'loss:{loss.item():.5f}')
    106. loss.backward()
    107. self.optimizer.step()
    108. self.optimizer.zero_grad()
    109. # 每隔一定轮次进行一次验证,由eval_steps参数控制,可以采用不同的验证判断条件
    110. if eval_steps != 0:
    111. if (epoch + 1) % eval_steps == 0:
    112. dev_score, dev_loss = self.evaluate(dev_loader, global_step=global_step)
    113. print(f'[Evalute] dev score:{dev_score:.5f}, dev loss:{dev_loss:.5f}')
    114. if dev_score > self.best_score:
    115. self.save_model(f'model_{epoch + 1}.pth')
    116. print(
    117. f'[Evaluate]best accuracy performance has been updated: {self.best_score:.5f}-->{dev_score:.5f}')
    118. self.best_score = dev_score
    119. # 验证过程结束后,请记住将模型调回训练模式
    120. self.model.train()
    121. global_step += 1
    122. # 保存当前轮次训练损失的累计值
    123. train_loss = (total_loss / len(train_loader)).item()
    124. self.train_epoch_losses.append((global_step, train_loss))
    125. self.save_model(f'{save_path}.pth')
    126. print('[Train] Train done')
    127. # 模型评价阶段
    128. def evaluate(self, dev_loader, **kwargs):
    129. assert self.metric is not None
    130. # 将模型设置为验证模式,此模式下,模型的参数不会更新
    131. self.model.eval()
    132. global_step = kwargs.get('global_step', -1)
    133. total_loss = 0
    134. self.metric.reset()
    135. for batch_id, data in enumerate(dev_loader):
    136. x, y = data
    137. logits = self.model(x.float())
    138. loss = self.loss_fn(logits, y.long()).item()
    139. total_loss += loss
    140. self.metric.update(logits, y)
    141. dev_loss = (total_loss / len(dev_loader))
    142. self.dev_losses.append((global_step, dev_loss))
    143. dev_score = self.metric.accumulate()
    144. self.dev_scores.append(dev_score)
    145. return dev_score, dev_loss
    146. # 模型预测阶段,
    147. def predict(self, x, **kwargs):
    148. self.model.eval()
    149. logits = self.model(x)
    150. return logits
    151. # 保存模型的参数
    152. def save_model(self, save_path):
    153. torch.save(self.model.state_dict(), save_path)
    154. # 读取模型的参数
    155. def load_model(self, model_path):
    156. self.model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu')))
    157. if __name__ == '__main__':
    158. batch_size = 20
    159. # 构建训练集
    160. train_data = CIFAR10Dataset('cifar10_tiny', 'trainLabels.csv')
    161. train_iter = DataLoader(train_data, batch_size=batch_size)
    162. # 构建测试集
    163. test_data = CIFAR10Dataset('cifar10_tiny', 'trainLabels.csv')
    164. test_iter = DataLoader(test_data, batch_size=batch_size)
    165. # 模型训练
    166. num_classes = 10
    167. # 定义模型
    168. model = VGG_S(num_classes)
    169. # 定义损失函数
    170. loss_fn = F.cross_entropy
    171. # 定义优化器
    172. optimizer = torch.optim.SGD(model.parameters(), lr=0.1)
    173. runner = Runner(model, optimizer, loss_fn, metric=None)
    174. runner.train(train_iter, num_epochs=10, save_path='chapter_5')
    175. # 模型预测
    176. runner.load_model('chapter_5.pth')
    177. x, label = next(iter(test_iter))
    178. predict = torch.argmax(runner.predict(x.float()), dim=1)
    179. print('predict:', predict)
    180. print(' label:', label)

  • 相关阅读:
    有什么好用的站内搜索SaaS能帮网站实现站内搜索功能?
    优雅而高效的JavaScript——扩展运算符
    【C++】笔试训练(四)
    [Python急救站]基于Transformer Models模型完成GPT2的学生AIGC学习训练模型
    Python计算均值、方差、标准差、协方差等常用指标的方法——Numpy模块+Pandas模块
    【探索C++】C++对C语言的扩展
    信息学奥赛一本通:1102:与指定数字相同的数的个数
    大部分人都缺乏一种特质
    linux快捷操作方式
    阿里云&腾讯云服务器安装oracle11g
  • 原文地址:https://blog.csdn.net/m0_63834988/article/details/133393522