• SpringBoot 集成 RocketMQ


    一、RocketMQ基本概念

    消息模型(Message Model)

    RocketMQ主要由Producer、Broker、Consumer三部分组成,其中Producer负责生产消息,Consumer负责消费消息,Broker负责存储消息。Broker在实际部署过程中对应一台服务器,每个Broker可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的Broker。MessageQueue用于存储消息的物理地址,每个Topic中的消息地址存储于多个MessageQueue中。ConsumerGroup由多个Consumer实例构成。

     1、在springBoot项目中添加Maven依赖 

    1. <dependency>
    2. <groupId>org.apache.rocketmqgroupId>
    3. <artifactId>rocketmq-spring-boot-starterartifactId>
    4. <version>2.0.4version>
    5. dependency>

    2、添加配置:

    application.yml 文件中添加如下配置:

    1. rocketmq:
    2. name-server: 192.168.152.165:9876
    3. producer:
    4. group: my-group

    SpringBoot 集成 RocketMQ代码:

    生产者: 消息发送的三种方式

    1. package com.rocketmq.springbootrocketmq;
    2. import org.apache.rocketmq.client.producer.SendCallback;
    3. import org.apache.rocketmq.client.producer.SendResult;
    4. import org.apache.rocketmq.spring.core.RocketMQTemplate;
    5. import org.junit.Test;
    6. import org.junit.runner.RunWith;
    7. import org.springframework.beans.factory.annotation.Autowired;
    8. import org.springframework.boot.test.context.SpringBootTest;
    9. import org.springframework.messaging.Message;
    10. import org.springframework.messaging.support.MessageBuilder;
    11. import org.springframework.test.context.junit4.SpringRunner;
    12. import java.util.concurrent.TimeUnit;
    13. @RunWith(SpringRunner.class)
    14. @SpringBootTest
    15. public class T {
    16. @Autowired
    17. private RocketMQTemplate rocketMQTemplate;
    18. //同步消息
    19. @Test
    20. public void testRocketMQ() {
    21. Message msg = MessageBuilder.withPayload("boot发送同步消息").build();
    22. rocketMQTemplate.send("helloTopicBoot", msg);
    23. System.out.println("success send");
    24. }
    25. //异步消息
    26. @Test
    27. public void sendASYCMsg() throws InterruptedException {
    28. Message message = MessageBuilder.withPayload("boot发送异步消息").build();
    29. rocketMQTemplate.asyncSend("helloTopicBoot", message, new SendCallback() {
    30. @Override
    31. public void onSuccess(SendResult sendResult) {
    32. System.out.println("发送状态:"+sendResult.getSendStatus());
    33. }
    34. @Override
    35. public void onException(Throwable throwable) {
    36. System.out.println("消息发送失败");
    37. }
    38. });
    39. TimeUnit.SECONDS.sleep(5);
    40. }
    41. //一次性消息
    42. @Test
    43. public void sendOneWayRocketMQ() {
    44. Message msg = MessageBuilder.withPayload("boot发送一次性消息").build();
    45. rocketMQTemplate.sendOneWay("helloTopicBoot", msg);
    46. }
    47. }

    消费者:

    1. package com.example.springbooTRocketMQConsumer.listener;
    2. import org.apache.rocketmq.common.message.MessageExt;
    3. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    4. import org.apache.rocketmq.spring.core.RocketMQListener;
    5. import org.springframework.stereotype.Component;
    6. import java.nio.charset.Charset;
    7. @Component
    8. @RocketMQMessageListener(consumerGroup = "htbConsumerGroup",topic = "helloTopicBoot")
    9. public class HelloTopicListener implements RocketMQListener {
    10. @Override
    11. public void onMessage(MessageExt messageExt) {
    12. System.out.println("success get:"+new String(messageExt.getBody(), Charset.defaultCharset()));
    13. }
    14. }

    消息消费的两种模式

    集群模式:默认模式
    广播模式:

    消费者:messageModel = MessageModel.BROADCASTING

    1. package com.example.springbooTRocketMQConsumer.listener;
    2. import org.apache.rocketmq.common.message.MessageExt;
    3. import org.apache.rocketmq.spring.annotation.MessageModel;
    4. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    5. import org.apache.rocketmq.spring.core.RocketMQListener;
    6. import org.springframework.stereotype.Component;
    7. import java.nio.charset.Charset;
    8. @Component
    9. @RocketMQMessageListener(consumerGroup = "htbConsumerGroup",topic = "helloTopicBoot",messageModel = MessageModel.BROADCASTING)
    10. public class HelloTopicListener implements RocketMQListener {
    11. @Override
    12. public void onMessage(MessageExt messageExt) {
    13. System.out.println("success get:"+new String(messageExt.getBody(), Charset.defaultCharset()));
    14. }
    15. }

    顺序消息

    生产者:

    1. //顺序消息
    2. @Test
    3. public void sendOrderlyMsg(){
    4. //设置队列选择器
    5. rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
    6. @Override
    7. public MessageQueue select(List list, org.apache.rocketmq.common.message.Message message, Object o) {
    8. String orderIdStr = (String) o;
    9. long orderId = Long.parseLong(orderIdStr);
    10. int index = (int)orderId % list.size();
    11. return list.get(index);
    12. }
    13. });
    14. List orderSteps = OrderUtil.buildOrders();
    15. for (OrderStep orderStep : orderSteps) {
    16. Message msg = MessageBuilder.withPayload(orderStep.toString()).build();
    17. rocketMQTemplate.sendOneWayOrderly("orderlyTopicBoot",msg,String.valueOf(orderStep.getOrderId()));
    18. }
    19. }

    消费者:

    1. package com.example.springbooTRocketMQConsumer.listener;
    2. import org.apache.rocketmq.common.message.MessageExt;
    3. import org.apache.rocketmq.spring.annotation.ConsumeMode;
    4. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    5. import org.apache.rocketmq.spring.core.RocketMQListener;
    6. import org.springframework.stereotype.Component;
    7. import java.nio.charset.Charset;
    8. @Component
    9. @RocketMQMessageListener(consumerGroup = "orderlyConsumerBoot",topic = "orderlyTopicBoot",consumeMode = ConsumeMode.ORDERLY)
    10. public class OrderlyTopicListener implements RocketMQListener {
    11. @Override
    12. public void onMessage(MessageExt messageExt) {
    13. System.out.println("当前线程:" + Thread.currentThread() + "队列ID"+messageExt.getQueueId() + ",消息内容:" + new String(messageExt.getBody(),Charset.defaultCharset()));
    14. }
    15. }

    延迟消息

    生产者:

    1. //延迟消息
    2. @Test
    3. public void sendDelayRocketMQ() {
    4. Message msg = MessageBuilder.withPayload("boot发送延时消息,发送时间:"+new Date()).build();
    5. rocketMQTemplate.syncSend("helloTopicBoot", msg,3000,3);
    6. }

    消费者:

    1. package com.example.springbooTRocketMQConsumer.listener;
    2. import org.apache.rocketmq.common.message.MessageExt;
    3. import org.apache.rocketmq.spring.annotation.MessageModel;
    4. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    5. import org.apache.rocketmq.spring.core.RocketMQListener;
    6. import org.springframework.stereotype.Component;
    7. import java.nio.charset.Charset;
    8. import java.util.Date;
    9. @Component
    10. @RocketMQMessageListener(consumerGroup = "htbConsumerGroup",topic = "helloTopicBoot")
    11. public class DelayTopicListener implements RocketMQListener {
    12. @Override
    13. public void onMessage(MessageExt messageExt) {
    14. System.out.println("success get:发送时间"+new Date()+new String(messageExt.getBody(), Charset.defaultCharset()));
    15. }
    16. }

    消息Tag条件过滤

    生成者

    1. //Tag消息
    2. @Test
    3. public void sendTagFilterRocketMQ() {
    4. Message msg1 = MessageBuilder.withPayload("消息A").build();
    5. rocketMQTemplate.sendOneWay("tagFilterBoot:TagA", msg1);
    6. Message msg2 = MessageBuilder.withPayload("消息B").build();
    7. rocketMQTemplate.sendOneWay("tagFilterBoot:TagB", msg2);
    8. Message msg3 = MessageBuilder.withPayload("消息C").build();
    9. rocketMQTemplate.sendOneWay("tagFilterBoot:TagC", msg3);
    10. }

    消费者:

    1. package com.example.springbooTRocketMQConsumer.listener;
    2. import org.apache.rocketmq.common.message.MessageExt;
    3. import org.apache.rocketmq.spring.annotation.MessageModel;
    4. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    5. import org.apache.rocketmq.spring.core.RocketMQListener;
    6. import org.springframework.stereotype.Component;
    7. import java.nio.charset.Charset;
    8. import java.util.Date;
    9. @Component
    10. @RocketMQMessageListener(consumerGroup = "tagFilterGroupBoot",topic = "tagFilterBoot",selectorExpression = "TagA || TagC")
    11. public class TagFilterTopicListener implements RocketMQListener {
    12. @Override
    13. public void onMessage(MessageExt messageExt) {
    14. System.out.println("success get:发送时间"+new Date()+new String(messageExt.getBody(), Charset.defaultCharset()));
    15. }
    16. }

    SQL92消息过滤

    生产者:

    1. //SQL92消息
    2. @Test
    3. public void sendSQL92FilterRocketMQ() {
    4. Message msg1 = MessageBuilder.withPayload("小红,年龄22,体重45").setHeader("age","22").setHeader("weight",45).build();
    5. rocketMQTemplate.sendOneWay("SQL92FilterBoot", msg1);
    6. Message msg2 = MessageBuilder.withPayload("小明,年龄25,体重60").setHeader("age","25").setHeader("weight",60).build();
    7. rocketMQTemplate.sendOneWay("SQL92FilterBoot", msg2);
    8. Message msg3 = MessageBuilder.withPayload("小蓝,年龄40,体重70").setHeader("age","40").setHeader("weight",70).build();
    9. rocketMQTemplate.sendOneWay("SQL92FilterBoot", msg3);
    10. }

    消费者:

    1. package com.example.springbooTRocketMQConsumer.listener;
    2. import org.apache.rocketmq.common.message.MessageExt;
    3. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    4. import org.apache.rocketmq.spring.annotation.SelectorType;
    5. import org.apache.rocketmq.spring.core.RocketMQListener;
    6. import org.springframework.stereotype.Component;
    7. import java.nio.charset.Charset;
    8. import java.util.Date;
    9. @Component
    10. @RocketMQMessageListener(consumerGroup = "SQL92FilterGroupBoot",topic = "SQL92FilterBoot",selectorType = SelectorType.SQL92,selectorExpression = "age > 23 and weight > 60")
    11. public class SQL92FilterTopicListener implements RocketMQListener {
    12. @Override
    13. public void onMessage(MessageExt messageExt) {
    14. System.out.println("success get:发送时间"+new Date()+new String(messageExt.getBody(), Charset.defaultCharset()));
    15. }
    16. }

  • 相关阅读:
    4316. 合适数对(思维 + 离散化 + 树状数组)
    素数筛法及其优化策略
    【1106】记录
    基于Matlab分析的电力系统可视化研究
    【PE】PE文件结构(一)
    SQL经典练习题(下)
    暑假补题【7-1】(codeforces)Educational Codeforces Round 121 (Rated for Div. 2)
    【华为机试真题详解】检查是否存在满足条件的数字组合
    postman教程-14-生成随机数
    动态规划:11分割等和子集
  • 原文地址:https://blog.csdn.net/weixin_42383680/article/details/134490287