果然还是要有Agent的思维
最近在实现APIFunc的时候,使用Redis的Stream作为队列,因为之前没有把Redis提到这么高的高度(主服务),所以想着写个类就完事了。在性能测试的时候就发现性能大大低于预期(4C8G估计日吞吐3000万条数据-当然包括了Mongo和数据处理)。后来总结下来,我发现根源在于思路混乱了:我既站在了消费者的角度,又站在生产者的角度去设计,结果关键的地方想偏了
效率来自于并行,而不是来自于并发;但是应用常以并发的方式出现。
以下是犯错的过程:
所以其实我的思路是在考虑分布式的程序获得并发挥单个结果的场景,那么TPS1000已经很够够了吧,按redis每条消息1~2ms,其实也就差不多这样。单个核一天估计可以接受8000万条数据,偶尔借一借其他核的性能,那么一天一亿条入数据已经可以保证了。这种模式算是【Collect 收集】,或者说是“零存”。使用服务(Agent)的好处是没必要再另外创建数据库连接,也就不会产生额外的开销。
但是,在密集的数据处理下就不能这么处理。巨多的数据库连接(gevent)和网络连接(asyncio)会把大量的资源耗费掉,所以要以列表形式进行传输:在一个请求中传输若干数据。这若干数据我希望可以是一万条(batchNum)。这种模式算是【Transfer 传输】,也可以想象成运钞车把储户零存的钱集中运走,或者分到某些网点。
所以本身对于Redis的模式至少就有两种完全不同的应用场景,通过Agent实现即可方便使用者(API),也可以减少不必要的数据库连接管理开销。
另外,我测试了一下Redis Stream在批量(列表)模式下的吞吐:
当然这些测试和数据的大小也会有关系,我这批的数据每条记录0.5~1KB左右,算是比较中规中矩的大小。
这次的性能优化也会对APIFunc的结构有一些微调,以下是单步的流转过程,可以理解为是APIFunc的基础单元(类似像深度学习基础单元单元)

以下均按一万条数据估计。
在每个Stream需要经过增、删、改三个过程,大约1.5S。三个工作流需要约4.5秒的时间。在对三个Mongo表进行存取操作的时候,也需要经过类似的过程,耗时方面我觉得会更长一些,单表在一亿条以内,每万条操作应该是在1秒左右,比redis要高一倍,大概这样就是9秒。数据的总体吞吐在13.5秒左右。
而一个小规模的数据处理(50~100规则函数)应该可在10秒内完成计算。所以每万条的总体吞吐应该在25秒以内完成。那么这样一个流程单核下可以完成3500万条/天。一台服务器按4个核估计,假设在处理操作时客户端和服务端都在这个机器上,那么应该可以达到2*3500 = 7000万每天的吞吐力。
假设一台mongo服务器一天在不停的吞吐,按每秒万条来算的话,那么一天约是8.6亿条;除以3的话,约是2.8亿条。Redis的吞吐可以更高(因为任务简单),所以瓶颈最终不会在Redis。
如果将Worker剥离这台主机,那么单机的吞吐可以更高,肯定可以超过1亿的吞吐。Worker一般将由局网内的其他主机完成。未来如果要更进一步拓展这个体系的结构,那么Redis、Mongo都会是(分片副本)集群,Sniffer和Worker可以在局网任意一台主机运行。我觉得提升十倍能力应该没问题。
回到这个主题,我会建立一个Redis的代理服务,用来隔离用户和数据库,并通过代理进行统一,规范的操作。后续会补上一些元素的操作。
初步只考虑给队列服务。
| 名称 | API |
|---|---|
| 创建新连接 | create_connection_hash |
| 基本信息 | info |
| 队列长度 | len_of_queue |
| 获取队列消息 | xrange |
| 确保消费组 | ensure_group |
| 删除消费组 | destroy_group |
| 查看消费组 | info_group |
| 查看队列(stream) | info_stream |
| 删掉队列 | del_stream |
| 获取未分配消息(消费者) | fetch_msg |
| 确认一条消息 | ack_msg |
| 批量确认消息 | batch_ack_msg |
| 删除消息(列表) | del_msg |
| 删除消费者(似乎会删除消费者的消息,待验证) | del_consumer |
| 获取延误消息概况 | summary_pending_msg |
| 获取延误消息(返回的是延误消息的元数据) | get_pending_msg |
| 添加一条消息 | add_msg |
| 批量添加消息 | batch_add_msg |
在之前的对象基础上,以上功能在一天左右开发完成。里面的很多都是细节,关键点有两个:
在第二点上我被卡了很久,我一直以为redis的xadd命令应该会直接提供bulk方式的操作,但发现是要通过另一个pipeline方式来。比较巧的一点是,官方也建议批量的命令大约以1万条为批次。这样正好和我的整体规划匹配。
官方文档

最终结果测下来还是不错的,万条操作都在几百毫秒,甚至几十毫秒。