• RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ


    添加 RocketMQ 依赖

    1. 在 Maven 仓库【https://mvnrepository.com/】中搜索 RocketMQ 依赖:

      image-20230527214713414

    2. 在 SpringBoot 项目的 Pom.xml 文件中添加对应 MQ 版本的依赖:

      
      <dependency>
          <groupId>org.apache.rocketmqgroupId>
          <artifactId>rocketmq-spring-boot-starterartifactId>
          <version>2.2.2version>
      dependency>
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6

    消费者 Consumer

    YAML 配置

    在 SpringBoot 项目的 yml 配置文件中添加以下配置:

    rocketmq:
      name-server: 192.168.68.121:9876     # rocketMq的nameServer地址
    
    • 1
    • 2

    创建监听器

    创建一个 MQMsgListener 类用于监听 RocketMQ 的消息,类上标注注解:@Component@RocketMQMessageListener,该类需要实现 RocketMQListener 接口,并使用泛型指定接收的消息类型:

    @Component
    @RocketMQMessageListener(topic = "delayTopic",consumerGroup="boot-mq-group-consumer")
    public class MQMsgListener implements RocketMQListener<MessageExt> {
    
        @Override
        public void onMessage(MessageExt message) {
            String msgId = message.getMsgId();
            String msg = new String(message.getBody());
            System.out.println("消息id:"+msgId+"消息内容:"+msg);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    @RocketMQMessageListener 注解参数如下:

    参数描述
    topic消费者订阅的主题
    consumerGroup消费者组
    consumeMode消费模式:并发接收消息 | 有序接收消息【ConsumeMode.CONCURRENTLY or ConsumeMode.ORDERLY
    messageModel消息模式:集群模式 | 广播模式【MessageModel.CLUSTERING or MessageModel.BROADCASTING
    selectorType过滤消息的方式:Tag | SQL92【SelectorType.TAG or SelectorType.SQL92
    selectorExpression过滤消息的表达式:Tag | SQL92【`tag1
    maxReconsumeTimes消息消费失败后,可被重复投递的最大次数。消息重试只针对集群消费模式生效。
    delayLevelWhenNextConsume并发模式的消息重试策略。-1,无需重试,直接放入死信队列(%DLQ%+消费组)

    消息过滤

    Tag 过滤

    消费者订阅的Tag和发送者设置的消息Tag相互匹配,则消息被投递给消费端进行消费。

    编写并启动消费者项目订阅 tagTopic 主题:

    @Component
    @RocketMQMessageListener(topic = "tagTopic",
            consumerGroup = "boot-mq-group-consumer",
            selectorType = SelectorType.TAG,
            selectorExpression = "java")
    public class MQMsgListener implements RocketMQListener<String> {
    
        @Override
        public void onMessage(String message) {
            System.out.println(message);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    编写生产者 Controller,使用 RocketMQTemplate 的 syncSend() 方法发送一个带 Tag 的同步消息:

    @RestController
    public class ProducerController {
    
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        @GetMapping("/send/tag")
        public String sendSyncMessage() {
            SendResult result = rocketMQTemplate.syncSend("tagTopic:java", "这是一个带有 java tag 的消息");
            return "发送状态:" + result.getSendStatus() + "
    消息id:"
    + result.getMsgId(); } }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    运行项目,访问接口:http://localhost:8080/send/tag

    image-20230528191958989

    查看 RocketMQ 控制台,可以看到消息带有 java tag:

    image-20230528191938535

    查看消费者项目的 IDEA 控制台:

    image-20230528191142421

    生产者 Producer

    YAML 配置

    在 SpringBoot 项目的 yml 配置文件中添加以下配置:

    rocketmq:
      name-server: 192.168.68.121:9876     # rocketMq的nameServer地址
      producer:
        group: boot-mq-group-producer # 生产者组名
    
    • 1
    • 2
    • 3
    • 4

    注:生产者需要标注生产者组名,否则会报异常:'org.apache.rocketmq.spring.core.RocketMQTemplate' that could not be found.

    发送同步消息

    编写 Controller,使用 RocketMQTemplate 的 syncSend() 方法发送同步消息,并将消息发送的结果进行打印:

    @RestController
    public class ProducerController {
    
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        @GetMapping("/send/sync/{msg}")
        public String sendSyncMessage(@PathVariable String msg){
            SendResult result = rocketMQTemplate.syncSend("syncTopic", msg);
            return "发送状态:"+result.getSendStatus()+"
    消息id:"
    +result.getMsgId(); } }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    运行项目,访问接口:http://localhost:8080/send/sync/同步消息

    image-20230527231022909

    访问控制台,查看【syncTopic】主题,可以看到队列中存在一条消息:

    image-20230527231142472

    发送异步消息

    不同于同步消息,异步消息在发出后,并不会等待服务端返回响应,直接继续向下执行,发送方通过回调接口接收服务端响应,并处理响应结果。

    编写 Controller,使用 RocketMQTemplate 的 asyncSend() 方法发送异步消息,并使用回调接口打印发送的结果:

    @RestController
    public class ProducerController {
    
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        @GetMapping("/send/async/{msg}")
        public String sendAsyncMessage(@PathVariable String msg) {
            rocketMQTemplate.asyncSend("asyncTopic", msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("异步消息发送成功");
                }
                @Override
                public void onException(Throwable throwable) {
                    System.out.println("异步消息发送失败");
                }
            });
            System.out.println("异步消息已发送完成");
            return "发送异步消息";
        }
      
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    运行项目,访问接口:http://localhost:8080/send/async/异步消息,查看 IDEA 控制台:

    image-20230527232838438

    访问控制台,查看【asyncTopic】主题,可以看到队列中存在一条消息:

    image-20230527233249499

    发送单向消息

    编写 Controller,使用 RocketMQTemplate 的 sendOneWay() 方法发送单向消息:

    @RestController
    public class ProducerController {
    
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        @GetMapping("/send/oneWay/{msg}")
        public String sendOneWayMessage(@PathVariable String msg) {
            rocketMQTemplate.sendOneWay("oneWayTopic",msg);
            return "单向消息发送成功";
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    运行项目,访问接口:http://localhost:8080/send/oneWay/单向消息

    image-20230527233640217

    访问控制台,查看【oneWayTopic】主题,可以看到队列中存在一条消息:

    image-20230527233751658

    发送延迟消息

    编写并启动消费者项目订阅 delayTopic 主题:

    @Component
    @RocketMQMessageListener(topic = "delayTopic",consumerGroup="boot-mq-group-consumer")
    public class MQMsgListener implements RocketMQListener<MessageExt> {
    
        @Override
        public void onMessage(MessageExt message) {
            String msgId = message.getMsgId();
            String msg = new String(message.getBody());
            System.out.println("消息id:"+msgId+"\n消息内容:"+msg+"\n消息收到时间:"+new Date());
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    编写生产者 Controller,使用 RocketMQTemplate 的 syncSend() 方法发送同步消息:

    @RestController
    public class ProducerController {
    
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        @GetMapping("/send/delay/{msg}")
        public String sendDelayMessage(@PathVariable String msg) {
            Message<String> message = MessageBuilder.withPayload(msg).build();
            // 延迟级别 "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
            SendResult result = rocketMQTemplate.syncSend("delayTopic", message, 2000, 3);
            return "发送状态:" + result.getSendStatus() + "
    消息id:"
    + result.getMsgId()+"
    消息发送时间:"
    +new Date(); } }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    运行项目,访问接口:http://localhost:8080/send/delay/延迟消息

    image-20230528141811562

    查看消费者项目的 IDEA 控制台,可以看到过去了10s,对应我们设置的延迟级别。

    image-20230528141834080

    发送顺序消息

    编写订单类,用于模拟【下订单->发短信->物流->签收】的顺序流程:

    public class Order {
        //订单号
        private String orderId;
        //订单名称
        private String orderName;
        //订单的流程顺序
        private String seq;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    编写并启动两个消费者项目订阅 orderlyTopic 主题,并将消费模式设置为顺序消费模式:

    @Component
    @RocketMQMessageListener(topic = "orderlyTopic",
            consumerGroup="boot-mq-group-consumer",
            consumeMode = ConsumeMode.ORDERLY)
    public class MQMsgListener implements RocketMQListener<Order> {
    
        @Override
        public void onMessage(Order message) {
            System.out.println("消费者:"+message);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    编写生产者 Controller,使用 RocketMQTemplate 的 syncSendOrderly() 方法发送同步顺序消息:

    @RestController
    public class ProducerController {
    
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        @GetMapping("/send/orderly")
        public String sendOrderlyMessage() {
            List<Order> orders = Arrays.asList(
                    new Order(UUID.randomUUID().toString(), "下订单", "1"),
                    new Order(UUID.randomUUID().toString(), "发短信", "1"),
                    new Order(UUID.randomUUID().toString(), "物流", "1"),
                    new Order(UUID.randomUUID().toString(), "签收", "1"),
    
                    new Order(UUID.randomUUID().toString(), "下订单", "2"),
                    new Order(UUID.randomUUID().toString(), "发短信", "2"),
                    new Order(UUID.randomUUID().toString(), "物流", "2"),
                    new Order(UUID.randomUUID().toString(), "签收", "2")
            );
            //控制流程:下订单->发短信->物流->签收
            //将 seq 作为 hashKey,这样 seq 相同的会放在同一个队列里面,顺序消费
            orders.forEach(order -> {
                rocketMQTemplate.syncSendOrderly("orderlyTopic",order,order.getSeq());
            });
            return "发送成功";
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    运行项目,访问接口:http:localhost:8080/send/orderly

    image-20230528152807514

    查看 RocketMQ 控制台,可以看到我们的消息分别存储在两个队列中:

    image-20230528152925141

    查看消费者项目的 IDEA 控制台,按照消息的顺序进行消费:

    image-20230528152848032

    发送批量消息

    编写并启动消费者项目订阅 batchOrderly 主题:

    @Component
    @RocketMQMessageListener(topic = "batchOrderly",
            consumerGroup="boot-mq-group-consumer")
    public class MQMsgListener implements RocketMQListener<Order> {
    
        @Override
        public void onMessage(Order message) {
            System.out.println(Thread.currentThread().getName()+":"+message);
        }
      
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    编写生产者 Controller,将消息打包成 Collection msgs 传入 syncSend() 方法中发送:

    @RestController
    public class ProducerController {
    
      @Autowired
      private RocketMQTemplate rocketMQTemplate;
    
      @GetMapping("/send/batch")
      public String sendOrderlyMessage() {
    
        List<Message> messages = Arrays.asList(
          MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build(),
          MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build(),
          MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build(),
          MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build()
        );
        return rocketMQTemplate.syncSend("batchOrderly", messages).getSendStatus().toString();
        
      }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    运行项目,访问接口:http:localhost:8080/send/batch

    image-20230528161620859

    查看 RocketMQ 控制台,可以看到队列中一次传入4条消息:

    image-20230528161706194

    查看消费者项目的 IDEA 控制台,多个线程并发进行消费:

    image-20230528161804943

    发送集合消息

    编写并启动消费者项目订阅 listTopic 主题:

    @Component
    @RocketMQMessageListener(topic = "listTopic",
            consumerGroup="boot-mq-group-consumer")
    public class MQMsgListener implements RocketMQListener<List<Order>> {
    
        @Override
        public void onMessage(List<Order> orders) {
            orders.forEach(o -> {
                System.out.println(Thread.currentThread().getName()+":"+o);
            });
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    编写生产者 Controller,将集合传入 syncSend() 方法中发送:

    @RestController
    public class ProducerController {
    
      @Autowired
      private RocketMQTemplate rocketMQTemplate;
    
      @GetMapping("/send/list")
      public String sendOrderlyMessage() {
    
        List<Order> orders = Arrays.asList(
          new Order(UUID.randomUUID().toString(), "下订单", "1"),
          new Order(UUID.randomUUID().toString(), "下订单", "1"),
          new Order(UUID.randomUUID().toString(), "下订单", "1"),
          new Order(UUID.randomUUID().toString(), "下订单", "1")
        );
        rocketMQTemplate.syncSend("listTopic",orders);
        return "发送成功";
      }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    运行项目,访问接口:http:localhost:8080/send/list

    image-20230528161620859

    查看 RocketMQ 控制台,可以看到队列中一条消息:

    image-20230528163701846

    查看消费者项目的 IDEA 控制台,进行消费:

    image-20230528163745691

  • 相关阅读:
    服务器被攻击怎么选择更好的方式去防御,IDC说的集防和单机防御都是什么意思
    TCP/IP协议—HTTP
    Netty 3 - 组件和设计
    开源博客项目Blog .NET Core源码学习(23:App.Hosting项目结构分析-11)
    解决SpringBoot整合Mybatis和Mybatis-Plus不能公用(版本兼容性问题)
    cpu天梯图2022年11月 cpu排行榜天梯图2022
    24个Docker常见问题处理技巧
    CVE-2022-31137 Roxy-WI未经身份验证的远程代码执行漏洞复现
    YB506A是一款锂电池充、放电管理专用芯片,集成锂电池充电管理和降压DCDC电路
    [JavaScript]_[初级]_[使用HTMLElement.dataset快速读写自定义属性]
  • 原文地址:https://blog.csdn.net/qq_20185737/article/details/130915401