RNN的核心和精华就在于这个“R”。不同于普通的不保留前面时刻数据(这边不包括使用反向传播更新参数作为“保存前面数据”的另一种手段)的神经网络结构,RNN通过在自己内部记录下一定范围内历史的数据,从而达到和历史数据形成强关联的目标。
这种结构通常使用在序列预测中:比如自然语言处理,人的话语序有先后,顺序会有不同,同样含义也可能发生很大变化。通过循环结构对于历史数据的记忆功能,能够更好地让计算机去“理解”人类的话。
RNN在普通神经网络结构的基础之上,增加了循环记忆的结构。简单抽象如下:
循环神经网络的隐藏层的值s不仅仅取决于当前这次的输入x,还取决于上一次隐藏层的值s。权重矩阵 W就是隐藏层上一次的值作为这一次的输入的权重。
网络在
t
t
t时刻接收到输入
x
t
x_t
xt之后,隐藏层的值是
s
t
s_t
st,输出值是
o
t
o_t
ot。关键一点是,
o
t
o_t
ot的值不仅仅取决于
s
t
s_t
st,还取决于
s
t
−
1
s_{t-1}
st−1。
为了保证轮子的一致性,我们依然将RNN继承自LayerBase。作为一个神经网络中的角色,其由类初始化、变量初始化、前向传播、反向传播组成:
class RNN(LayerBase):
def __init__(self, ...):
...
def initialize(self, optimizer):
...
def forward(self, X):
...
def backward(self, _grad_sum):
...
在类构造方法中,我们只需要将RNN所需要的东西准备好即可:
def __init__(self, n_units, activation='tanh', bptt_trunc=5, in_channels=None):
self.in_channels = in_channels # 时间+输入通道数
self.n_units = n_units # 隐藏状态个数
self.activation = activation_functions[activation]() # task2 激活函数
self.bptt_trunc = bptt_trunc # 反向传播跨越的时间步长
self.W = None # 前时刻权重
self.V = None # 输出权重
self.U = None # 输入权重
初始化部分只需要将各个变量赋予初值并指定优化器即可。这边使用随机数进行初始化:
def initialize(self, optimizer):
timesteps, input_dim = self.in_channels
limit = 1 / math.sqrt(input_dim)
self.U = np.random.uniform(-limit, limit, (self.n_units, input_dim))
limit = 1 / math.sqrt(self.n_units)
self.V = np.random.uniform(-limit, limit, (input_dim, self.n_units))
self.W = np.random.uniform(-limit, limit, (self.n_units, self.n_units))
self.U_opt = copy.copy(optimizer)
self.V_opt = copy.copy(optimizer)
self.W_opt = copy.copy(optimizer)
前向传播由输入到输出。这边写的代码并没有为反向传播进行支持:
def forward(self, X):
batch_size, timesteps, input_dim = X.shape # (B, T, C)
self.state_input = np.zeros((batch_size, timesteps, self.n_units))
self.states = np.zeros((batch_size, timesteps+1, self.n_units))
self.outputs = np.zeros((batch_size, timesteps, input_dim))
self.states[:, -1] = np.zeros((batch_size, self.n_units))
for t in range(timesteps):
self.state_input[:, t] = X[:, t].dot(self.U.T) + self.states[:, t-1].dot(self.W.T)
self.states[:, t] = self.activation(self.state_input[:, t])
self.outputs[:, t] = self.states[:, t].dot(self.V.T)
return self.outputs
类的整体代码如下(不包括反向传播),可以整合进以前造的轮子中使用:
class RNN(LayerBase):
def __init__(self, n_units, activation='tanh', bptt_trunc=5, in_channels=None):
self.in_channels = in_channels # 时间+输入通道数
self.n_units = n_units # 隐藏状态个数
self.activation = activation_functions[activation]() # task2 激活函数
self.bptt_trunc = bptt_trunc # 反向传播跨越的时间步长
self.W = None # 前时刻权重
self.V = None # 输出权重
self.U = None # 输入权重
def initialize(self, optimizer):
timesteps, input_dim = self.in_channels
limit = 1 / math.sqrt(input_dim)
self.U = np.random.uniform(-limit, limit, (self.n_units, input_dim))
limit = 1 / math.sqrt(self.n_units)
self.V = np.random.uniform(-limit, limit, (input_dim, self.n_units))
self.W = np.random.uniform(-limit, limit, (self.n_units, self.n_units))
self.U_opt = copy.copy(optimizer)
self.V_opt = copy.copy(optimizer)
self.W_opt = copy.copy(optimizer)
def forward(self, X):
batch_size, timesteps, input_dim = X.shape # (B, T, C)
self.state_input = np.zeros((batch_size, timesteps, self.n_units))
self.states = np.zeros((batch_size, timesteps+1, self.n_units))
self.outputs = np.zeros((batch_size, timesteps, input_dim))
self.states[:, -1] = np.zeros((batch_size, self.n_units))
for t in range(timesteps):
self.state_input[:, t] = X[:, t].dot(self.U.T) + self.states[:, t-1].dot(self.W.T)
self.states[:, t] = self.activation(self.state_input[:, t])
self.outputs[:, t] = self.states[:, t].dot(self.V.T)
return self.outputs
这边使用如下数据进行测试:
测试代码如下:
input = np.array(
[[[1.,1.],
[1.,1.],
[2.,2.]]]
)
rnn = RNN(2, 'tanh',5,(1,2))
rnn.initialize('')
rnn.U = np.ones(rnn.U.shape)
rnn.V = np.ones(rnn.V.shape)
rnn.W = np.ones(rnn.W.shape)
rnn.forward(input)
输出结果如下:
由于torch中有相对应的算子,因此实现非常简单,这边直接使用如下代码:
import torch
batch_size = 1
seq_len = 3 # 序列长度
input_size = 2 # 输入序列维度
hidden_size = 2 # 隐藏层维度
output_size = 2 # 输出层维度
# RNNCell
cell = torch.nn.RNNCell(input_size=input_size, hidden_size=hidden_size)
# 初始化参数 https://zhuanlan.zhihu.com/p/342012463
for name, param in cell.named_parameters():
if name.startswith("weight"):
torch.nn.init.ones_(param)
else:
torch.nn.init.zeros_(param)
# 线性层
liner = torch.nn.Linear(hidden_size, output_size)
liner.weight.data = torch.Tensor([[1, 1], [1, 1]])
liner.bias.data = torch.Tensor([0.0])
seq = torch.Tensor([[[1, 1]],
[[1, 1]],
[[2, 2]]])
hidden = torch.zeros(batch_size, hidden_size)
output = torch.zeros(batch_size, output_size)
for idx, input in enumerate(seq):
print('=' * 20, idx, '=' * 20)
print('Input :', input)
print('hidden :', hidden)
hidden = cell(input, hidden)
output = liner(hidden)
print('output :', output)
输出结果如下:
Seq2Seq 是一种重要的 RNN 模型,也称为 Encoder-Decoder 模型,可以理解为一种 N×M的模型。模型包含两个部分:Encoder 用于编码序列的信息,将任意长度的序列信息编码到一个向量 c 里。而 Decoder 是解码器,解码器得到上下文信息向量 c 之后可以将信息解码,并输出为序列。
这边给出一张直观的Seq2Seq结构图:
Encoder 的 RNN 接受输入 x,最终输出一个编码所有信息的上下文向量 c,中间的神经元没有输出。Decoder 主要传入的是上下文向量 c,然后解码出需要的信息。
Decoder有多种结构,这边只介绍一种:将上下文向量 c当成是 RNN 的初始隐藏状态,输入到 RNN 中,后续只接受上一个神经元的隐藏层状态 h’ 而不接收其他的输入 x。
class E_D(torch.nn.Module):
def __init__(self, n_class, n_hidden):
super(E_D, self).__init__()
self.encoder = torch.nn.RNN(input_size=n_class, hidden_size=n_hidden)
self.decoder = torch.nn.RNN(input_size=n_class, hidden_size=n_hidden)
self.fc = torch.nn.Linear(n_hidden, n_class)
def forward(self, inp, hid, inp2): # 编码器输入,隐藏层,解码器输入
enc_input = inp.transpose(0, 1)
dec_input = inp2.transpose(0, 1)
_, h_t = self.encoder(enc_input, hid)
outputs,_=self.decoder(dec_input, h_t)
x = self.fc(outputs)
return h_t,,x
通过本次作业,我们了解了简单的RNN构成、特点、原理和实现方式。通过自己造轮子进行测试对比,我们对RNN的理解也进一步加深。在接下来的实验中,我们将重点探索并掌握循环神经网络中的各个组成部分。