• 从零实现深度学习框架——LSTM从理论到实战【实战】


    引言

    本着“凡我不能创造的,我就不能理解”的思想,本系列文章会基于纯Python以及NumPy从零创建自己的深度学习框架,该框架类似PyTorch能实现自动求导。

    要深入理解深度学习,从零开始创建的经验非常重要,从自己可以理解的角度出发,尽量不使用外部完备的框架前提下,实现我们想要的模型。本系列文章的宗旨就是通过这样的过程,让大家切实掌握深度学习底层实现,而不是仅做一个调包侠。

    上篇文章中,我们学习了LSTM的理论部分。并且在RNN的实战部分,我们看到了如何实现多层RNN和双向RNN。同样,这里实现的LSTM也支持多层和双向。

    LSTMCell

    class LSTMCell(Module):
        def __init__(self, input_size, hidden_size: int, bias: bool = True) -> None:
            super(LSTMCell, self).__init__()
            # 组合了 x->input gate; x-> forget gate; x-> g ; x-> output gate 的线性转换
            self.input_trans = Linear(hidden_size, 4 * hidden_size, bias=bias)
            # 组合了 h->input gate; h-> forget gate; h-> g ; h-> output gate 的线性转换
            self.hidden_trans = Linear(input_size, 4 * hidden_size, bias=bias)
    
        def forward(self, x: Tensor, h: Tensor, c: Tensor) -> Tuple[Tensor, Tensor]:
            # i: input gate
            # f: forget gate
            # o: output gate
            # g: g_t
            ifgo = self.input_trans(h) + self.hidden_trans(x)
            ifgo = F.chunk(ifgo, 4, -1)
            # 一次性计算三个门 与 g_t
            i, f, g, o = ifgo
    
            c_next = F.sigmoid(f) * c + F.sigmoid(i) * F.tanh(g)
    
            h_next = F.sigmoid(o) * F.tanh(c_next)
    
            return h_next, c_next
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    在实现上参考了最后面的参考文章,将xh相关的线性变换分开:
    i t = Linear x i ( x t ) + Linear h i ( h t − 1 ) f t = Linear x f ( x t ) + Linear h f ( h t − 1 ) g t = Linear x g ( x t ) + Linear h g ( h t − 1 ) o t = Linear x o ( x t ) + Linear h o ( h t − 1 ) (1)

    it=Linearxi(xt)+Linearhi(ht1)ft=Linearxf(xt)+Linearhf(ht1)gt=Linearxg(xt)+Linearhg(ht1)ot=Linearxo(xt)+Linearho(ht1)
    \tag{1} itftgtot=Linearxi(xt)+Linearhi(ht1)=Linearxf(xt)+Linearhf(ht1)=Linearxg(xt)+Linearhg(ht1)=Linearxo(xt)+Linearho(ht1)(1)
    比如公式 ( 3 ) (3) (3) i t = σ ( U i h t − 1 + W i x t ) i_t = \sigma(U_ih_{t-1} + W_ix_t) it=σ(Uiht1+Wixt) σ \sigma σ的参数部分可以变为:
    U i h t − 1 + W i x t ⇒ i t = Linear x i ( x t ) + Linear h i ( h t − 1 ) U_ih_{t-1} + W_ix_t \Rightarrow i_t = \text{Linear}^i_x(x_t) + \text{Linear}^i_h(h_{t-1}) Uiht1+Wixtit=Linearxi(xt)+Linearhi(ht1)
    然后我们可以组合类似的线性变换,比如组合 x t x_t xt相关的线性变换: Linear x i ( x t ) , Linear x f ( x t ) , Linear x g ( x t ) , Linear x o ( x t ) \text{Linear}^i_x(x_t),\text{Linear}^f_x(x_t),\text{Linear}^g_x(x_t),\text{Linear}^o_x(x_t) Linearxi(xt),Linearxf(xt),Linearxg(xt),Linearxo(xt)为:

    self.input_trans = Linear(hidden_size, 4 * hidden_size, bias=bias)
    
    • 1

    类似地,hidden_trans同理。这样做的原因是为了加快运算速度,我们只需要做一次线性运算,就可以得到四个结果。

    通过

    ifgo = self.input_trans(h) + self.hidden_trans(x) \tag 2
    
    • 1

    得到了这四个重要值的结果,但它们拼接到了一个张量中。然后利用

    ifgo = F.chunk(ifgo, 4, -1)
    
    • 1

    将它们拆分成包含四个值的元组,

    i, f, g, o = ifgo
    
    • 1

    这样我们就得到了相应函数里面的参数值,加上对应的Sigmoid函数或Tanh函数就可以得到我们想要的值。

    下一步先计算 c t c_t ct
    c t = σ ( f t ) ⊙ c t − 1 + σ ( i t ) ⊙ tanh ⁡ ( g t ) (3) c_t = \sigma(f_t) \odot c_{t-1} + \sigma(i_t) \odot \tanh(g_t) \tag 3 ct=σ(ft)ct1+σ(it)tanh(gt)(3)
    对应代码:

    c_next = F.sigmoid(f) * c + F.sigmoid(i) * F.tanh(g)
    
    • 1

    然后根据 o t o_t ot c t c_t ct计算隐藏状态 h t h_t ht
    h t = σ ( o t ) ⊙ tanh ⁡ ( c t ) (4) h_t = \sigma(o_t) \odot \tanh(c_t) \tag 4 ht=σ(ot)tanh(ct)(4)
    对应代码:

    h_next = F.sigmoid(o) * F.tanh(c_next)
    
    • 1

    LSTM

    有了LSTMCell就可以实现完整的LSTM了。

    class LSTM(Module):
        def __init__(self, input_size: int, hidden_size: int, batch_first: bool = False, num_layers: int = 1,
                     bidirectional: bool = False, dropout: float = 0):
            super(LSTM, self).__init__()
            self.num_layers = num_layers
            self.hidden_size = hidden_size
            self.batch_first = batch_first
            self.bidirectional = bidirectional
    
            # 支持多层
            self.cells = ModuleList([LSTMCell(input_size, hidden_size)] +
                                    [LSTMCell(hidden_size, hidden_size) for _ in range(num_layers - 1)])
    
            if self.bidirectional:
                # 支持双向
                self.back_cells = copy.deepcopy(self.cells)
    
            self.dropout = dropout
            if dropout:
                # Dropout层
                self.dropout_layer = Dropout(dropout)
    
        def _one_directional_op(self, input, cells, n_steps, hs, cs, reverse=False):
            '''
    
            Args:
                input: 输入 [n_steps, batch_size, input_size]
                cells: 正向或反向RNNCell的ModuleList
                hs:
                cs:
                n_steps: 步长
                reverse: true 反向
    
            Returns:
    
            '''
            output = []
            for t in range(n_steps):
                inp = input[t]
    
                for layer in range(self.num_layers):
                    hs[layer], cs[layer] = cells[layer](inp, hs[layer], cs[layer])
                    inp = hs[layer]
                    if self.dropout and layer != self.num_layers - 1:
                        inp = self.dropout_layer(inp)
    
                # 收集最终层的输出
                output.append(hs[-1])
    
            output = F.stack(output)  # (n_steps, batch_size, num_directions * hidden_size)
    
            if reverse:
                output = F.flip(output, 0)  # 将输出时间步维度逆序,使得时间步t=0上,是看了整个序列的结果。
    
            if self.batch_first:
                output = output.transpose((1, 0, 2))
    
            h_n = F.stack(hs)
            c_n = F.stack(cs)
    
            return output, (h_n, c_n)
    
        def forward(self, input: Tensor, state: Optional[Tuple[Tensor, Tensor]] = None):
            '''
    
            Args:
                input: 形状 [n_steps, batch_size, input_size] 若batch_first=False ;否则形状 [batch_size, n_steps, input_size]
                state: 元组(h,c)
    
                num_directions = 2 if self.bidirectional else 1
    
                h: [num_directions * num_layers, batch_size, hidden_size]
                c: [num_directions * num_layers, batch_size, hidden_size]
    
            Returns:
                num_directions = 2 if self.bidirectional else 1
    
                output: (n_steps, batch_size, num_directions * hidden_size)若batch_first=False 或
                        (batch_size, n_steps, num_directions * hidden_size)若batch_first=True
                        包含每个时间步最后一层(多层RNN)的输出h_t
                h_n: (num_directions * num_layers, batch_size, hidden_size) 包含最终隐藏状态
                c_n: (num_directions * num_layers, batch_size, hidden_size) 包含最终隐藏状态
            '''
    
            h_0, c_0 = None, None
            if state is not None:
                h_0, c_0 = state
    
            is_batched = input.ndim == 3
            batch_dim = 0 if self.batch_first else 1
            if not is_batched:
                # 转换为批大小为1的输入
                input = input.unsqueeze(batch_dim)
                if state is not None:
                    h_0 = h_0.unsqueeze(1)
                    c_0 = c_0.unsqueeze(1)
    
            if self.batch_first:
                batch_size, n_steps, _ = input.shape
                input = input.transpose((1, 0, 2))  # 将batch放到中间维度
            else:
                n_steps, batch_size, _ = input.shape
    
            if state is None:
                num_directions = 2 if self.bidirectional else 1
                h_0 = Tensor.zeros((self.num_layers * num_directions, batch_size, self.hidden_size), dtype=input.dtype,
                                   device=input.device)
                c_0 = Tensor.zeros((self.num_layers * num_directions, batch_size, self.hidden_size), dtype=input.dtype,
                                   device=input.device)
    
            # 得到每层的状态
            hs, cs = list(F.split(h_0)), list(F.split(c_0))
    
            if not self.bidirectional:
                # 如果是单向的
                output, (h_n, c_n) = self._one_directional_op(input, self.cells, n_steps, hs, cs)
            else:
                output_f, (h_n_f, c_n_f) = self._one_directional_op(input, self.cells, n_steps, hs[:self.num_layers],
                                                                    cs[:self.num_layers])
    
                output_b, (h_n_b, c_n_b) = self._one_directional_op(F.flip(input, 0), self.back_cells, n_steps,
                                                                    hs[self.num_layers:], cs[self.num_layers:],
                                                                    reverse=True)
    
                output = F.cat([output_f, output_b], 2)
                h_n = F.cat([h_n_f, h_n_b], 0)
                c_n = F.cat([c_n_f, c_n_b], 0)
    
            return output, (h_n, c_n)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130

    在多层和双向的实现上和RNN基本一样,其输入输出有些不同。这是由LSTM的架构决定的。

    词性标注实战

    基于我们上面实现的LSTM来实现该词性标注分类模型,这里同样也叫LSTM:

    class LSTM(nn.Module):
        def __init__(self, vocab_size: int, embedding_dim: int, hidden_dim: int, output_dim: int, n_layers: int,
                     dropout: float, bidirectional: bool = False):
            super(LSTM, self).__init__()
            self.embedding = nn.Embedding(vocab_size, embedding_dim)
            self.rnn = nn.LSTM(embedding_dim, hidden_dim, batch_first=True, num_layers=n_layers, dropout=dropout,
                               bidirectional=bidirectional)
    
            num_directions = 2 if bidirectional else 1
            self.output = nn.Linear(num_directions * hidden_dim, output_dim)
    
        def forward(self, input: Tensor, hidden: Tensor = None) -> Tensor:
            embeded = self.embedding(input)
            output, _ = self.rnn(embeded, hidden)  # pos tag任务利用的是包含所有时间步的output
            outputs = self.output(output)
            log_probs = F.log_softmax(outputs, axis=-1)
            return log_probs
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    过程和RNN中的类似,训练代码如下:

    embedding_dim = 128
    hidden_dim = 128
    batch_size = 32
    num_epoch = 10
    n_layers = 2
    dropout = 0.2
    
    # 加载数据
    train_data, test_data, vocab, pos_vocab = load_treebank()
    train_dataset = RNNDataset(train_data)
    test_dataset = RNNDataset(test_data)
    train_data_loader = DataLoader(train_dataset, batch_size=batch_size, collate_fn=train_dataset.collate_fn, shuffle=True)
    test_data_loader = DataLoader(test_dataset, batch_size=batch_size, collate_fn=test_dataset.collate_fn, shuffle=False)
    
    num_class = len(pos_vocab)
    
    # 加载模型
    device = cuda.get_device("cuda:0" if cuda.is_available() else "cpu")
    model = LSTM(len(vocab), embedding_dim, hidden_dim, num_class, n_layers, dropout, bidirectional=True)
    model.to(device)
    
    # 训练过程
    nll_loss = NLLLoss()
    optimizer = SGD(model.parameters(), lr=0.1)
    
    model.train()  # 确保应用了dropout
    for epoch in range(num_epoch):
        total_loss = 0
        for batch in tqdm(train_data_loader, desc=f"Training Epoch {epoch}"):
            inputs, targets, mask = [x.to(device) for x in batch]
            log_probs = model(inputs)
            loss = nll_loss(log_probs[mask], targets[mask])  # 通过bool选择,mask部分不需要计算
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Loss: {total_loss:.2f}")
    
    # 测试过程
    acc = 0
    total = 0
    model.eval()  # 不需要dropout
    for batch in tqdm(test_data_loader, desc=f"Testing"):
        inputs, targets, mask = [x.to(device) for x in batch]
        with no_grad():
            output = model(inputs)
            acc += (output.argmax(axis=-1).data == targets.data)[mask.data].sum().item()
            total += mask.sum().item()
    
    # 输出在测试集上的准确率
    print(f"Acc: {acc / total:.2f}")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    输出:

    Loss: 102.51
    Acc: 0.70
    
    • 1
    • 2

    同样的配置,测试集上的准确率也是70%,难道需要更多的批次了么。

    这里为了演示,只训练了10个批次。

    参考

    1. Long Short-Term Memory (LSTM)
  • 相关阅读:
    C++系列之list的模拟实现
    美日两国利差或继续增大 美元兑日元势创24年新高?
    基于ISO14229协议的单帧以及多帧Can发送代码
    Group velocity and phase velocity(群速度与相速度)
    MFC自定义消息的实现方法----(线程向主对话框发送消息)、MFC不能用UpdateData的解决方法
    【Python】论文中常用的Matplotlib画图(三)
    EasyX图形库的下载安装与Dev-C++配置
    00 vue部分补充
    【阅读论文】-- IDmvis:面向1型糖尿病治疗决策支持的时序事件序列可视化
    linux基本指令(上)
  • 原文地址:https://blog.csdn.net/yjw123456/article/details/125600972