• Python - 深度学习系列36 重塑实体识别3


    说明

    从应用的角度,对实体识别的全流程进行进一步的明确。从全流程的角度上看,需要对数据做一些规范,并允许在不同的阶段插进来进行修改和迭代。

    内容

    1 原始数据

    假设这个阶段,通过较为简单的方式对数据做了标记

    在初始阶段,我们获得原始数据。然后,可能通过tf idf之类简单的方法获得一些实体列表。

    x = '小王喊小明吃饭,小王大声喊'
    y = '小王,小明'
    

    考虑到数据应该是以文档形式给到的,所以可以约定如下,必须具有4个字段:

    无论是x还是y,必须遵守最基本的utf-8 + 半角 + 两端无空格的规范。

    字段解释
    doc_id文档id,一篇文档包含n个句子
    ss_id句子排序id, sentence sort id, 整型,标题是0, 正文从1开始编号
    md5md5 hash id ,用于标记内容
    x句子文本内容,必须以强分隔符结尾(如果是标题类的,手动补上一个中文句号)。强制分隔符包含中文句号、中英文问号、中英文叹号、换行符
    y实体列表,使用英文逗号连接

    强分割函数

    import re
    def split_sentences_with_punctuation(text):
        # 定义句子分隔符
        punctuation = r'([。?!?!\n])'
        # 根据句子分隔符进行分割,并保留分隔符
        parts = re.split(punctuation, text)
        # 将分隔符与句子重新组合
        sentences = []
        for i in range(0, len(parts), 2):
            sentence = parts[i].strip()
            if i + 1 < len(parts):
                sentence += parts[i + 1]
            sentences.append(sentence)
        return sentences
    
    # 示例文本
    text = "我喜欢编程。你呢?这是一个很有趣的项目!\n我也喜欢读书。"
    
    # 分割句子,并保留分隔符
    sentences = split_sentences_with_punctuation(text)
    print(sentences)
    ['我喜欢编程。', '你呢?', '这是一个很有趣的项目!', '\n', '我也喜欢读书。', '']
    
    
    
    # 确保数据总是可以有一个强分割符
    
    import re
    
    def ensure_period(sentence= None):
        """
        如果句子不是以句号、问号、感叹号或者感叹问号结尾,则在结尾添加一个句号。
        
        参数:
        sentence (str): 待检查的句子。
        
        返回:
        str: 添加句号后的句子。
        """
        # 使用正则表达式匹配句子末尾的标点符号
        if not re.search(r'[。?!?!]$', sentence):
            sentence += '。'
        return sentence
    
    # 示例用法
    sentence1 = "这是一个例句"
    sentence2 = "这是一个例句!"
    sentence3 = "这是一个例句?"
    sentence4 = "这是一个例句."
    sentence5 = "这是一个例句?"
    sentence6 = "这是一个例句!"
    sentence7 = "这是一个例句!?"
    sentence8 = "这是一个例句?!"
    
    print(ensure_period(sentence1))  # 输出: "这是一个例句."
    print(ensure_period(sentence2))  # 输出: "这是一个例句!"
    print(ensure_period(sentence3))  # 输出: "这是一个例句?"
    print(ensure_period(sentence4))  # 输出: "这是一个例句."
    print(ensure_period(sentence5))  # 输出: "这是一个例句?"
    print(ensure_period(sentence6))  # 输出: "这是一个例句!"
    print(ensure_period(sentence7))  # 输出: "这是一个例句!?"
    print(ensure_period(sentence8))  # 输出: "这是一个例句?!"
    

    2 处理流程

    基本的规范是utf8字符集和半角字符集。

    # 标准处理函数
    import re
    
    def extract_utf8_chars(input_string = None):
        # 定义一个正则表达式,用于匹配所有的UTF-8字符
        utf8_pattern = re.compile(r'[\u0000-\U0010FFFF]')
        
        # 使用findall方法找到所有匹配的字符
        utf8_chars = utf8_pattern.findall(input_string)
        
        return ''.join(utf8_chars)
    
    def toDBC(some_char):
        tem_str_ord = ord(some_char)
        res = None 
        if tem_str_ord >65280 and tem_str_ord < 65375:
            res =tem_str_ord - 65248
        # 12288全角空格,160  空格
        if tem_str_ord in [12288,160]:
            res = 32
        res_var_ord = res or tem_str_ord
        return chr(res_var_ord)
    def tranform_half_widh(some_str = None):
        res_list = []
        return ''.join([toDBC(x) for x in some_str])
    
    

    使用pydantic进行规范转化

    # 强分割断句,确保末尾的强分隔符。
    from typing import List, Optional
    from pydantic import BaseModel,FieldValidationInfo, field_validator
    
    class Item(BaseModel):
        doc_id:str
        content:str
    
        # 验证器:确保 content 以强分隔符结尾
        @field_validator('content',mode='before')
        def ensure_utf8_and_halfwidth(cls, v):
            v = tranform_half_widh(extract_utf8_chars(v))  # 转换为半角字符
            return v
    
    # 给到一个document,将之分割为句子
    class DocumentSplit(BaseModel):
        input_data_listofdict: List[Item] = [{'doc_id':'1', 'content':'这是第一篇文章。'}, {'doc_id':'2', 'content':'这是第二篇文章。'}]
    

    先进行初始化

    ds = DocumentSplit(input_data_listofdict = [{'doc_id':'1','content':'这是第一篇文章。这是第一篇文章'}, {'doc_id':'2', 'content':'这是第二篇文章。这是第二篇文章'}])
    

    开始构造句子

    from Basefuncs import * 
    
    tem_df = pd.DataFrame([x.dict() for x in   ds.input_data_listofdict])
    tem_df['sentences'] = tem_df['content'].apply(split_sentences_with_punctuation)
    
    # 构造句子
    def make_sentences(some_dict = None):
        doc_id = some_dict['doc_id']
        sentences = some_dict['sentences']
    
        res_list = []
        for i, v in enumerate(sentences):
            tem_dict = {}
            tem_dict['doc_id'] = doc_id
            tem_dict['s_ord'] = i+1
            tem_dict['sentence'] = ensure_period(v)
            res_list.append(tem_dict)
        return res_list
    
    _s = cols2s(tem_df, cols = ['doc_id', 'sentences'], cols_key_mapping=['doc_id', 'sentences'])
    _s1 = _s.apply(make_sentences)
    tem_df1 = pd.DataFrame( flatten_list(_s1.to_list() ))
    
    	doc_id	s_ord	sentence
    0	1	1	这是第一篇文章。
    1	1	2	这是第一篇文章。
    2	2	1	这是第二篇文章。
    3	2	2	这是第二篇文章。
    

    以上处理完成了将原始数据转为标准格式的数据,每个句子将作为一个独立的个体。对于训练数据而言,可以无视句子在文章中的顺序;而作为预测来说,结果可以根据原文的顺序拼接起来。

    3 创建环境

    租用一个显卡主机,启动jupyter,然后将对应的模型考过去。

    • 1 拷贝老的模型,对无标签数据进行初级打标
    • 2 拷贝新的模型,根据数据进行下一步训练。

    4 训练

    4.1 数据预打标(prelabel_data.ipynb)

    这个过程是通过各种方法收集到的实体打标数据,通常是打标尚具有缺陷的数据。总体上可以认为 80%正确的标签数据。

    获取原始未达标数据,这部分数据应该已经存在于数据库中 clickhouse

    from Basefuncs import * 
    import pandas as pd 
    import requests as req 
    
    # 获取全部数据
    host = 'xxx'
    port = 19000
    database = 'my_database'
    user = 'xxx'
    password = 'xxx'
    name = 'tem'
    chc = CHClient(host = host, port = port , database = database, user = user, password = password, name = name )
    the_sql = 'show tables'
    chc._exe_sql(the_sql)
    
    # 直接获取全部
    query_data = chc.get_table('news_wz_retrain_ner')
    # 整合数据
    df = pd.DataFrame(query_data, columns = ['mid','title','content','task_id','task_rand'])
    mid	title	content	task_id	task_rand
    0	00007bfd99a62722a2ebc0bedef3c398	河南济源示范区:依托企业招才引智博聚兴产	714日,河南博士后科技服务团济源行活动启动,来自全省高等院校、科研院所的21名博士后、专...	42719	208
    1	000193b97b50083acb76de6128094916	Omicron有何新变化?将如何影响市场?	辉瑞和德国拜恩泰科(BioNTech)公司也发布声明称“三剂疫苗可以将抗体水平提升25倍”;...	85212	746
    

    数据规范化处理

    对应的处理函数,确保了基本字符集以及断句,分隔符的规范。

    import re
    
    def extract_utf8_chars(input_string = None):
        # 定义一个正则表达式,用于匹配所有的UTF-8字符
        utf8_pattern = re.compile(r'[\u0000-\U0010FFFF]')
        
        # 使用findall方法找到所有匹配的字符
        utf8_chars = utf8_pattern.findall(input_string)
        
        return ''.join(utf8_chars)
    
    def toDBC(some_char):
        tem_str_ord = ord(some_char)
        res = None 
        if tem_str_ord >65280 and tem_str_ord < 65375:
            res =tem_str_ord - 65248
        # 12288全角空格,160  空格
        if tem_str_ord in [12288,160]:
            res = 32
        res_var_ord = res or tem_str_ord
        return chr(res_var_ord)
    def tranform_half_widh(some_str = None):
        res_list = []
        return ''.join([toDBC(x) for x in some_str])
    
    # 强分割
    import re
    def split_sentences_with_punctuation(text):
        # 定义句子分隔符
        punctuation = r'([。?!?!\n])'
        # 根据句子分隔符进行分割,并保留分隔符
        parts = re.split(punctuation, text)
        # 将分隔符与句子重新组合
        sentences = []
        for i in range(0, len(parts), 2):
            sentence = parts[i].strip()
            if i + 1 < len(parts):
                sentence += parts[i + 1]
            sentences.append(sentence)
        return sentences
    
    
    import re
    
    def ensure_period(sentence= None):
        """
        如果句子不是以句号、问号、感叹号或者感叹问号结尾,则在结尾添加一个句号。
        
        参数:
        sentence (str): 待检查的句子。
        
        返回:
        str: 添加句号后的句子。
        """
        # 使用正则表达式匹配句子末尾的标点符号
        if not re.search(r'[。?!?!]$', sentence):
            sentence += '。'
        return sentence
    
    
    # 强分割断句,确保末尾的强分隔符。
    from typing import List, Optional
    from pydantic import BaseModel,FieldValidationInfo, field_validator
    
    
    class Item(BaseModel):
        doc_id:str
        content:str
    
        # 验证器:确保 content 以强分隔符结尾
        @field_validator('content',mode='before')
        def ensure_utf8_and_halfwidth(cls, v):
            v = tranform_half_widh(extract_utf8_chars(v))  # 转换为半角字符
            return v
    
    
    # 给到一个document,将之分割为句子
    class DocumentSplit(BaseModel):
        input_data_listofdict: List[Item] = [{'doc_id':'1', 'content':'这是第一篇文章。'}, {'doc_id':'2', 'content':'这是第二篇文章。'}]
    
    
    
    ds = DocumentSplit(input_data_listofdict = [{'doc_id':'1','content':'这是第一篇文章。这是第一篇文章'}, {'doc_id':'2', 'content':'这是第二篇文章。这是第二篇文章'}])
    

    标题类数据准备
    对于一篇文档来说,可能存在一个类似标题的数据。这类数据是高度概括的,从文字风格上,可能与正文不同;从作用上也不同,标题,也可能是摘要,目前是对文档的信息进行高度提炼。

    所以在实体识别任务中,这些是需要区别对待的。(对于title数据,只要保证强分隔符即可)

    通过DocumentSplit, 数据进行字符集的检查和转换。pydantic的处理效率接近pandas的apply方法,是可以量产的。

    title_s = cols2s(df, cols=['mid','title'], cols_key_mapping=['doc_id', 'content'])
    ds = DocumentSplit(input_data_listofdict = title_s.to_list())
    the_data = pd.DataFrame([x.dict() for x in ds.input_data_listofdict])
    doc_id	content
    0	00007bfd99a62722a2ebc0bedef3c398	河南济源示范区:依托企业招才引智博聚兴产
    1	000193b97b50083acb76de6128094916	Omicron有何新变化?将如何影响市场?
    

    句子序号和分隔符规范化。每篇文档,0的序号预留给标题。

    the_data.columns = ['doc_id', 'sentence']
    the_data['s_ord'] = 0
    the_data['sentence'] = the_data['sentence'].apply(ensure_period)
    part1_df = the_data.copy()
    

    part1_df就是标题部分的预处理结果。

    正文类数据准备

    对于content数据,需要转为短句列表。在识别实体时,一方面实体一定不会包含强分隔符,所以业务上可分;另一方面,服务在批量处理时是通过GPU并行计算的,那么限定每个向量的长度是必须的,这是技术上的要求。

    将句子按强分隔符分割

    def make_sentences(some_dict = None):
        doc_id = some_dict['doc_id']
        sentences = some_dict['sentences']
    
        res_list = []
        for i, v in enumerate(sentences):
            tem_dict = {}
            tem_dict['doc_id'] = doc_id
            tem_dict['s_ord'] = i+1
            tem_dict['sentence'] = ensure_period(v)
            res_list.append(tem_dict)
        return res_list
        
    content_s = cols2s(df, cols=['mid','content'], cols_key_mapping=['doc_id', 'content'])
    ds = DocumentSplit(input_data_listofdict = content_s.to_list())
    
    the_data1 = pd.DataFrame([x.dict() for x in ds.input_data_listofdict])
    
    the_data1['sentences'] = the_data1['content'].apply(split_sentences_with_punctuation)
    
    _s = cols2s(the_data1, cols = ['doc_id', 'sentences'], cols_key_mapping=['doc_id', 'sentences'])
    _s1 = _s.apply(make_sentences)
    the_data2 = pd.DataFrame( flatten_list(_s1.to_list() ))
    part2_df = the_data2.copy()
    

    数据合并

    ucs编号:通过将接口服务再封装为对象操作

    article_df = pd.concat([part1_df,part2_df], ignore_index=True)
    from typing import List, Optional
    from pydantic import BaseModel
    
    import requests as req 
    class UCS(BaseModel):
        gfgo_lite_server: str = 'http://172.17.0.1:24090/'
    
        def get_brick_name(self, some_id = None):
            some_dict = {}
            some_dict['rec_id'] = some_id
            url = self.gfgo_lite_server + 'get_brick_name/'
            res = req.post(url, json = some_dict).json()
            return res 
    
        def get_brick_name_s(self, some_id_list = None):
            some_dict = {}
            some_dict['rec_id_list'] = some_id_list
            url = self.gfgo_lite_server + 'get_brick_name_s/'
            res = req.post(url, json = some_dict).json()
            return res 
    
    ucs = UCS()
    article_df['pid'] = list(range(len(article_df)))
    pid_list2 = slice_list_by_batch2(list(article_df['pid']),10000)
    
    import tqdm
    res_list = []
    for some_pid_list in tqdm.tqdm(pid_list2):
        block_name_list = ucs.get_brick_name_s(some_pid_list)
        res_list.append(block_name_list)
    article_df['brick'] = flatten_list(res_list)
    
    doc_id	sentence	s_ord	pid	brick
    0	00007bfd99a62722a2ebc0bedef3c398	河南济源示范区:依托企业招才引智博聚兴产。	0	0	0.0.0.0
    1	000193b97b50083acb76de6128094916	Omicron有何新变化?将如何影响市场?	0	1	0.0.0.0
    

    分块存储并处理(打标)

    brick_list = sorted(list(article_df['brick'].unique()))
    !mkdir left
    !mkdir right
    

    worker.py

    import sys
    
    # brick
    arg1 = sys.argv[1]
    
    print('arg1', arg1)
    
    port_list = [10000,10001,10002]
    
    import random 
    from Basefuncs import * 
    the_port = random.choice(port_list)
    
    the_brick = arg1
    the_data = from_pickle(the_brick, './left/')
    
    the_data['sentence_clean'] = the_data['sentence'].apply(lambda x: x[:198])
    batch_list1 = cols2s(the_data, cols=['pid', 'sentence_clean'], cols_key_mapping= ['rec_id', 'data'])
    batch_list2 = slice_list_by_batch2(batch_list1.to_list(), 500)
    
    import requests as req
    
    resp_df_list = []
    for some_batch in batch_list2:
        the_url = 'http://172.17.0.1:%s/parse_ent/' % the_port
        para_dict = {}
        para_dict['task_for'] = 'test.test.ent'
        para_dict['data_listofdict'] = some_batch
        resp = req.post(the_url,json = para_dict).json()
        resp_df = pd.DataFrame(resp['data'])
        resp_df_list.append(resp_df)
    
    
    import pandas as pd 
    mdf = pd.concat(resp_df_list, ignore_index=True)
    
    mdf['pid'] = mdf['rec_id']
    mdf2 = pd.merge(the_data, mdf[['pid', 'ORG']], how='left', on ='pid')
    to_pickle(mdf2, the_brick, './right/')
    

    player.py

    import sys
    
    # brick
    arg1 = sys.argv[1]
    
    print('arg1', arg1)
    
    the_mod_num = int(arg1)
    
    from Basefuncs import * 
    
    left_file = list_file_names_without_extension('./left/')
    right_file = list_file_names_without_extension('./right/')
    
    gap_files = list(left_file - right_file)
    gap_files1 = [x for x in gap_files if  int(x[-1]) % 3 == the_mod_num]
    
    
    import os 
    for some_brick in gap_files1:
        os.system('python3 worker.py %s' % some_brick)
    

    执行player,对3个服务均匀发起请求

    python3 player.py 0 &
    python3 player.py 1 &
    python3 player.py 2 &
    

    收集结果并存库

    left_file = list_file_names_without_extension('./left/')
    right_file = list_file_names_without_extension('./right/')
    gap_files = list(left_file - right_file)
    如果gap_files为空列表,那么表示处理完毕
    

    读取所有文件合并

    right_file_list = right_file
    right_df_list = []
    for some_right in right_file_list:
        tem_df = from_pickle(some_right, './right/')
        right_df_list.append(tem_df)
    right_df = pd.concat(right_df_list, ignore_index=True)
    right_sel = (right_df['ORG'] != ',') &(right_df['ORG'].notnull())
    right_df1 = right_df[right_sel]
    

    建表

    # 获取全部数据
    host = 'xxx'
    port = 19000
    database = 'xxx'
    user = 'xxx'
    password = 'xxx'
    name = 'xxx'
    chc = CHClient(host = host, port = port , database = database, user = user, password = password, name = name )
    the_sql = 'show tables'
    chc._exe_sql(the_sql)
    
    # chc.del_table('train_ner_news_title_content_org_20240529')
    create_table_sql = '''
    CREATE TABLE train_ner_news_title_content_org_20240529
    (
        doc_id String,
        s_ord Int,
        pid Int,
        brick String,
        sentence_clean String,
        ORG String
    )
    ENGINE = MergeTree
    partition by (brick)
    PRIMARY KEY (pid)
    '''
    chc._exe_sql(create_table_sql)
    
    for slice_tuple in slice_list_by_batch1(0, len(right_df1), 100000):
        print(slice_tuple)
        _tem_df = right_df1.iloc[slice_tuple[0]:slice_tuple[1]]
        chc.insert_df2table(table_name = 'train_ner_news_title_content_org_20240529' , 
                            some_df = _tem_df[['doc_id','s_ord', 'pid','brick','sentence_clean', 'ORG']], 
                            pid_name = 'pid', cols =['doc_id','s_ord', 'pid','brick','sentence_clean', 'ORG'])
    

    到这里算是完成了规范化的一部分,太长了,还得继续开新的文章写。

  • 相关阅读:
    参数方程求导
    ACM算法学习路线、清单
    TCP实现网络通信(多进程与多线程版本)
    Day9:面试必考选择题目
    13JVM进阶
    i.MX6ULL - 问题解决:NFS挂载失败 - VFS: Unable to mount root fs on unknown-block(2,0)
    金仓数据库 KingbaseES V8.3至V8.6迁移最佳实践(3. KingbaseES移植能力支撑体系)
    数学分析_笔记_第3章:极限
    Go学习第十五章——Gin参数绑定bind与验证器
    SizeBalanceTree
  • 原文地址:https://blog.csdn.net/yukai08008/article/details/139149117