• SpringCloud Stream整合RabbitMQ3.5.0


    前言

     

    点击进入Spring官网文档

    本文章为单体项目,将消费者和生产者写在同一个项目中,介意者不用向下看了。

    本文介绍三种应用方式:

    1:普通整合RabbitMQ

    2:消息分区

    3:按条件消费(多个消费者只消费同一队列中满足自己条件的消息)

    1:核心依赖

    1. <dependency>
    2. <groupId>org.springframework.cloudgroupId>
    3. <artifactId>spring-cloud-stream-binder-rabbitartifactId>
    4. <version>${spring.cloud.stream}version>
    5. dependency>
    6. <dependency>
    7. <groupId>org.springframework.cloudgroupId>
    8. <artifactId>spring-cloud-starter-stream-rabbitartifactId>
    9. <version>${spring.cloud.stream}version>
    10. dependency>

    全部依赖:

    项目目录图:

    2:基础版整合RabbitMQ

    ①:application.properties

    1. spring.rabbitmq.host=192.168.1.218
    2. spring.rabbitmq.port=5672
    3. spring.rabbitmq.username=admin
    4. spring.rabbitmq.password=123456
    5. spring.cloud.stream.bindings.dev-exchange.destination=dev-exchange
    6. spring.cloud.stream.bindings.dev-exchange.group=dev-queue
    7. spring.cloud.stream.bindings.dev-exchange.content-type=application/json
    8. spring.cloud.stream.bindings.dev-exchange.consumer.concurrency=1
    9. spring.cloud.stream.bindings.dev-exchange.consumer.max-attempts=1

    ②:定义生产者和消费者接口

    1. import com.boot.rabbitmq.constance.MQConstants;
    2. import org.springframework.cloud.stream.annotation.Input;
    3. import org.springframework.cloud.stream.annotation.Output;
    4. import org.springframework.messaging.MessageChannel;
    5. import org.springframework.messaging.SubscribableChannel;
    6. public interface RabbitStream {
    7. /**
    8. * 消息流入(消费)
    9. **/
    10. @Input(MQConstants.DEV_EXCHANGE)
    11. SubscribableChannel devConsumer();
    12. /**
    13. * 消息流出(生产)
    14. **/
    15. @Output(MQConstants.DEV_EXCHANGE)
    16. MessageChannel devProducer();
    17. }

    ③:生产者代码:

    1. @Component
    2. @EnableBinding(RabbitStream.class)
    3. public class DevProducer {
    4. private static final Logger logger = LoggerFactory.getLogger(DevProducer.class);
    5. private final RabbitStream rabbitStream;
    6. public DevProducer(RabbitStream rabbitStream) {
    7. this.rabbitStream = rabbitStream;
    8. }
    9. public void sendMsg(MQModel model) {
    10. logger.info("producer:{}", JSON.toJSONString(model));
    11. rabbitStream.devProducer()
    12. .send(MessageBuilder.withPayload(model).build());
    13. }
    14. }

    ④:费者代码:

    1. @Component
    2. @EnableBinding(RabbitStream.class)
    3. public class DevListener {
    4. private static final Logger logger = LoggerFactory.getLogger(DevListener.class);
    5. @StreamListener(MQConstants.DEV_EXCHANGE)
    6. public void receiveMsgAutoCommit(@Payload String payload) {
    7. logger.info("consumer:{}", payload);
    8. }
    9. }

    ⑥:controller代码:

    1. @PostMapping(value = "/dev")
    2. public void dev(@RequestBody MQModel model) {
    3. devProducer.sendMsg(model);
    4. }

    ⑦:测试

    发送请求:

    控制台日志:

    3:消息分区

    3.1:概念

    RabbitMQ 本身是不支持消息分区的,只是由于业务演变+代码控制的一种方案而已(参考spring官方开头文档理解)。
    个人理解:所谓消息分区就是将一个大队列拆分 0、1...n 个小队列,
    然后分解成 producer-A -> queue-A -> Consumer-A 的一种场景。

    3.2:如何在项目中使用

    ①:配置
    不需要改很多东西,只需要添加少部分配置即可

    1. ## RabbitMQ 消息分区配置
    2. spring.cloud.stream.bindings.partition-exchange.destination=partition-exchange
    3. spring.cloud.stream.bindings.partition-exchange.group=partition-queue
    4. spring.cloud.stream.bindings.partition-exchange.content-type=application/json
    5. spring.cloud.stream.bindings.partition-exchange.consumer.concurrency=1
    6. spring.cloud.stream.bindings.partition-exchange.consumer.max-attempts=1
    7. ## 消息分区
    8. spring.cloud.stream.bindings.partition-exchange.consumer.partitioned=true
    9. ## 分区数量
    10. spring.cloud.stream.bindings.partition-exchange.producer.partition-count=2
    11. ## 机器下标,最大值=partition-count-1
    12. spring.cloud.stream.instance-index=0
    13. ## 分区策略表达式
    14. spring.cloud.stream.bindings.partition-exchange.producer.partition-key-expression=payload.mid

    ②:路由规则
    然后消息的路由的时候会从payload拿到mid进行条件运算:
    mid/2=1则放在应用队列下标为1的队列,mid/2=0则放在队列下标为0的队列。

    ③:源码截图
    消息的入队前会计算出该消息应该进入哪个队列↓↓↓↓↓↓↓↓↓↓

    可以看到开启分区之后,payload 的类型不是String,而是具备键值对的实体对象。

    4:条件消费

    4.1:概念

    前面说过,Message 是由消息头和消息体组成的。因此可以在发送消息的时候自定义一个key存放在消息头,消费者可以根据自己的消费条件进行消费。
    对同一个队列中的消息按条件进行划分再派发给不同的消费者。我的示例就是在header 中设置了一个key。

    4.2:匹配条件讲解

    除了可以用 MessageHeader 中的数据进行匹配条件外,payload(消息体)中的数据也可以作为条件。

    消息实体结构:

    4.3:测试

    代码截图↓↓↓↓↓↓↓↓↓↓↓


    效果↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓

  • 相关阅读:
    小程序注册安装以及新手快速入门教程
    QTableWidget加载大文件数据
    Linux命令之usermod命令
    VS Code 这么牛,再次印证了一句名言
    操作系统:2.3.5经典同步问题(1)——生产者消费者问题,多生产者多消费者问题
    9/7 dp练习+01背包方案数+求背包具体方案
    【学习笔记77】ajax的函数封装
    CAN总线负载及CANoe查看总线负载率
    记一次 .NET 某工控自动化控制系统 卡死分析
    2023年11月15号期中测验选择题(Java)
  • 原文地址:https://blog.csdn.net/weixin_45985053/article/details/126022564