前一篇中介绍了一种双向的递归神经网络,将数据进行正序输入和倒序输入,兼顾向前的语义以及向后的语义,从而达到更好的分类效果。
之前的两篇使用递归神经网络做的是分类,可以发现做分类时我们不需要使用时序输入过程中产生的输出,只需关注每个时序输入产生隐藏信息,最后一个时序产生的输出即最后的输出。
这里将会介绍语言模型,这个模型中我们需要重点关注的是每个时序输入过程中产生的输出。可以理解为,我输入a,那么我需要知道这个时序的输出是不是b,如果不是那么我就要调整模型了。
- import torch
- import torch.nn as nn
- import numpy as np
- from torch.nn.utils import clip_grad_norm_
- from data_utils import Dictionary, Corpus
-
- device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
-
- embed_size = 128
- hidden_size = 1024
- num_layers = 1
- num_epochs = 5
- num_samples = 1000
- batch_size = 20
- seq_length = 30
- learning_rate = 0.002
-
- corpus = Corpus()
- ids = corpus.get_data('data/train.txt', batch_size)
- vocab_size = len(corpus.dictionary)
- num_batches = ids.size(1) // seq_length
-
- print(ids.size())
- print(vocab_size)
- print(num_batches)
-
- #torch.Size([20, 46479])
- #10000
- #1549
1、ids:从train.txt中获取的训练数据,总共为20条,下面的模型只对这20条数据进行训练。
2、vocab_size:词库,总共包含有10000个单词
3、num_batch:可能有人要问前面有batch_size,这里的num_batch是干嘛用的?前面的batch_size是从语料库中抽取20条,每条数据长度为46497,除以序列长度seq_length(输入时序为30),个num_batch可以理解为是输入时序块的个数,也就是一个epoch中我们将所有语料输入网络需要循环的次数。
模型很简单,但是参数比较难理解,这里在讲流程的时候依旧对参数进行解释。
1、Embedding层:保存了固定字典和大小的简单查找表,第一个参数是嵌入字典的大小,第二个是每个嵌入向量的大小。也就是说,每个时间序列的特征都被转化成128维的向量。假设一个序列维[20, 30],经过嵌入会变成[20, 30, 128]
2、LSTM层:3个重要参数,输入维度即为嵌入向量大小embed_size = 128,隐藏层神经元个数hidden_size = 1024,lstm单元个数num_layers = 1
3、LSTM的输出结果out中包含了30个时间序列的所有隐藏层输出,这里不仅仅只用最后一层了,要用到所有层的输出。
4、线性激活层:LSTM的隐藏层有1024个特征,要把这1024个特征通过全连接组合成我们词库特征10000,得到的就是这10000个词被选中的概率了。
- class RNNLM(nn.Module):
- def __init__(self,vocab_size,embed_size,hidden_size,num_layers):
- super(RNNLM,self).__init__()
- #parameters - 1、嵌入字典的大小 2、每个嵌入向量的大小
- self.embed = nn.Embedding(vocab_size,embed_size)
- self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first = True)
- self.linear = nn.Linear(hidden_size, vocab_size)
-
- def forward(self, x, h):
- #转化为词向量
- x = self.embed(x) #x.shape = torch.Size([20, 30, 128])
-
- #分成30个时序,在训练的过程中的循环中体现
- out,(h,c) = self.lstm(x,h) #out.shape = torch.Size([20, 30, 1024])
- #out中保存每个时序的输出,这里不仅仅要用最后一个时序,要用上一层的输出和下一层的输入做对比,计算损失
- out = out.reshape(out.size(0) * out.size(1), out.size(2))
-
- #输出10000是因为字典中存在10000个单词
- out = self.linear(out) #out.shape = torch.Size([600, 10000])
-
- return out,(h,c)
向前传播时,我们需要输入两个参数,分别是数据x,h0和c0。每个epoch都要将h0和c0重新初始化。
可以看到在训练之前对输入数据做了一些处理。每次取出长度为30的序列输入,相应的依次向后取一位做为target,这是因为我们的目标就是让每个序列输出的值和下一个字符项相近似。
输出的维度为(600, 10000),将target维度进行转化,计算交叉熵时会自动独热处理。
反向传播过程,防止梯度爆炸,进行了梯度修剪。
- model = RNNLM(vocab_size, embed_size, hidden_size, num_layers).to(device)
-
- criterion = nn.CrossEntropyLoss()
- optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
-
- def detach(states):
- return [state.detach() for state in states]
- for epoch in range(num_epochs):
- # Set initial hidden and cell states
- states = (torch.zeros(num_layers, batch_size, hidden_size).to(device),
- torch.zeros(num_layers, batch_size, hidden_size).to(device))
-
- for i in range(0, ids.size(1) - seq_length, seq_length):
- # Get mini-batch inputs and targets
- inputs = ids[:, i:i+seq_length].to(device) #input torch.Size([20, 30])
- targets = ids[:, (i+1):(i+1)+seq_length].to(device) #target torch.Size([20, 30])
-
- # Forward pass
- states = detach(states)
- #用前一层输出和下一层输入计算损失
- outputs, states = model(inputs, states) #output torch.Size([600, 10000])
-
- loss = criterion(outputs, targets.reshape(-1))
-
- # Backward and optimize
- model.zero_grad()
- loss.backward()
- clip_grad_norm_(model.parameters(), 0.5) #梯度修剪
- optimizer.step()
-
- step = (i+1) // seq_length
- if step % 100 == 0:
- print ('Epoch [{}/{}], Step[{}/{}], Loss: {:.4f}, Perplexity: {:5.2f}'
- .format(epoch+1, num_epochs, step, num_batches, loss.item(), np.exp(loss.item())))
测试时随机选择一个词作为输入,因为没有一个停止的标准,所以我们需要利用循环来控制到底输出多少个字符。
输入维度[1, 1],我们之前的输入是[20, 30]。
本来有一种想法:我们现在只有一个时序了,但是我们的训练时有30个时序,那么还有什么意义?忽然想起来我们训练的参数是公用的!!!所以只要输入一个数据就能预测下面的数据了,并不要所谓的30层。
这里的初始输入是1,那么能不能是2呢?或者是根据我们之前的输入取预测新的字符?其实是可以的,但是由于初始化h0和c0的问题,我们更改了输入的长度,相应的h0和c0也要改变的。
我们最后的输出结果需要转化成为概率,然后随机抽取
- # Test the model
- with torch.no_grad():
- with open('sample.txt', 'w') as f:
- # Set intial hidden ane cell states
- state = (torch.zeros(num_layers, 1, hidden_size).to(device),
- torch.zeros(num_layers, 1, hidden_size).to(device))
-
- # Select one word id randomly
- prob = torch.ones(vocab_size)
- input = torch.multinomial(prob, num_samples=1).unsqueeze(1).to(device)
-
- for i in range(num_samples):
- # Forward propagate RNN
- output, state = model(input, state) #output.shape = torch.Size([1, 10000])
-
- # Sample a word id
- prob = output.exp()
- word_id = torch.multinomial(prob, num_samples=1).item() #根据输出的概率随机采样
-
- # Fill input with sampled word id for the next time step
- input.fill_(word_id)
-
- # File write
- word = corpus.dictionary.idx2word[word_id]
- word = '\n' if word == '
' else word + ' ' - f.write(word)
-
- if (i+1) % 100 == 0:
- print('Sampled [{}/{}] words and save to {}'.format(i+1, num_samples, 'sample.txt'))