• springboot+rocketmq(4):实现延时消息


    一、延迟消息

    对于消息中间件来说,producer 将消息发送到mq的服务器上,但并不希望这条消息马上被消费,而是推迟到当前时间节点之后的某个时间点,再将消息投递到 queue 中让 consumer 进行消费。

    二、延迟消息的使用场景

    ​ 延迟消息的使用场景很多,一种比较常见的场景就是在电商系统中,订单创建后,会有一个等待用户支付的时间窗口,一般为30分钟,30分钟后 customer 会收到这条订单消息,然后程序去订单表中检查当前这条订单的支付状态,如果是未支付的状态,则自动清理掉(取消订单释放库存),这样就不需要使用定时任务的方式去处理了

    三、Rocket的延迟消息 使用限制级别

    RocketMQ 支持定时的延迟消息,但是不支持任意时间精度仅支持特定的 level(共18级),例如定时 5s, 10s, 1m 等。其中,level=0 级表示不延时,level=1 表示 1 级延时,level=2 表示 2 级延时,以此类推。

    支持的level如下:level=0 级表示不延时,level=1 表示 延时1s,level=2 表示 延时5s

    messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    
    • 1

    四、延迟消息 的整合实现

    4.1创建Springboot项目,添加rockermq 依赖

    <!--rocketMq依赖-->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.1</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    4.2配置rocketmq

    # 端口
    server:
      port: 8083
    
    # 配置 rocketmq
    rocketmq:
      name-server: 127.0.0.1:9876
      #生产者
      producer:
        #生产者组名,规定在一个应用里面必须唯一
        group: group1
        #消息发送的超时时间 默认3000ms
        send-message-timeout: 3000
        #消息达到4096字节的时候,消息就会被压缩。默认 4096
        compress-message-body-threshold: 4096
        #最大的消息限制,默认为128K
        max-message-size: 4194304
        #同步消息发送失败重试次数
        retry-times-when-send-failed: 3
        #在内部发送失败时是否重试其他代理,这个参数在有多个broker时才生效
        retry-next-server: true
        #异步消息发送失败重试的次数
        retry-times-when-send-async-failed: 3
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    4.3 生产者 发送延时消息

    package com.example.springbootrocketdemo.controller;
    
    import com.alibaba.fastjson.JSONObject;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.time.LocalDateTime;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 延时消息
     * @author qzz
     */
    @RestController
    public class RocketMQDelayCOntroller {
    
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        /**
         * 发送延时消息
         */
        @RequestMapping("/testDelaySend")
        public void testDelaySend(){
    
            Map<String,Object> orderMap = new HashMap<>();
            orderMap.put("orderNumber","1357890");
            orderMap.put("createTime", LocalDateTime.now());
    
            //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
            //参数二:Message<?>
            //参数三:消息发送超时时间
            //参数四:delayLevel 延时level  messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
            rocketMQTemplate.syncSend("test-topic-delay", MessageBuilder.withPayload(JSONObject.toJSONString(orderMap)).build(),3000,3);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    4.4 消费者 监听延时消息,消费消息

    package com.example.springbootrocketdemo.config;
    
    import com.alibaba.fastjson.JSONObject;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Service;
    import java.util.Map;
    
    /**
     * 消费延时消息
     * 配置RocketMQ监听
     * @author qzz
     */
    @Service
    @RocketMQMessageListener(consumerGroup = "test-delay",topic = "test-topic-delay")
    public class RocketMQDelayConsumerListener implements RocketMQListener<String> {
    
        @Override
        public void onMessage(String s) {
            Map<String,Object> orderMap = JSONObject.parseObject(s,Map.class);
            String orderNumber = String.valueOf(orderMap.get("orderNumber"));
            String createTime = String.valueOf(orderMap.get("createTime"));
            //根据orderNumber 查询订单状态,若为未支付,则消息订单并修改库存
            //....
            System.out.println("consumer 延时消息消费 orderNumber:"+orderNumber+",createTime:"+createTime);
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    消费者类要实现RocketMQListener接口,以及动态指定消息类型String。

    类上要加上@RocketMQMessageListener注解,指定topic主题test-topic-delay,以及消费者组test-delay

    延时消息发送与接收搭建完毕!

    4.5启动服务,测试延时消息

    在这里插入图片描述

    测试OK,成功消费!

  • 相关阅读:
    Shiro安全框架
    Linux C/C++实现SSL的应用层VPN (MiniVPN)
    kafka rebalance你真的了解吗
    【Python+C#】手把手搭建基于Hugging Face模型的离线翻译系统,并通过C#代码进行访问
    大厂在裁员,一本,985,211_该如何择业?
    【学习笔记】「JOI Open 2022」长颈鹿
    PAM从入门到精通(五)
    腾讯云数据库公有云市场稳居TOP 2!
    Neo4j图数据库实践——基于知识图谱方法开发构建猪类养殖疾病问答查询系统
    关于jQuery_DOM操作中的添加,删除,替换标签方法
  • 原文地址:https://blog.csdn.net/qq_26383975/article/details/125377874