• redis stream 实现消息队列


    redis stream 实现消息队列

    Redis5.0带来了Stream类型。从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列(MQ,Message Queue)的完善实现。

    基于redis实现消息队列的方式有很多:

    • PUB/SUB,订阅/发布模式
    • 基于List的 LPUSH+BRPOP 的实现

    redis 实现消息对列4中方法

    发布订阅

    发布订阅优点: 典型的一对的,所有消费者都能同时消费到消息。主动通知订阅者而不是订阅者轮询去读。

    发布订阅缺点: 不支持多个消费者公平消费消息,消息没有持久化,不管订阅者是否收到消息,消息都会丢失。

    使用场景:微服务间的消息同步,如 分布式webSocker,数据同步等。

    list 队列

    生产者通过lpush生成消息,消费者通过blpop阻塞读取消息。

    **list队列优点:**支持多个消费者公平消费消息,对消息进行存储,可以通过lrange查询队列内的消息。

    **list队列缺点:**blpop仍然会阻塞当前连接,导致连接不可用。一旦blpop成功消息就丢弃了,期间如果服务器宕机消息会丢失,不支持一对多消费者。

    zset 队列

    生产者通过zadd 创建消息时指定分数,可以确定消息的顺序,消费者通过zrange获取消息后进行消费,消费完后通zrem删除消息。

    zset优点: 保证了消息的顺序,消费者消费失败后重新入队不会打乱消费顺序。

    zset缺点: 不支持一对多消费,多个消费者消费时可能出现读取同一条消息的情况,得通过加锁或其他方式解决消费的幂等性。

    zset使用场景:由于数据是有序的,常常被用于延迟队列,如 redisson的DelayQueue

    Stream 队列

    Redis5.0带来了Stream类型。从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列(MQ,Message Queue)的完善实现。

    参考kafka的思想,通过多个消费者组和消费者支持一对多消费,公平消费,消费者内维护了pending列表防止消息丢失。

    提供消息ack机制。

    在这里插入图片描述

    基本命令

    xadd 生产消息

    往 stream 内创建消息 语法为:

    XADD key ID field string [field string …]

    # * 表示自动生成id redis会根据时间戳+序列号自动生成id,不建议我们自己指定id
    xadd stream1 * name zs age 23  
    
    • 1
    • 2

    读取消息

    读取stream内的消息,这个并不是消费,只是提供了查看数据的功能,语法为:

    XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]

    #表示从 stream1 内取出一条消息,从第0条消息读取(0表示最小的id)
    xread count 1 streams stream1 0
    #表示从 stream1 内 id=1649143363972-0 开始读取一条消息,读取的是指定id的下一条消息
    xread count 1 streams msg 1649143363972-0
    
    #表示一直阻塞读取最新的消息($表示获取下一个生成的消息)
    xread count 1 block 0 streams stream1 $ 
    
    xrange stream - + 10
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    XRANGE key startID endID count

    #表示从stream1内取10条消息 起始位置为 -(最小ID) 结束位置为+(最大ID)
    xrange stream1 - + 10 
    
    • 1
    • 2

    xgroup 消费者组

    redis stream 借鉴了kafka的设计,采用了消费者和消费者组的概念。允许多个消费者组消费stream的消息,每个消费者组都能收到完整的消息,例如:stream内有10条消息,消费者组A和消费者组B同时消费时,都能获取到这10条消息。

    每个消费者组内可以有多个消费者消费,消息会平均分摊给各个消费者,例如:stream有10条消息,消费者A,B,C同时在同一个组内消费,A接收到 1,4,7,10,B接收到 2,5,8,C接收到 3,6,9

    创建消费者组:

    #消费消息首先得创建消费者组
    # 表示为队列 stream1 创建一个消费者组 group1 从消息id=0(第一条消息)开始读取消息
    xgroup create stream1 group1 0
    
    #查询stream1内的所有消费者组信息
    xinfo groups stream1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    xreadgroup 消费消息

    通过xreadgroup可以在消费者组内创建消费者消费消息

    XREADGROUP group groupName consumerName [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]

    #创建消费者读取消息
    #在group1消费者组内通过consumer1消费stream1内的消息,消费1条未分配的消息 (> 表示未分配过消费者的消息)
    xreadgrup group group1 consumer1 count 1 streams stream1 > 
    
    • 1
    • 2
    • 3

    Pending 等待列表

    通过 xreadgroup 读取消息时消息会分配给对应的消费者,每个消费者内都维护了一个Pending列表用于保存接收到的消息,当消息ack后会从pending列表内移除,也就是说pending列表内维护的是所有未ack的消息id

    每个Pending的消息有4个属性:

    1. 消息ID
    2. 所属消费者
    3. IDLE,已读取时长
    4. delivery counter,消息被读取次数

    XPENDING key group [start end count] [consumer]

    #查看pending列表
    # 查看group1组内的consumer1的pending列表 - 表示最小的消息id + 表示最大的消息ID
    xpending stream1 group1 - + 10 consumer1
    # 查看group1组内的所有消费者pending类表
    xpending stream1 group1 - + 10 
    
    • 1
    • 2
    • 3
    • 4
    • 5

    消息确认

    当消费者消费了消息,需要通过 xack 命令确认消息,xack后的消息会从pending列表移除

    XACK key gruopName ID

    xack stream1 group1 xxx
    
    • 1

    消息转移

    当消费者接收到消息却不能正确消费时(报错或其他原因),可以使用 XCLAIM 将消息转移给其他消费者消费,需要设置组、转移的目标消费者和消息ID,同时需要提供IDLE(已被读取时长),只有超过这个时长,才能被转移。

    通过xclaim转移的消息只是将消息移入另一个消费者的pending列表,消费者并不能通过xreadgroup读取到消息,只能通过xpending读取到。

    # 表示将ID为 1553585533795-1 的消息转移到消费者B消费,前提是消费
    XCLAIM stream1 group1 consumer1 3600000 1553585533795-1
    
    • 1
    • 2

    信息监控

    redis提供了xinfo来查看stream的信息

    #查看sream信息
    xinfo stream steam1
    #查询消费者组信息
    xinfo groups group1 
    
    #查询消费者信息
    xinfo consumers consumer1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    SpringBoot 整合

    1 引入依赖

    
        org.springframework.boot
        spring-boot-starter-data-redis
    
    
    • 1
    • 2
    • 3
    • 4

    2 编写消费者

    @Slf4j
    @Component
    public class EmailConsumer implements StreamListener> {
    
        public final String streamName      = "emailStream";
        public final String groupName       = "emailGroup";
        public final String consumerName    = "emailConsumer";
    
    
        @Autowired
        private StringRedisTemplate stringRedisTemplate;
    
        @Override
        public void onMessage(MapRecord message) {
    
            //log.info("stream名称-->{}",message.getStream());
            //log.info("消息ID-->{}",message.getId());
            log.info("消息内容-->{}",message.getValue());
    
            Map msgMap = message.getValue();
    
            if( msgMap.get("sID")!=null && Integer.valueOf(msgMap.get("sID")) % 3 ==0 ){
                //消费异常导致未能ack时,消息会进入pending列表,我们可以启动定时任务来读取pending列表处理失败的任务
                log.info("消费异常-->"+message);
               return;
            }
    
            StreamOperations streamOperations = stringRedisTemplate.opsForStream();
            //消息应答
            streamOperations.acknowledge( streamName,groupName,message.getId() );
    
        }
    	//我们可以启动定时任务不断监听pending列表,处理死信消息
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    3 配置redis

    序列化配置

    @EnableCaching
    @Configuration
    public class RedisConfig {
    
    
        /**
         * 设置redis序列化规则
         */
        @Bean
        public Jackson2JsonRedisSerializer jackson2JsonRedisSerializer(){
            Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
    
            ObjectMapper om = new ObjectMapper();
            om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            jackson2JsonRedisSerializer.setObjectMapper(om);
    
            return jackson2JsonRedisSerializer;
        }
    
        /**
         * RedisTemplate配置
         */
        @Bean
        public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory,
                                                           Jackson2JsonRedisSerializer jackson2JsonRedisSerializer) {
    
            // 配置redisTemplate
            RedisTemplate redisTemplate = new RedisTemplate<>();
            redisTemplate.setConnectionFactory(redisConnectionFactory);
            RedisSerializer stringSerializer = new StringRedisSerializer();
    
            // key序列化
            redisTemplate.setKeySerializer(stringSerializer);
            // value序列化
            redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
    
            // Hash key序列化
            redisTemplate.setHashKeySerializer(stringSerializer);
            // Hash value序列化
            redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
    
            redisTemplate.afterPropertiesSet();
            return redisTemplate;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    消费者组和消费者配置

    @Slf4j
    @Configuration
    public class RedisStreamConfig {
    
        @Autowired
        private EmailConsumer emailConsumer;
    
        @Autowired
        private RedisTemplate redisTemplate;
    
        @Bean
        public StreamMessageListenerContainer.StreamMessageListenerContainerOptions> emailListenerContainerOptions(){
    
            StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
    
            return StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                    .builder()
                    //block读取超时时间
                    .pollTimeout(Duration.ofSeconds(3))
                    //count 数量(一次只获取一条消息)
                    .batchSize(1)
                    //序列化规则
                    .serializer( stringRedisSerializer )
                    .build();
        }
    
        /**
         * 开启监听器接收消息
         */
        @Bean
        public StreamMessageListenerContainer> emailListenerContainer(RedisConnectionFactory factory,
                                                                     StreamMessageListenerContainer.StreamMessageListenerContainerOptions> streamMessageListenerContainerOptions){
    
            StreamMessageListenerContainer> listenerContainer = StreamMessageListenerContainer.create(factory,
                    streamMessageListenerContainerOptions);
    
            //如果 流不存在 创建 stream 流
            if( !redisTemplate.hasKey(emailConsumer.streamName)){
                redisTemplate.opsForStream().add(emailConsumer.streamName, Collections.singletonMap("", ""));
                log.info("初始化stream {} success",emailConsumer.streamName);
            }
    
            //创建消费者组
            try {
                redisTemplate.opsForStream().createGroup(emailConsumer.streamName,emailConsumer.groupName);
            } catch (Exception e) {
                log.info("消费者组 {} 已存在",emailConsumer.groupName);
            }
    
            //注册消费者 消费者名称,从哪条消息开始消费,消费者类
            // > 表示没消费过的消息
            // $ 表示最新的消息
            listenerContainer.receive(
                Consumer.from(emailConsumer.groupName, emailConsumer.consumerName),
                StreamOffset.create(emailConsumer.streamName, ReadOffset.lastConsumed()),
                emailConsumer
            );
    
    
            listenerContainer.start();
            return listenerContainer;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    4.生产者生产消息

    @GetMapping("/redis/ps")
    public String redisPublish(String content,Integer count){
    
        StreamOperations streamOperations = redisTemplate.opsForStream();
    
        for (int i = 0; i < count; i++) {
            AtomicInteger num = new AtomicInteger(i);
    
            Map msgMap = new HashMap();
            msgMap.put("count", i);
            msgMap.put("sID", num);
            //新增消息
            streamOperations.add("emailStream",msgMap);
        }
        return "success";
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    参考文档:

    redis Stream 消息队列

    SpringBoot整合redis stream 实现消息队列

    先自我介绍一下,小编13年上师交大毕业,曾经在小公司待过,去过华为OPPO等大厂,18年进入阿里,直到现在。深知大多数初中级java工程师,想要升技能,往往是需要自己摸索成长或是报班学习,但对于培训机构动则近万元的学费,着实压力不小。自己不成体系的自学效率很低又漫长,而且容易碰到天花板技术停止不前。因此我收集了一份《java开发全套学习资料》送给大家,初衷也很简单,就是希望帮助到想自学又不知道该从何学起的朋友,同时减轻大家的负担。添加下方名片,即可获取全套学习资料哦

  • 相关阅读:
    [JS真好玩] 遇到表格,手动翻页太麻烦?我教你写脚本,一页展示所有数据
    使用轮廓分数提升时间序列聚类的表现
    【算法刷题day37】Leetcode:738. 单调递增的数字、968. 监控二叉树
    预训练word2vec--Word2Vec实现(二)
    Autosar Configuration(十) Ethernet之物理层PHY-EB配置
    文心一言,通营销之学,成一家之言,百度人工智能AI大数据模型文心一言Python3.10接入
    二手交易平台碳减排,有了评估标准
    jQuery,解决命名冲突的问题
    2022 谷歌出海创业加速器展示日: 见证入营企业成长收获
    Web安全——信息收集下篇
  • 原文地址:https://blog.csdn.net/m0_61083409/article/details/126113987