写在最前面:
本次博客不涉及模型原理的解释,可以看作是一个纯工程性的一次实验。之前看了很多论文模型中的代码,我只是不求甚解,把大概的流程理解了就放下了。本次实验就是为了仔细的体会其中的细节。
大家都知道,pytorch已经将底层的代码封装的很好的,我们只需要写很少的代码就能跑一个模型。所以本次实验还有一个目的,让写的代码尽量能够复用。
在实验开始之前的第一步,就是选取数据集。我之前看到顶会论文中很多使用的是这个数据集,在这里我们也跟风一下,想要下载的小伙伴可以点击这里。这个数据集是一个关于数字彩色图像设别的数据集,可以理解为更加复杂的Mnist
数据集。给大家展示一下它的复杂度。有些样本我都看不清楚,真不知道大佬些是怎么干到90+的,可怕!
这两个类是将数据集加载过程与预处理过程封装,让上层忽略底层实现细节。
Dataset:
import scipy.io as sio
from torch.utils.data import Dataset
from torch.utils.data.dataset import T_co
class SVHN(Dataset):
def __init__(self, file_path) -> None:
super().__init__()
self.file_path = file_path
data_mat = sio.loadmat(self.file_path)
self.X = data_mat["X"]
self.y = data_mat["y"]
def __getitem__(self, index) -> T_co:
return self.X[:, :, :, index], self.y[index]
def __len__(self):
return self.y.shape[0]
值得注意的是,我们需要重写父类Dataset
的两个方法,__getitem__
, __len__
。__getitem__
方法就是返回一个训练样本与标签, __len__
方法是返回数据集的长度。
DataLoader:
dataLoader = DataLoader(dataset, batch_size=batchSize, shuffle=True)
有的同学看到这儿就会问了,Dataset不是已经有返回数据的接口了吗?为什么还有包一层DataLoader
呢?原因就是在网络训练的过程中,样本不是一个一个输入的,而是一个Batch一个Batch的输入。这里的Batch可以理解为是一个训练样本的集合(多个样本打包在一起)。DataLoader还有很多可选的参数,在这里就不详细介绍了,感兴趣的同学可以去查阅pytoch的API文档。
在这里就不自己写模型结构了,pytorch有官方的实现,我们这里偷一下懒。
from torchvision import models
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
resnet18 = models.resnet18()
# 修改全连接层的输出
num_ftrs = resnet18.fc.in_features
# 十分类,将输出层修改成10
resnet18.fc = nn.Linear(num_ftrs, 10)
# 模型参数放大GPU上,加快训练速度
resnet18 = resnet18.to(device)
这部分其实才是本次主要的工作量。这其中充斥着大量的模板代码,几乎每个模型都会用上。这部分主要是计算损失,反向传播,优化器。其中优化器就优化反向传播的。比较无奈的是,这部分也已经有实现了,直接用就是了,非常的方便。
def train(model, dataLoader, optimizer, lossFunc, n_epoch):
start_time = time.time()
test_best_loss = float('inf')
last_improve = 0 # 记录上次验证集loss下降的batch数
flag = False # 记录是否很久没有效果提升
total_batch = 0 # 记录进行到多少batch
writer = SummaryWriter(log_dir=log_path + '/' + time.strftime('%m-%d_%H.%M', time.localtime()))
for epoch in range(n_epoch):
print('Epoch [{}/{}]'.format(epoch + 1, n_epoch))
model.train()
sum_loss = 0.0
correct = 0.0
total = 0.0
for batch_idx, dataset in enumerate(dataLoader):
length = len(dataLoader)
optimizer.zero_grad()
data, labelOrg = dataset
data = data.to(device)
label = F.one_hot(labelOrg.to(torch.long), 10).to(torch.float).to(device)
predict = model(data)
loss = lossFunc(predict, label)
loss.backward()
optimizer.step()
# Tensor.item() 类型转换,返回一个数
sum_loss += loss.item()
# maxIdx, maxVal = torch.max
_, predicted = torch.max(predict.data, dim=1)
total += label.size(0)
correct += predicted.cpu().eq(labelOrg.data).sum()
# 注意这里是以一个batch为一个单位
print("[epoch:%d, iter:%d] Loss: %.03f | Acc: %.3f%% "
% (epoch + 1, (batch_idx + 1 + epoch * length), sum_loss / (batch_idx + 1), 100. * correct / total))
# 每一百个batch计算模型再测试集或者验证集的正确率
if total_batch % 100 == 0:
testDataLoss, testDataAcc = evalTestAcc(model)
time_dif = get_time_dif(start_time)
if testDataLoss < test_best_loss:
test_best_loss = testDataLoss
torch.save(model.state_dict(), save_path)
improve = '*'
last_improve = total_batch
else:
improve = ''
msg = 'Iter: {0:>6}, Train Loss: {1:>5.2}, Train Acc: {2:>6.2%}, Test Loss: {3:>5.2}, Test Acc: {4:>6.2%}, Time: {5} {6}'
print(msg.format(total_batch, sum_loss / (batch_idx + 1), correct / total, testDataLoss, testDataAcc, time_dif, improve))
writer.add_scalar("loss/train", loss.item(), total_batch)
writer.add_scalar("loss/dev", testDataLoss, total_batch)
writer.add_scalar("acc/train", correct / total, total_batch)
writer.add_scalar("acc/dev", testDataAcc, total_batch)
# 提供训练程序的两个出口: n_epoch, require_improvement个batch没有提升
total_batch += 1
model.train()
if total_batch - last_improve > require_improvement:
# 验证集loss超过1000batch没下降,结束训练
print("No optimization for a long time, auto-stopping...")
flag = True
break
if flag:
break
writer.close()
def evalTestAcc(net):
net.eval()
totalAcc = 0.0
sumLoss = 0.0
total = 0.0
with torch.no_grad():
for idx, dataset in enumerate(testDataLoader):
data, labelOrg = dataset
predict = net(data.to(device))
_, predicted = torch.max(predict.data, dim=1)
totalAcc += predicted.cpu().eq(labelOrg).sum()
label = F.one_hot(labelOrg.to(torch.long), 10).to(torch.float).to(device)
sumLoss += lossFunc(predict, label).item()
total += label.size(0)
return sumLoss / len(testDataLoader), totalAcc / total
看了一下,感觉没什么讲的,几乎都是模板代码,放在任何一个模型中都可以使用。值得注意的是,在本次实验中没有区分测试集与验证集,可以理解为没有测试集,实验中的testDataset被用作是验证集,调整训练参数了。
if __name__ == '__main__':
# filePath = r"E:\dataset\SVHN\train_32x32.mat"
save_path = r"model_save/net.pt"
log_path = r"logs"
require_improvement = 1000
batchSize = 256
n_epoch = 10
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
resnet18 = models.resnet18()
# 修改全连接层的输出
num_ftrs = resnet18.fc.in_features
resnet18.fc = nn.Linear(num_ftrs, 10)
resnet18 = resnet18.to(device)
# SVHNTrainData = SVHN(filePath)
train_dataset = torchvision.datasets.SVHN(
root=r'E:\dataset\SVHN',
split='train',
download=False,
transform=torchvision.transforms.ToTensor()
)
test_dataset = torchvision.datasets.SVHN(
root=r'E:\dataset\SVHN',
split='test',
download=False,
transform=torchvision.transforms.ToTensor()
)
dataLoader = DataLoader(train_dataset, batch_size=batchSize, shuffle=True)
testDataLoader = DataLoader(test_dataset, batch_size=batchSize, shuffle=True)
optimizer = optim.SGD(resnet18.parameters(), lr=0.01, momentum=0.9)
lossFunc = nn.CrossEntropyLoss()
train(resnet18, dataLoader, optimizer, lossFunc, n_epoch)
这里把所有的内容串起来了。在运行完成后,在当前目录会产生于一个logs文件夹, 大家可以运行tensorboard --logdir 文件夹地址
,就可以看到如下图所示,记录训练过程中,损失与准确率在测试集与验证集上的变化曲线。
序列化与反序列化,我们可以理解为保存于加载。我们的模型训练好之后,就可以直接进行预测任务,这时候就不会在反向传播更新模型参数了。
参考,这篇博客讲得太清楚了,几乎包括了所有的内容,我都不想在讲了。我这里就记录一下我的反序列化过程吧。
import random
import numpy as np
import torch
import torchvision
from matplotlib import pyplot as plt
from torch import nn
from torch.utils.data import DataLoader
from torchvision import models
if __name__ == '__main__':
path = r"model_save/net.pt"
batchSize = 256
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
resnet18 = models.resnet18()
# 修改全连接层的输出
num_ftrs = resnet18.fc.in_features
resnet18.fc = nn.Linear(num_ftrs, 10)
# resnet18 = resnet18.to(device)
resnet18.load_state_dict(torch.load(path, map_location=torch.device("cpu")))
resnet18.eval()
test_dataset = torchvision.datasets.SVHN(
root=r'E:\dataset\SVHN',
split='test',
download=False,
transform=torchvision.transforms.ToTensor()
)
testDataLoader = DataLoader(test_dataset, batch_size=batchSize, shuffle=True)
trains, labels = iter(testDataLoader).__next__()
predicts = resnet18(trains)
# 其实可以只用预测一个样本,而不是一个batch
# resnet18(trains[0].unsqueeze(0))
_, predictLabels = torch.max(predicts, dim=1)
fig, axs = plt.subplots(1, 5, figsize=(10, 10)) # 建立子图
print("predictLabels: {}".format(predictLabels))
print("labels: {}".format(labels))
print("Acc: {:.2f}".format(predictLabels.data.eq(labels).sum() / labels.shape[0]))
for i in range(5):
num = random.randint(0, batchSize) # 首先选取随机数,随机选取五次
npimg, nplabel = trains[num], labels[num]
axs[i].imshow(np.transpose(npimg, (1, 2, 0)))
axs[i].set_title("GroundTruth: {}, Predict: {}".format(nplabel, predictLabels[num])) # 给每个子图加上标签
axs[i].axis("off") # 消除每个子图的坐标轴
plt.show()