• Python 全栈系列191 基于Redis队列的处理服务


    说明

    这大概是最简单,又最快的处理方式了。

    Mongo可以很好的借助内存,也可以利用硬盘,但总体上,Mongo更适合进行大容量或者高密度的数据吞吐。因为Mongo的功能多,所以就显得“重”。 所以在服务和用户之间,还需要有个高速缓存。Kafka当然是很好的选择,但是Redis简单多了。

    这篇文章介绍消息队列说的很好,也提到了Redis和Kafka以及RabbitMQ的优缺点

    综上,把 Redis 当作队列来使用时,始终面临的 2 个问题:
    
    Redis 本身可能会丢数据
    
    面对消息积压,Redis 内存资源紧张
    
    Redis 是否可以用作队列,我想这个答案应该会比较清晰了。
    
    如果你的业务场景足够简单,对于数据丢失不敏感,而且消息积压概率比较小的情况下,把 Redis 当作队列是完全可以的。
    
    而且,Redis 相比于 Kafka、RabbitMQ,部署和运维也更加轻量。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    对我的设计来说,Redis更像是L2 Cache,所以在我的生态里使用Redis作为队列是完全可行的。

    内容

    1 批量模式

    用户会发起若干查询(query), 程序需要一定时间处理(异步),所以如果用户没有第一时间查到结果,就过一会再查(当然,CallBack也是一个选择,但前提是对方的处理也微服务化)。这类请求表现为:准实时要求,具有一定的时效容忍度

    这种模式下,处理端和客户端不直接打交道,而是通过消息队列这样的中间件。中间件的速度很快,这样客户端可以很快放下东西就走,服务端也可以直接从中间件获取数据,批量处理。

    2 部署的组件

    全部使用Docker部署

    2.1 Redis服务(24008)

    这块比较简单,但是也要部署一个。注意各组件统一即可,为了在多台机器部署方便,我做了一键部署。即执行一个脚本,从我的web服务器拉取文件夹压缩包,然后自动执行切换目录、解压和启动脚本。

    记得把镜像改为开放即可。

    2.2 RedisStream服务(24907)

    提供了对Redis的队列操作和存取操作,甚至还可以加上一些数据的预检查。API就向这个服务发起查询就可以了,速度会很快。在查询时,每个请求需要能够对要处理的核心内容计算MD5,查询时就通过这个MD5作为主键查询。

    Worker也会对这个队列执行消息的读取,并且我们假设Worker是不稳定的,也就是说:

    • 1 Worker可能有多个,且无法预先安排对应的处理通道
    • 2 Worker可能处理到一半掉线,从而导致任务被领取但是没有完成(超时)

    所以RedisStream会提供一个扎口(query_with_ack):

    • 1 任何Worker都会通过这个扎口获取消息,这样就保证了消息不会重复分发
    • 2 扎口在分发了消息之后,会对消息进行ACK,这样下次扎口就会拿到新的消息

    服务以一定概率触发重处理检查(例如 5%):

    • 1 基于Redis的Stream是以毫秒时间戳+序号作为消息的ID,所以在某一时刻的Worker处理可以往回看未删除的消息
    • 2 一般故障发生的概率不会很高,可以根据实际情况来设定重处理检查
    • 3 Worker在成功处理了(一批)数据后,会向服务批量提交删除消息请求
    • 4 Worker会将结果

    所以Worker就不必看了, 24907提供了对于队列操作的代理服务,这样可以使得Worker对于Redis的操作是接口透明的。

    3 RedisStream服务(24907)搭建

    3.1 LittleRQ对象

    这是个基础对象,用来简化后续服务中的数据库操作。包含有键值对和队列两类操作。

    Redis有16个逻辑数据库(db0-db15),每个逻辑数据库项目是隔离的,默认使用db0数据库。若选择第2个数据库

    键值对

    这个方法相当于 set, setnx和setex三个方法集成。

    set(name, value, ex=None, px=None, nx=False, xx=False)
    
    ex:过期时间(秒),时间到了后redis会自动删除
    px:过期时间(毫秒),时间到了后redis会自动删除。ex、px二选一即可
    nx:如果设置为True,则只有name不存在时,当前set操作才执行
    xx:如果设置为True,则只有name存在时,当前set操作才执行
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    方法解释
    setv避免和集合(set)关键字重合,加个v。简单设置,r.set(query_hash, json.dumps(resp)) ;允许存文本、数值和json字符串
    getv获取键值
    expire给某个键设置超时r.expire(query_hash, 300),过期时间是秒
    incr自增计数;redis_conn.set(‘num_2’, 2) , v = redis_conn.incr(‘num_2’), 设置一个可数值化的字符,在redis也会当成数值,可以自增
    decr自减计数
    list_keys将当前的键全部列出来
    delete可以传入单个字符,或者列表。返回的是数值。如果没有删任何数就是0,否则就是删除的个数。
    info数据库信息
    dbsize数据库的键数量

    队列

    方法解释
    add_msg向队列存入一条消息,返回boolean
    len_of_queue队列的长度,返回int
    ensure_group确保组的存在,返回boolean
    get_a_stream_state检查一个stream的状态, 返回boolean
    get_a_stream_groups检查一个group的状态
    del_a_stream删掉一个队列
    get_msg根据id获取消息
    get_pending_info获取被待处理的消息状态
    get_pending_msg获取待处理的消息
    ack_msg确认消息,这样就不是pending了
    del_msg删除消息(不然统计队列内消息的数量不会减少),ACK不会删除消息
    get_range_msg获取未删除的队列消息,与get_msg,这是一个列表。用来检查可能被miss掉的消息。

    其他

    redis官网的tutorial

  • 相关阅读:
    kubernetes-pod的更新策略与回滚策略
    Linux多线程之线程控制
    LeetCode515. Find Largest Value in Each Tree Row
    千行 MySQL 学习笔记总结大全,语法大全
    STM32 HAL库 利用CH376进行USB文件读写
    9月第4周榜单丨飞瓜数据B站UP主排行榜(哔哩哔哩平台)发布!
    『 MySQL数据库 』数据库之表的约束
    Windows下解决Loading composer repositories with package information 慢的问题。
    MySQL最新2023年面试题及答案,汇总版(3)【MySQL最新2023年面试题及答案,汇总版-第三十三刊】
    03Redis-五大基本数据类型
  • 原文地址:https://blog.csdn.net/yukai08008/article/details/127230444