• SpringBoot整合RabbitMQ


    整合背景:

    1. RabbitMQ可动态配置,系统自动初始化rabbitMQ的交换机-路由-队列信息。
    2. 满足不同业务模块使用不同类型RabbitMQ交换机的需求。
    3. 统一生产者入口。
    4. 消费者集成MessageListener事件动态监听。

    一、Springboot版本和RabbitMQ依赖

    1. <dependency>
    2. <groupId>org.springframework.bootgroupId>
    3. <artifactId>spring-boot-starterartifactId>
    4. <version>2.7.13version>
    5. dependency>
    6. <dependency>
    7. <groupId>org.springframework.bootgroupId>
    8. <artifactId>spring-boot-starter-amqpartifactId>
    9. dependency>

    二、RabbitMQ动态配置文件

    可根据不同的交换机,进行队列,路由的定义。也可以对队列路由进行扩充。目前这里只用到直连类型的交换机。扩充的属性可以看(三)中对应的实体类

    1. # 通过yaml配置文件,初始化rabbitMQ的交换机-路由-队列。
    2. # 交换机(exchange)通过路由(routing-key)来确定发送到那个队列(queue)
    3. # 对于 Direct直连类型交换机。匹配路由的规则为全匹配。可以理解为routing-key与queue一对一
    4. # 对于 Fanout扇形类型交换机。则直接分发给所有绑定的队列,此时可以忽略routing-key, 多用于广播消息
    5. # 对于 Topic 主题类型交换机。根据routing-key模糊匹配队列,*匹配任意一个字符,#匹配0个或多个字符
    6. # 在spring cloud多应用中。不同的应用可能需要不同的类型。该配置只是该应用下创建的一个或多个类型。
    7. rabbit:
    8. modules:
    9. - exchange:
    10. name: default.exchange
    11. #type为RabbitExchangeTypeEnum枚举中的值。不配置默认为Direct
    12. type: DIRECT
    13. queue:
    14. name: default.queue
    15. arguments:
    16. # 队列中所有消息的最大存活时间。单位毫秒。 1分钟
    17. x-message-ttl: 60000
    18. # routing-key可以为空
    19. routing-key: default.queue.key

    三、配置文件映射

    YamlAndPropertySourceFactory.class:该类为了解决读取yaml以及yml配置文件无法映射的问题。如果可以映射配置文件,可以忽略这个

    1. /**
    2. * 读取rabbitMQ配置中的交换机,队列等信息
    3. *
    4. * @author byp
    5. * @date 2023/7/27 11:30
    6. */
    7. @Component
    8. @ConfigurationProperties(prefix = "rabbit")
    9. @PropertySource(value = "${property.source.prefix:classpath}:rabbitmq/rabbit.yaml", factory =YamlAndPropertySourceFactory.class)
    10. @Data
    11. public class RabbitProperties {
    12. private List modules;
    13. }

    RabbitModuleInfo对象:

    1. /**
    2. * RabbitMQ 队列和交换机机绑定关系实体对象
    3. *
    4. * @author byp
    5. * @date 2023/7/27 11:31
    6. */
    7. @Data
    8. public class RabbitModuleInfo {
    9. /**
    10. * 路由Key
    11. */
    12. private String routingKey;
    13. /**
    14. * 队列信息
    15. */
    16. private Queue queue;
    17. /**
    18. * 交换机信息
    19. */
    20. private Exchange exchange;
    21. /**
    22. * 交换机信息类
    23. */
    24. @Data
    25. public static class Exchange {
    26. /**
    27. * 交换机类型
    28. * 默认直连交换机
    29. */
    30. private RabbitExchangeEnum type = RabbitExchangeEnum.DIRECT;
    31. /**
    32. * 交换机名称
    33. */
    34. private String name;
    35. /**
    36. * 是否持久化
    37. * 默认true持久化,重启消息不会丢失
    38. */
    39. private boolean durable = true;
    40. /**
    41. * 当所有队绑定列均不在使用时,是否自动删除交换机
    42. * 默认false,不自动删除
    43. */
    44. private boolean autoDelete = false;
    45. /**
    46. * 交换机其他参数
    47. */
    48. private Map arguments;
    49. }
    50. /**
    51. * 队列信息类
    52. */
    53. @Data
    54. public static class Queue {
    55. /**
    56. * 队列名称
    57. */
    58. private String name;
    59. /**
    60. * 是否持久化
    61. * 默认true持久化,重启消息不会丢失
    62. */
    63. private boolean durable = true;
    64. /**
    65. * 是否具有排他性
    66. * 默认false,可多个消费者消费同一个队列
    67. */
    68. private boolean exclusive = false;
    69. /**
    70. * 当消费者均断开连接,是否自动删除队列
    71. * 默认false,不自动删除,避免消费者断开队列丢弃消息
    72. */
    73. private boolean autoDelete = false;
    74. /**
    75. * 绑定死信队列的交换机名称
    76. */
    77. private String deadLetterExchange;
    78. /**
    79. * 绑定死信队列的路由key
    80. */
    81. private String deadLetterRoutingKey;
    82. private Map arguments;
    83. }
    84. }

    YamlAndPropertySourceFactory.class

    1. /**
    2. * 获取yaml配置
    3. *
    4. * @author byp
    5. * @date 2023/5/22 10:51
    6. */
    7. @Configuration
    8. public class YamlAndPropertySourceFactory extends DefaultPropertySourceFactory {
    9. private static final String YML_RESOURCE_FILE = ".yml";
    10. private static final String YAML_RESOURCE_FILE = ".yaml";
    11. @NotNull
    12. @Override
    13. public PropertySource createPropertySource(String name, EncodedResource resource) throws IOException {
    14. Resource resourceResource = resource.getResource();
    15. if (Objects.requireNonNull(resourceResource.getFilename()).endsWith(YML_RESOURCE_FILE) || Objects.requireNonNull(resourceResource.getFilename()).endsWith(YAML_RESOURCE_FILE)) {
    16. List> sources = new YamlPropertySourceLoader().load(resourceResource.getFilename(), resourceResource);
    17. return sources.get(0);
    18. }
    19. return super.createPropertySource(name, resource);
    20. }
    21. }

    四、初始化rabbitMQ的交换机-路由-队列

    动态参数配置校验(校验可不配置):可根据不同的交换机类型,做校验,这里是对直连交换机的参数做的校验。

    1. /**
    2. * rabbitMQ初始化交换机,队列 容器
    3. *
    4. * @author byp
    5. * @date 2023/7/27 11:05
    6. */
    7. @Slf4j
    8. public class RabbitModuleInitializer implements SmartInitializingSingleton {
    9. AmqpAdmin amqpAdmin;
    10. RabbitProperties rabbitProperties;
    11. public RabbitModuleInitializer(AmqpAdmin amqpAdmin, RabbitProperties rabbitProperties) {
    12. this.amqpAdmin = amqpAdmin;
    13. this.rabbitProperties = rabbitProperties;
    14. }
    15. @Override
    16. public void afterSingletonsInstantiated() {
    17. log.info("RabbitMQ 动态创建和绑定队列、交换机开始");
    18. declareRabbitModule();
    19. log.info("RabbitMQ 动态创建和绑定队列、交换机完成");
    20. }
    21. /**
    22. * RabbitMQ 根据配置动态创建和绑定队列、交换机
    23. */
    24. private void declareRabbitModule() {
    25. List rabbitModuleInfos = rabbitProperties.getModules();
    26. if (CollectionUtils.isEmpty(rabbitModuleInfos)) {
    27. return;
    28. }
    29. for (RabbitModuleInfo rabbitModuleInfo : rabbitModuleInfos) {
    30. configParamValidate(rabbitModuleInfo);
    31. // 队列
    32. Queue queue = convertQueue(rabbitModuleInfo.getQueue());
    33. // 交换机
    34. Exchange exchange = convertExchange(rabbitModuleInfo.getExchange());
    35. // 绑定关系
    36. String routingKey = rabbitModuleInfo.getRoutingKey();
    37. String queueName = rabbitModuleInfo.getQueue().getName();
    38. String exchangeName = rabbitModuleInfo.getExchange().getName();
    39. Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null);
    40. // 创建队列
    41. amqpAdmin.declareQueue(queue);
    42. // 创建交换机
    43. amqpAdmin.declareExchange(exchange);
    44. // 队列 绑定 交换机
    45. amqpAdmin.declareBinding(binding);
    46. }
    47. }
    48. /**
    49. * RabbitMQ动态配置参数校验
    50. *
    51. * @param rabbitModuleInfo 队列和交换机机绑定关系实体对象
    52. */
    53. public void configParamValidate(RabbitModuleInfo rabbitModuleInfo) {
    54. String routingKey = rabbitModuleInfo.getRoutingKey();
    55. Assert.isTrue(StringUtils.isNotBlank(routingKey), "RoutingKey 未配置");
    56. Assert.isTrue(rabbitModuleInfo.getExchange() != null, "routingKey:{}未配置exchange", routingKey);
    57. Assert.isTrue(StringUtils.isNotBlank(rabbitModuleInfo.getExchange().getName()), "routingKey:{}未配置exchange的name属性", routingKey);
    58. Assert.isTrue(rabbitModuleInfo.getQueue() != null, "routingKey:{}未配置queue", routingKey);
    59. Assert.isTrue(StringUtils.isNotBlank(rabbitModuleInfo.getQueue().getName()), "routingKey:{}未配置exchange的name属性", routingKey);
    60. }
    61. /**
    62. * 转换生成RabbitMQ队列
    63. *
    64. * @param queue 队列
    65. * @return Queue
    66. */
    67. public Queue convertQueue(RabbitModuleInfo.Queue queue) {
    68. Map arguments = queue.getArguments();
    69. // 转换ttl的类型为long
    70. if (arguments != null && arguments.containsKey("x-message-ttl")) {
    71. arguments.put("x-message-ttl", Convert.toLong(arguments.get("x-message-ttl")));
    72. }
    73. // 是否需要绑定死信队列
    74. String deadLetterExchange = queue.getDeadLetterExchange();
    75. String deadLetterRoutingKey = queue.getDeadLetterRoutingKey();
    76. if (StringUtils.isNotBlank(deadLetterExchange) && StringUtils.isNotBlank(deadLetterRoutingKey)) {
    77. if (arguments == null) {
    78. arguments = new HashMap<>(4);
    79. }
    80. arguments.put("x-dead-letter-exchange", deadLetterExchange);
    81. arguments.put("x-dead-letter-routing-key", deadLetterRoutingKey);
    82. }
    83. return new Queue(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments);
    84. }
    85. /**
    86. * 转换生成RabbitMQ交换机
    87. *
    88. * @param exchangeInfo 交换机信息
    89. * @return Exchange
    90. */
    91. public Exchange convertExchange(RabbitModuleInfo.Exchange exchangeInfo) {
    92. AbstractExchange exchange = null;
    93. RabbitExchangeEnum exchangeType = exchangeInfo.getType();
    94. String exchangeName = exchangeInfo.getName();
    95. boolean isDurable = exchangeInfo.isDurable();
    96. boolean isAutoDelete = exchangeInfo.isAutoDelete();
    97. Map arguments = exchangeInfo.getArguments();
    98. switch (exchangeType) {
    99. case DIRECT:
    100. // 直连交换机
    101. exchange = new DirectExchange(exchangeName, isDurable, isAutoDelete, arguments);
    102. break;
    103. case TOPIC:
    104. // 主题交换机
    105. exchange = new TopicExchange(exchangeName, isDurable, isAutoDelete, arguments);
    106. break;
    107. case FANOUT:
    108. //扇形交换机
    109. exchange = new FanoutExchange(exchangeName, isDurable, isAutoDelete, arguments);
    110. break;
    111. case HEADERS:
    112. // 头交换机
    113. exchange = new HeadersExchange(exchangeName, isDurable, isAutoDelete, arguments);
    114. break;
    115. }
    116. return exchange;
    117. }
    118. }

    五、RabbitMQ配置初始化bean以及添加动态监听

    1. /**
    2. * rabbitMQ配置
    3. *
    4. * @author byp
    5. * @date 2023/7/27 9:14
    6. */
    7. @Configuration
    8. public class RabbitConfig {
    9. /**
    10. * 通过yaml配置,创建队列、交换机初始化器
    11. */
    12. @Bean
    13. @ConditionalOnMissingBean
    14. public RabbitModuleInitializer rabbitModuleInitializer(AmqpAdmin amqpAdmin, RabbitProperties rabbitProperties) {
    15. return new RabbitModuleInitializer(amqpAdmin, rabbitProperties);
    16. }
    17. /**
    18. * 动态监听所有队列
    19. *
    20. * @param rabbitConsumerManager 用于消息监听的类
    21. * @param connectionFactory MQ连接
    22. * @return SimpleMessageListenerContainer
    23. */
    24. @Bean
    25. public SimpleMessageListenerContainer mqMessageContainer(RabbitConsumerManager rabbitConsumerManager,
    26. ConnectionFactory connectionFactory) {
    27. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    28. //所有的队列
    29. container.setQueueNames(Objects.requireNonNull(RabbitQueueEnum.getAllQueues()));
    30. container.setExposeListenerChannel(true);
    31. //设置每个消费者获取的最大的消息数量
    32. container.setPrefetchCount(100);
    33. //消费者个数
    34. container.setConcurrentConsumers(1);
    35. //设置确认模式为手工确认
    36. container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    37. //监听处理类
    38. container.setMessageListener(rabbitConsumerManager);
    39. return container;
    40. }
    41. }

    以上五个步骤就完成了根据配置文件初始化rabbitMQ的交换机-路由-队列

    六、统一消息生产者入口

    1. /**
    2. * rabbit MQ 发送消息
    3. *
    4. * @author byp
    5. * @date 2023/7/27 8:58
    6. */
    7. @Component
    8. @RequiredArgsConstructor
    9. @Slf4j
    10. public class RabbitProducerManager {
    11. private final RabbitTemplate rabbitTemplate;
    12. private final AmqpAdmin amqpAdmin;
    13. /**
    14. * 指定队列发送消息
    15. * 【注】 rabbitQueueEnum 与 rabbitRoutingEnum 的值可以考虑使用相同的,方便发送消息与接受消息。
    16. *
    17. * @param message 消息体
    18. * @param rabbitQueueEnum 队列枚举
    19. */
    20. public void sendMessage(RabbitQueueEnum rabbitQueueEnum, RabbitRoutingEnum rabbitRoutingEnum, String message) {
    21. if (!isExistQueue(rabbitQueueEnum)) {
    22. //可以改成没有队列的话,自动创建队列。
    23. throw new BusinessException("发送消息失败,队列为空");
    24. }
    25. rabbitTemplate.convertAndSend("default.exchange", rabbitRoutingEnum.getCode(), message);
    26. log.info("向路由:{}, 发送消息成功:{}", rabbitRoutingEnum.getCode(), message);
    27. }
    28. /**
    29. * 判断队列是否存在
    30. *
    31. * @param rabbitQueueEnum 队列枚举
    32. * @return boolean true-存在
    33. */
    34. public boolean isExistQueue(RabbitQueueEnum rabbitQueueEnum) {
    35. boolean falg = true;
    36. if (StringUtils.isBlank(rabbitQueueEnum.getCode())) {
    37. throw new BusinessException("队列名称为空");
    38. }
    39. Properties queueProperties = amqpAdmin.getQueueProperties(rabbitQueueEnum.getCode());
    40. if (queueProperties == null) {
    41. falg = false;
    42. }
    43. return falg;
    44. }
    45. }

    七、(消费者)MessageListener动态监听

    1. /**
    2. * rabbitMQ 消费者
    3. * 不统一处理消费,各应用自己创建监听消息消费
    4. *
    5. * @author byp
    6. * @date 2023/7/27 9:39
    7. */
    8. @Component
    9. @Slf4j
    10. @RequiredArgsConstructor
    11. public class RabbitConsumerManager implements ChannelAwareMessageListener {
    12. private final ApplicationEventPublisher publisher;
    13. @Override
    14. public void onMessage(Message message, Channel channel) throws IOException {
    15. MessageProperties messageProperties = message.getMessageProperties();
    16. //消息唯一标识
    17. long deliveryTag = messageProperties.getDeliveryTag();
    18. try {
    19. //路由
    20. String routingKey = messageProperties.getReceivedRoutingKey();
    21. //渠道
    22. String consumerQueue = messageProperties.getConsumerQueue();
    23. //消息体
    24. String msg = new String(message.getBody());
    25. log.info("RabbitConsumerManager监听到消息标识:{}, 队列:{}, 路由:{}, 的消息:{}", deliveryTag, consumerQueue,
    26. routingKey, msg);
    27. //将消息发送到spring的事件监听中
    28. JSONObject json = new JSONObject();
    29. json.put("deliveryTag", deliveryTag);
    30. json.put("routingKey", routingKey);
    31. json.put("consumerQueue", consumerQueue);
    32. json.put("msg", msg);
    33. json.put("channel", channel);
    34. //这一步可以根据routingKey进行事件分类。在监听中根据不同的类进行事件处理
    35. publisher.publishEvent(json);
    36. //这里确认消息消费不合适,这里并非是真实的业务消费场景。
    37. //channel.basicAck(deliveryTag, true);
    38. } catch (Exception e) {
    39. channel.basicReject(deliveryTag, false);
    40. log.error("获取rabbitMQ消息报错");
    41. e.printStackTrace();
    42. }
    43. }
    44. }

    八、消息处理

    1. /**
    2. * rabbitMQ consumer事件监听
    3. *
    4. * @author byp
    5. * @date 2023/7/28 11:11
    6. */
    7. @Component
    8. @Slf4j
    9. @RequiredArgsConstructor
    10. public class MessageListener {
    11. @EventListener
    12. public void handleMessage(JSONObject json) throws IOException {
    13. log.info("接受到消息json:{}", json);
    14. //标识消息在队列中的顺序
    15. long deliveryTag = json.getLong("deliveryTag");
    16. Channel channel = (Channel) json.get("channel");
    17. try {
    18. //TODO 业务逻辑
    19. //标记消费成功, false-只确认当前消息 ture-批量确认,当前标记之前的都确认消费
    20. channel.basicAck(deliveryTag, false);
    21. } catch (Exception e) {
    22. //拒绝消费并放入queue中,这时消息会进入循环,直到消费掉。 第二个参数表示是否放回队列中
    23. channel.basicReject(deliveryTag, true);
    24. log.error("消息确认失败,消息标识:{}", deliveryTag);
    25. }
    26. }
    27. }

  • 相关阅读:
    基于JAVA旅游景点推荐系统计算机毕业设计源码+数据库+lw文档+系统+部署
    大数据:Shell的操作
    【深度学习3】线性回归与逻辑回归
    sql去重查询
    Dockerfile(容器与镜像 自定义nginx tomcat 镜像优化)
    【PyQt学习篇 · ⑨】:QWidget -控件交互
    基于springboot的果蔬配送商城
    CentOS 7安装Redis5.0.7
    2022年双十一数码产品排名,数码好物选购指南
    vue使用axios,mock请求假数据,脱离后端的限制!!!这一篇就够了
  • 原文地址:https://blog.csdn.net/ILoveController/article/details/132684920