• RocketMQ(三):集成SpringBoot


    RocketMQ系列文章

    RocketMQ(一):基本概念和环境搭建

    RocketMQ(二):原生API快速入门

    RocketMQ(三):集成SpringBoot


    一、搭建环境

    • 需要创建两个服务,消息生产服务和消息消费者服务
    • 生产消息存在多个服务,消费则统一由一个服务处理
    • 这样可以做到解耦

    pom.xml

    • 生产者和消费者都需要
    <dependency>
        <groupId>org.apache.rocketmqgroupId>
        <artifactId>rocketmq-spring-boot-starterartifactId>
        <version>2.2.2version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    生产者配置文件

    • 设置统一的生产者组,这样发送消息时就不用指定了
    rocketmq:
        name-server: 127.0.0.1:9876     # rocketMq的nameServer地址
        producer:
            group: boot-producer-group        # 生产者组别
            send-message-timeout: 3000  # 消息发送的超时时间
            retry-times-when-send-async-failed: 2  # 异步消息发送失败重试次数
            max-message-size: 4194304       # 消息的最大长度
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    生产者配置文件

    • 不能设置统一的消费者组,因为不同的消费者订阅关系不一致,需要设置不同的消费者组
    rocketmq:
        name-server: localhost:9876
    
    • 1
    • 2

    二、不同类型消息

    直接引入即可

    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    • 1
    • 2

    1、同步消息

    生产消息

    • 消息由消费者发送到broker后,会得到一个确认,是具有可靠性的
    • 比如:重要的消息通知,短信通知等
    rocketMQTemplate.syncSend("bootTestTopic", "我是boot的一个消息");
    
    • 1

    消费消息

    • RocketMQListener的泛型类型即消息类型
      • MessageExt类型是消息的所有内容
      • 其他类型则就只是消息体内容,没有消息头内容(keys、msgId、延迟时间、重试次数、主题名称...)
    • onMessage方法内没有报错就是签收了,报错就是拒收会重试
    @Component
    @RocketMQMessageListener(topic = "bootTestTopic", consumerGroup = "boot-test-consumer-group")
    public class ABootSimpleMsgListener implements RocketMQListener<MessageExt> {
        @Override
        public void onMessage(MessageExt message) {
            System.out.println(new String(message.getBody()));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    2、异步消息

    • 发送异步消息,发送完以后会有一个异步通知
    • 不影响程序往下执行
    rocketMQTemplate.asyncSend("bootAsyncTestTopic", "我是boot的一个异步消息", new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println("成功");
        }
        @Override
        public void onException(Throwable throwable) {
            System.out.println("失败" + throwable.getMessage());
        }
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    3、单向消息

    • 不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险
    • 例如日志信息的发送
    rocketMQTemplate.sendOneWay("bootOnewayTopic", "单向消息");
    
    • 1

    4、延迟消息

    • RocketMQ不支持任意时间的延时
    • 只支持以下18个固定的延时等级,等级1就对应1s,以此类推,最高支持2h延迟
    • private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;
    • 发送一个延时消息,延迟等级为4级,也就是30s后被监听消费
    Message<String> msg = MessageBuilder.withPayload("我是一个延迟消息").build();
    rocketMQTemplate.syncSend("bootMsTopic", msg, 3000, 4);
    
    • 1
    • 2

    5、顺序消息

    生产消息

    • 根据syncSendOrderly方法的第三个参数计算hash值决定消息放入哪个队列
    // 顺序消息 发送者放 需要将一组消息 都发在同一个队列中去  消费者 需要单线程消费
    List<MsgModel> msgModels = Arrays.asList(
            new MsgModel("qwer", 1, "下单"),
            new MsgModel("qwer", 1, "短信"),
            new MsgModel("qwer", 1, "物流"),
    
            new MsgModel("zxcv", 2, "下单"),
            new MsgModel("zxcv", 2, "短信"),
            new MsgModel("zxcv", 2, "物流")
    );
    msgModels.forEach(msgModel -> {
        // 发送  一般都是以json的方式进行处理
        // 根据第三个参数计算hash值决定消息放入哪个队列
        rocketMQTemplate.syncSendOrderly("bootOrderlyTopic", JSON.toJSONString(msgModel), msgModel.getOrderSn());
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    消费消息

    • 默认是并发消费模式,可以设置为单线程顺序模式
    • 设置消费重试次数
    @Component
    @RocketMQMessageListener(topic = "bootOrderlyTopic",
            consumerGroup = "boot-orderly-consumer-group",
            consumeMode = ConsumeMode.ORDERLY, // 顺序消费模式 单线程
            maxReconsumeTimes = 5 // 消费重试的次数
    )
    public class BOrderlyMsgListener implements RocketMQListener<MessageExt> {
        @Override
        public void onMessage(MessageExt message) {
            MsgModel msgModel = JSON.parseObject(new String(message.getBody()), MsgModel.class);
            System.out.println(msgModel);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    6、带tag消息

    • tag带在主题后面用:来携带
    rocketMQTemplate.syncSend("bootTagTopic:tagA", "我是一个带tag的消息");
    
    • 1

    7、带key消息

    Message<String> message = MessageBuilder
            .withPayload("我是一个带key的消息")
            .setHeader(RocketMQHeaders.KEYS, "10086")
            .build();
    rocketMQTemplate.syncSend("bootKeyTopic", message);
    
    • 1
    • 2
    • 3
    • 4
    • 5

    获取带key和tag的消费者

    • 过滤模式有两种:正则表达式和sql92方式
    • keys从MessageExt对象中获取
    @Component
    @RocketMQMessageListener(topic = "bootTagTopic",
            consumerGroup = "boot-tag-consumer-group",
            selectorType = SelectorType.TAG,// tag过滤模式
            selectorExpression = "tagA || tagB"
    //        selectorType = SelectorType.SQL92,// sql92过滤模式
    //        selectorExpression = "a in (3,5,7)" // broker.conf中开启enbalePropertyFilter=true
    )
    public class CTagMsgListener implements RocketMQListener<MessageExt> {
    
        @Override
        public void onMessage(MessageExt message) {
            System.out.println("获取keys: " + message.getKeys());
            System.out.println("消息内容: " + new String(message.getBody()));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    查看源码

    • destination目标 = 主题 : 标签
    • keys从消息头里面获取

    在这里插入图片描述

  • 相关阅读:
    Stm32_点灯
    我做了一个世界杯的可视化网站...
    Sping源码(九)—— Bean的初始化(非懒加载)— Bean的创建方式(factoryMethod)
    【网络安全产品】---网闸
    简单模拟与链表
    2022.9.1 SAP RFC
    《统计学习方法》啃书手册|字符串核函数动态规划的实现
    【C++】模板初阶 【 深入浅出理解 模板 】
    新一代开源免费的轻量级 SSH 终端,太棒了
    提高尼日利亚稻米产量 丰收节贸促会:国稻种芯百团计划行动
  • 原文地址:https://blog.csdn.net/qq_35512802/article/details/134408321