随机生成一些数据来模拟训练和验证数据集:
import torch
# 随机生成数据
n_samples = 1000
n_features = 784 # 例如,28x28图像的像素数
train_data = torch.rand(n_samples, n_features)
val_data = torch.rand(int(n_samples * 0.1), n_features)
import torch.nn as nn
class Autoencoder(nn.Module):
def __init__(self, input_size, hidden_size):
super(Autoencoder, self).__init__()
self.encoder = nn.Sequential(
nn.Linear(input_size, hidden_size),
nn.Tanh())
self.decoder = nn.Sequential(
nn.Linear(hidden_size, input_size),
nn.Tanh())
def forward(self, x):
x = self.encoder(x)
x = self.decoder(x)
return x
定义一个函数来训练自编码器:
def train_ae(model, train_loader, val_loader, num_epochs, criterion, optimizer):
for epoch in range(num_epochs):
# Training
model.train()
train_loss = 0
for batch_data in train_loader:
optimizer.zero_grad()
outputs = model(batch_data)
loss = criterion(outputs, batch_data)
loss.backward()
optimizer.step()
train_loss += loss.item()
train_loss /= len(train_loader)
print(f"Epoch {epoch+1}/{num_epochs}, Training Loss: {train_loss:.4f}")
# Validation
model.eval()
val_loss = 0
with torch.no_grad():
for batch_data in val_loader:
outputs = model(batch_data)
loss = criterion(outputs, batch_data)
val_loss += loss.item()
val_loss /= len(val_loader)
print(f"Epoch {epoch+1}/{num_epochs}, Validation Loss: {val_loss:.4f}")
使用上面定义的函数来训练自编码器:
from torch.utils.data import DataLoader
# DataLoader
batch_size = 32
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False)
# 训练第一个自编码器
ae1 = Autoencoder(input_size=784, hidden_size=400)
optimizer = torch.optim.Adam(ae1.parameters(), lr=0.001)
criterion = nn.MSELoss()
train_ae(ae1, train_loader, val_loader, 10, criterion, optimizer)
# 使用第一个自编码器的编码器对数据进行编码
encoded_train_data = []
for data in train_loader:
encoded_train_data.append(ae1.encoder(data))
encoded_train_loader = DataLoader(torch.cat(encoded_train_data), batch_size=batch_size, shuffle=True)
encoded_val_data = []
for data in val_loader:
encoded_val_data.append(ae1.encoder(data))
encoded_val_loader = DataLoader(torch.cat(encoded_val_data), batch_size=batch_size, shuffle=False)
# 训练第二个自编码器
ae2 = Autoencoder(input_size=400, hidden_size=200)
optimizer = torch.optim.Adam(ae2.parameters(), lr=0.001)
train_ae(ae2, encoded_train_loader, encoded_val_loader, 10, criterion, optimizer)
# 使用第二个自编码器的编码器对数据进行编码
encoded_train_data = []
for data in train_loader:
encoded_train_data.append(ae2.encoder(data))
encoded_train_loader = DataLoader(torch.cat(encoded_train_data), batch_size=batch_size, shuffle=True)
encoded_val_data = []
for data in val_loader:
encoded_val_data.append(ae2.encoder(data))
encoded_val_loader = DataLoader(torch.cat(encoded_val_data), batch_size=batch_size, shuffle=False)
# 训练第三个自编码器
ae3 = Autoencoder(input_size=400, hidden_size=200)
optimizer = torch.optim.Adam(ae3.parameters(), lr=0.001)
train_ae(ae3, encoded_train_loader, encoded_val_loader, 10, criterion, optimizer)
# 使用第三个自编码器的编码器对数据进行编码
encoded_train_data = []
for data in train_loader:
encoded_train_data.append(ae3.encoder(data))
encoded_train_loader = DataLoader(torch.cat(encoded_train_data), batch_size=batch_size, shuffle=True)
encoded_val_data = []
for data in val_loader:
encoded_val_data.append(ae3.encoder(data))
encoded_val_loader = DataLoader(torch.cat(encoded_val_data), batch_size=batch_size, shuffle=False)
class StackedAutoencoder(nn.Module):
def __init__(self, ae1, ae2, ae3):
super(StackedAutoencoder, self).__init__()
self.encoder = nn.Sequential(ae1.encoder, ae2.encoder, ae3.encoder)
self.decoder = nn.Sequential(ae3.decoder, ae2.decoder, ae1.decoder)
def forward(self, x):
x = self.encoder(x)
x = self.decoder(x)
return x
sae = StackedAutoencoder(ae1, ae2, ae3)
在整个数据集上重新训练堆叠自编码器来完成。
train_autoencoder(sae, train_dataset)