• SpringBoot——整合RabbitMQ收发消息


    目录

    RabbitMQ消息队列 

    项目总结

    新建一个SpringBoot项目

    pom.xml

    application.properties配置文件

    index.html前端页面

    RabbitMQConfig配置类

    RabbitMQProducer生产者

    RabbitMQConsumer消费者

    IndexController控制器

    SpringbootRabbitmqApplication启动类

     测试


    RabbitMQ消息队列 

    • 消息中间件是位于两个或多个应用程序之间的中介软件,它允许应用程序相互通信和数据交换,而无需直接连接和交互。
    • RabbitMQ 作为消息中间件,提供了这种消息传递的机制和基础架构,使得不同系统之间能够通过异步消息队列进行有效通信。 
    • 它是一个基于AMQP(Advanced Message Queuing Protocol,高级消息队列协议)协议的消息代理,提供了跨应用和跨服务的异步通信能力。
    • RabbitMQ的核心概念包括:

      • 消息(Message):数据的最小单位,包含了消息头和消息体。
      • 队列(Queue):用于存储消息,等待被消费者接收和处理。
      • 交换器(Exchange):负责接收消息并将其路由到合适的队列,基于预定义的规则。
        • 直接交换(Direct Exchange):基于路由键的精确匹配。
        • 主题交换(Topic Exchange):基于路由键的模式匹配。
        • 扇形交换(Fanout Exchange):广播消息到所有绑定的队列。
        • 首选交换(Headers Exchange):基于消息头进行匹配。(headers交换机和direct交换机完全一致,但性能差很多,目前几乎用不到了)
      • 绑定(Binding):定义交换器与队列之间的关系,确定消息的路由规则。
      • 路由键(Routing Key):用于在交换器和队列之间路由消息。
      • 消费者(Consumer):从队列中接收和处理消息的应用程序或服务。
      • 发布者(Producer):向交换器发送消息的应用程序或服务。
    • RabbitMQ广泛用于构建分布式系统、微服务架构和事件驱动架构,以实现异步通信、负载分担、数据解耦和高可用性。

    项目总结

    1. 添加依赖:在Spring Boot项目的pom.xml文件中添加RabbitMQ的依赖

    2. 配置RabbitMQ连接信息:在application.propertiesapplication.yml文件中配置RabbitMQ的连接信息,包括主机名、端口号、用户名、密码等信息。

    3. 创建配置类

      1. 使用@Bean注解定义Exchange和Queue,并绑定它们的关系,以确保消息能够被正确路由和传递。

      2. 使用@Bean注入CachingConnectionFactory缓存 连接工厂对象,用于开启RabbitMQ的消息发送确认模式

    4. 创建生产者:编写一个发送消息的生产者(Producer),可以是一个服务类或控制器方法。使用RabbitTemplate来发送消息到RabbitMQ的Exchange。

    5. 创建消费者:编写一个接收消息的消费者(Consumer),使用@RabbitListener监听指定的Queue,并处理接收到的消息,比如打印在控制台上。

    6. 启动项目和RabbitMQ服务后,项目的工作流程

      1. 打开浏览器访问页面,在文本域填写要发送信息,点击“发送”,

      2. 后端IndexController控制器接收到用户提交的消息后,由生产者将消息发送给RabbitMQ的交换机,RabbitMQ再将消息根据路由键路由到队列中,并且将结果反馈给服务器,服务器将打印”消息发送成功“的日志

      3. 消费者中监听此队列就会立刻收到并处理接收到的消息,打印到控制台上

    自己从填写要发送的信息的地方开始分析,顺藤摸瓜,将项目的几个文件串连起来

    新建一个SpringBoot项目

    项目结构:

    pom.xml

    1. "1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    4. <modelVersion>4.0.0modelVersion>
    5. <parent>
    6. <groupId>org.springframework.bootgroupId>
    7. <artifactId>spring-boot-starter-parentartifactId>
    8. <version>2.3.12.RELEASEversion>
    9. <relativePath/>
    10. parent>
    11. <groupId>com.studygroupId>
    12. <artifactId>springboot_rabbitmqartifactId>
    13. <version>0.0.1-SNAPSHOTversion>
    14. <name>springboot_rabbitmqname>
    15. <description>Demo project for Spring Bootdescription>
    16. <properties>
    17. <java.version>8java.version>
    18. properties>
    19. <dependencies>
    20. <dependency>
    21. <groupId>org.springframework.bootgroupId>
    22. <artifactId>spring-boot-starter-amqpartifactId>
    23. dependency>
    24. <dependency>
    25. <groupId>org.springframework.bootgroupId>
    26. <artifactId>spring-boot-starter-thymeleafartifactId>
    27. dependency>
    28. <dependency>
    29. <groupId>org.springframework.bootgroupId>
    30. <artifactId>spring-boot-starter-webartifactId>
    31. dependency>
    32. <dependency>
    33. <groupId>org.springframework.bootgroupId>
    34. <artifactId>spring-boot-starter-testartifactId>
    35. <scope>testscope>
    36. dependency>
    37. <dependency>
    38. <groupId>org.springframework.amqpgroupId>
    39. <artifactId>spring-rabbit-testartifactId>
    40. <scope>testscope>
    41. dependency>
    42. dependencies>
    43. <build>
    44. <plugins>
    45. <plugin>
    46. <groupId>org.springframework.bootgroupId>
    47. <artifactId>spring-boot-maven-pluginartifactId>
    48. plugin>
    49. plugins>
    50. build>
    51. project>

    application.properties配置文件

    1. spring.rabbitmq.host=192.168.40.128
    2. spring.rabbitmq.port=5672
    3. spring.rabbitmq.username=admin
    4. spring.rabbitmq.password=123456
    5. # 指定队列,交换机,路由键的名称
    6. #队列用于存储消息,生产者发送消息到队列,消费者从队列接收消息进行处理。
    7. rabbit.queue.name=springboot.queue.test
    8. #交换机负责将消息路由到一个或多个队列中,根据指定的路由键来确定消息的路由规则。
    9. rabbit.exchange.name=springboot.exchange.test
    10. #在消息发送时,会指定消息的路由键,交换机根据这个路由键来决定将消息路由到哪些队列中。
    11. rabbit.routing.key=springboot.routingkey.test

    index.html前端页面

    1. html>
    2. <html lang="en">
    3. <head>
    4. <meta charset="UTF-8">
    5. <title>Titletitle>
    6. head>
    7. <body>
    8. <form action="/sendMessage" method="post">
    9. <textarea rows="4" cols="40" name="message">textarea>
    10. <br><input type="submit" value="发送"/>
    11. form>
    12. body>
    13. html>

    RabbitMQConfig配置类

    1. package com.study.springboot_rabbitmq.config;
    2. import org.springframework.amqp.core.Binding;
    3. import org.springframework.amqp.core.BindingBuilder;
    4. import org.springframework.amqp.core.DirectExchange;
    5. import org.springframework.amqp.core.Queue;
    6. import org.springframework.beans.factory.annotation.Value;
    7. import org.springframework.context.annotation.Bean;
    8. import org.springframework.context.annotation.Configuration;
    9. /**
    10. * 配置类: 绑定队列和交换器
    11. */
    12. @Configuration
    13. public class RabbitMQConfig {
    14. @Value("${rabbit.queue.name}")
    15. String queueName;
    16. @Value("${rabbit.exchange.name}")
    17. String exchangeName;
    18. @Value("${rabbit.routing.key}")
    19. String routingKey;
    20. @Bean()
    21. public Queue initQueue(){//创建队列
    22. return new Queue(queueName);
    23. }
    24. @Bean()
    25. public DirectExchange initDirectExchange(){//创建交换器
    26. return new DirectExchange(exchangeName);
    27. }
    28. @Bean
    29. public Binding bindingDirect(){//将队列与交换器绑定
    30. return BindingBuilder.bind(initQueue()).to(initDirectExchange()).with(routingKey);
    31. }
    32. }

    RabbitMQProducer生产者

    1. package com.study.springboot_rabbitmq.service;
    2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    3. import org.springframework.beans.factory.annotation.Autowired;
    4. import org.springframework.beans.factory.annotation.Value;
    5. import org.springframework.stereotype.Service;
    6. /**
    7. * 消息生产者
    8. */
    9. @Service
    10. public class RabbitMQProducer {
    11. @Autowired
    12. RabbitTemplate rabbitTemplate;
    13. //读取配置文件中的交换器名称和路由键名称
    14. @Value("${rabbit.exchange.name}")
    15. String exchangeName;
    16. @Value("${rabbit.routing.key}")
    17. String routingKey;
    18. //将消息发送给RabbitMQ的交换机,交换机通过路由键决定将消息路由到哪些队列
    19. public void send(String message){
    20. rabbitTemplate.convertAndSend(exchangeName,routingKey,message);
    21. }
    22. }

    RabbitMQConsumer消费者

    1. package com.study.springboot_rabbitmq.service;
    2. import org.slf4j.Logger;
    3. import org.slf4j.LoggerFactory;
    4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    5. import org.springframework.stereotype.Service;
    6. /**
    7. * 消息消费者
    8. */
    9. @Service
    10. public class RabbitMQConsumer {
    11. private static final Logger log= LoggerFactory.getLogger(RabbitMQConsumer.class);
    12. //使用@RabbitListener监听配置文件中的队列,当收到消息后,将其打印在控制台上
    13. @RabbitListener(queues = "${rabbit.queue.name}")
    14. public void getMessage(String message){
    15. log.info("消费者收到消息:{}",message);
    16. }
    17. }

    IndexController控制器

    • 发送确认模式:在生产者向RabbitMQ发送消息后,RabbitMQ可以给生产者一个反馈消息,这个反馈消息中会包含接收是否成功、失败原因等一系列内容
    1. package com.study.springboot_rabbitmq.config;
    2. import org.slf4j.Logger;
    3. import org.slf4j.LoggerFactory;
    4. import org.springframework.amqp.core.*;
    5. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    6. import org.springframework.amqp.rabbit.connection.CorrelationData;
    7. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    8. import org.springframework.beans.factory.annotation.Autowired;
    9. import org.springframework.beans.factory.annotation.Value;
    10. import org.springframework.cache.annotation.Caching;
    11. import org.springframework.context.annotation.Bean;
    12. import org.springframework.context.annotation.Configuration;
    13. /**
    14. * 配置类: 绑定队列和交换器
    15. */
    16. @Configuration
    17. public class RabbitMQConfig {
    18. private static final Logger log= LoggerFactory.getLogger(RabbitMQConfig.class);
    19. @Autowired
    20. private CachingConnectionFactory connectionFactory;
    21. @Value("${rabbit.queue.name}")
    22. String queueName;
    23. @Value("${rabbit.exchange.name}")
    24. String exchangeName;
    25. @Value("${rabbit.routing.key}")
    26. String routingKey;
    27. @Bean()
    28. public Queue initQueue(){//创建队列
    29. return new Queue(queueName);
    30. }
    31. @Bean()
    32. public DirectExchange initDirectExchange(){//创建交换器
    33. return new DirectExchange(exchangeName);
    34. }
    35. @Bean
    36. public Binding bindingDirect(){//将队列与交换器绑定
    37. return BindingBuilder.bind(initQueue()).to(initDirectExchange()).with(routingKey);
    38. }
    39. //RabbitMQ收到消息后,把结果反馈给服务器,服务器将打印日志
    40. @Bean
    41. public RabbitTemplate rabbitTemplate(){
    42. //消息发送成功后触发确认方法
    43. connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
    44. //消息发送失败后触发回调方法
    45. connectionFactory.setPublisherReturns(true);
    46. //通过连接工厂对象创建RabbitTemplate对象
    47. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    48. //若交换器无法匹配到指定队列,则取消发送消息
    49. rabbitTemplate.setMandatory(true);
    50. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    51. @Override
    52. public void confirm(CorrelationData correlationData, boolean ack, String cause) {//ack:RabbitMQ返回的应答
    53. if(ack){
    54. log.info("消息发送成功");
    55. }else{
    56. log.info("消息发送失败,原因: {}",cause);
    57. }
    58. }
    59. });
    60. rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
    61. @Override
    62. public void returnedMessage(Message message, int i, String s, String s1, String s2) {
    63. log.info("消息发送失败: {}",message);
    64. }
    65. });
    66. return rabbitTemplate;
    67. }
    68. }

    SpringbootRabbitmqApplication启动类

    1. package com.study.springboot_rabbitmq;
    2. import org.springframework.boot.SpringApplication;
    3. import org.springframework.boot.autoconfigure.SpringBootApplication;
    4. @SpringBootApplication
    5. public class SpringbootRabbitmqApplication {
    6. public static void main(String[] args) {
    7. SpringApplication.run(SpringbootRabbitmqApplication.class, args);
    8. }
    9. }

     测试

    开启RabbitMQ服务,启动项目

    从Spring Boot项目的角度来看,无论RabbitMQ运行在Windows还是Linux上,整合RabbitMQ的方式都是基本一致的,只要你的Spring Boot应用程序连接到RabbitMQ服务器,并使用RabbitMQ客户端库进行通信即可

  • 相关阅读:
    文字上屏展示,文本内容自动滚动下一行
    SpringMVC——基于MVC架构的Spring框架
    Python 将数据写入csv、xlsx、xls文件中(工厂方法、封装、优雅)
    自考本科,毕业八年,2023浙大MPA提面优秀分享
    顽皮恶魔(模拟)
    数据库学习之基础内容
    mysql学习笔记8——常用5个内置方法
    iSpring SDK 10.2.X Crack iSpring SDK 9.7.X
    Windows网络模型之重叠IO模型(事件通知和完成例程)
    在PHP8中使用instanceof操作符检测对象类型-PHP8知识详解
  • 原文地址:https://blog.csdn.net/weixin_72052233/article/details/139245231