有些场景下,开始的时候数据量很小,如果我们用一个几千条数据训练一个全新的深度机器学习的文本分类模型,效果不会很好。这个时候你有两种选择,1.用传统的机器学习训练,2.利用迁移学习在一个预训练的模型上训练。本博客教你怎么用tensorflow Hub和keras 在少量的数据上训练一个文本分类模型。
Imdb影评的数据集介绍与下载_imdb影评数据集-CSDN博客
替换掉imdb目录 (imdb_raw_data_dir). 创建dataset目录。
- import numpy as np
- import os as os
-
- import re
- from sklearn.model_selection import train_test_split
-
- vocab_size = 30000
- maxlen = 200
- imdb_raw_data_dir = "/Users/harry/Documents/apps/ml/aclImdb"
- save_dir = "dataset"
-
- def get_data(datapath =r'D:\train_data\aclImdb\aclImdb\train' ):
- pos_files = os.listdir(datapath + '/pos')
- neg_files = os.listdir(datapath + '/neg')
- print(len(pos_files))
- print(len(neg_files))
-
- pos_all = []
- neg_all = []
- for pf, nf in zip(pos_files, neg_files):
- with open(datapath + '/pos' + '/' + pf, encoding='utf-8') as f:
- s = f.read()
- s = process(s)
- pos_all.append(s)
- with open(datapath + '/neg' + '/' + nf, encoding='utf-8') as f:
-
- s = f.read()
- s = process(s)
- neg_all.append(s)
- print(len(pos_all))
- # print(pos_all[0])
-
- print(len(neg_all))
-
- X_orig= np.array(pos_all + neg_all)
- # print(X_orig)
- Y_orig = np.array([1 for _ in range(len(pos_all))] + [0 for _ in range(len(neg_all))])
- print("X_orig:", X_orig.shape)
- print("Y_orig:", Y_orig.shape)
-
- return X_orig, Y_orig
-
- def generate_dataset():
- X_orig, Y_orig = get_data(imdb_raw_data_dir + r'/train')
- X_orig_test, Y_orig_test = get_data(imdb_raw_data_dir + r'/test')
- X_orig = np.concatenate([X_orig, X_orig_test])
- Y_orig = np.concatenate([Y_orig, Y_orig_test])
- X = X_orig
- Y = Y_orig
-
- np.random.seed = 1
- random_indexs = np.random.permutation(len(X))
- X = X[random_indexs]
- Y = Y[random_indexs]
- X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.3)
- print("X_train:", X_train.shape)
- print("y_train:", y_train.shape)
- print("X_test:", X_test.shape)
- print("y_test:", y_test.shape)
- np.savez(save_dir + '/train_test', X_train=X_train, y_train=y_train, X_test= X_test, y_test=y_test )
-
-
- def rm_tags(text):
- re_tag = re.compile(r'<[^>]+>')
- return re_tag.sub(' ', text)
-
- def clean_str(string):
- string = re.sub(r"[^A-Za-z0-9(),!?\'\`]", " ", string)
- string = re.sub(r"\'s", " \'s", string) # it's -> it 's
- string = re.sub(r"\'ve", " \'ve", string) # I've -> I 've
- string = re.sub(r"n\'t", " n\'t", string) # doesn't -> does n't
- string = re.sub(r"\'re", " \'re", string) # you're -> you are
- string = re.sub(r"\'d", " \'d", string) # you'd -> you 'd
- string = re.sub(r"\'ll", " \'ll", string) # you'll -> you 'll
- string = re.sub(r"\'m", " \'m", string) # I'm -> I 'm
- string = re.sub(r",", " , ", string)
- string = re.sub(r"!", " ! ", string)
- string = re.sub(r"\(", " \( ", string)
- string = re.sub(r"\)", " \) ", string)
- string = re.sub(r"\?", " \? ", string)
- string = re.sub(r"\s{2,}", " ", string)
- return string.strip().lower()
-
- def process(text):
- text = clean_str(text)
- text = rm_tags(text)
- #text = text.lower()
- return text
-
- if __name__ == '__main__':
-
- generate_dataset()
-
-
执行完后,产生train_test.npz 文件
1. 取数据集
- def get_dataset_to_train():
-
- train_test = np.load('dataset/train_test.npz', allow_pickle=True)
- x_train = train_test['X_train']
- y_train = train_test['y_train']
- x_test = train_test['X_test']
- y_test = train_test['y_test']
-
- return x_train, y_train, x_test, y_test
2. 创建模型
基于nnlm-en-dim50/2 预训练的文本嵌入向量,在模型外面加了两层全连接。
- def get_model():
- hub_layer = hub.KerasLayer(embedding_url, input_shape=[], dtype=tf.string, trainable=True)
- # Build the model
- model = Sequential([
- hub_layer,
- Dense(16, activation='relu'),
- Dropout(0.5),
- Dense(2, activation='softmax')
- ])
-
- print(model.summary())
-
- model.compile(optimizer=keras.optimizers.Adam(),
- loss=keras.losses.SparseCategoricalCrossentropy(),
- metrics=[keras.metrics.SparseCategoricalAccuracy()])
-
- return model
还可以使用来自 TFHub 的许多其他预训练文本嵌入向量:
还有很多!在 TFHub 上查找更多文本嵌入向量模型。
3. 评估你的模型
- def evaluate_model(test_data, test_labels):
- model = load_trained_model()
- # Evaluate the model
- results = model.evaluate(test_data, test_labels, verbose=2)
-
- print("Test accuracy:", results[1])
-
-
- def load_trained_model():
- # model = get_model()
- # model.load_weights('./models/model_new1.h5')
-
- model = tf.keras.models.load_model('models_pb')
-
- return model
4. 测试几个例子
- def predict(real_data):
- model = load_trained_model()
- probabilities = model.predict([real_data]);
- print("probabilities :",probabilities)
- result = get_label(probabilities)
- return result
-
-
- def get_label(probabilities):
-
- index = np.argmax(probabilities[0])
- print("index :" + str(index))
-
- result_str = index_dic.get(str(index))
- # result_str = list(index_dic.keys())[list(index_dic.values()).index(index)]
-
- return result_str
-
-
- def predict_my_module():
-
- # review = "I don't like it"
- # review = "this is bad movie "
- # review = "This is good movie"
- review = " this is terrible movie"
- # review = "This isn‘t great movie"
- # review = "i think this is bad movie"
- # review = "I'm not very disappoint for this movie"
- # review = "I'm not very disappoint for this movie"
- # review = "I am very happy for this movie"
- #neg:0 postive:1
- s = predict(review)
- print(s)
-
- if __name__ == '__main__':
- x_train, y_train, x_test, y_test = get_dataset_to_train()
- model = get_model()
- model = train(model, x_train, y_train, x_test, y_test)
- evaluate_model(x_test, y_test)
- predict_my_module()
完整代码
- import numpy as np
- import tensorflow as tf
- from keras.models import Sequential
- from keras.layers import Dense, Dropout
- import keras as keras
- from keras.callbacks import EarlyStopping, ModelCheckpoint
- import tensorflow_hub as hub
-
- embedding_url = "https://tfhub.dev/google/nnlm-en-dim50/2"
-
- index_dic = {"0":"negative", "1": "positive"}
-
- def get_dataset_to_train():
-
- train_test = np.load('dataset/train_test.npz', allow_pickle=True)
- x_train = train_test['X_train']
- y_train = train_test['y_train']
- x_test = train_test['X_test']
- y_test = train_test['y_test']
-
- return x_train, y_train, x_test, y_test
-
-
- def get_model():
- hub_layer = hub.KerasLayer(embedding_url, input_shape=[], dtype=tf.string, trainable=True)
- # Build the model
- model = Sequential([
- hub_layer,
- Dense(16, activation='relu'),
- Dropout(0.5),
- Dense(2, activation='softmax')
- ])
-
- print(model.summary())
-
- model.compile(optimizer=keras.optimizers.Adam(),
- loss=keras.losses.SparseCategoricalCrossentropy(),
- metrics=[keras.metrics.SparseCategoricalAccuracy()])
-
-
- return model
-
- def train(model , train_data, train_labels, test_data, test_labels):
- # train_data, train_labels, test_data, test_labels = get_dataset_to_train()
- train_data = [tf.compat.as_str(tf.compat.as_bytes(str(x))) for x in train_data]
- test_data = [tf.compat.as_str(tf.compat.as_bytes(str(x))) for x in test_data]
-
- train_data = np.asarray(train_data) # Convert to numpy array
- test_data = np.asarray(test_data) # Convert to numpy array
- print(train_data.shape, test_data.shape)
-
- early_stop = EarlyStopping(monitor='val_sparse_categorical_accuracy', patience=4, mode='max', verbose=1)
- # 定义ModelCheckpoint回调函数
- # checkpoint = ModelCheckpoint( './models/model_new1.h5', monitor='val_sparse_categorical_accuracy', save_best_only=True,
- # mode='max', verbose=1)
-
- checkpoint_pb = ModelCheckpoint(filepath="./models_pb/", monitor='val_sparse_categorical_accuracy', save_weights_only=False, save_best_only=True)
-
- history = model.fit(train_data[:2000], train_labels[:2000], epochs=45, batch_size=45, validation_data=(test_data, test_labels), shuffle=True,
- verbose=1, callbacks=[early_stop, checkpoint_pb])
- print("history", history)
-
- return model
-
-
- def evaluate_model(test_data, test_labels):
- model = load_trained_model()
- # Evaluate the model
- results = model.evaluate(test_data, test_labels, verbose=2)
-
- print("Test accuracy:", results[1])
-
-
- def predict(real_data):
- model = load_trained_model()
- probabilities = model.predict([real_data]);
- print("probabilities :",probabilities)
- result = get_label(probabilities)
- return result
-
-
- def get_label(probabilities):
-
- index = np.argmax(probabilities[0])
- print("index :" + str(index))
-
- result_str = index_dic.get(str(index))
- # result_str = list(index_dic.keys())[list(index_dic.values()).index(index)]
-
- return result_str
-
- def load_trained_model():
- # model = get_model()
- # model.load_weights('./models/model_new1.h5')
-
- model = tf.keras.models.load_model('models_pb')
-
- return model
-
- def predict_my_module():
-
- # review = "I don't like it"
- # review = "this is bad movie "
- # review = "This is good movie"
- review = " this is terrible movie"
- # review = "This isn‘t great movie"
- # review = "i think this is bad movie"
- # review = "I'm not very disappoint for this movie"
- # review = "I'm not very disappoint for this movie"
- # review = "I am very happy for this movie"
- #neg:0 postive:1
- s = predict(review)
- print(s)
-
-
- if __name__ == '__main__':
- x_train, y_train, x_test, y_test = get_dataset_to_train()
- model = get_model()
- model = train(model, x_train, y_train, x_test, y_test)
- evaluate_model(x_test, y_test)
- predict_my_module()
-
-
-
-
-
-
-
-
-
-
-