• sam_out 应用到时序分类


    import math
    from glob import glob
    
    import numpy as np
    import paddle
    import paddle.nn as nn
    import paddle.nn.functional as F
    import pandas as pd
    from tqdm import tqdm
    
    from sklearn.metrics import f1_score,confusion_matrix
    
    
    class MaxState(paddle.nn.Layer):
        def __init__(self, hidden_dim, heads, win):
            super(MaxState, self).__init__()
    
            assert hidden_dim % heads == 0, "Hidden size must be divisible by the number of heads."
    
            self.head_size = hidden_dim // heads
            self.head = paddle.nn.Linear(hidden_dim, hidden_dim, bias_attr=False)
            self.head_num = heads
            self.win = win
            self.hidden = hidden_dim
            self.mask = paddle.triu(paddle.ones([win, win]))
    
        def forward(self, input_data, state=None):
            b, s, k, h, w = input_data.shape[0], input_data.shape[1], self.head_num, self.head_size, self.win
    
            window = paddle.ones([1, w])
    
            out = self.head(input_data)
    
            out = out.unsqueeze(-1) @ window
    
            out = out.transpose([0, 2, 1, 3])
    
            one_list = []
            if state is None:
                state = paddle.ones([out.shape[0], out.shape[1], 1, 1]) * float("-inf")
            for i in range(0, s, w):
                j = w + i
                one = out[:, :, i:j]
                _, _, r, c = one.shape
                if r != self.win:
    
                    one = paddle.where(self.mask[:r, :], one, paddle.to_tensor(-float('inf')))
                else:
                    one = paddle.where(self.mask, one, paddle.to_tensor(-float('inf')))
    
                one = paddle.concat([one, state @ window], axis=2)
                state = paddle.max(one, axis=2, keepdim=True)
                one = state.reshape([b, k, h, w])
                state = state[..., -1:]
                if r != self.win:
                    one = one[..., :r]
    
                one = one.transpose([0, 3, 1, 2])
                one_list.append(one)
            out = paddle.concat(one_list, 1)
            out = out.reshape([b, s, -1])
    
            return out, state
    
    
    
    
    class FeedForward(nn.Layer):
        def __init__(self, hidden_size):
            super(FeedForward, self).__init__()
    
            self.ffn1 = nn.Linear(hidden_size, hidden_size * 2)
            self.ffn2 = nn.Linear(hidden_size * 2, hidden_size)
            self.gate = nn.Linear(hidden_size, hidden_size * 2)
            self.relu = nn.Silu()
    
        def forward(self, x):
            x1 = self.ffn1(x)
            x2 = self.relu(self.gate(x))
            x = x1 * x2
            x = self.ffn2(x)
            return x
    
    
    class RMSNorm(nn.Layer):
        def __init__(self, dim, eps: float = 1e-6):
            super(RMSNorm, self).__init__()
            self.eps = eps
            self.fc = paddle.create_parameter(shape=[dim], dtype='float32',
                                              default_initializer=nn.initializer.Constant(value=1.0))
    
        def norm(self, x):
            return x * paddle.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps)
    
        def forward(self, x):
            output = self.norm(x)
    
            return output * self.fc
    
    
    class GPTDecoderLayer(nn.Layer):
        def __init__(self, hidden_size, num_heads):
            super(GPTDecoderLayer, self).__init__()
            # self.self_attention = MaskMultiHeadAttention(hidden_size, num_heads)
            self.self_attention = MaxState(hidden_size, num_heads, 8)
            self.ffn = FeedForward(hidden_size)
            self.norm = nn.LayerNorm(hidden_size)
            self.norm1 = RMSNorm(hidden_size)
    
        def forward(self, x, state=None, seq_len=None):
            x1, state = self.self_attention(x, state)  # Self-Attention with residual connection
            x = x1 + x
            x = self.norm(x)
    
            x = self.ffn(x) + x  # Feed-Forward with residual connection
            x = self.norm1(x)
            return x, state
    
    
    class PositionalEncoding(nn.Layer):
        def __init__(self, d_model, max_len=5000):
            super(PositionalEncoding, self).__init__()
            # Create a long enough Paddle array to hold position encodings for the maximum sequence length
            position = paddle.arange(max_len).unsqueeze(1).astype("float32")
            # Create a constant 'pe' matrix with the same size as the embedding matrix
            div_term = paddle.exp(paddle.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
            pe = paddle.zeros([max_len, d_model])
            pe[:, 0::2] = paddle.sin(position * div_term)
            pe[:, 1::2] = paddle.cos(position * div_term)
            self.pe = pe.unsqueeze(0)  # Shape: [1, max_len, d_model]
            # Register 'pe' as a buffer (non-trainable parameter)
    
        def forward(self, x, seq_len=None):
            # x is of shape [batch_size, seq_len, d_model]
    
            if seq_len is None:
                seq_len = x.shape[1]
                return x + self.pe[:, :seq_len, :]
            else:
                return x + self.pe[:, seq_len - 1:seq_len, :]
    
    
    # %%
    
    def sinusoidal_position_embedding(max_len, output_dim):
        # (max_len, 1)
        position = paddle.arange(0, max_len, dtype="float32").unsqueeze(-1)
        # (output_dim//2)
        ids = paddle.arange(0, output_dim // 2, dtype="float32")  # 即公式里的i, i的范围是 [0,d/2]
        theta = 10000 ** (-2 * ids / output_dim)
        # (max_len, output_dim//2)
        embeddings = position * theta  # 即公式里的:pos / (10000^(2i/d))
        sin_embeddings = paddle.sin(embeddings)
        cos_embeddings = paddle.cos(embeddings)
        return sin_embeddings, cos_embeddings
    
    
    def rope(q, sin_em, cos_em, seq_len=None):
        if seq_len is None:
    
            sin_em = sin_em[:q.shape[2]]
            cos_em = cos_em[:q.shape[2]]
    
        else:
            sin_em = sin_em[seq_len - 1:seq_len]
            cos_em = cos_em[seq_len - 1:seq_len]
    
        q1 = q.reshape([q.shape[0], q.shape[1], q.shape[2], -1, 2])[..., 1]
        q2 = q.reshape([q.shape[0], q.shape[1], q.shape[2], -1, 2])[..., 0]
        # 奇数负值*sin_em+偶数正值*cos_em  奇数正值*cos_em+偶数正值*sin_em
    
        q3 = paddle.stack([-q1 * sin_em + q2 * cos_em, q1 * cos_em + q2 * sin_em], -1)
        q = q3.reshape(q.shape)  # reshape后就是正负交替了
        return q
    
    
    class CvEm(nn.Layer):
        def __init__(self, hidden_size):
            super(CvEm, self).__init__()
            self.embedding = nn.Conv1D(3, hidden_size, 3, padding=2)
    
        def forward(self, x):
            x = self.embedding(x)
            return x.transpose([0, 2, 1])
    
    
    class GPT(nn.Layer):
        def __init__(self, vocab_size, hidden_size, num_heads, num_layers):
            super(GPT, self).__init__()
            self.embedding = CvEm(hidden_size)
    
            self.decoder_layers = nn.LayerList([GPTDecoderLayer(hidden_size, num_heads) for _ in range(num_layers)])
            self.fc = nn.Linear(hidden_size, vocab_size, bias_attr=False)
            self.sin_em, self.cos_em = sinusoidal_position_embedding(50000, hidden_size // num_heads // 2)
    
            self.layer_nor = paddle.nn.LayerNorm(hidden_size)
    
        def forward(self, x, state=None, seq_len=None):
            x = self.embedding(x)
            # x = self.position_embedding(x, seq_len)
    
            if state is None:
                state = [None] * len(self.decoder_layers)
    
            i = 0
            x = rope(x.reshape([x.shape[0], x.shape[1], -1, self.sin_em.shape[1] * 2]).transpose([0, 2, 1, 3]),
                     self.sin_em,
                     self.cos_em, seq_len).transpose([0, 2, 1, 3]).reshape(x.shape) + x
            for decoder_layer in self.decoder_layers:
                x1, state[i] = decoder_layer(x, state[i])
                x = x1 + x
                i += 1
    
            out = self.fc(self.layer_nor(paddle.max(x, 1)))
            return out, state
    
    

    这段代码实现了一个基于PaddlePaddle的GPT(Generative Pre-trained Transformer)模型。主要包括以下几个部分:

    1. 引入依赖库:引入了一些需要使用的库,包括math、glob、numpy、paddle等。

    2. 定义MaxState类:这是一个自定义的PaddlePaddle层,用于计算输入数据的最大状态。它使用了自注意力机制(self-attention)和位置编码(positional encoding)来计算输入数据的最大状态。

    3. 定义FeedForward类:这是一个前馈神经网络层,用于对输入数据进行非线性变换。

    4. 定义RMSNorm类:这是一个归一化层,用于对输入数据进行归一化处理。

    5. 定义GPTDecoderLayer类:这是一个GPT解码器层,包括自注意力机制、前馈神经网络和归一化层。

    6. 定义PositionalEncoding类:这是一个位置编码层,用于为输入数据添加位置信息。

    7. 定义sinusoidal_position_embedding函数:这是一个用于生成正弦位置编码和余弦位置编码的函数。

    8. 定义rope函数:这是一个用于将输入数据与位置编码相结合的函数。

    9. 定义CvEm类:这是一个卷积神经网络层,用于将输入数据进行卷积操作。

    10. 定义GPT类:这是一个GPT模型的定义,包括嵌入层、解码器层和全连接层。

    11. forward函数:这是GPT模型的前向传播函数,用于计算输出结果。

    总体来说,这段代码实现了一个基于PaddlePaddle的GPT模型,并提供了相应的层和函数用于构建和训练模型。

  • 相关阅读:
    Java项目依赖释放模式
    【Hive SQL 每日一题】环比增长率、环比增长率、复合增长率
    ☕ Java IO 技术
    JVM虚拟机详解
    java版 Spring Cloud+uniapp b2b2c o2o 多商家入驻商城 直播带货商城 电子商务
    精通 VS 调试技巧,学习与工作效率翻倍!
    PCL 计算USC(UniqueShapeContext)特征描述子
    MATLAB实现AHP层次分析法——以情人节选取礼物为例
    java面试题:java中的单例设计模式及两种实现方法的代码举例
    NFT游戏有哪些?盘点当前热门的NFT游戏
  • 原文地址:https://blog.csdn.net/weixin_32759777/article/details/139729151