• NLP(15)-序列标注任务


    前言

    仅记录学习过程,有问题欢迎讨论

    什么时候应该使用Pooling层:

    • 如果针对每个字做标注,无需;若是针对整句话做分类,则需要pooling

    NER(数据标注):B/M/E (A/O/P) --左/中/右 边界(地址/机构/人名) O–无关字

    CRF -转移矩阵: 如果一个字已经是B_location 那么它大概率是E_location 或者 M_location 和其他的BEM 基本无关

    • shape为 label * label

    发射矩阵:shape为 sen_len * tag_size ;相当于输出每个字对应tag的概率矩阵

    采用bert:

    PERSON类实体,准确率:0.655738, 召回率: 0.462428, F1: 0.542368
    LOCATION类实体,准确率:0.655462, 召回率: 0.406250, F1: 0.501603
    TIME类实体,准确率:0.846847, 召回率: 0.576687, F1: 0.686127
    ORGANIZATION类实体,准确率:0.448276, 召回率: 0.305882, F1: 0.363631
    Macro-F1: 0.523432
    Micro-F1 0.543495

    采用LSTM:

    PERSON类实体,准确率:0.432000, 召回率: 0.312139, F1: 0.362411
    LOCATION类实体,准确率:0.512987, 召回率: 0.411458, F1: 0.456642
    TIME类实体,准确率:0.721804, 召回率: 0.588957, F1: 0.648644
    ORGANIZATION类实体,准确率:0.450000, 召回率: 0.423529, F1: 0.436359
    Macro-F1: 0.476014
    Micro-F1 0.479633
    `

    代码

    实现一个NER代码,划分每一句话的B/M/E/O

    config.py

    """
    配置参数信息
    """
    Config = {
        "model_path": "./output/",
        "model_name": "model.pt",
        "schema_path": r"E:\Anlp\week9 序列标注问题\ner\ner_data\schema.json",
        "train_data_path": r"E:\Anlp\week9 序列标注问题\ner\ner_data\train.txt",
        "valid_data_path": r"E:\Anlp\week9 序列标注问题\ner\ner_data\test.txt",
        "vocab_path": r"E:\Anlp\week9 序列标注问题\ner\chars.txt",
        "model_type": "bert",
        # 数据标注中计算loss
        "use_crf": True,
        # 文本向量大小
        "char_dim": 20,
        # 文本长度
        "max_len": 50,
        # 词向量大小
        "hidden_size": 64,
        # 训练 轮数
        "epoch_size": 15,
        # 批量大小
        "batch_size": 25,
        # 训练集大小
        "simple_size": 300,
        # 学习率
        "lr": 0.0001,
        # dropout
        "dropout": 0.5,
        # 优化器
        "optimizer": "adam",
        # 卷积核
        "kernel_size": 3,
        # 最大池 or 平均池
        "pooling_style": "max",
        # 模型层数
        "num_layers": 3,
        "bert_model_path": r"E:\Anlp\week6语言模型和预训练\bert-base-chinese",
        # 输出层大小
        "output_size": 9,
        # 随机数种子
        "seed": 987
    }
    
    
    
    

    load.py j加载数据文件

    """
    数据加载
    """
    import os
    import numpy as np
    import json
    import re
    import os
    import torch
    import torch.utils.data as Data
    from torch.utils.data import Dataset, DataLoader
    from transformers import BertTokenizer
    
    
    # 获取字表集
    def load_vocab(path):
        vocab = {}
        with open(path, 'r', encoding='utf-8') as f:
            for index, line in enumerate(f):
                word = line.strip()
                # 0留给padding位置,所以从1开始
                vocab[word] = index + 1
            vocab['unk'] = len(vocab) + 1
        return vocab
    
    
    class DataGenerator:
        def __init__(self, data_path, config):
            self.data_path = data_path
            self.config = config
            self.schema = self.load_schema(config["schema_path"])
            if self.config["model_type"] == "bert":
                self.tokenizer = BertTokenizer.from_pretrained(config["bert_model_path"])
            self.vocab = load_vocab(config["vocab_path"])
            self.config["vocab_size"] = len(self.vocab)
            # 中文的语句list
            self.sentence_list = []
            self.data = self.load_data()
    
        def __len__(self):
            return len(self.data)
    
        def __getitem__(self, idx):
            return self.data[idx]
    
        def load_schema(self, path):
            with open(path, encoding="utf8") as f:
                return json.load(f)
    
        def load_data(self):
            dataset_x = []
            dataset_y = []
            with open(self.data_path, 'r', encoding='utf-8') as f:
                # 每句话
                segments = f.read().split("\n\n")
                # 每句话字符 如: 你 0
                for segment in segments:
                    sentences = []
                    labels = []
                    for line in segment.split("\n"):
    
                        if line.strip() == "":
                            continue
                        char, label = line.split()
                        sentences.append(char)
                        labels.append(self.schema[label])
                    self.sentence_list.append(' '.join(sentences))
                    input_id = self.sentence_to_index(sentences)
                    # labels 也需要padding相同长度
                    labels = self.padding(labels,-1)
                    # 标签和文本组成一个样本
                    dataset_x.append(input_id)
                    dataset_y.append(labels)
                data = Data.TensorDataset(torch.tensor(dataset_x), torch.tensor(dataset_y))
    
            return data
    
        # 文本预处理
        # 转化为向量
        def sentence_to_index(self, text):
            input_ids = []
            vocab = self.vocab
            if self.config["model_type"] == "bert":
                # 中文的文本转化为tokenizer的id
                input_ids = self.tokenizer.encode(text, padding="max_length", truncation=True,
                                                  max_length=self.config["max_len"])
            else:
                for char in text:
                    input_ids.append(vocab.get(char, vocab['unk']))
                # 填充or裁剪
                input_ids = self.padding(input_ids)
            return input_ids
    
        # 数据预处理 裁剪or填充
        def padding(self, input_ids, padding_dot=0):
            length = self.config["max_len"]
            padded_input_ids = input_ids
            if len(input_ids) >= length:
                return input_ids[:length]
            else:
                padded_input_ids += [padding_dot] * (length - len(input_ids))
                return padded_input_ids
    
    
    # 用torch自带的DataLoader类封装数据
    def load_data_batch(data_path, config, shuffle=True):
        dg = DataGenerator(data_path, config)
        # DataLoader 类封装数据 dg除了data 还包含其他信息(后面需要使用)
        dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle)
        return dl
    
    
    if __name__ == '__main__':
        from config import Config
    
        dg = DataGenerator(Config["train_data_path"], Config)
        print(len(dg))
        print(dg[0])
    
    
    
    

    evaluate.py 评估模型文件

    """
    模型效果测试
    """
    import re
    from collections import defaultdict
    
    import numpy as np
    import torch
    from loader import load_data_batch
    
    
    class Evaluator:
        def __init__(self, config, model, logger):
            self.config = config
            self.model = model
            self.logger = logger
            # 选择验证集合
            self.dataset = load_data_batch(config["valid_data_path"], config, shuffle=False)
            # self.stats_dict = {"correct": 0, "wrong": 0}  # 用于存储测试结果
    
        def eval(self, epoch):
            self.logger.info("开始测试第%d轮模型效果:" % epoch)
            # 测试模式
            self.model.eval()
            self.logger.info("开始测试第%d轮模型效果:" % epoch)
            self.stats_dict = {"LOCATION": defaultdict(int),
                               "TIME": defaultdict(int),
                               "PERSON": defaultdict(int),
                               "ORGANIZATION": defaultdict(int)}
            for index, batch_data in enumerate(self.dataset):
                # 取batch_size 句话
                sentences = self.dataset.dataset.sentence_list[
                            index * self.config["batch_size"]: (index + 1) * self.config["batch_size"]]
                if torch.cuda.is_available():
                    batch_data = [d.cuda() for d in batch_data]
                input_id, labels = batch_data
                with torch.no_grad():
                    pred_results = self.model(input_id)  # 不输入labels,使用模型当前参数进行预测
                self.write_stats(labels, pred_results, sentences)
                self.show_stats()
            return
    
        def write_stats(self, labels, pred_results, sentences):
            assert len(labels) == len(pred_results) == len(sentences)
            if not self.config['use_crf']:
                pred_results = torch.argmax(pred_results, dim=-1)
            for true_label, pred_label, sentence in zip(labels, pred_results, sentences):
                if not self.config["use_crf"]:
                    pred_label = pred_label.cpu().detach().tolist()
                true_label = true_label.cpu().detach().tolist()
                true_entities = self.decode(sentence, true_label)
                pred_entities = self.decode(sentence, pred_label)
                # 正确率 = 识别出的正确实体数 / 识别出的实体数
                # 召回率 = 识别出的正确实体数 / 样本的实体数
                for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]:
                    self.stats_dict[key]["正确识别"] += len([ent for ent in pred_entities[key] if ent in true_entities[key]])
                    self.stats_dict[key]["样本实体数"] += len(true_entities[key])
                    self.stats_dict[key]["识别出实体数"] += len(pred_entities[key])
            return
    
        def show_stats(self):
            F1_scores = []
            for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]:
                # 正确率 = 识别出的正确实体数 / 识别出的实体数
                # 召回率 = 识别出的正确实体数 / 样本的实体数
                precision = self.stats_dict[key]["正确识别"] / (1e-5 + self.stats_dict[key]["识别出实体数"])
                recall = self.stats_dict[key]["正确识别"] / (1e-5 + self.stats_dict[key]["样本实体数"])
                F1 = (2 * precision * recall) / (precision + recall + 1e-5)
                F1_scores.append(F1)
                self.logger.info("%s类实体,准确率:%f, 召回率: %f, F1: %f" % (key, precision, recall, F1))
            self.logger.info("Macro-F1: %f" % np.mean(F1_scores))
            correct_pred = sum([self.stats_dict[key]["正确识别"] for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]])
            total_pred = sum([self.stats_dict[key]["识别出实体数"] for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]])
            true_enti = sum([self.stats_dict[key]["样本实体数"] for key in ["PERSON", "LOCATION", "TIME", "ORGANIZATION"]])
            micro_precision = correct_pred / (total_pred + 1e-5)
            micro_recall = correct_pred / (true_enti + 1e-5)
            micro_f1 = (2 * micro_precision * micro_recall) / (micro_precision + micro_recall + 1e-5)
            self.logger.info("Micro-F1 %f" % micro_f1)
            self.logger.info("--------------------")
            return
    
        # 相当于截取对应的句子
        def decode(self, sentence, labels):
            labels = "".join([str(x) for x in labels[:len(sentence)]])
            results = defaultdict(list)
            for location in re.finditer("(04+)", labels):
                s, e = location.span()
                results["LOCATION"].append(sentence[s:e])
            for location in re.finditer("(15+)", labels):
                s, e = location.span()
                results["ORGANIZATION"].append(sentence[s:e])
            for location in re.finditer("(26+)", labels):
                s, e = location.span()
                results["PERSON"].append(sentence[s:e])
            for location in re.finditer("(37+)", labels):
                s, e = location.span()
                results["TIME"].append(sentence[s:e])
            return results
    
    

    model.py

    import torch
    import torch.nn as nn
    from torch.optim import Adam, SGD
    from transformers import BertModel
    from torchcrf import CRF
    
    """
    建立网络模型结构
    """
    
    
    class TorchModel(nn.Module):
        def __init__(self, config):
            super(TorchModel, self).__init__()
            hidden_size = config["hidden_size"]
            vocab_size = config["vocab_size"] + 1
            output_size = config["output_size"]
            self.model_type = config["model_type"]
            num_layers = config["num_layers"]
            # self.use_bert = config["use_bert"]
            self.use_crf = config["use_crf"]
    
            self.emb = nn.Embedding(vocab_size + 1, hidden_size, padding_idx=0)
            if self.model_type == 'rnn':
                self.encoder = nn.RNN(input_size=hidden_size, hidden_size=hidden_size, num_layers=num_layers,
                                      batch_first=True)
            elif self.model_type == 'lstm':
                # 双向lstm,输出的是 hidden_size * 2(num_layers 要写2)
                self.encoder = nn.LSTM(hidden_size, hidden_size, num_layers=num_layers, bidirectional=True, batch_first=True)
                hidden_size = hidden_size * 2
    
            elif self.model_type == 'bert':
                self.encoder = BertModel.from_pretrained(config["bert_model_path"])
                # 需要使用预训练模型的hidden_size
                hidden_size = self.encoder.config.hidden_size
            elif self.model_type == 'cnn':
                self.encoder = CNN(config)
            elif self.model_type == "gated_cnn":
                self.encoder = GatedCNN(config)
            elif self.model_type == "bert_lstm":
                self.encoder = BertLSTM(config)
                # 需要使用预训练模型的hidden_size
                hidden_size = self.encoder.config.hidden_size
            self.classify = nn.Linear(hidden_size, output_size)
            self.pooling_style = config["pooling_style"]
            self.crf_layer = CRF(output_size, batch_first=True)
    
            self.loss = nn.functional.cross_entropy  # loss采用交叉熵损失
    
        def forward(self, x, y=None):
            if self.model_type == 'bert':
                # 输入x为[batch_size, seq_len]
                # bert返回的结果是 (sequence_output, pooler_output)
                # sequence_output:batch_size, max_len, hidden_size
                # pooler_output:batch_size, hidden_size
                x = self.encoder(x)[0]
            else:
                x = self.emb(x)
                x = self.encoder(x)
            # 判断x是否是tuple
            if isinstance(x, tuple):
                x = x[0]
    
            # # 池化层
            # if self.pooling_style == "max":
            #     # shape[1]代表列数,shape是行和列数构成的元组
            #     self.pooling_style = nn.MaxPool1d(x.shape[1])
            # elif self.pooling_style == "avg":
            #     self.pooling_style = nn.AvgPool1d(x.shape[1])
            # x = self.pooling_style(x.transpose(1, 2)).squeeze()
    
            y_pred = self.classify(x)
            if y is not None:
                # 是否使用crf:
                if self.use_crf:
                    mask = y.gt(-1)
                    return - self.crf_layer(y_pred, y, mask, reduction="mean")
                else:
                    # (number, class_num), (number)
                    return self.loss(y_pred.view(-1, y_pred.shape[-1]), y.view(-1))
            else:
                if self.use_crf:
                    return self.crf_layer.decode(y_pred)
                else:
                    return y_pred
    
    # 优化器的选择
    def choose_optimizer(config, model):
        optimizer = config["optimizer"]
        learning_rate = config["lr"]
        if optimizer == "adam":
            return Adam(model.parameters(), lr=learning_rate)
        elif optimizer == "sgd":
            return SGD(model.parameters(), lr=learning_rate)
    
    
    # 定义CNN模型
    class CNN(nn.Module):
        def __init__(self, config):
            super(CNN, self).__init__()
            hidden_size = config["hidden_size"]
            kernel_size = config["kernel_size"]
            pad = int((kernel_size - 1) / 2)
            self.cnn = nn.Conv1d(hidden_size, hidden_size, kernel_size, bias=False, padding=pad)
    
        def forward(self, x):  # x : (batch_size, max_len, embeding_size)
            return self.cnn(x.transpose(1, 2)).transpose(1, 2)
    
    
    # 定义GatedCNN模型
    class GatedCNN(nn.Module):
        def __init__(self, config):
            super(GatedCNN, self).__init__()
            self.cnn = CNN(config)
            self.gate = CNN(config)
    
        # 定义前向传播函数 比普通cnn多了一次sigmoid 然后互相卷积
        def forward(self, x):
            a = self.cnn(x)
            b = self.gate(x)
            b = torch.sigmoid(b)
            return torch.mul(a, b)
    
    
    # 定义BERT-LSTM模型
    class BertLSTM(nn.Module):
        def __init__(self, config):
            super(BertLSTM, self).__init__()
            self.bert = BertModel.from_pretrained(config["bert_model_path"], return_dict=False)
            self.rnn = nn.LSTM(self.bert.config.hidden_size, self.bert.config.hidden_size, batch_first=True)
    
        def forward(self, x):
            x = self.bert(x)[0]
            x, _ = self.rnn(x)
            return x
    
    # if __name__ == "__main__":
    #     from config import Config
    #
    #     Config["output_size"] = 2
    #     Config["vocab_size"] = 20
    #     Config["max_length"] = 5
    #     Config["model_type"] = "bert"
    #     Config["use_bert"] = True
    #     # model = BertModel.from_pretrained(Config["bert_model_path"], return_dict=False)
    #     x = torch.LongTensor([[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]])
    #     # sequence_output, pooler_output = model(x)
    #     # print(x[1], type(x[2]), len(x[2]))
    #
    #     model = TorchModel(Config)
    #     label = torch.LongTensor([0,1])
    #     print(model(x, label))
    
    

    代码二

    添加标点符号任务:
    config

    """
    配置参数信息
    """
    Config = {
        "model_path": "./output/",
        "model_name": "model.pt",
        "schema_path": r"D:\NLP\video\第九周\week9 序列标注问题\加标点\data\schema.json",
        "train_data_path": r"D:\NLP\video\第九周\week9 序列标注问题\加标点\data\train_corpus",
        "valid_data_path": r"D:\NLP\video\第九周\week9 序列标注问题\加标点\data\valid_corpus",
        "vocab_path": r"D:\NLP\video\第七周\data\vocab.txt",
        "model_type": "lstm",
        # 数据标注中计算loss
        "use_crf": False,
        # 文本向量大小
        "char_dim": 20,
        # 文本长度
        "max_len": 50,
        # 词向量大小
        "hidden_size": 64,
        # 训练 轮数
        "epoch_size": 15,
        # 批量大小
        "batch_size": 25,
        # 训练集大小
        "simple_size": 300,
        # 学习率
        "lr": 0.001,
        # dropout
        "dropout": 0.5,
        # 优化器
        "optimizer": "adam",
        # 卷积核
        "kernel_size": 3,
        # 最大池 or 平均池
        "pooling_style": "max",
        # 模型层数
        "num_layers": 2,
        "bert_model_path": r"E:\Anlp\week6语言模型和预训练\bert-base-chinese",
        # 输出层大小
        "output_size": 4,
        # 随机数种子
        "seed": 987
    }
    
    
    

    evaluate

    """
    模型效果测试
    """
    import re
    from collections import defaultdict
    
    import numpy as np
    import torch
    from loader import load_data_batch
    
    
    class Evaluator:
        def __init__(self, config, model, logger):
            self.config = config
            self.model = model
            self.logger = logger
            # 选择验证集合
            self.dataset = load_data_batch(config["valid_data_path"], config, shuffle=False)
            self.stats_dict = {"correct": 0, "wrong": 0}  # 用于存储测试结果
            self.schema = self.dataset.dataset.schema
            self.index_to_label = dict((y, x) for x, y in self.schema.items())
    
        def eval(self, epoch):
            self.logger.info("开始测试第%d轮模型效果:" % epoch)
            self.stats_dict = dict(zip(self.schema.keys(), [defaultdict(int) for i in range(len(self.schema))]))
            self.model.eval()
            for index, batch_data in enumerate(self.dataset):
                # 句子 batch_size 化
                sentence = self.dataset.dataset.sentence_list[
                           index * self.config["batch_size"]: (index + 1) * self.config["batch_size"]]
                input_id, labels = batch_data
                with torch.no_grad():
                    pred_results = self.model(input_id)
                    self.write_stats(labels, pred_results, sentence)
                self.show_stats()
            return
    
        # 计算准确率的
        def write_stats(self, ture_labels, pred_results, sentences):
            assert len(ture_labels) == len(pred_results) == len(sentences), print(len(ture_labels), len(pred_results), len(sentences))
            if not self.config["use_crf"]:
                # 获取最大下标值
                pred_results = torch.argmax(pred_results, dim=-1)
            for true_label, pred_result, sentence in zip(ture_labels, pred_results, sentences):
                if not self.config["use_crf"]:
                    # 和sentence 等长
                    pred_result = pred_result.cpu().detach().tolist()[:len(sentence)]
                # 和sentence 等长
                true_label = true_label.cpu().detach().tolist()[:len(sentence)]
                for pred, index in zip(pred_result, true_label):
                    if index == -1:
                        continue
                    key = self.index_to_label[index]
                    self.stats_dict[key]["correct"] += 1 if pred == index else 0
                    self.stats_dict[key]["total"] += 1
            return
    
        # 显示准确率
        def show_stats(self):
            total = []
            for key in self.schema:
                acc = self.stats_dict[key]["correct"] / (1e-5 + self.stats_dict[key]["total"])
                self.logger.info("符号%s预测准确率:%f" % (key, acc))
                total.append(acc)
            self.logger.info("平均acc:%f" % np.mean(total))
            self.logger.info("--------------------")
            return
    
    

    load

    """
    数据加载
    """
    import os
    import numpy as np
    import json
    import re
    import os
    import torch
    import torch.utils.data as Data
    from torch.utils.data import Dataset, DataLoader
    from transformers import BertTokenizer
    
    
    # 获取字表集
    def load_vocab(path):
        vocab = {}
        with open(path, 'r', encoding='utf-8') as f:
            for index, line in enumerate(f):
                word = line.strip()
                # 0留给padding位置,所以从1开始
                vocab[word] = index + 1
            vocab['unk'] = len(vocab) + 1
        return vocab
    
    
    class DataGenerator:
        def __init__(self, data_path, config):
            self.data_path = data_path
            self.config = config
            self.schema = self.load_schema(config["schema_path"])
            self.max_len = config["max_len"]
            if self.config["model_type"] == "bert":
                self.tokenizer = BertTokenizer.from_pretrained(config["bert_model_path"])
            self.vocab = load_vocab(config["vocab_path"])
            self.config["vocab_size"] = len(self.vocab)
            # 中文的语句list
            self.sentence_list = []
            self.load_data()
    
        def __len__(self):
            return len(self.data)
    
        def __getitem__(self, idx):
            return self.data[idx]
    
        def load_schema(self, path):
            with open(path, encoding="utf8") as f:
                return json.load(f)
    
        def load_data(self):
            self.data = []
            with open(self.data_path, 'r', encoding='utf-8') as f:
                for line in f:
                    if len(line) > self.max_len:
                        for i in range(len(line) // self.max_len):
                            input_id, label = self.process_sentence(line[i * self.max_len:(i + 1) * self.max_len])
                            self.data.append([torch.LongTensor(input_id), torch.LongTensor(label)])
    
                    else:
                        input_id, label = self.process_sentence(line)
                        self.data.append([torch.LongTensor(input_id), torch.LongTensor(label)])
            return
    
        # 处理文本 输出为不带target标点的文本 + label(目标是当前char的下一个char是否为target dot)
        def process_sentence(self, sentence):
            sentence_without_target = []
            labels = []
            # sentence[:-1] 因为取了 next_char
            for index, char in enumerate(sentence[:-1]):
                if char in self.schema:
                    continue
                # 不是target 标点
                sentence_without_target.append(char)
                next_char = sentence[index + 1]
                if next_char in self.schema:
                    labels.append(self.schema[next_char])
                else:
                    labels.append(0)
            # 向量化
            input_id = self.sentence_to_index(sentence_without_target)
            labels = self.padding(labels, -1)
            # 保存一下原始的句子
            self.sentence_list.append(' '.join(sentence_without_target))
            return input_id, labels
    
        # 文本预处理
        # 转化为向量
        def sentence_to_index(self, text):
            input_ids = []
            vocab = self.vocab
    
            for char in text:
                input_ids.append(vocab.get(char, vocab['unk']))
            # 填充or裁剪
            input_ids = self.padding(input_ids)
            return input_ids
    
        # 数据预处理 裁剪or填充
        def padding(self, input_ids, padding_dot=0):
            length = self.config["max_len"]
            padded_input_ids = input_ids
            if len(input_ids) >= length:
                return input_ids[:length]
            else:
                padded_input_ids += [padding_dot] * (length - len(input_ids))
                return padded_input_ids
    
    
    # 用torch自带的DataLoader类封装数据
    def load_data_batch(data_path, config, shuffle=True):
        dg = DataGenerator(data_path, config)
        # DataLoader 类封装数据 dg除了data 还包含其他信息(后面需要使用)
        dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle)
        return dl
    
    
    if __name__ == '__main__':
        from config import Config
    
        dg = DataGenerator(Config["train_data_path"], Config)
        print(len(dg))
        print(dg[0])
    
    

    main

    import torch
    import os
    import random
    import os
    import numpy as np
    import logging
    from config import Config
    from model import TorchModel, choose_optimizer
    from loader import load_data_batch
    from evaluate import Evaluator
    
    # [DEBUG, INFO, WARNING, ERROR, CRITICAL]
    
    
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    logger = logging.getLogger(__name__)
    
    """
    模型训练主程序
    """
    # 通过设置随机种子来复现上一次的结果(避免随机性)
    seed = Config["seed"]
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    
    
    def main(config):
        # 保存模型的目录
        if not os.path.isdir(config["model_path"]):
            os.mkdir(config["model_path"])
        # 加载数据
        dataset = load_data_batch(config["train_data_path"], config)
        # 加载模型
        model = TorchModel(config)
        # 是否使用gpu
        if torch.cuda.is_available():
            logger.info("gpu可以使用,迁移模型至gpu")
            model.cuda()
        # 选择优化器
        optim = choose_optimizer(config, model)
        # 加载效果测试类
        evaluator = Evaluator(config, model, logger)
        for epoch in range(config["epoch_size"]):
            epoch += 1
            logger.info("epoch %d begin" % epoch)
            epoch_loss = []
            # 训练模型
            model.train()
            for index, batch_data in enumerate(dataset):
                if torch.cuda.is_available():
                    batch_data = [d.cuda() for d in batch_data]
                # x, y = dataiter
                # 反向传播
                optim.zero_grad()
                x, y = batch_data     # 输入变化时这里需要修改,比如多输入,多输出的情况
                # 计算梯度
                loss = model(x, y)
                # 梯度更新
                loss.backward()
                # 优化器更新模型
                optim.step()
                # 记录损失
                epoch_loss.append(loss.item())
            logger.info("epoch average loss: %f" % np.mean(epoch_loss))
            # 测试模型效果
            acc = evaluator.eval(epoch)
        # 可以用model_type model_path epoch 三个参数来保存模型
        # model_path = os.path.join(config["model_path"], "epoch_%d_%s.pth" % (epoch, config["model_type"]))
        # torch.save(model.state_dict(), model_path)  # 保存模型权重
        return
    
    
    if __name__ == "__main__":
        main(Config)
    
        # for model in ["cnn"]:
        #     Config["model_type"] = model
        #     print("最后一轮准确率:", main(Config), "当前配置:", Config["model_type"])
    
        # 对比所有模型
        # 中间日志可以关掉,避免输出过多信息
        # 超参数的网格搜索
        # for model in ["gated_cnn"]:
        #     Config["model_type"] = model
        #     for lr in [1e-3, 1e-4]:
        #         Config["learning_rate"] = lr
        #         for hidden_size in [128]:
        #             Config["hidden_size"] = hidden_size
        #             for batch_size in [64, 128]:
        #                 Config["batch_size"] = batch_size
        #                 for pooling_style in ["avg"]:
        #                     Config["pooling_style"] = pooling_style
        # 可以把输出放入文件中 便于查看
        #                     print("最后一轮准确率:", main(Config), "当前配置:", Config)
    
    
    
    

    model

    import torch
    import torch.nn as nn
    from torch.optim import Adam, SGD
    from transformers import BertModel
    from torchcrf import CRF
    
    """
    建立网络模型结构
    """
    
    
    class TorchModel(nn.Module):
        def __init__(self, config):
            super(TorchModel, self).__init__()
            hidden_size = config["hidden_size"]
            vocab_size = config["vocab_size"] + 1
            output_size = config["output_size"]
            self.model_type = config["model_type"]
            num_layers = config["num_layers"]
            # self.use_bert = config["use_bert"]
            self.use_crf = config["use_crf"]
    
            self.emb = nn.Embedding(vocab_size + 1, hidden_size, padding_idx=0)
            if self.model_type == 'rnn':
                self.encoder = nn.RNN(input_size=hidden_size, hidden_size=hidden_size, num_layers=num_layers,
                                      batch_first=True)
            elif self.model_type == 'lstm':
                # 双向lstm,输出的是 hidden_size * 2(num_layers 要写2)
                self.encoder = nn.LSTM(hidden_size, hidden_size, num_layers=num_layers, bidirectional=True, batch_first=True)
                hidden_size = hidden_size * 2
    
            elif self.model_type == 'bert':
                self.encoder = BertModel.from_pretrained(config["bert_model_path"])
                # 需要使用预训练模型的hidden_size
                hidden_size = self.encoder.config.hidden_size
            elif self.model_type == 'cnn':
                self.encoder = CNN(config)
            elif self.model_type == "gated_cnn":
                self.encoder = GatedCNN(config)
            elif self.model_type == "bert_lstm":
                self.encoder = BertLSTM(config)
                # 需要使用预训练模型的hidden_size
                hidden_size = self.encoder.config.hidden_size
            self.classify = nn.Linear(hidden_size, output_size)
            self.pooling_style = config["pooling_style"]
            self.crf_layer = CRF(output_size, batch_first=True)
            # 添加() 使用torch.nn
            self.loss = torch.nn.CrossEntropyLoss(ignore_index=-1)  # loss采用交叉熵损失
    
        def forward(self, x, y=None):
            if self.model_type == 'bert':
                # 输入x为[batch_size, seq_len]
                # bert返回的结果是 (sequence_output, pooler_output)
                # sequence_output:batch_size, max_len, hidden_size
                # pooler_output:batch_size, hidden_size
                x = self.encoder(x)[0]
            else:
                x = self.emb(x)
                x = self.encoder(x)
            # 判断x是否是tuple
            if isinstance(x, tuple):
                x = x[0]
    
            # # 池化层
            # if not self.use_crf:
            #     if self.pooling_style == "max":
            #         # shape[1]代表列数,shape是行和列数构成的元组
            #         self.pooling_style = nn.MaxPool1d(x.shape[1])
            #     elif self.pooling_style == "avg":
            #         self.pooling_style = nn.AvgPool1d(x.shape[1])
            #     x = self.pooling_style(x.transpose(1, 2)).squeeze()
    
            y_pred = self.classify(x)
            if y is not None:
                # 是否使用crf:
                if self.use_crf:
                    mask = y.gt(-1)
                    return - self.crf_layer(y_pred, y, mask, reduction="mean")
                else:
                    # (number, class_num), (number)
                    # 返回的y_pred是(batch_size, max_len, class_num)
                    # y是(batch_size, max_len)
                    # 所以计算loss 应该 展开为y_pred[batch_size*max_len,class_num] y[batch_size*max_len]
                    return self.loss(y_pred.view(-1, y_pred.shape[-1]), y.view(-1))
            else:
                if self.use_crf:
                    return self.crf_layer.decode(y_pred)
                else:
                    return y_pred
    
    # 优化器的选择
    def choose_optimizer(config, model):
        optimizer = config["optimizer"]
        learning_rate = config["lr"]
        if optimizer == "adam":
            return Adam(model.parameters(), lr=learning_rate)
        elif optimizer == "sgd":
            return SGD(model.parameters(), lr=learning_rate)
    
    
    # 定义CNN模型
    class CNN(nn.Module):
        def __init__(self, config):
            super(CNN, self).__init__()
            hidden_size = config["hidden_size"]
            kernel_size = config["kernel_size"]
            pad = int((kernel_size - 1) / 2)
            self.cnn = nn.Conv1d(hidden_size, hidden_size, kernel_size, bias=False, padding=pad)
    
        def forward(self, x):  # x : (batch_size, max_len, embeding_size)
            return self.cnn(x.transpose(1, 2)).transpose(1, 2)
    
    
    # 定义GatedCNN模型
    class GatedCNN(nn.Module):
        def __init__(self, config):
            super(GatedCNN, self).__init__()
            self.cnn = CNN(config)
            self.gate = CNN(config)
    
        # 定义前向传播函数 比普通cnn多了一次sigmoid 然后互相卷积
        def forward(self, x):
            a = self.cnn(x)
            b = self.gate(x)
            b = torch.sigmoid(b)
            return torch.mul(a, b)
    
    
    # 定义BERT-LSTM模型
    class BertLSTM(nn.Module):
        def __init__(self, config):
            super(BertLSTM, self).__init__()
            self.bert = BertModel.from_pretrained(config["bert_model_path"], return_dict=False)
            self.rnn = nn.LSTM(self.bert.config.hidden_size, self.bert.config.hidden_size, batch_first=True)
    
        def forward(self, x):
            x = self.bert(x)[0]
            x, _ = self.rnn(x)
            return x
    
    # if __name__ == "__main__":
    #     from config import Config
    #
    #     Config["output_size"] = 2
    #     Config["vocab_size"] = 20
    #     Config["max_length"] = 5
    #     Config["model_type"] = "bert"
    #     Config["use_bert"] = True
    #     # model = BertModel.from_pretrained(Config["bert_model_path"], return_dict=False)
    #     x = torch.LongTensor([[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]])
    #     # sequence_output, pooler_output = model(x)
    #     # print(x[1], type(x[2]), len(x[2]))
    #
    #     model = TorchModel(Config)
    #     label = torch.LongTensor([0,1])
    #     print(model(x, label))
    
    

    predict:

    # -*- coding: utf-8 -*-
    import torch
    import re
    import json
    import numpy as np
    from collections import defaultdict
    from config import Config
    from model import TorchModel
    """
    模型效果测试
    """
    
    class SentenceLabel:
        def __init__(self, config, model_path):
            self.config = config
            self.schema = self.load_schema(config["schema_path"])
            self.index_to_sign = dict((y, x) for x, y in self.schema.items())
            self.vocab = self.load_vocab(config["vocab_path"])
            self.model = TorchModel(config)
            self.model.load_state_dict(torch.load(model_path))
            self.model.eval()
            print("模型加载完毕!")
    
        def load_schema(self, path):
            with open(path, encoding="utf8") as f:
                schema = json.load(f)
                self.config["class_num"] = len(schema)
            return schema
    
        # 加载字表或词表
        def load_vocab(self, vocab_path):
            token_dict = {}
            with open(vocab_path, encoding="utf8") as f:
                for index, line in enumerate(f):
                    token = line.strip()
                    token_dict[token] = index + 1  # 0留给padding位置,所以从1开始
            self.config["vocab_size"] = len(token_dict)
            return token_dict
    
        def predict(self, sentence):
            input_id = []
            for char in sentence:
                input_id.append(self.vocab.get(char, self.vocab[""]))
            with torch.no_grad():
                res = self.model(torch.LongTensor([input_id]))[0]
                res = torch.argmax(res, dim=-1)
            labeled_sentence = ""
            for char, label_index in zip(sentence, res):
                labeled_sentence += char + self.index_to_sign[int(label_index)]
            return labeled_sentence
    
    if __name__ == "__main__":
        sl = SentenceLabel(Config, "model_output/epoch_10.pth")
    
        sentence = "客厅的颜色比较稳重但不沉重相反很好的表现了欧式的感觉给人高雅的味道"
        res = sl.predict(sentence)
        print(res)
    
        sentence = "双子座的健康运势也呈上升的趋势但下半月有所回落"
        res = sl.predict(sentence)
        print(res)
    
    
  • 相关阅读:
    CAD渲染如何选择合适的电脑配置?
    【四:Unittest框架】
    jquery 中 e.keycode不能正确输出或不起作用的解决方法
    PMP证书已经过续费期了,还有必要再考一次吗?
    使用MinIO Client客户端实现MySQL数据库跨机备份
    宝宝每天需要补充多少钙?宝宝睡觉出汗枕秃是缺钙吗?
    C#学习以及感受
    LayaBox---Animation动画
    【wpf】依赖属性三个回调详解
    〖Python 数据库开发实战 - MySQL篇㊱〗- 综合案例 - 关于数据表 password 字段的数据加密
  • 原文地址:https://blog.csdn.net/njh1147394013/article/details/138861024