• 时间序列预测(9) — Informer源码详解与运行


    目录

    1 源码解析

    1.1 文件结构

    1.2 mian_informer.py文件

    1.3 模型训练

    1.4 模型测试

    1.5 模型预测

    2 Informer模型

    2.1 process_one_batch

    2.2 Informer函数

    2.3 DataEmbedding函数

    2.4 ProbAttention稀疏注意力机制

    2.5 Encoder编码器函数

    2.6 Decoder解码器函数

    3 官方数据集运行


    1 源码解析

    1.1 文件结构

    1.2 mian_informer.py文件

    首先导入代码的基本函数和数据类型,在exp文件下exp_informer import中的类Exp_Informer中,定义了模型参数、get_data、model_optim、train、test、eval等函数

    1. # 继承了exp文件下exp_informer import中的Exp_Informer
    2. from exp.exp_informer import Exp_Informer

    进一步解析模型需要的参数,参数的含义如下表格:

    参数名称    参数类型    参数讲解
    model  str  这是一个用于实验的参数设置,其中包含了三个选项: informer, informerstack, informerlight。根据实验需求,可以选择其中之一来进行实验,默认是使用informer模型。
    datastr    数据,这个并不是你理解的你的数据集文件,而是你想要用官方定义的方法还是你自己的数据集进行定义数据加载器,如果是自己的数据集就输入custom
    root_pathstr这个才是你文件的路径,不要到具体的文件,到目录级别即可。
    data_pathstr 这个填写你文件的名称。
    features str    这个是特征有三个选项M,MS,S。分别是多元预测多元,多元预测单元,单元预测单元。
    target str    这个是你数据集中你想要预测那一列数据,假设我预测的是油温OT列就输入OT即可。
    freq str 时间的间隔,你数据集每一条数据之间的时间间隔。
    checkpointsstr训练出来的模型保存路径
    seq_lenint用过去的多少条数据来预测未来的数据
    label_lenint 可以裂解为更高的权重占比的部分要小于seq_len
    pred_lenint  预测未来多少个时间点的数据
     
    enc_in    int  你数据有多少列,要减去时间那一列,这里我是输入8列数据但是有一列是时间所以就填写7
    dec_inint同上
    c_outint 这里有一些不同如果你的features填写的是M那么和上面就一样,如果填写的MS那么这里要输入1因为你的输出只有一列数据。
    d_modelint用于设置模型的维度,默认值为512。可以根据需要调整该参数的数值来改变模型的维度
    n_heads  
     
    int  用于设置模型中的注意力头数。默认值为8,表示模型会使用8个注意力头,我建议和的输入数据的总体保持一致,列如我输入的是8列数据不用刨去时间的那一列就输入8即可。
    e_layersint    
     
    用于设置编码器的层数
    d_layers 
     
    int  用于设置解码器的层数
    s_layersstr    用于设置堆叠编码器的层数
    d_ff 
     
    int模型中全连接网络(FCN)的维度,默认值为2048
    factorint    ProbSparse自注意力中的因子,默认值为5
     
    paddingint填充类型,默认值为0,这个应该大家都理解,如果不够数据就填写0.
     
    distil
     
    bool 是否在编码器中使用蒸馏操作。使用--distil参数表示不使用蒸馏操作,默认为True也是我们的论文中比较重要的一个改进。
    dropoutfloat这个应该都理解不说了,丢弃的概率,防止过拟合的。
     attnstr
     
    编码器中使用的注意力类型,默认为"prob"我们论文的主要改进点,提出的注意力机制。
    embed

     
    str   时间特征的编码方式,默认为"timeF"
    activation  str  激活函数
    output_attention    
     
    bool    是否在编码器中输出注意力,默认为False
    do_predict    bool是否进行预测,这里模型中没有给添加算是一个小bug我们需要填写一个default=True在其中。
    mix
     
    bool 在生成式解码器中是否使用混合注意力,默认为True
    cols 
     
    str从数据文件中选择特定的列作为输入特征,应该用不到
    num_workers    
     
    int线程windows大家最好设置成0否则会报线程错误,linux系统随便设置。
    itrint 
     
    实验运行的次数,默认为2,我们这里改成数字1.
    train_epochs  
     
    int训练的次数
    batch_size    int  一次往模型力输入多少条数据
    patience
     
    int 早停机制,如果损失多少个epochs没有改变就停止训练。
    learning_rate    
     
    float学习率。
    des 
     
    str实验描述,默认为"test"
    loss 
     
    str损失函数,默认为"mse"
     lradjstr学习率的调整方式,默认为"type1"
    use_amp    
     
    bool   混合精度训练,
    inverse 
     
    bool我们的数据输入之前会被进行归一化处理,这里默认为False,算是一个小bug,因为输出的数据模型没有给我们转化成我们的数据,我们要改成True。
    use_gpu    
     
    bool是否使用GPU训练,根据自身来选择
    gpu 
     
    int  GPU的编号
    use_multi_gpu  
     
    bool    是否使用多个GPU训练。
    devices str    GPU的编号

    接下来判断是否使用GPU设备进行训练

    1. # 判断是否使用GPU设备进行训练
    2. args.use_gpu = True if torch.cuda.is_available() and args.use_gpu else False
    3. if args.use_gpu and args.use_multi_gpu:
    4. args.devices = args.devices.replace(' ','')
    5. device_ids = args.devices.split(',')
    6. args.device_ids = [int(id_) for id_ in device_ids]
    7. args.gpu = args.device_ids[0]

    然后解析数据集的信息,字典data_parser中包含了不同数据集的信息,键值为数据集名称('ETTh1'等),对应一个包含.csv数据文件名。接着遍历字典data_parser,将数据信息存储在data_info变量中,并将相关信息存储在args中。最后将将args.s_layers中的字符串转换为整数列表。

    1. ## 解析数据集的信息 ##
    2. # 字典data_parser中包含了不同数据集的信息,键值为数据集名称('ETTh1'等),对应一个包含.csv数据文件名
    3. # 目标特征、M、S和MS等参数的字典
    4. data_parser = {
    5. 'ETTh1':{'data':'ETTh1.csv','T':'OT','M':[7,7,7],'S':[1,1,1],'MS':[7,7,1]},
    6. 'ETTh2':{'data':'ETTh2.csv','T':'OT','M':[7,7,7],'S':[1,1,1],'MS':[7,7,1]},
    7. 'ETTm1':{'data':'ETTm1.csv','T':'OT','M':[7,7,7],'S':[1,1,1],'MS':[7,7,1]},
    8. 'ETTm2':{'data':'ETTm2.csv','T':'OT','M':[7,7,7],'S':[1,1,1],'MS':[7,7,1]},
    9. 'WTH':{'data':'WTH.csv','T':'WetBulbCelsius','M':[12,12,12],'S':[1,1,1],'MS':[12,12,1]},
    10. 'ECL':{'data':'ECL.csv','T':'MT_320','M':[321,321,321],'S':[1,1,1],'MS':[321,321,1]},
    11. 'Solar':{'data':'solar_AL.csv','T':'POWER_136','M':[137,137,137],'S':[1,1,1],'MS':[137,137,1]},
    12. }
    13. # 遍历字典data_parser
    14. # 将数据信息存储在data_info变量中,并将相关信息存储在args中
    15. # data_path:数据存放路径
    16. # target:标签,也就是需要预测的值
    17. # enc_in, args.dec_in, args.c_out为特征
    18. if args.data in data_parser.keys():
    19. data_info = data_parser[args.data]
    20. args.data_path = data_info['data']
    21. args.target = data_info['T']
    22. args.enc_in, args.dec_in, args.c_out = data_info[args.features]
    23. # 首先将args.s_layers中的字符串转换为整数列表
    24. # 它首先使用replace函数去掉空格,然后使用split函数根据逗号分隔字符串,并使用列表推导式将分割后的字符串转换为整数列表。
    25. args.s_layers = [int(s_l) for s_l in args.s_layers.replace(' ','').split(',')]
    26. # 参数的赋值
    27. args.detail_freq = args.freq
    28. args.freq = args.freq[-1:]
    29. # 打印参数信息。
    30. print('Args in experiment:')
    31. print(args)

    最后声明Informer模型,开始训练、验证和预测,最后清楚GPU缓存。

    1. # 声明Informer模型对象
    2. Exp = Exp_Informer
    3. # 开始遍历循环训练args.itr次
    4. for ii in range(args.itr):
    5. # setting record of experiments
    6. setting = '{}_{}_ft{}_sl{}_ll{}_pl{}_dm{}_nh{}_el{}_dl{}_df{}_at{}_fc{}_eb{}_dt{}_mx{}_{}_{}'.format(args.model, args.data, args.features,
    7. args.seq_len, args.label_len, args.pred_len,
    8. args.d_model, args.n_heads, args.e_layers, args.d_layers, args.d_ff, args.attn, args.factor,
    9. args.embed, args.distil, args.mix, args.des, ii)
    10. exp = Exp(args) # set experiments
    11. # 开始训练
    12. print('>>>>>>>start training : {}>>>>>>>>>>>>>>>>>>>>>>>>>>'.format(setting))
    13. exp.train(setting)
    14. # 开始测试
    15. print('>>>>>>>testing : {}<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<'.format(setting))
    16. exp.test(setting)
    17. # 开始预测
    18. if args.do_predict:
    19. print('>>>>>>>predicting : {}<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<'.format(setting))
    20. exp.predict(setting, True)
    21. # 清空了GPU缓存,以释放内存。
    22. torch.cuda.empty_cache()

    1.3 模型训练

    train代码主要实现了Informer模型的训练过程,包括数据加载、模型训练、损失计算、学习率调整和提前停止训练过程。

    1. ## 训练函数 ##
    2. def train(self, setting):
    3. # 获取训练、验证、测试数据和加载器。
    4. train_data, train_loader = self._get_data(flag = 'train')
    5. vali_data, vali_loader = self._get_data(flag = 'val')
    6. test_data, test_loader = self._get_data(flag = 'test')
    7. # 创建路径用于保存训练过程中的信息
    8. path = os.path.join(self.args.checkpoints, setting)
    9. if not os.path.exists(path):
    10. os.makedirs(path)
    11. # 记录了当前的时间戳
    12. time_now = time.time()
    13. # 训练数据加载器的步数
    14. train_steps = len(train_loader)
    15. # 提前停止训练机制,如果损失多少个epochs没有改变就停止训练。
    16. early_stopping = EarlyStopping(patience=self.args.patience, verbose=True)
    17. # 获取优化器和损失函数
    18. model_optim = self._select_optimizer()
    19. criterion = self._select_criterion()
    20. # 是否使用混合精度训练,如果使用则创建了一个
    21. if self.args.use_amp:
    22. scaler = torch.cuda.amp.GradScaler()
    23. # 这行代码开始train_epochs次循环训练。
    24. for epoch in range(self.args.train_epochs):
    25. # 初始化迭代计数器和训练损失
    26. iter_count = 0
    27. train_loss = []
    28. # 将模型设置为训练模式
    29. self.model.train()
    30. # 记录每个epoch的训练时间
    31. epoch_time = time.time()
    32. # 遍历训练数据加载器中的每个批次,并进行模型训练
    33. for i, (batch_x,batch_y,batch_x_mark,batch_y_mark) in enumerate(train_loader):
    34. iter_count += 1
    35. # 梯度清零
    36. model_optim.zero_grad()
    37. # 计算模型的预测值和真实值、损失
    38. pred, true = self._process_one_batch(
    39. train_data, batch_x, batch_y, batch_x_mark, batch_y_mark)
    40. loss = criterion(pred, true)
    41. train_loss.append(loss.item())
    42. # 打印当前迭代的信息,包括迭代次数、epoch、损失值、训练速度等
    43. if (i+1) % 100==0:
    44. print("\titers: {0}, epoch: {1} | loss: {2:.7f}".format(i + 1, epoch + 1, loss.item()))
    45. speed = (time.time()-time_now)/iter_count
    46. left_time = speed*((self.args.train_epochs - epoch)*train_steps - i)
    47. print('\tspeed: {:.4f}s/iter; left time: {:.4f}s'.format(speed, left_time))
    48. iter_count = 0
    49. time_now = time.time()
    50. # 根据是否使用混合精度训练,选择不同的梯度更新方式
    51. if self.args.use_amp:
    52. scaler.scale(loss).backward()
    53. scaler.step(model_optim)
    54. scaler.update()
    55. else:
    56. loss.backward()
    57. model_optim.step()
    58. # 计算并打印每个epoch的训练时间、平均训练损失、验证损失和测试损失
    59. print("Epoch: {} cost time: {}".format(epoch+1, time.time()-epoch_time))
    60. train_loss = np.average(train_loss)
    61. vali_loss = self.vali(vali_data, vali_loader, criterion)
    62. test_loss = self.vali(test_data, test_loader, criterion)
    63. print("Epoch: {0}, Steps: {1} | Train Loss: {2:.7f} Vali Loss: {3:.7f} Test Loss: {4:.7f}".format(
    64. epoch + 1, train_steps, train_loss, vali_loss, test_loss))
    65. early_stopping(vali_loss, self.model, path)
    66. if early_stopping.early_stop:
    67. print("Early stopping")
    68. break
    69. # 根据当前epoch的情况,调整学习率
    70. adjust_learning_rate(model_optim, epoch+1, self.args)
    71. # 最后保存最佳模型的参数,并返回该模型
    72. best_model_path = path+'/'+'checkpoint.pth'
    73. self.model.load_state_dict(torch.load(best_model_path))
    74. return self.model

    1.4 模型测试

    test函数主要实现了Informer模型的测试过程,包括测试数据加载、模型测试、损失计算、保存测试结果的过程。

    1. ## 测试函数 ##
    2. def test(self, setting):
    3. # 获取测试数据和测试数据加载器
    4. test_data, test_loader = self._get_data(flag='test')
    5. # 将模型设置为验证模式
    6. self.model.eval()
    7. # 初始化,存储模型的预测值和真实值
    8. preds = []
    9. trues = []
    10. # 遍历测试数据加载器中的每个批次
    11. for i, (batch_x,batch_y,batch_x_mark,batch_y_mark) in enumerate(test_loader):
    12. # 对每个批次的数据进行预测,并将预测值和真实值存储到preds和trues列表中
    13. pred, true = self._process_one_batch(
    14. test_data, batch_x, batch_y, batch_x_mark, batch_y_mark)
    15. preds.append(pred.detach().cpu().numpy())
    16. trues.append(true.detach().cpu().numpy())
    17. # 将列表转换为NumPy数组
    18. preds = np.array(preds)
    19. trues = np.array(trues)
    20. # 这两行代码对预测值和真实值进行形状调整
    21. print('test shape:', preds.shape, trues.shape)
    22. preds = preds.reshape(-1, preds.shape[-2], preds.shape[-1])
    23. trues = trues.reshape(-1, trues.shape[-2], trues.shape[-1])
    24. print('test shape:', preds.shape, trues.shape)
    25. # 保存测试结果
    26. folder_path = './results/' + setting +'/'
    27. if not os.path.exists(folder_path):
    28. os.makedirs(folder_path)
    29. # 计算并打印误差
    30. mae, mse, rmse, mape, mspe = metric(preds, trues)
    31. print('mse:{}, mae:{}'.format(mse, mae))
    32. # 分别将评估指标、预测值和真实值保存为NumPy数组文件
    33. np.save(folder_path+'metrics.npy', np.array([mae, mse, rmse, mape, mspe]))
    34. np.save(folder_path+'pred.npy', preds)
    35. np.save(folder_path+'true.npy', trues)
    36. return

    1.5 模型预测

    predict函数利用Informer模型的最佳训练参数进行预测,包括预测数据加载、模型预测、保存预测结果的过程。

    1. ## 预测函数 ##
    2. def predict(self, setting, load=False):
    3. # 获取预测数据和预测数据加载器
    4. pred_data, pred_loader = self._get_data(flag='pred')
    5. # 加载了最佳模型的参数
    6. if load:
    7. path = os.path.join(self.args.checkpoints, setting)
    8. best_model_path = path+'/'+'checkpoint.pth'
    9. self.model.load_state_dict(torch.load(best_model_path))
    10. # 将模型设置为评估模式
    11. self.model.eval()
    12. # 初始化,存储模型的预测值
    13. preds = []
    14. # 遍历预测数据加载器中的每个批次
    15. for i, (batch_x,batch_y,batch_x_mark,batch_y_mark) in enumerate(pred_loader):
    16. # 每个批次的数据进行预测,并将预测值存储到preds列表中
    17. pred, true = self._process_one_batch(
    18. pred_data, batch_x, batch_y, batch_x_mark, batch_y_mark)
    19. preds.append(pred.detach().cpu().numpy())
    20. # 将列表转换为NumPy数组
    21. preds = np.array(preds)
    22. preds = preds.reshape(-1, preds.shape[-2], preds.shape[-1])
    23. # 保存预测结果
    24. folder_path = './results/' + setting +'/'
    25. if not os.path.exists(folder_path):
    26. os.makedirs(folder_path)
    27. np.save(folder_path+'real_prediction.npy', preds)
    28. return

    2 Informer模型

    2.1 process_one_batch

    前面介绍了训练、测试和预测的的流程,那么每一批次是数据是如何利用Informer模型进行训练的呢?首先看一下每一个批次的训练函数process_one_batch,包括数据加载、数据处理、模型训练、保存预测结果的过程。

    1. ## 每一批次数据训练过程 ##
    2. def _process_one_batch(self, dataset_object, batch_x, batch_y, batch_x_mark, batch_y_mark):
    3. # 将输入数据转换为浮点型,并移到GPU设备上
    4. batch_x = batch_x.float().to(self.device)
    5. batch_y = batch_y.float()
    6. batch_x_mark = batch_x_mark.float().to(self.device)
    7. batch_y_mark = batch_y_mark.float().to(self.device)
    8. # decoder解码器输入
    9. if self.args.padding==0:
    10. # padding=0,创建一个全零的张量作为解码器输入
    11. dec_inp = torch.zeros([batch_y.shape[0], self.args.pred_len, batch_y.shape[-1]]).float()
    12. elif self.args.padding==1:
    13. # padding=1,创建一个全1的张量作为解码器输入
    14. dec_inp = torch.ones([batch_y.shape[0], self.args.pred_len, batch_y.shape[-1]]).float()
    15. # 将目标数据的一部分与解码器输入拼接在一起,就是论文结构图 X_de={X_token,X_0}
    16. dec_inp = torch.cat([batch_y[:,:self.args.label_len,:], dec_inp], dim=1).float().to(self.device)
    17. # encoder - decoder
    18. # 是否启用了混合精度训练
    19. if self.args.use_amp:
    20. with torch.cuda.amp.autocast():
    21. # 根据是否在编码器中输出注意力,进行不同的模型训练
    22. if self.args.output_attention:
    23. outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)[0]
    24. else:
    25. outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)
    26. else:
    27. if self.args.output_attention:
    28. outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)[0]
    29. else:
    30. outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)
    31. # 我们的数据输入之前会被进行归一化处理,这里对预测数据进行还原
    32. if self.args.inverse:
    33. outputs = dataset_object.inverse_transform(outputs)
    34. # 这个features有三个选项M,MS,S。分别是多元预测多元,多元预测单元,单元预测单元
    35. # 根据预测方式调整输出数据
    36. f_dim = -1 if self.args.features=='MS' else 0
    37. batch_y = batch_y[:,-self.args.pred_len:,f_dim:].to(self.device)
    38. return outputs, batch_y

    2.2 Informer函数

    Informer模型主函数进行Encoding -> Attention -> Encoder -> Decoder - > Linear过程,对应论文的流程图。

    1. ## Informer模型主函数 ##
    2. class Informer(nn.Module):
    3. def __init__(self, enc_in, dec_in, c_out, seq_len, label_len, out_len,
    4. factor=5, d_model=512, n_heads=8, e_layers=3, d_layers=2, d_ff=512,
    5. dropout=0.0, attn='prob', embed='fixed', freq='h', activation='gelu',
    6. output_attention = False, distil=True, mix=True,
    7. device=torch.device('cuda:0')):
    8. super(Informer, self).__init__()
    9. self.pred_len = out_len
    10. self.attn = attn
    11. self.output_attention = output_attention
    12. # Encoding
    13. self.enc_embedding = DataEmbedding(enc_in, d_model, embed, freq, dropout)
    14. self.dec_embedding = DataEmbedding(dec_in, d_model, embed, freq, dropout)
    15. # Attention
    16. Attn = ProbAttention if attn=='prob' else FullAttention
    17. # Encoder
    18. self.encoder = Encoder(
    19. [
    20. EncoderLayer(
    21. AttentionLayer(Attn(False, factor, attention_dropout=dropout, output_attention=output_attention),
    22. d_model, n_heads, mix=False),
    23. d_model,
    24. d_ff,
    25. dropout=dropout,
    26. activation=activation
    27. ) for l in range(e_layers)
    28. ],
    29. [
    30. ConvLayer(
    31. d_model
    32. ) for l in range(e_layers-1)
    33. ] if distil else None,
    34. norm_layer=torch.nn.LayerNorm(d_model)
    35. )
    36. # Decoder
    37. self.decoder = Decoder(
    38. [
    39. DecoderLayer(
    40. AttentionLayer(Attn(True, factor, attention_dropout=dropout, output_attention=False),
    41. d_model, n_heads, mix=mix),
    42. AttentionLayer(FullAttention(False, factor, attention_dropout=dropout, output_attention=False),
    43. d_model, n_heads, mix=False),
    44. d_model,
    45. d_ff,
    46. dropout=dropout,
    47. activation=activation,
    48. )
    49. for l in range(d_layers)
    50. ],
    51. norm_layer=torch.nn.LayerNorm(d_model)
    52. )
    53. # self.end_conv1 = nn.Conv1d(in_channels=label_len+out_len, out_channels=out_len, kernel_size=1, bias=True)
    54. # self.end_conv2 = nn.Conv1d(in_channels=d_model, out_channels=c_out, kernel_size=1, bias=True)
    55. self.projection = nn.Linear(d_model, c_out, bias=True)
    56. def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec,
    57. enc_self_mask=None, dec_self_mask=None, dec_enc_mask=None):
    58. enc_out = self.enc_embedding(x_enc, x_mark_enc)
    59. enc_out, attns = self.encoder(enc_out, attn_mask=enc_self_mask)
    60. dec_out = self.dec_embedding(x_dec, x_mark_dec)
    61. dec_out = self.decoder(dec_out, enc_out, x_mask=dec_self_mask, cross_mask=dec_enc_mask)
    62. dec_out = self.projection(dec_out)
    63. # dec_out = self.end_conv1(dec_out)
    64. # dec_out = self.end_conv2(dec_out.transpose(2,1)).transpose(1,2)
    65. if self.output_attention:
    66. return dec_out[:,-self.pred_len:,:], attns
    67. else:
    68. return dec_out[:,-self.pred_len:,:] # [B, L, D]

    2.3 DataEmbedding函数

    一般Transformer框架的第一层都是embedding,把各种特征信息融合在一起,作者从3个角度进行特征融合,并执行两步DataEmbedding操作,对应论文原理图中有两部分输入 X_en 和 X_de:

    1. # 对应论文原理图中有两部分输入 X_en 和 X_de
    2. self.enc_embedding = DataEmbedding(enc_in, d_model, embed, freq, dropout)
    3. self.dec_embedding = DataEmbedding(dec_in, d_model, embed, freq, dropout)

    DataEmbedding操作过程如下:

    1. class DataEmbedding(nn.Module):
    2. def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1):
    3. super(DataEmbedding, self).__init__()
    4. self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model)
    5. self.position_embedding = PositionalEmbedding(d_model=d_model)
    6. self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type,
    7. freq=freq) if embed_type != 'timeF' else TimeFeatureEmbedding(
    8. d_model=d_model, embed_type=embed_type, freq=freq)
    9. self.dropout = nn.Dropout(p=dropout)
    10. def forward(self, x, x_mark):
    11. x = self.value_embedding(x) + self.temporal_embedding(x_mark) + self.position_embedding(x)
    12. return self.dropout(x)
    • TokenEmbedding:用来将输入的token序列转换为向量表示,这里使用了一个一维卷积层来进行处理。
    • PositionalEmbedding:用来对输入序列的位置信息进行编码,这里使用了sin和cos函数来生成位置编码。
    • TemporalEmbedding:用来对输入序列的时间信息进行编码,包括分钟、小时、星期几、日期和月份等。根据不同的时间频率,选择不同的Embedding方式来进行编码。
    1. ## 将输入的token序列转换为向量表示 ##
    2. # 这里使用了一个一维卷积层来进行处理
    3. class TokenEmbedding(nn.Module):
    4. # c_in:输入的特征维度
    5. # d_model:嵌入后的维度
    6. def __init__(self, c_in, d_model):
    7. super(TokenEmbedding, self).__init__()
    8. # 根据PyTorch的版本选择不同的padding值
    9. padding = 1 if torch.__version__ >= '1.5.0' else 2
    10. # 将输入的token序列进行卷积操作
    11. self.tokenConv = nn.Conv1d(in_channels=c_in, out_channels=d_model,
    12. kernel_size=3, padding=padding, padding_mode='circular', bias=False)
    13. for m in self.modules():
    14. if isinstance(m, nn.Conv1d):
    15. nn.init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='leaky_relu')
    16. # 执行前向传播操作
    17. def forward(self, x):
    18. x = self.tokenConv(x.permute(0, 2, 1)).transpose(1, 2)
    19. return x
    1. ## 对输入序列的位置信息进行编码 ##
    2. # 这里使用了sin和cos函数来生成位置编码
    3. class PositionalEmbedding(nn.Module):
    4. def __init__(self, d_model, max_len=5000):
    5. super(PositionalEmbedding, self).__init__()
    6. # 创建了一个大小为(max_len, d_model)的全零张量
    7. pe = torch.zeros(max_len, d_model).float()
    8. # 不需要梯度更新
    9. pe.require_grad = False
    10. # 创建了一个长度为max_len的位置张量position,并将其转换为浮点型张量,并在其维度1上增加了一个维度。
    11. position = torch.arange(0, max_len).float().unsqueeze(1)
    12. # 计算得到一个长度为d_model的张量
    13. div_term = (torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model)).exp()
    14. # 计算了正弦和余弦位置编码
    15. # 并将结果分别赋值给pe张量的偶数索引和奇数索引位置。
    16. pe[:, 0::2] = torch.sin(position * div_term)
    17. pe[:, 1::2] = torch.cos(position * div_term)
    18. # 将pe张量增加一个维度
    19. pe = pe.unsqueeze(0)
    20. # 将位置编码pe作为模型的缓冲区
    21. self.register_buffer('pe', pe)
    22. # 执行前向传播
    23. def forward(self, x):
    24. return self.pe[:, :x.size(1)]
    1. ## 时间嵌入部分 ##
    2. class TemporalEmbedding(nn.Module):
    3. def __init__(self, d_model, embed_type='fixed', freq='h'):
    4. super(TemporalEmbedding, self).__init__()
    5. # 分别表示分钟、小时、星期、日期和月份的维度大小
    6. minute_size = 4
    7. hour_size = 24
    8. weekday_size = 7
    9. day_size = 32
    10. month_size = 13
    11. # embed_type参数选择使用固定嵌入(FixedEmbedding)还是普通嵌入(nn.Embedding)
    12. Embed = FixedEmbedding if embed_type == 'fixed' else nn.Embedding
    13. # 是否初始化了分钟嵌入向量,并进行时间嵌入操作
    14. if freq == 't':
    15. self.minute_embed = Embed(minute_size, d_model)
    16. self.hour_embed = Embed(hour_size, d_model)
    17. self.weekday_embed = Embed(weekday_size, d_model)
    18. self.day_embed = Embed(day_size, d_model)
    19. self.month_embed = Embed(month_size, d_model)

    2.4 ProbAttention稀疏注意力机制

    主要思想是在计算每个quey稀疏性得分时,只需采样出的部分和key计算就可以了。就是找到这些重要的/稀疏的query,从而只计算这些queryl的attention值,来优化计算效率。

    1. ## 概率稀疏注意力机制 ##
    2. class ProbAttention(nn.Module):
    3. # mask_flag(是否使用掩码)
    4. # factor(用于计算稀疏性的因子)
    5. # scale(缩放因子)
    6. # attention_dropout(注意力机制的dropout率)
    7. # output_attention(是否输出注意力权重)
    8. def __init__(self, mask_flag=True, factor=5, scale=None, attention_dropout=0.1, output_attention=False):
    9. super(ProbAttention, self).__init__()
    10. self.factor = factor
    11. self.scale = scale
    12. self.mask_flag = mask_flag
    13. self.output_attention = output_attention
    14. self.dropout = nn.Dropout(attention_dropout)
    15. # prob_QK方法用于计算概率稀疏注意力机制中的Q_K矩阵,其中包括了对Q和K的采样、稀疏性计算和计算Q_K矩阵
    16. def _prob_QK(self, Q, K, sample_k, n_top): # n_top: c*ln(L_q)
    17. # Q [B, H, L, D]
    18. # 获取张量K和Q的形状信息
    19. # 分别表示批量大小、头数、K序列长度和嵌入维度
    20. B, H, L_K, E = K.shape
    21. _, _, L_Q, _ = Q.shape
    22. ## calculate the sampled Q_K,对应论文中的公式(4)##
    23. # 张量K沿着第三个维度进行扩展,以便与Q相乘
    24. K_expand = K.unsqueeze(-3).expand(B, H, L_Q, L_K, E)
    25. # 生成一个(L_Q, sample_k)大小的随机整数张量,用于在K中进行采样
    26. index_sample = torch.randint(L_K, (L_Q, sample_k)) # real U = U_part(factor*ln(L_k))*L_q
    27. # 获取采样的K张量
    28. K_sample = K_expand[:, :, torch.arange(L_Q).unsqueeze(1), index_sample, :]
    29. # 计算稀疏性测量值M
    30. Q_K_sample = torch.matmul(Q.unsqueeze(-2), K_sample.transpose(-2, -1)).squeeze(-2)
    31. # find the Top_k query with sparisty measurement
    32. # 计算稀疏性测量值M,包括对Q_K采样进行最大值计算和求和计算,对应论文中的公式(4)
    33. M = Q_K_sample.max(-1)[0] - torch.div(Q_K_sample.sum(-1), L_K)
    34. # 找到稀疏性最高的查询项,即M中top-k的索引
    35. M_top = M.topk(n_top, sorted=False)[1]
    36. # use the reduced Q to calculate Q_K
    37. # 计算最终的Q_K矩阵,即使用减少的Q和K进行点积计算
    38. Q_reduce = Q[torch.arange(B)[:, None, None],
    39. torch.arange(H)[None, :, None],
    40. M_top, :] # factor*ln(L_q)
    41. Q_K = torch.matmul(Q_reduce, K.transpose(-2, -1)) # factor*ln(L_q)*L_k
    42. return Q_K, M_top
    43. def _get_initial_context(self, V, L_Q):
    44. # 张量V的形状信息
    45. B, H, L_V, D = V.shape
    46. # 判断是否使用掩码
    47. if not self.mask_flag:
    48. # 如果不使用掩码,计算V在倒数第二维上的均值,得到V的总和
    49. # V_sum = V.sum(dim=-2)
    50. V_sum = V.mean(dim=-2)
    51. # 将V_sum扩展为与查询序列相同长度的向量
    52. contex = V_sum.unsqueeze(-2).expand(B, H, L_Q, V_sum.shape[-1]).clone()
    53. else: # use mask
    54. # 如果使用掩码,需要L_Q == L_V,然后对V进行累积求和
    55. assert(L_Q == L_V) # requires that L_Q == L_V, i.e. for self-attention only
    56. contex = V.cumsum(dim=-2)
    57. return contex
    58. ## 更新
    59. def _update_context(self, context_in, V, scores, index, L_Q, attn_mask):
    60. B, H, L_V, D = V.shape
    61. # 如果使用掩码,更新注意力掩码
    62. if self.mask_flag:
    63. attn_mask = ProbMask(B, H, L_Q, index, scores, device=V.device)
    64. scores.masked_fill_(attn_mask.mask, -np.inf)
    65. # 对注意力分数进行softmax操作,得到注意力权重
    66. attn = torch.softmax(scores, dim=-1) # nn.Softmax(dim=-1)(scores)
    67. # 使用注意力权重对输入张量V进行加权求和
    68. context_in[torch.arange(B)[:, None, None],
    69. torch.arange(H)[None, :, None],
    70. index, :] = torch.matmul(attn, V).type_as(context_in)
    71. # 是否需要输出注意力权重
    72. if self.output_attention:
    73. attns = (torch.ones([B, H, L_V, L_V])/L_V).type_as(attn).to(attn.device)
    74. attns[torch.arange(B)[:, None, None], torch.arange(H)[None, :, None], index, :] = attn
    75. return (context_in, attns)
    76. else:
    77. return (context_in, None)
    78. ## 前项传播操作
    79. def forward(self, queries, keys, values, attn_mask):
    80. B, L_Q, H, D = queries.shape
    81. _, L_K, _, _ = keys.shape
    82. queries = queries.transpose(2,1)
    83. keys = keys.transpose(2,1)
    84. values = values.transpose(2,1)
    85. U_part = self.factor * np.ceil(np.log(L_K)).astype('int').item() # c*ln(L_k)
    86. u = self.factor * np.ceil(np.log(L_Q)).astype('int').item() # c*ln(L_q)
    87. U_part = U_part if U_partelse L_K
    88. u = u if uelse L_Q
    89. scores_top, index = self._prob_QK(queries, keys, sample_k=U_part, n_top=u)
    90. # add scale factor
    91. scale = self.scale or 1./sqrt(D)
    92. if scale is not None:
    93. scores_top = scores_top * scale
    94. # get the context
    95. context = self._get_initial_context(values, L_Q)
    96. # update the context with selected top_k queries
    97. context, attn = self._update_context(context, values, scores_top, index, L_Q, attn_mask)
    98. return context.transpose(2,1).contiguous(), attn

    2.5 Encoder编码器函数

    编码器旨在提取长顺序输入的稳健长程依赖性,Encoder编码器的实现

    1. ## 编码器Encoder ##
    2. class Encoder(nn.Module):
    3. def __init__(self, attn_layers, conv_layers=None, norm_layer=None):
    4. super(Encoder, self).__init__()
    5. # 将输入的注意力层、卷积层列表转换为nn.ModuleList类
    6. self.attn_layers = nn.ModuleList(attn_layers)
    7. self.conv_layers = nn.ModuleList(conv_layers) if conv_layers is not None else None
    8. self.norm = norm_layer
    9. def forward(self, x, attn_mask=None):
    10. # x [B, L, D]
    11. # 保存每个注意力层的注意力权重
    12. attns = []
    13. # 判断是否存在卷积层
    14. # 遍历注意力层和卷积层,进行前向传播计算
    15. if self.conv_layers is not None:
    16. for attn_layer, conv_layer in zip(self.attn_layers, self.conv_layers):
    17. x, attn = attn_layer(x, attn_mask=attn_mask)
    18. x = conv_layer(x)
    19. attns.append(attn)
    20. x, attn = self.attn_layers[-1](x, attn_mask=attn_mask)
    21. attns.append(attn)
    22. else:
    23. for attn_layer in self.attn_layers:
    24. x, attn = attn_layer(x, attn_mask=attn_mask)
    25. attns.append(attn)
    26. if self.norm is not None:
    27. x = self.norm(x)
    28. return x, attns

    其中编码器层的实现如下:

    1. ## 编码器层 ##
    2. class EncoderLayer(nn.Module):
    3. def __init__(self, attention, d_model, d_ff=None, dropout=0.1, activation="relu"):
    4. super(EncoderLayer, self).__init__()
    5. # 如果未提供全连接层维度d_ff,则将其设置为4*d_model
    6. d_ff = d_ff or 4*d_model
    7. self.attention = attention
    8. # 定义卷积、归一化、Dropout、激活函数类型选择
    9. self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)
    10. self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)
    11. self.norm1 = nn.LayerNorm(d_model)
    12. self.norm2 = nn.LayerNorm(d_model)
    13. self.dropout = nn.Dropout(dropout)
    14. self.activation = F.relu if activation == "relu" else F.gelu
    15. # 定义了EncoderLayer类的前向传播方法
    16. def forward(self, x, attn_mask=None):
    17. # x [B, L, D]
    18. # x = x + self.dropout(self.attention(
    19. # x, x, x,
    20. # attn_mask = attn_mask
    21. # ))
    22. new_x, attn = self.attention(
    23. x, x, x,
    24. attn_mask = attn_mask
    25. )
    26. x = x + self.dropout(new_x)
    27. y = x = self.norm1(x)
    28. y = self.dropout(self.activation(self.conv1(y.transpose(-1,1))))
    29. y = self.dropout(self.conv2(y).transpose(-1,1))
    30. return self.norm2(x+y), attn

    2.6 Decoder解码器函数

    提出了生成式的decoder机制,在预测序列(也包括inferencel阶段)时一步得到结果,而不是step-by-step,直接将预测时间复杂度降低。

    1. ## 编码器Encoder ##
    2. class Encoder(nn.Module):
    3. def __init__(self, attn_layers, conv_layers=None, norm_layer=None):
    4. super(Encoder, self).__init__()
    5. # 将输入的注意力层、卷积层列表转换为nn.ModuleList类
    6. self.attn_layers = nn.ModuleList(attn_layers)
    7. self.conv_layers = nn.ModuleList(conv_layers) if conv_layers is not None else None
    8. self.norm = norm_layer
    9. def forward(self, x, attn_mask=None):
    10. # x [B, L, D]
    11. # 保存每个注意力层的注意力权重
    12. attns = []
    13. # 判断是否存在卷积层
    14. # 遍历注意力层和卷积层,进行前向传播计算
    15. if self.conv_layers is not None:
    16. for attn_layer, conv_layer in zip(self.attn_layers, self.conv_layers):
    17. x, attn = attn_layer(x, attn_mask=attn_mask)
    18. x = conv_layer(x)
    19. attns.append(attn)
    20. x, attn = self.attn_layers[-1](x, attn_mask=attn_mask)
    21. attns.append(attn)
    22. else:
    23. for attn_layer in self.attn_layers:
    24. x, attn = attn_layer(x, attn_mask=attn_mask)
    25. attns.append(attn)
    26. if self.norm is not None:
    27. x = self.norm(x)
    28. return x, attns

    其中解码器层的实现如下:

    1. ## 编码器层 ##
    2. class DecoderLayer(nn.Module):
    3. def __init__(self, self_attention, cross_attention, d_model, d_ff=None,
    4. dropout=0.1, activation="relu"):
    5. super(DecoderLayer, self).__init__()
    6. # 如果未提供全连接层维度d_ff,则将其设置为4*d_model
    7. d_ff = d_ff or 4*d_model
    8. # 定义自注意、交叉注意力、卷积、归一化、Dropout、激活函数类型选择
    9. self.self_attention = self_attention
    10. self.cross_attention = cross_attention
    11. self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)
    12. self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)
    13. self.norm1 = nn.LayerNorm(d_model)
    14. self.norm2 = nn.LayerNorm(d_model)
    15. self.norm3 = nn.LayerNorm(d_model)
    16. self.dropout = nn.Dropout(dropout)
    17. self.activation = F.relu if activation == "relu" else F.gelu
    18. # 定义了DecoderLayer的前向传播方法
    19. def forward(self, x, cross, x_mask=None, cross_mask=None):
    20. x = x + self.dropout(self.self_attention(
    21. x, x, x,
    22. attn_mask=x_mask
    23. )[0])
    24. x = self.norm1(x)
    25. x = x + self.dropout(self.cross_attention(
    26. x, cross, cross,
    27. attn_mask=cross_mask
    28. )[0])
    29. y = x = self.norm2(x)
    30. y = self.dropout(self.activation(self.conv1(y.transpose(-1,1))))
    31. y = self.dropout(self.conv2(y).transpose(-1,1))
    32. return self.norm3(x+y)

    3 官方数据集运行

    其中定义了许多参数,在其中存在一些bug有如下的->

    这个bug是因为头两行参数的,中的required=True导致的,我们将其删除掉即可,改为如下:

    1. parser.add_argument('--model', type=str, default='informer',help='model of experiment, options: [informer, informerstack, informerlight(TBD)]')
    2. parser.add_argument('--data', type=str, default='ETTh1', help='data')

    最后写如下的脚本文件可视化预测结果:

    1. import numpy as np
    2. import pandas as pd
    3. import matplotlib.pyplot as plt
    4. # 指定.npy文件路径
    5. file_path1 = "results/informer_ETTh1_ftM_sl96_ll48_pl24_dm512_nh8_el2_dl1_df2048_atprob_fc5_ebtimeF_dtTrue_mxTrue_test_0/true.npy"
    6. file_path2 = "results/informer_ETTh1_ftM_sl96_ll48_pl24_dm512_nh8_el2_dl1_df2048_atprob_fc5_ebtimeF_dtTrue_mxTrue_test_1/pred.npy"
    7. # 使用NumPy加载.npy文件
    8. true_value = []
    9. pred_value = []
    10. data1 = np.load(file_path1)
    11. data2 = np.load(file_path2)
    12. print(data2)
    13. for i in range(24):
    14. true_value.append(data2[0][i][6])
    15. pred_value.append(data1[0][i][6])
    16. # 打印内容
    17. print(true_value)
    18. print(pred_value)
    19. #保存数据
    20. df = pd.DataFrame({'real': true_value, 'pred': pred_value})
    21. df.to_csv('results.csv', index=False)
    22. #绘制图形
    23. fig = plt.figure(figsize=( 16, 8))
    24. plt.plot(df['real'], marker='o', markersize=8)
    25. plt.plot(df['pred'], marker='o', markersize=8)
    26. plt.tick_params(labelsize = 28)
    27. plt.legend(['real','pred'],fontsize=28)
    28. plt.show()

  • 相关阅读:
    【毕业设计】天气预测与数据分析系统 - 机器学习 python
    【精讲】vue中的添加数据、监测数据(data)原理及拓展、监测数组及(对象,数组)监测概括
    milvus数据库-连接
    LeetCode 740.删除并获得点数---->打家劫舍
    Linux命令(22)之chage
    14:00面试,14:06就出来了,问的问题有点变态。。。
    java架构知识-设计模式与实践(学习笔记)
    使用docker安装配置oracle 11g
    .net-------数值、日期和字符串处理
    @Pointcut 使用
  • 原文地址:https://blog.csdn.net/qq_41921826/article/details/134509231