• Python - 深度学习系列38 重塑实体识别5-预测并行化改造


    说明

    在重塑实体识别4中梳理了数据流,然后我发现pipeline的串行效率太低了,所以做了并行化改造。里面还是有不少坑的,记录一下。

    内容

    1 pipeline

    官方的pipeline看起来的确是比较好用的,主要是实现了比较好的数据预处理。因为在训练/使用过程中都要进行数据的令牌化与反令牌化,有些字符会被特殊处理,例如 '##A’等。
    在这里插入图片描述
    在使用过程中,我用200条新闻数据进行测试,用pipeline方法花了11分钟处理完毕,期间CUDA的使用率大约为10%。按此估算,即使用多接口并行的方式,那么一分钟最多处理2000条,一天最多处理0.14*2000~30万条数据。这个效率太低了。

    2 并行化

    最终的结论是不到30秒处理200条,显存只占用2.6G,理论上可以支持3个服务并行(以确保GPU的完全利用)。按最保守的估计,改造后的并行化应该可以提升3倍的效率,稍微激进一点,可以提升10倍的效率。这个之后可以进行测试。

    一些主要的点如下

    2.1 结果解析

    结果可以分为:

    • 1 仅含解析出的实体列表,用逗号连接字符串表示。
    • 2 含实体及其起始位置的表示,这个用于标注反馈、二次增强处理。
    • 3 仅含BIO标签,主要用于和测试数据进行效果比对。

    对应的相关函数,看起来有点繁杂,我自己都不太想看第二眼。

    from datasets import ClassLabel
    # 定义标签列表
    label_list = ['B', 'I', 'O']
    # 创建 ClassLabel 对象
    class_label = ClassLabel(names=label_list)
    def convert_entity_label_batch(x):
        x1 = x
        return class_label.int2str(x1)
    # 定义函数将整数Tensor转换为字符串 | 反令牌函数,但是用不上;因为predict label列表的长度和 ss_padding相同
    def tensor_to_string(tensor, tokenizer = None , skip_special_tokens = True):
        return tokenizer.decode(tensor.tolist(), skip_special_tokens=skip_special_tokens).replace(' ','')
    
    from datasets import ClassLabel
    def detokenize(word_piece):
        """
        将 WordPiece 令牌还原为原始句子。
        """
        if word_piece.startswith('##'):
            x = word_piece[2:]
        else:
            x = word_piece
        return x
    import re
    def extract_bio_positions(bio_string):
        pattern = re.compile(r'B(I+)(O|$)')
        matches = pattern.finditer(bio_string)
        
        results = []
        for match in matches:
            start, end = match.span()
            results.append((start, end - 1))  # end-1 to include the last 'I'
        
        return results
    
    # 0.1ms
    def parse_ent_pos_map_batch(some_dict = None):
    
        word_list = some_dict['token_words']
        label_list = [int(x) for x in list(some_dict['label_list'])]
    
        min_len = min(len(word_list),len(label_list))
        word_list = word_list[:min_len]
        label_list = label_list[:min_len]
        
    
        label_list1 =  list(map(convert_entity_label_batch,label_list))
        oriword_list1 = list(map(detokenize,word_list))
        ori_word_str =''.join(oriword_list1)
        # 补到等长
        label_str = ''
        for i in range(len(label_list1)):
            len_of_ori_word = len(oriword_list1[i])
            if len_of_ori_word == 1:
                tem_str = label_list1[i]
            else:
                if label_list1[i] in ['I','O']:
                    tem_str = label_list1[i] * len_of_ori_word
                else:
                    tem_str = 'B' + 'I' * (len_of_ori_word -1)        
            label_str += tem_str
    
        pos_list = extract_bio_positions(label_str)
        part_ent_list = [(ori_word_str[x[0]:x[1]] , *x) for x in pos_list]
        return part_ent_list
    
    # =============
    def make_BIO_by_len(some_len):
        default_str = 'I' * some_len
        str_list = list(default_str)
        str_list[0] ='B'
        return str_list
    def gen_BIO_list2(some_dict):
        the_content = some_dict['clean_data']
        ent_list =  some_dict['ent_tuple_list']
    
        content_list = list(the_content)
        tag_list = list('O'* len(content_list))
        
        for ent_info in ent_list:
            start = ent_info[1]
            end = ent_info[2]
            label_len = end-start
            tem_bio_list = make_BIO_by_len(label_len)
            tag_list[start:end] = tem_bio_list
    
        res_dict = {}
        res_dict['x'] = content_list
        res_dict['y'] = tag_list
        return res_dict
    
    def trim_len(some_dict = None):
        padding_BIO = some_dict['padding_BIO']
        ss_len = some_dict['ss_len']
        return padding_BIO[:ss_len]
    

    2.2 批量预测

    看起来同样很繁杂,但是不得不细看。首先,数据会按照几个长度 20,50,198分为三部分处理,batch_predict每次仅处理一个批次。在这里,将数据转为定长的令牌长度,然后转入CUDA进行批量预测。结果再按照实体-位置 tuple, 实体列表和BIO三种方式进行解析。

    from functools import partial
    import transformers 
    import torch 
    from transformers import AutoModelForMaskedLM, AutoTokenizer,AutoModelForTokenClassification
    from functools import partial
    # some_batch 是原文经过padding的数据,['ss_hash','ss','ss_len', 'ss_padding'], 其中ss_padding的长度是固定的
    # 模型文件和令牌文件都放在model_path之下,model比较大,避免重载;而tokenize会有padding过程,必须重载
    # 模型先载入cuda
    def batch_predict(some_batch, ss_padding_len = None, model = None, model_path = None):
        # 因为tokenize会在令牌的前后加上分隔令牌,所以+2
        if ss_padding_len is None:
            ss_padding_len = some_batch['ss_padding'].apply(len).max()
        print('ss_padding_len is %s ' % ss_padding_len)
        max_len = ss_padding_len+2
        tokenizer = AutoTokenizer.from_pretrained(model_path)
        tencoder = partial(tokenizer.encode,truncation=True, max_length=max_len, is_split_into_words=True, return_tensors="pt",  padding='max_length')
        some_batch['ss_padding_token'] = some_batch['ss_padding'].apply(list).apply(tencoder)
    
        # 构成矩阵
        minput = torch.cat(list(some_batch['ss_padding_token'].values))
        # 将数据搬到GPU中处理再返回
        with torch.no_grad():
            input_cuda = minput.to(device)
        
            outputs_cuda = model(input_cuda).logits
            predictions = torch.argmax(outputs_cuda, dim=2)
        
            predictions_list = list(predictions.to('cpu').numpy())
        
        predict_list1 = []
        for predictions in predictions_list:
            tem_pred_tag = [int(x) for x in predictions[1:-1]]
            predict_list1.append(tem_pred_tag)
    
        some_batch['label_list'] = predict_list1
        _s = cols2s(some_df =some_batch, cols= ['ss_padding','label_list'], cols_key_mapping= ['token_words', 'label_list'])
        _s1 = _s.apply(parse_ent_pos_map_batch)
        some_batch['ent_tuple_list'] = list(_s1)
        some_batch['ent_list'] = some_batch['ent_tuple_list'].apply(lambda x: ','.join([a[0] for a in x ]))
    
        _s = cols2s(some_batch, cols= ['ss_padding', 'ent_tuple_list'], cols_key_mapping= ['clean_data', 'ent_tuple_list'])
        s1 = _s.apply(gen_BIO_list2)
        ent_tuple_res_df1 = pd.DataFrame(s1.to_list())
        some_batch['padding_BIO'] = list(ent_tuple_res_df1['y'].apply(lambda x: ''.join(x)))
        _s00 = cols2s(some_batch, cols = ['ss_len', 'padding_BIO'], cols_key_mapping=['ss_len', 'padding_BIO'])
        some_batch['BIO'] = list(_s00.apply(trim_len))
        return some_batch    
    

    3 迭代器

    在推送数据处理时,可以采用迭代器来控制不同的批次数据

    # 迭代器切分
    import pandas as pd
    class DataFrameBatchIterator:
        def __init__(self, dataframe, batch_size):
            self.dataframe = dataframe
            self.batch_size = batch_size
            # 【我增加的】
            self.fail_batch_list = []
        def __iter__(self):
            num_rows = len(self.dataframe)
            num_batches = (num_rows - 1) // self.batch_size + 1
            for i in range(num_batches):
                start_idx = i * self.batch_size
                end_idx = (i + 1) * self.batch_size
                batch_data = self.dataframe.iloc[start_idx:end_idx]
                yield batch_data
        # 【我增加的】
        def clear_fail(self):
            self.fail_batch_list = []
        # 【我增加的】
        def get_some_batch(self, batch_idx):
            return self.dataframe.iloc[self.batch_size * batch_idx: self.batch_size * (batch_idx + 1)]
        # 【我增加的】记录失败的批次
        def rec_fail_batch_idx(self, batch_idx):
            self.fail_batch_list.append(batch_idx)
    # 创建一个示例 DataFrame
    data = {'Name': ['John', 'Jane', 'Mike', 'Alice', 'Bob'],
            'Age': [25, 30, 35, 28, 32],
            'City': ['New York', 'Paris', 'London', 'Tokyo', 'Sydney']}
    df = pd.DataFrame(data)
    # 创建 DataFrame 迭代器
    batch_iterator = DataFrameBatchIterator(df, batch_size=2)
    import tqdm
    # 使用迭代器逐批次处理数据
    for i,batch in tqdm.tqdm(enumerate(batch_iterator)):
        try:
            # 在这里可以对当前批次的数据进行相应的操作
            # 例如进行数据清洗、特征处理、模型训练等
            # 示例:打印当前批次的数据
    #         raise Exception(e) 
            print(batch)
        except:
            print('>>> %s Fail' % i)
            batch_iterator.rec_fail_batch_idx(i)
    

    以下是实际的调度

    # 假设处理长度为1万的句子
    # 20 * 2000 ~ 4w
    # 50 * 800 ~  4w
    # 200 * 200 ~ 4w
    import warnings 
    warnings.filterwarnings('ignore')
    batch_slice_para = {20:2000, 50:800, 200:200}
    batch_len_list = sorted(list(batch_slice_para.keys()))
    batch_len_list.insert(0,0)
    
    batch_df_list = []
    for i in range(len(batch_len_list)):
        if i >0:
            sel = (ss_df['ss_len'] >=batch_len_list[i-1]) & (ss_df['ss_len'] < batch_len_list[i])
            if sel.sum():
                padding_len = batch_len_list[i]
                padding_batch = batch_slice_para[padding_len]
                
                tem_df= ss_df[sel]
                # tem_df['ss_padding'] = tem_df['ss'].apply(lambda x: x.ljust(padding_len,'a'))
                tem_df['ss_padding'] = tem_df['ss']
                tem_df_iterator = DataFrameBatchIterator(tem_df, padding_batch)
                batch_df_list.append(tem_df_iterator)
            else:
                batch_df_list.append(None)
    

    对每个批次执行处理,载入模型

    label_list = ['B','I','O']
    model_checkpoint = 'model03'
    model = AutoModelForTokenClassification.from_pretrained(model_checkpoint, num_labels=len(label_list))
    
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    print('Device: %s' % device)
    # 自动切换设备
    if model.device.type != device:
        model.to(device)
        print('>>> 检测到模型设备与当前指定不一致,切换 %s' % device )
    else:
        print('>>> 模型设备一致,不切换 %s' % device)
    

    分批次预测(主要是确保显存不溢出)

    batch_res_list = []
    for some_iter in batch_df_list:
        for some_batch in some_iter:
            batch_res = batch_predict(some_batch, model = model, model_path = 'model03')
            batch_res_list.append(batch_res)
    

    结果合并

    batch_res_df = pd.concat(batch_res_list, ignore_index= True)
    mdf = pd.merge(input_df , batch_res_df[['ss_hash', 'ent_list']],how='left', on ='ss_hash')
    

    在这里插入图片描述

    4 总结

    一个在理论上证明可以显著提升效率的点在于,模型进行实体识别时先切分了短句,然后按短句进行了去重:相同短句的实体结果一定是相同的。

    实验中,200条新闻产生了约5万个短句,去重后只剩下约3.5万。所以即使在这一步也是有提升的。当然,这种方式同样也可以被用于pipeline。

    还有就是在处理填充时,并不按照最大长度统一填充。而是按照句子长度的统计特性分为了短、中、长三种方式。从统计上看,约70%的短句长度是在20个字符以内的,真正超过50个字符的短句(中间无分隔符),即使从语法上来看也是比较奇怪的。
    这样在填充数据时浪费就比pipeline要小,同样显存可以装下更多的数据。

  • 相关阅读:
    深度优先搜索dfs算法刷题笔记【蓝桥杯】
    静态pod 创建使用示例详解
    Http代理与socks5代理有何区别?如何选择?(一)
    buuctf web [极客大挑战 2019]BabySQL
    04.toRef 默认值
    娄底可靠性检测实验室建设知识概述
    JSP webshell免杀——JSP的基础
    防火墙导致Linux发送网络报文出现errno等于1的错误码
    分布式一致性算法Raft-理论篇
    Unity学习笔记--入门
  • 原文地址:https://blog.csdn.net/yukai08008/article/details/139432253