• 建模杂谈系列170 APIFunc继续实践-数据处理体系


    说明

    先说点题外话,这几年星星点点搞了很多东西,很像一个个的零部件:每一个都有用,但是似乎又没大用。同时也一直在探索一个工具,可以将日常繁杂的工作进行很大程度的自动化,想法很多,实验了也很多,但是似乎一直都是差一点。

    我觉得事物的发展还是有一定不变的基础规律的,简单理解也可以说量变引起质变。我觉得今年10月之后,到了一个有趣的质变点,我现在叫它APIFunc(前身还有很多个名字)。这个核心的数据处理工具提供了基础的逻辑处理机制,可以让使用者进行无限拓展,随之而来的就会有一系列基础的约定。

    在过去的探索中,当然知道有些问题无非就是一些关键点,但是具体怎么操作,甚至怎么命名都没有确定,所以那些实验没法称为工具,智能说是“概念机”。大概是翻来覆去的次数多了,干脆就固化一个可行版本下来,也就是APIFunc:毕竟探索还是受到资源限制的,找到一个可行解就可以退出探索了。

    我认为我工作/研究的核心价值是数据处理,也就是DataIn-【数据处理】->DataOut的过程。
    我的目标是提供端到端的服务实例,而不仅仅是解决方案,是可以开箱即用(SaaS)的服务。简单一点来说,就是给到服务器,然后用户就可以(通过前端、API)获得价值。这些价值是根据客户的业务和工作性质而不同的,但是很显然,它一定会大幅减少机械的重复时间

    要实现这些,也就意味着既要有核心的数据处理方法,也要有灵魂的算法(与业务结合),同时还要能够进行匹配的数据吞吐、服务运维等能力。对应的技术栈就包括了linux操作、运维工具(ssh、nginx、docker)的使用、微服务(体系)的搭建、前端技术群(js,css)、数据库(集群)的搭建以及数据处理及分析的工具。

    注意数据处理是广义的,可以说是数据治理、机器学习、深度学习等。这个本身是我专注的价值核心,而且这个领域目前也还没有特别公认的方法(不是没有,而是百花齐放,没有统一)。这个话题过于宽泛,所以干脆就简单说数据处理与分析。

    啰啰嗦嗦一大堆,简单概括一下我的感受:

    • 1 【水到渠成】我只是有一个模糊的目标,然后碰到相关的问题就去想办法解决,这些东西都ready的时候,似乎那个模糊的目标也就清晰了。(例如我是2年前开始搞flask,docker,mongo这些的,现在这些都是我成熟的基础工具了)
    • 2 【纲举目张】当APIFunc开始具体化各项规范时,其他的东西也很快具象化。只有具象化了,才是一个工具,可以用;否则就是胚子,只能想象,不能用。

    内容

    1 背景

    APIFunc从最近一次具象化概念以来,已经经过几次迭代实验(这几个实验在1个月之内完成):

    • 1 【概念】实体识别改造。这个是APIFunc之前的最后一次概念性改造,然后就明确了APIFunc的形态。
    • 2 【原型】某个手工处理的数据比对任务。我都有点忘了,大概是对一批文本处理的数据进行比对,这次实验确定了APIFunc可实用的模糊想法。简单、清晰的工具。
    • 3 【验证】图片识别服务的改造。这次实验非常重要,完全确定了APIFunc是可行的。这个图片识别服务是4年前写的,我在Review代码时发现那时候构建程序的方法和现在的APIFunc在底层上是完全一致的(这点我自己都没想到)。所以在重构时,基本就是一次全仿真的实例构建。这次实验特别还补充了「列式」处理的功能。
    • 4 【工程】某数据处理服务的重构。在前次实验时,还仅仅是有APIFunc的处理核心,但这次就增加了数据库、队列,使得常态化,服务化,并行化得以保证。当然,中间也再次使用APIFunc翻译了Java的一些逻辑,虽然我和Java没缘分,但这并不妨碍我看逻辑并用APIFunc进行转化。

    这篇文章的目的也就是对实现的功能进行再一次的梳理和打包,这次实验,也是工程也就算完成了。之后当然还有启动后的校验和改变,但那些并不会影响这次实验的结论。

    2 处理图

    我习惯流程从左到右画,所以看起来会比较宽一点。里面的细节就不必关注了,我们从整体上看这个图的涵义。

    • 1 原点:代表数据源,从概念上我们可以当成是一个数据表。
    • 2 方框:队列,用于缓冲。这在实战中很重要,因为数据库不适合做大量的零散操作。通过队列实现了零存整取。
    • 3 箭头框:Sniffer,用于推动整个体系活动。每个箭头框都可以理解是一个单线程的定时程序,会按间隔进行取数/存数,通常周期是1秒。
    • 4 六边形:Worker,处理的核心,用于实现数据处理的逻辑功能,APIFunc就是Worker的核心内容。Worker被假设为可具有多个分身,所以在核心的计算上,是可以并行拓展的。Worker是比较耗资源的,通常来说我会采用动态的方式来进行调度:如果没有任务,Worker将推迟启动的周期(可能1分钟或者5分钟才启动一次任务执行)。如果当前有数据,那么就不间断处理。(本次简单的实验,Worker也被设置为和Sniffer一样启动)

    在这里插入图片描述

    结构上的简单性

    在数据进入体系后(通常是Step1),后续的每次处理结构都是一样的。数据处理的结果是会随着Step逐级流转,而分叉出来不流转的点是这个流程处理的元数据。

    因为这种结构是高度简单的,所以也就可以无限的叠加下去。

    机制确定后,一些操作也随之确定,所以我写了StreamIO对象来方便基于机制的队列和数据库存取数。这样在进行拓展的时候,只要写一些简单的参数就可以了。

    3 机制

    从调度的便利性上,运行的部分分为两类。

    3.1 Sniffer

    负责IO,单线程。

    3.2 Worker

    负责处理,多线程。

    4 运行

    4.1 Sche调度

    在运行的调度中(sche.py),确保对应的mongo表存在默认索引。因为索引是不需要频繁确认的,只要在整个调度起来时检查一次就足够了。

    In [7]: sio.ensure_mongo_index(tier1 ='xjpg', tier2 ='step1_input', key_index='pdfkey')
    {'data': {'pdfkey': 'Existed And Passed'}, 'msg': 'ok', 'status': True}
    {'data': {'_is_enable_1': 'Not Existed and Created'}, 'msg': 'ok', 'status': True}
    {'data': {'_create_time_1': 'Not Existed and Created'}, 'msg': 'ok', 'status': True}
    {'data': {'_update_time_1': 'Not Existed and Created'}, 'msg': 'ok', 'status': True}
    {'data': {'_ch001_1': 'Not Existed and Created'}, 'msg': 'ok', 'status': True}
    {'data': {'_ch001_cnt_1': 'Not Existed and Created'}, 'msg': 'ok', 'status': True}
    Out[7]: True
    
    In [8]: sio.ensure_mongo_index(tier1 ='xjpg', tier2 ='step2_output',key_index='pdfkey')
    {'data': {'pdfkey': 'Existed And Passed'}, 'msg': 'ok', 'status': True}
    {'data': {'_is_enable_1': 'Existed And Passed'}, 'msg': 'ok', 'status': True}
    {'data': {'_create_time_1': 'Existed And Passed'}, 'msg': 'ok', 'status': True}
    {'data': {'_update_time_1': 'Existed And Passed'}, 'msg': 'ok', 'status': True}
    {'data': {'_ch001_1': 'Existed And Passed'}, 'msg': 'ok', 'status': True}
    {'data': {'_ch001_cnt_1': 'Existed And Passed'}, 'msg': 'ok', 'status': True}
    Out[8]: True
    
    In [9]: sio.ensure_mongo_index(tier1 ='xjpg', tier2 ='step3_nodup',key_index='pdfkey')
    {'data': {'pdfkey': 'Not Existed and Created'}, 'msg': 'ok', 'status': True}
    {'data': {'_is_enable_1': 'Not Existed and Created'}, 'msg': 'ok', 'status': True}
    {'data': {'_create_time_1': 'Not Existed and Created'}, 'msg': 'ok', 'status': True}
    {'data': {'_update_time_1': 'Not Existed and Created'}, 'msg': 'ok', 'status': True}
    {'data': {'_ch001_1': 'Not Existed and Created'}, 'msg': 'ok', 'status': True}
    {'data': {'_ch001_cnt_1': 'Not Existed and Created'}, 'msg': 'ok', 'status': True}
    Out[9]: True
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    关于APS假死问题

    我发现在过去的应用中,使用APS起来的容器一般在运行一到两周后会进入假死状态。所以还有个小的约定/要实现的功能就是APS容器在运行一段时间后自动销毁,然后触发建立新的容器。

    BTW, 像Flask或者数据库容器从来都不会这样,虽然一段时间不用会进入类似休眠的状态(感觉第一次访问会比较慢一些),但是很快就会恢复正常。

    所以APS的调度会交给Flask_APS来管理。

    下一步:前端

    • 1 监控

    • 2 控制

    后记

    1 输入数据的问题

    在做的时候碰到了一个问题,算是业务层面的问题。

    我在观察数据的时候发现,不能使用pdfkey作为唯一主键,而是应该使用a,b,c三个作为联合主键。好,问题来了,现在怎么改?

    首先,数据从外部获取,到了两个队列。他们能有唯一主键当然好,但是碰到这种情况,我们并不方便让业务去按严格规范改。

    所以,从IO的角度,我并不能让其直接按写好的函数去调。所以约定,系统的入口处可以按相对自由的方式去写,最终要确保主键不重不漏就可以。

    内部的流转则可以直接使用S2M这样的规范函数。

    对应的处理方法:

    • 1 将之间建立的几个主键索引删掉,重新建立联合索引(建立唯一索引要在表为空,或者字段有非空非重复的时候建)
    • 2 修改入口的流,不需要其他人去改(因为给的信息也已足够)

    2 通道字段

    我发现如果是去重的话,完全可以通过加一个字段,而不是一张表。所以步骤可以砍掉一个。另外如果未来还有更多的需求,只要加通道就好了。

  • 相关阅读:
    Apollo 应用与源码分析:CyberRT-话题通信
    多线程---操作系统
    Python库之Scrapy的简介、安装、使用方法详细攻略
    VScode为什么选择了Electron,而不是QT?
    C++ 基础与深度分析 Chapter7 深入I/O(文件与内存操作、流的状态、流的定位、流的同步)
    抽象类和接口
    编程基础---C/C++基础知识
    Linux17 --- 消息队列
    CPU调度
    Taro vue3版本安装使用 NutUi 手动安装
  • 原文地址:https://blog.csdn.net/yukai08008/article/details/127595864