• NLP(14)--文本匹配任务


    前言

    仅记录学习过程,有问题欢迎讨论

    步骤:
    * 1. 输入问题
    * 2. 匹配问题库(基础资源,FAQ)
    * 3. 返回答案
    

    文本匹配算法:

    • 编辑距离算法(缺点)

      • 字符之间没有语义相似度;
        受无关词/停用词影响大;
        受语序影响大
    • Jaccard相似度(元素的交集/元素的并集)、

    • 词向量(基于窗口;解决了语义相似的问题;文本转为数字,计算cos值来判断相似度)

    • 深度学习-表示型(问题匹配问题比较合适,因为二者都是问题,所以转向量也方便)
      两个话使用同一个Encoder向量 语义相似的score = 1,类似二分类

      • (Triplet Loss):
        使得相同标签的样本再Embedding空间尽量接近(anchor和positive接近 away negative)
      • loss = max((D(p,a)-D(p,n)+margin,0)
      • 优点:训练好的模型可以对知识库内的问题计算向量,在实际查找过程中,只对输入文本做一次向量化
      • 缺点:在向量化的过程中不知道文本重点
    • 深度学习-交互型

      • 输入一句话,但是两个样本拼接,利用attention机制来判断是否匹配(Q&A拼接去学习)
      • 优点:通过对比把握句子重点
      • 缺点:每次计算需要都需要两个输入
    • 对比学习

      • 输入一个样本,通过函数把样本改动,但还是相似,得到两个相似样本,进行bertEconder,pooling操作
    • 海量向量查找:

      • 可以用开源写好的库^^ Faiss Pinecore
      • 避免遍历,避免和所有向量做距离计算(空间切割KD树,Kmeans方式切割)

    代码

    实现一个智能问答demo

    """
    配置参数信息
    """
    Config = {
        "model_path": "./output/",
        "model_name": "model.pt",
        "schema_path": r"D:\NLP\video\第八周\week8 文本匹配问题\data\schema.json",
        "train_data_path": r"D:\NLP\video\第八周\week8 文本匹配问题\data\data.json",
        "valid_data_path": r"D:\NLP\video\第八周\week8 文本匹配问题\data\valid.json",
        "vocab_path": r"D:\NLP\video\第七周\data\vocab.txt",
        "model_type": "rnn",
        # 正样本比例
        "positive_sample_rate": 0.5,
        "use_bert": False,
        # 文本向量大小
        "char_dim": 32,
        # 文本长度
        "max_len": 20,
        # 词向量大小
        "hidden_size": 128,
        # 训练 轮数
        "epoch_size": 15,
        # 批量大小
        "batch_size": 32,
        # 训练集大小
        "simple_size": 300,
        # 学习率
        "lr": 1e-3,
        # dropout
        "dropout": 0.5,
        # 优化器
        "optimizer": "adam",
        # 卷积核
        "kernel_size": 3,
        # 最大池 or 平均池
        "pooling_style": "max",
        # 模型层数
        "num_layers": 2,
        "bert_model_path": r"D:\NLP\video\第六周\bert-base-chinese",
        # 输出层大小
        "output_size": 2,
        # 随机数种子
        "seed": 987
    }
    
    
    

    load.py j加载数据文件

    """
    数据加载
    """
    import json
    from collections import defaultdict
    import random
    
    import torch
    import torch.utils.data as Data
    from torch.utils.data import DataLoader
    from transformers import BertTokenizer
    
    
    # 获取字表集
    def load_vocab(path):
        vocab = {}
        with open(path, 'r', encoding='utf-8') as f:
            for index, line in enumerate(f):
                word = line.strip()
                # 0留给padding位置,所以从1开始
                vocab[word] = index + 1
            vocab['unk'] = len(vocab) + 1
        return vocab
    
    
    # 数据预处理 裁剪or填充
    def padding(input_ids, length):
        if len(input_ids) >= length:
            return input_ids[:length]
        else:
            padded_input_ids = input_ids + [0] * (length - len(input_ids))
            return padded_input_ids
    
    
    # 文本预处理
    # 转化为向量
    def sentence_to_index(text, length, vocab):
        input_ids = []
        for char in text:
            input_ids.append(vocab.get(char, vocab['unk']))
        # 填充or裁剪
        input_ids = padding(input_ids, length)
        return input_ids
    
    
    class DataGenerator:
        def __init__(self, data_path, config):
            # 加载json数据
            self.load_know_base(config["train_data_path"])
            # 加载schema 相当于答案集
            self.schema = self.load_schema(config["schema_path"])
            self.data_path = data_path
            self.config = config
            if self.config["model_type"] == "bert":
                self.tokenizer = BertTokenizer.from_pretrained(config["bert_model_path"])
            self.vocab = load_vocab(config["vocab_path"])
            self.config["vocab_size"] = len(self.vocab)
            self.train_flag = None
            self.load_data()
    
        def __len__(self):
            if self.train_flag:
                return self.config["simple_size"]
            else:
                return len(self.data)
    
        # 这里需要返回随机的样本
        def __getitem__(self, idx):
            if self.train_flag:
                # return self.random_train_sample()  # 随机生成一个训练样本
                # triplet loss:
                return self.random_train_sample_for_triplet_loss()
            else:
                return self.data[idx]
    
        # 针对获取的文本 load_know_base = {target : [questions]} 做处理
        # 传入两个样本 正样本为相同target数据 负样本为不同target数据
        # 训练集和验证集不一致
        def load_data(self):
            self.train_flag = self.config["train_flag"]
            dataset_x = []
            dataset_y = []
            self.knwb = defaultdict(list)
            if self.train_flag:
                for target, questions in self.target_to_questions.items():
                    for question in questions:
                        input_id = sentence_to_index(question, self.config["max_len"], self.vocab)
                        input_id = torch.LongTensor(input_id)
                        # self.schema[target] 下标 把每个question转化为向量append放入一个target下
                        self.knwb[self.schema[target]].append(input_id)
            else:
                with open(self.data_path, encoding="utf8") as f:
                    for line in f:
                        line = json.loads(line)
                        assert isinstance(line, list)
                        question, target = line
                        input_id = sentence_to_index(question, self.config["max_len"], self.vocab)
                        # input_id = torch.LongTensor(input_id)
                        label_index = torch.LongTensor([self.schema[target]])
                        # self.data.append([input_id, label_index])
                        dataset_x.append(input_id)
                        dataset_y.append(label_index)
                    self.data = Data.TensorDataset(torch.tensor(dataset_x), torch.tensor(dataset_y))
            return
    
        # 加载知识库
        def load_know_base(self, know_base_path):
            self.target_to_questions = {}
            with open(know_base_path, encoding="utf8") as f:
                for index, line in enumerate(f):
                    content = json.loads(line)
                    questions = content["questions"]
                    target = content["target"]
                    self.target_to_questions[target] = questions
            return
    
        # 加载schema 相当于答案集
        def load_schema(self, param):
            with open(param, encoding="utf8") as f:
                return json.loads(f.read())
    
        # 训练集随机生成一个样本
        # 依照一定概率生成负样本或正样本
        # 负样本从随机两个不同的标准问题中各随机选取一个
        # 正样本从随机一个标准问题中随机选取两个
        def random_train_sample(self):
            target = random.choice(list(self.knwb.keys()))
            # 随机正样本:
            # 随机正样本
            if random.random() <= self.config["positive_sample_rate"]:
                if len(self.knwb[target]) <= 1:
                    return self.random_train_sample()
                else:
                    question1 = random.choice(self.knwb[target])
                    question2 = random.choice(self.knwb[target])
                    # 一组
                    # dataset_x.append([question1, question2])
                    # # 二分类任务 同一组的question target = 1
                    # dataset_y.append([1])
                    return [question1, question2, torch.LongTensor([1])]
            else:
                # 随机负样本:
                p, n = random.sample(list(self.knwb.keys()), 2)
                question1 = random.choice(self.knwb[p])
                question2 = random.choice(self.knwb[n])
                # dataset_x.append([question1, question2])
                # dataset_y.append([-1])
                return [question1, question2, torch.LongTensor([-1])]
    
        # triplet_loss随机生成3个样本 锚样本A, 正样本P, 负样本N
        def random_train_sample_for_triplet_loss(self):
            target = random.choice(list(self.knwb.keys()))
            # question1锚样本 question2为同一个target下的正样本 question3 为其他target下样本
            question1 = random.choice(self.knwb[target])
            question2 = random.choice(self.knwb[target])
            question3 = random.choice(self.knwb[random.choice(list(self.knwb.keys()))])
            return [question1, question2, question3]
    
    
    # 用torch自带的DataLoader类封装数据
    def load_data_batch(data_path, config, shuffle=True):
        dg = DataGenerator(data_path, config)
        if config["train_flag"]:
            dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle)
        else:
            dl = DataLoader(dg.data, batch_size=config["batch_size"], shuffle=shuffle)
        return dl
    
    
    if __name__ == '__main__':
        from config import Config
    
        Config["train_flag"] = True
        # dg = DataGenerator(Config["train_data_path"], Config)
        dataset = load_data_batch(Config["train_data_path"], Config)
        # print(len(dg))
        # print(dg[0])
        for index, dataset in enumerate(dataset):
            input_id1, input_id2, input_id3 = dataset
            print(input_id1)
            print(input_id2)
            print(input_id3)
    
    
    

    main.py 主方法

    import torch
    import os
    import random
    import os
    import numpy as np
    import logging
    from config import Config
    from model import TorchModel, choose_optimizer, SiameseNetwork
    from loader import load_data_batch
    from evaluate import Evaluator
    
    # [DEBUG, INFO, WARNING, ERROR, CRITICAL]
    
    
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    logger = logging.getLogger(__name__)
    
    """
    模型训练主程序
    """
    # 通过设置随机种子来复现上一次的结果(避免随机性)
    seed = Config["seed"]
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    
    
    def main(config):
        # 保存模型的目录
        if not os.path.isdir(config["model_path"]):
            os.mkdir(config["model_path"])
        # 加载数据
        dataset = load_data_batch(config["train_data_path"], config)
        # 加载模型
        model = SiameseNetwork(config)
        # 是否使用gpu
        if torch.cuda.is_available():
            logger.info("gpu可以使用,迁移模型至gpu")
            model.cuda()
        # 选择优化器
        optim = choose_optimizer(config, model)
        # 加载效果测试类
        evaluator = Evaluator(config, model, logger)
        for epoch in range(config["epoch_size"]):
            epoch += 1
            logger.info("epoch %d begin" % epoch)
            epoch_loss = []
            # 训练模型
            model.train()
            for batch_data in dataset:
                if torch.cuda.is_available():
                    batch_data = [d.cuda() for d in batch_data]
                # x, y = dataiter
                # 反向传播
                optim.zero_grad()
                s1, s2, s3 = batch_data  # 输入变化时这里需要修改,比如多输入,多输出的情况
                # 计算梯度
                loss = model(s1, s2, s3)
                # 梯度更新
                loss.backward()
                # 优化器更新模型
                optim.step()
                # 记录损失
                epoch_loss.append(loss.item())
            logger.info("epoch average loss: %f" % np.mean(epoch_loss))
            # 测试模型效果
            acc = evaluator.eval(epoch)
        # 可以用model_type model_path epoch 三个参数来保存模型
        model_path = os.path.join(config["model_path"], "epoch_%d_%s.pth" % (epoch, config["model_type"]))
        torch.save(model.state_dict(), model_path)  # 保存模型权重
        return
    
    
    if __name__ == "__main__":
        from config import Config
    
        Config["train_flag"] = True
        main(Config)
    
        # for model in ["cnn"]:
        #     Config["model_type"] = model
        #     print("最后一轮准确率:", main(Config), "当前配置:", Config["model_type"])
    
        # 对比所有模型
        # 中间日志可以关掉,避免输出过多信息
        # 超参数的网格搜索
        # for model in ["gated_cnn"]:
        #     Config["model_type"] = model
        #     for lr in [1e-3, 1e-4]:
        #         Config["learning_rate"] = lr
        #         for hidden_size in [128]:
        #             Config["hidden_size"] = hidden_size
        #             for batch_size in [64, 128]:
        #                 Config["batch_size"] = batch_size
        #                 for pooling_style in ["avg"]:
        #                     Config["pooling_style"] = pooling_style
        # 可以把输出放入文件中 便于查看
        #                     print("最后一轮准确率:", main(Config), "当前配置:", Config)
    
    
    
    

    evaluate.py 评估模型文件

    """
    模型效果测试
    """
    import torch
    from loader import load_data_batch
    
    
    class Evaluator:
        def __init__(self, config, model, logger):
            self.config = config
            self.model = model
            self.logger = logger
            # 选择验证集合
            config['train_flag'] = False
            self.valid_data = load_data_batch(config["valid_data_path"], config, shuffle=False)
            config['train_flag'] = True
            self.train_data = load_data_batch(config["train_data_path"], config)
            self.stats_dict = {"correct": 0, "wrong": 0}  # 用于存储测试结果
    
        def eval(self, epoch):
            self.logger.info("开始测试第%d轮模型效果:" % epoch)
            self.stats_dict = {"correct": 0, "wrong": 0}  # 清空前一轮的测试结果
            self.model.eval()
            self.knwb_to_vector()
            for index, batch_data in enumerate(self.valid_data):
                if torch.cuda.is_available():
                    batch_data = [d.cuda() for d in batch_data]
                input_id, labels = batch_data  # 输入变化时这里需要修改,比如多输入,多输出的情况
                with torch.no_grad():
                    test_question_vectors = self.model(input_id)  # 不输入labels,使用模型当前参数进行预测
                self.write_stats(test_question_vectors, labels)
            self.show_stats()
            return
    
        def write_stats(self, test_question_vectors, labels):
            assert len(labels) == len(test_question_vectors)
            for test_question_vector, label in zip(test_question_vectors, labels):
                # 通过一次矩阵乘法,计算输入问题和知识库中所有问题的相似度
                # test_question_vector shape [vec_size]   knwb_vectors shape = [n, vec_size]
                res = torch.mm(test_question_vector.unsqueeze(0), self.knwb_vectors.T)
                hit_index = int(torch.argmax(res.squeeze()))  # 命中问题标号
                hit_index = self.question_index_to_standard_question_index[hit_index]  # 转化成标准问编号
                if int(hit_index) == int(label):
                    self.stats_dict["correct"] += 1
                else:
                    self.stats_dict["wrong"] += 1
            return
    
        # 将知识库中的问题向量化,为匹配做准备
        # 每轮训练的模型参数不一样,生成的向量也不一样,所以需要每轮测试都重新进行向量化
        def knwb_to_vector(self):
            self.question_index_to_standard_question_index = {}
            self.question_ids = []
            for standard_question_index, question_ids in self.train_data.dataset.knwb.items():
                for question_id in question_ids:
                    # 记录问题编号到标准问题标号的映射,用来确认答案是否正确
                    self.question_index_to_standard_question_index[len(self.question_ids)] = standard_question_index
                    self.question_ids.append(question_id)
            with torch.no_grad():
                question_matrixs = torch.stack(self.question_ids, dim=0)
                if torch.cuda.is_available():
                    question_matrixs = question_matrixs.cuda()
                self.knwb_vectors = self.model(question_matrixs)
                # 将所有向量都作归一化 v / |v|
                self.knwb_vectors = torch.nn.functional.normalize(self.knwb_vectors, dim=-1)
            return
    
        def show_stats(self):
            correct = self.stats_dict["correct"]
            wrong = self.stats_dict["wrong"]
            self.logger.info("预测集合条目总量:%d" % (correct + wrong))
            self.logger.info("预测正确条目:%d,预测错误条目:%d" % (correct, wrong))
            self.logger.info("预测准确率:%f" % (correct / (correct + wrong)))
            self.logger.info("--------------------")
            return correct / (correct + wrong)
    
    

    model.py

    import torch
    import torch.nn as nn
    from torch.optim import Adam, SGD
    from transformers import BertModel
    
    """
    建立网络模型结构
    """
    
    
    class TorchModel(nn.Module):
        def __init__(self, config):
            super(TorchModel, self).__init__()
            hidden_size = config["hidden_size"]
            vocab_size = config["vocab_size"] + 1
            output_size = config["output_size"]
            model_type = config["model_type"]
            num_layers = config["num_layers"]
            self.use_bert = config["use_bert"]
            self.emb = nn.Embedding(vocab_size + 1, hidden_size, padding_idx=0)
            if model_type == 'rnn':
                self.encoder = nn.RNN(input_size=hidden_size, hidden_size=hidden_size, num_layers=num_layers,
                                      batch_first=True)
            elif model_type == 'lstm':
                # 双向lstm,输出的是 hidden_size * 2(num_layers 要写2)
                self.encoder = nn.LSTM(hidden_size, hidden_size, num_layers=num_layers)
            elif self.use_bert:
                self.encoder = BertModel.from_pretrained(config["bert_model_path"])
                # 需要使用预训练模型的hidden_size
                hidden_size = self.encoder.config.hidden_size
            elif model_type == 'cnn':
                self.encoder = CNN(config)
            elif model_type == "gated_cnn":
                self.encoder = GatedCNN(config)
            elif model_type == "bert_lstm":
                self.encoder = BertLSTM(config)
                # 需要使用预训练模型的hidden_size
                hidden_size = self.encoder.config.hidden_size
    
            self.classify = nn.Linear(hidden_size, output_size)
            self.pooling_style = config["pooling_style"]
            self.loss = nn.functional.cross_entropy  # loss采用交叉熵损失
    
        def forward(self, x, y=None):
            if self.use_bert:
                # 输入x为[batch_size, seq_len]
                # bert返回的结果是 (sequence_output, pooler_output)
                # sequence_output:batch_size, max_len, hidden_size
                # pooler_output:batch_size, hidden_size
                x = self.encoder(x)[0]
            else:
                x = self.emb(x)
                x = self.encoder(x)
            # 判断x是否是tuple
            if isinstance(x, tuple):
                x = x[0]
            # 池化层
            if self.pooling_style == "max":
                # shape[1]代表列数,shape是行和列数构成的元组
                self.pooling_style = nn.MaxPool1d(x.shape[1])
            elif self.pooling_style == "avg":
                self.pooling_style = nn.AvgPool1d(x.shape[1])
            x = self.pooling_style(x.transpose(1, 2)).squeeze()
    
            y_pred = self.classify(x)
            if y is not None:
                return self.loss(y_pred, y.squeeze())
            else:
                return y_pred
    
    
    # 定义孪生网络  (计算两个句子之间的相似度)
    class SiameseNetwork(nn.Module):
        def __init__(self, config):
            super(SiameseNetwork, self).__init__()
            self.sentence_encoder = TorchModel(config)
            # 使用的是cos计算
            # self.loss = nn.CosineEmbeddingLoss()
            # 使用triplet_loss
            self.triplet_loss = self.cosine_triplet_loss
    
        # 计算余弦距离  1-cos(a,b)
        # cos=1时两个向量相同,余弦距离为0;cos=0时,两个向量正交,余弦距离为1
        def cosine_distance(self, tensor1, tensor2):
            tensor1 = torch.nn.functional.normalize(tensor1, dim=-1)
            tensor2 = torch.nn.functional.normalize(tensor2, dim=-1)
            cosine = torch.sum(torch.mul(tensor1, tensor2), axis=-1)
            return 1 - cosine
    
        # 3个样本  2个为一类 另一个一类 计算triplet loss
        def cosine_triplet_loss(self, a, p, n, margin=None):
            ap = self.cosine_distance(a, p)
            an = self.cosine_distance(a, n)
            if margin is None:
                diff = ap - an + 0.1
            else:
                diff = ap - an + margin.squeeze()
            return torch.mean(diff[diff.gt(0)])  # greater than
    
        # 使用triplet_loss
        def forward(self, sentence1, sentence2=None, sentence3=None, margin=None):
            vector1 = self.sentence_encoder(sentence1)
            # 同时传入3 个样本
            if sentence2 is None:
                if sentence3 is None:
                    return vector1
                # 计算余弦距离
                else:
                    vector3 = self.sentence_encoder(sentence3)
                    return self.cosine_distance(vector1, vector3)
            else:
                vector2 = self.sentence_encoder(sentence2)
                if sentence3 is None:
                    return self.cosine_distance(vector1, vector2)
                else:
                    vector3 = self.sentence_encoder(sentence3)
                    return self.triplet_loss(vector1, vector2, vector3, margin)
    
        # CosineEmbeddingLoss
        # def forward(self,sentence1, sentence2=None, target=None):
        #     # 同时传入两个句子
        #     if sentence2 is not None:
        #         vector1 = self.sentence_encoder(sentence1)  # vec:(batch_size, hidden_size)
        #         vector2 = self.sentence_encoder(sentence2)
        #         # 如果有标签,则计算loss
        #         if target is not None:
        #             return self.loss(vector1, vector2, target.squeeze())
        #         # 如果无标签,计算余弦距离
        #         else:
        #             return self.cosine_distance(vector1, vector2)
        #     # 单独传入一个句子时,认为正在使用向量化能力
        #     else:
        #         return self.sentence_encoder(sentence1)
    
    
    # 优化器的选择
    def choose_optimizer(config, model):
        optimizer = config["optimizer"]
        learning_rate = config["lr"]
        if optimizer == "adam":
            return Adam(model.parameters(), lr=learning_rate)
        elif optimizer == "sgd":
            return SGD(model.parameters(), lr=learning_rate)
    
    
    # 定义CNN模型
    class CNN(nn.Module):
        def __init__(self, config):
            super(CNN, self).__init__()
            hidden_size = config["hidden_size"]
            kernel_size = config["kernel_size"]
            pad = int((kernel_size - 1) / 2)
            self.cnn = nn.Conv1d(hidden_size, hidden_size, kernel_size, bias=False, padding=pad)
    
        def forward(self, x):  # x : (batch_size, max_len, embeding_size)
            return self.cnn(x.transpose(1, 2)).transpose(1, 2)
    
    
    # 定义GatedCNN模型
    class GatedCNN(nn.Module):
        def __init__(self, config):
            super(GatedCNN, self).__init__()
            self.cnn = CNN(config)
            self.gate = CNN(config)
    
        # 定义前向传播函数 比普通cnn多了一次sigmoid 然后互相卷积
        def forward(self, x):
            a = self.cnn(x)
            b = self.gate(x)
            b = torch.sigmoid(b)
            return torch.mul(a, b)
    
    
    # 定义BERT-LSTM模型
    class BertLSTM(nn.Module):
        def __init__(self, config):
            super(BertLSTM, self).__init__()
            self.bert = BertModel.from_pretrained(config["bert_model_path"], return_dict=False)
            self.rnn = nn.LSTM(self.bert.config.hidden_size, self.bert.config.hidden_size, batch_first=True)
    
        def forward(self, x):
            x = self.bert(x)[0]
            x, _ = self.rnn(x)
            return x
    
    
    if __name__ == "__main__":
        from config import Config
    
        Config["vocab_size"] = 10
        Config["max_length"] = 4
        model = SiameseNetwork(Config)
        s1 = torch.LongTensor([[1, 2, 3, 0], [2, 2, 0, 0]])
        s2 = torch.LongTensor([[1, 2, 3, 4], [3, 2, 3, 4]])
        l = torch.LongTensor([[1], [0]])
        y = model(s1, s2, l)
        print(y)
    
    
    
  • 相关阅读:
    使用nvm管理node.js
    Opensips安装配置(以下操作均已centOS 6.3系统为准)
    vue的ref使用技巧☞重置表单
    简单的数据相加效果
    【kubernetes】Argo Rollouts -- k8s下的自动化蓝绿部署
    怎样才能上传百度文库,上传百度文库的操作步骤和技巧
    C++11
    第P9周:YOLOv5-Backbone模块实现
    第一章 Google软件测试介绍
    26.3.4 绑定资源组
  • 原文地址:https://blog.csdn.net/njh1147394013/article/details/138766512