目录
增加一个1维度.unsqueeze(0) 删除一个1维度squeeze(0)
- import torch
- from torch import nn
-
- x = torch.randn(1,2,64)
- print(x.shape)
- y = x.expand(50,2,64)#此时做expand,可以发现(3,)和(2, 3)是第二个维度相同,因此按第一个维度扩张
- print(y.shape)
x = x.type(torch.FloatTensor)
- def forward(self, x, batch_size):
- x = x.type(torch.FloatTensor)
- x = x.to(device)
- print("137",x_input.shape,temp_aspect.shape)
- # 137 torch.Size([50, 2, 64]) torch.Size([50, 2, 64])
- x_input=torch.cat((x_input,temp_aspect),dim=2)
x_input=x_input.transpose(0,1)
lstm_out=lstm_out.reshape(batch_size,-1)
- # -*- coding: utf-8 -*-
- import pandas as pd
- import gensim
- import jieba
- import re
- import matplotlib.pyplot as plt
- import numpy as np
- from tqdm import tqdm
- from sklearn.model_selection import train_test_split
- from gensim.models import KeyedVectors
- from gensim.scripts.glove2word2vec import glove2word2vec
- import torch
- from torch import nn
- import torch.utils.data as data
- import torch.nn.functional as F
- from torch import tensor
- from sklearn.metrics import f1_score
- from datetime import datetime
- from torch.utils.data import Dataset, DataLoader
- from torch.utils.data import random_split
- from tqdm import tqdm
- def data_process():
- data=pd.read_excel("pre_process_level_2_table(1).xlsx")
- data_neirong=list(data['内容'].values)
- data_1_aspect=list(data['1_aspect'].values)
- data_label=list(data['label'].values)
-
- aspect_vec_dict={}
- with open("ceshi_1_aspect_vec.txt","r") as f:
- f=f.readlines()
- for line in f:
- temp_word=line.split("_||_")[0]
- temp_vec=line.split("_||_")[1].split(" ")[:-1]
- temp_vec=[float(i) for i in temp_vec]# 转化为数值型列表
- aspect_vec_dict[temp_word]=temp_vec
- print(aspect_vec_dict)
- data_neirong_word_list=[]
- text_len=[]
- for line in data_neirong:
- line=line.strip()
- line=line.split(" ")
- print(line)
- while 1 :
- print(1)
- if '' in line:line.remove('')
- if '' not in line:break
- data_neirong_word_list.append(line)
- text_len.append(len(line))
- print("48-----------------------")
- # print(max(text_len),np.mean(text_len))# 393 14.989528010696924
- # 对句子进行截断重复 设置句子长度是 50
- # pading_data_neirong_word_list=[]
- data_x = []
- temp_data_y=[]
- for idx,line in tqdm(enumerate(data_neirong_word_list)):
- # print("54",idx, len(line),line)
- temp_line = line.copy()
- # 会有数据只有空格这样子 这个while 循环会出问题
- temp_idx = 0 # 设置while循环标志位 来解决这个问题
- if len(line) <60:
- while 1:
- line=line+temp_line
- # print(len(line))
- temp_idx+=1
- if len(line)>=50:break
- if temp_idx==50:break
- if temp_idx != 50:
- line = line[:50]
- data_x.append(line + [data_1_aspect[idx]])
- temp_data_y.append(data_label[idx])
- print("62----数据数目:---------",len(data_x))
- # 矩阵生成
- wd2 = gensim.models.Word2Vec.load("wd2.bin")#print(wd2.wv['hotel'])
- data_x_vec=[]
- # data_x_aspect=[]
- data_y=[]
- for idx,line in tqdm(enumerate(data_x)):
- try:
- # print(line)
- temp_vec=[]
- line_neirong=line[:-1]
- line_1_aspect=line[-1]
- for word in line_neirong:
- temp_vec.append(wd2.wv[word])
-
- temp_vec.append(np.array(aspect_vec_dict[line_1_aspect]))
- data_x_vec.append(temp_vec)
- data_y.append(temp_data_y[idx])
- except KeyError:
- pass
- return np.array(data_y),np.array(data_x_vec)#,np.array(data_x_aspect)
-
-
- class mydataset(Dataset):
- def __init__(self): # 读取加载数据
- data_y,data_x=data_process()
- self._x = torch.tensor(np.array(data_x).astype(float))
- self._y = torch.tensor(np.array(data_y).astype(float))
- print(len(data_x),data_y.shape,data_y)
- # self._aspect= torch.tensor(np.array(data_x_aspect).astype(float))
- self._len = len(data_y)
- def __getitem__(self, item):
- return self._x[item], self._y[item]#,self._aspect[item]
- def __len__(self): # 返回整个数据的长度
- return self._len
- mydata = mydataset()
- # 划分 训练集 测试集
- train_data, test_data = random_split(mydata, [round(0.8 * mydata._len), round(0.2 * mydata._len)]) # 这个参数有的版本没有 generator=torch.Generator().manual_seed(0)
- # 随机混乱顺序划分的 四舍五入
- #
- # train_loader =DataLoader(train_data, batch_size =2, shuffle = True, num_workers = 0 , drop_last=False)
- #
- # # for step,(train_x,train_y) in enumerate(train_loader):
- # # print(step,':',(train_x.shape,train_y.shape),(train_x,train_y))
- # # break
- #
- # # 测试 loader
- # test_loader =DataLoader(test_data, batch_size = 2, shuffle = True, num_workers = 0 , drop_last=False)
- # # dorp_last 是说最后一组数据不足一个batch的时候 能继续用还是舍弃。 # num_workers 多少个进程载入数据
- #
- # # 测试
- # # for step,(test_x,test_y) in enumerate(test_loader):
- # # print(step,':',(test_x.shape,test_y.shape),(test_x,test_y))
- device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
-
-
- class LSTM_attention(nn.Module): # 注意Module首字母需要大写
- def __init__(self, ):
- super().__init__()
- input_size = 64
- hidden_size = 64
- output_size = 64
- # input_size:输入lstm单元向量的长度 ,hidden_size输出lstm单元向量的长度。也是输入、输出隐藏层向量的长度
- self.lstm = nn.LSTM(input_size, output_size, num_layers=1) # ,batch_first=True
- self.ReLU = nn.ReLU()
- self.attention = nn.Linear(6400,64)
- self.liner=nn.Linear(128,5)
- def forward(self, x, batch_size):
- x = x.type(torch.FloatTensor)
- x = x.to(device)
-
- x_input=x[:,:50]
- x_input=x_input.transpose(0,1)
-
- temp_aspect=x[:,-1]
- temp_aspect=temp_aspect.unsqueeze(0)
- temp_aspect =temp_aspect.expand(50,batch_size, 64)
-
- #print("137",x_input.shape,temp_aspect.shape)# 137 torch.Size([50, 2, 64]) torch.Size([50, 2, 64])
- x_input=torch.cat((x_input,temp_aspect),dim=2)
- #print("137",x_input.shape,temp_aspect.shape)# 137 torch.Size([50, 2, 128]) torch.Size([50, 2, 64])
- # 输入 lstm的矩阵形状是:[序列长度,batch_size,每个向量的维度] [序列长度,batch, 64]
- lstm_out, (h_n, c_n) = self.lstm(x, None)
- lstm_out=self.ReLU(lstm_out)
- last_lstm=lstm_out[:,-1]# 取最后一个
- lstm_out=lstm_out[:,:-1]
- lstm_out=lstm_out.transpose(0, 1)
- #print("154",lstm_out.shape,temp_aspect.shape)
- lstm_out=torch.cat((lstm_out,temp_aspect),dim=2)
- lstm_out=lstm_out.transpose(0, 1)
- lstm_out=lstm_out.reshape(batch_size,-1)
-
- lstm_out = self.ReLU(lstm_out)
- lstm_out = self.attention(lstm_out)
- lstm_out = self.ReLU(lstm_out)
-
- # print("157",lstm_out.shape,last_lstm.shape)
- out_sum= torch.cat((lstm_out,last_lstm), dim=1)
- # print(out_sum.shape)
- prediction=self.liner(out_sum)
- return prediction
-
-
- # 这个函数是测试用来测试x_test y_test 数据 函数
- def eval_test(model): # 返回的是这10个 测试数据的平均loss
- test_epoch_loss = []
- with torch.no_grad():
- optimizer.zero_grad()
- for step, (test_x, test_y) in enumerate(test_loader):
- y_pre = model(test_x, batch_size)
- test_y = test_y.to(device)
- test_loss = loss_function(y_pre, test_y.long())
- test_epoch_loss.append(test_loss.item())
- return np.mean(test_epoch_loss)
-
-
- epochs = 50
- batch_size = 128
- # 在模型测试中 这两个值:batch_size = 19 固定得 epochs = 随便设置
- test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=True, num_workers=0, drop_last=True)
- train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, num_workers=0, drop_last=True)
-
- # 创建LSTM()类的对象,定义损失函数和优化器
-
- model = LSTM_attention().to(device)
- loss_function = torch.nn.CrossEntropyLoss().to(device) # 损失函数的计算 交叉熵损失函数计算
- optimizer = torch.optim.SGD(model.parameters(), lr=0.01) # 建立优化器实例
- print(model)
-
- sum_train_epoch_loss = [] # 存储每个epoch 下 训练train数据的loss
- sum_test_epoch_loss = [] # 存储每个epoch 下 测试 test数据的loss
- best_test_loss = 10000
- for epoch in tqdm(range(epochs)):
- epoch_loss = []
- for step, (train_x, train_y) in enumerate(train_loader):
- y_pred = model(train_x, batch_size)
- # 训练过程中,正向传播生成网络的输出,计算输出和实际值之间的损失值
- # print(y_pred,train_y)
- single_loss = loss_function(y_pred.cpu(), train_y.long())
- # print("single_loss",single_loss)
- single_loss.backward() # 调用backward()自动生成梯度
- optimizer.step() # 使用optimizer.step()执行优化器,把梯度传播回每个网络
- epoch_loss.append(single_loss.item())
- train_epoch_loss = np.mean(epoch_loss)
- test_epoch_loss = eval_test(model) # 测试数据的平均loss
-
- if test_epoch_loss < best_test_loss:
- best_test_loss = test_epoch_loss
- print("best_test_loss", best_test_loss)
- best_model = model
- sum_train_epoch_loss.append(train_epoch_loss)
- sum_test_epoch_loss.append(test_epoch_loss)
- print("epoch:" + str(epoch) + " train_epoch_loss: " + str(train_epoch_loss) + " test_epoch_loss: " + str(
- test_epoch_loss))
-
- torch.save(best_model, 'best_model.pth')
-
- # 画图
- # sum_train_epoch_loss=[]
- # sum_test_epoch_loss=[]
- fig = plt.figure(facecolor='white', figsize=(10, 7))
- plt.xlabel('第几个epoch')
- plt.ylabel('loss值')
- plt.xlim(xmax=len(sum_train_epoch_loss), xmin=0)
- plt.ylim(ymax=max(sum_train_epoch_loss), ymin=0)
- # 画两条(0-9)的坐标轴并设置轴标签x,y
-
- x1 = [i for i in range(0, len(sum_train_epoch_loss), 1)] # 随机产生300个平均值为2,方差为1.2的浮点数,即第一簇点的x轴坐标
- y1 = sum_train_epoch_loss # 随机产生300个平均值为2,方差为1.2的浮点数,即第一簇点的y轴坐标
-
- x2 = [i for i in range(0, len(sum_test_epoch_loss), 1)]
- y2 = sum_test_epoch_loss
-
- colors1 = '#00CED4' # 点的颜色
- colors2 = '#DC143C'
- area = np.pi * 4 ** 1 # 点面积
- # 画散点图
- plt.scatter(x1, y1, s=area, c=colors1, alpha=0.4, label='train_loss')
- plt.scatter(x2, y2, s=area, c=colors2, alpha=0.4, label='val_loss')
- # plt.plot([0,9.5],[9.5,0],linewidth = '0.5',color='#000000')
- plt.legend()
- # plt.savefig(r'C:\Users\jichao\Desktop\大论文\12345svm.png', dpi=300)
- plt.show()
-
- import sklearn
- from sklearn.metrics import accuracy_score
- # 模型加载:
- model.load_state_dict(torch.load('best_model.pth').cpu().state_dict())
- model.eval()
- test_pred = []
- test_true = []
-
- with torch.no_grad():
- optimizer.zero_grad()
- for step, (test_x, test_y) in enumerate(test_loader):
- y_pre = model(test_x, batch_size).cpu()
- y_pre = torch.argmax(y_pre, dim=1)
- for i in y_pre:
- test_pred.append(i)
- for i in test_y:
- test_true.append(i)
-
- Acc = accuracy_score(test_pred, test_true)
- print(Acc)
-
-
-