• Python 全栈系列200 Redis Agent


    说明

    果然还是要有Agent的思维

    最近在实现APIFunc的时候,使用Redis的Stream作为队列,因为之前没有把Redis提到这么高的高度(主服务),所以想着写个类就完事了。在性能测试的时候就发现性能大大低于预期(4C8G估计日吞吐3000万条数据-当然包括了Mongo和数据处理)。后来总结下来,我发现根源在于思路混乱了:我既站在了消费者的角度,又站在生产者的角度去设计,结果关键的地方想偏了

    效率来自于并行,而不是来自于并发;但是应用常以并发的方式出现。

    以下是犯错的过程:

    • 1 为了接受来自其他人的数据,我起了一个服务,可以将消息转存到redis,但是是单条的形式
    • 2 然后在进行接口压测的时候我用asyncio向该服务进行了并发(这时候还是在消费者的角度)
    • 3 用asyncio的时候发现有点问题,在连续并发数据超过3万时会断掉,但是如果每次一千,每次sleep一秒,那么就可以一直存
    • 4 在后台的APIFunc数据流转时我也使用这种方式(然后就发现效率提不上来)
    • 5 我想是不是因为asyncio工具的局限,就开始使用gevent(这时候就走入歧途)
    • 6 的确,在单次实验下gevent存储很快,一万条也不用一秒,但是在连续存储时就有问题了,变得特别慢,大概十几秒一万
    • 7 我猜测是因为gevent发起操作时,产生了大量的数据库连接,然后处理结束后,数据库又要释放这些连接,所以就很慢
    • 8 原来asyncio的那个服务,估计就占1个或者几个数据库连接

    所以其实我的思路是在考虑分布式的程序获得并发挥单个结果的场景,那么TPS1000已经很够够了吧,按redis每条消息1~2ms,其实也就差不多这样。单个核一天估计可以接受8000万条数据,偶尔借一借其他核的性能,那么一天一亿条入数据已经可以保证了。这种模式算是【Collect 收集】,或者说是“零存”。使用服务(Agent)的好处是没必要再另外创建数据库连接,也就不会产生额外的开销。

    但是,在密集的数据处理下就不能这么处理。巨多的数据库连接(gevent)和网络连接(asyncio)会把大量的资源耗费掉,所以要以列表形式进行传输:在一个请求中传输若干数据。这若干数据我希望可以是一万条(batchNum)。这种模式算是【Transfer 传输】,也可以想象成运钞车把储户零存的钱集中运走,或者分到某些网点。

    所以本身对于Redis的模式至少就有两种完全不同的应用场景,通过Agent实现即可方便使用者(API),也可以减少不必要的数据库连接管理开销。

    另外,我测试了一下Redis Stream在批量(列表)模式下的吞吐:

    • 1 读取一万条数据时间:0.2~0.5S
    • 2 删除一万条数据时间: < 0.1 S
    • 3 存一万条数据时间: 0.8S

    当然这些测试和数据的大小也会有关系,我这批的数据每条记录0.5~1KB左右,算是比较中规中矩的大小。

    内容

    1 关于APIFunc数据处理体系

    这次的性能优化也会对APIFunc的结构有一些微调,以下是单步的流转过程,可以理解为是APIFunc的基础单元(类似像深度学习基础单元单元)

    在这里插入图片描述

    以下均按一万条数据估计。

    在每个Stream需要经过增、删、改三个过程,大约1.5S。三个工作流需要约4.5秒的时间。在对三个Mongo表进行存取操作的时候,也需要经过类似的过程,耗时方面我觉得会更长一些,单表在一亿条以内,每万条操作应该是在1秒左右,比redis要高一倍,大概这样就是9秒。数据的总体吞吐在13.5秒左右。

    而一个小规模的数据处理(50~100规则函数)应该可在10秒内完成计算。所以每万条的总体吞吐应该在25秒以内完成。那么这样一个流程单核下可以完成3500万条/天。一台服务器按4个核估计,假设在处理操作时客户端和服务端都在这个机器上,那么应该可以达到2*3500 = 7000万每天的吞吐力。

    假设一台mongo服务器一天在不停的吞吐,按每秒万条来算的话,那么一天约是8.6亿条;除以3的话,约是2.8亿条。Redis的吞吐可以更高(因为任务简单),所以瓶颈最终不会在Redis。

    如果将Worker剥离这台主机,那么单机的吞吐可以更高,肯定可以超过1亿的吞吐。Worker一般将由局网内的其他主机完成。未来如果要更进一步拓展这个体系的结构,那么Redis、Mongo都会是(分片副本)集群,Sniffer和Worker可以在局网任意一台主机运行。我觉得提升十倍能力应该没问题。

    2 Redis Agent

    回到这个主题,我会建立一个Redis的代理服务,用来隔离用户和数据库,并通过代理进行统一,规范的操作。后续会补上一些元素的操作。

    初步只考虑给队列服务。

    功能列表

    名称API
    创建新连接create_connection_hash
    基本信息info
    队列长度len_of_queue
    获取队列消息xrange
    确保消费组ensure_group
    删除消费组destroy_group
    查看消费组info_group
    查看队列(stream)info_stream
    删掉队列del_stream
    获取未分配消息(消费者)fetch_msg
    确认一条消息ack_msg
    批量确认消息batch_ack_msg
    删除消息(列表)del_msg
    删除消费者(似乎会删除消费者的消息,待验证)del_consumer
    获取延误消息概况summary_pending_msg
    获取延误消息(返回的是延误消息的元数据)get_pending_msg
    添加一条消息add_msg
    批量添加消息batch_add_msg

    在之前的对象基础上,以上功能在一天左右开发完成。里面的很多都是细节,关键点有两个:

    • 1 使用原生的命令来进行消息的批量查询和删除
    • 2 在增加消息上,要使用pipeline

    在第二点上我被卡了很久,我一直以为redis的xadd命令应该会直接提供bulk方式的操作,但发现是要通过另一个pipeline方式来。比较巧的一点是,官方也建议批量的命令大约以1万条为批次。这样正好和我的整体规划匹配。

    官方文档
    在这里插入图片描述
    最终结果测下来还是不错的,万条操作都在几百毫秒,甚至几十毫秒。

  • 相关阅读:
    区块链技术与应用 - 学习笔记3【比特币数据结构】
    not read Username for “https://gitee.com“: Device not configured
    sqlserver数据库,创建作业,定时执行sql
    AVR单片机开发5——串口通信仿真
    S曲线步进电机控制算法 ChatGPT
    MySQL刷题的一些注意事项
    super关键字
    为什么gitlab runner 必须要映射docker.sock
    处理游戏提示找不到steam_api64.dll丢失的方法
    重新认识 IP地址
  • 原文地址:https://blog.csdn.net/yukai08008/article/details/127706301