RabbitMQ是shiyongERLANG语言编写的,实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。
常见的消息中间件:KAFKA、RabbitMQ、ActiveMQ、RocketMQ(可以处理分布式事务)
【1、通过案例理解RabbitMQ】
你想和远方的好友实现通信,这个时候你给好友写封信,我们将写好的信封装成一个信封,将信放到邮箱中,邮差就会在规定的时间将这封信送到远方好友的手中。
寄件人是A系统,远方好友是B系统,RabbitMQ就是邮箱,邮差就是信道。A系统和B系统可以通过RabbitMQ实现通信。
【2、为什么要使用RabbitMQ? 】
1、RabbitMQ实现了AMQP标准的消息服务器
2、可靠性,RabbitMQ的持久化支持,保证了消息的稳定性;
3、RabbitMQ使用了Erlang开发语言,高并发、高可用、集群部署简单等特性;
【3、RabbitMQ能做什么?】
1、业务解耦:下订单、扣减库存、生成订单、发红包、发短信、异步操作、日志处理、跨平台通信等。
2、流量消峰:控制请求量。
3、路由灵活:RabbitMQ有丰富的Exchange(交换器),通过关键字Routingkey判断消息的具体队列(Queue)。
1、生产者生产消息,发送给服务器端的Exchange交换器。
2、Exchange收到消息,根据用户发送的RoutingKey,将消息转发给匹配的Queue1或Queue2。
3、Queue1或Queue2收到消息后,将消息发送给订阅者(消费者1或消费者2)。
4、消费者收到消息,发送ACK给队列确认收到消息。
5、消费者收到ACK确认消息,然后删除队列中缓存的对应消息。
A系统和B系统之间如果不是使用同一架构或者语言编写的,这个时候可以通过RabbitMQ进行通信。
如A系统可以发送JSON类型的数据,B系统获取数据的时候可以解析JSON,达到跨系统交互。
核心包:spring-cloud-starter-bus-amqp
- "1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0modelVersion>
- <parent>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-parentartifactId>
- <version>2.7.17version>
- <relativePath/>
- parent>
- <groupId>com.txcgroupId>
- <artifactId>springbootrabbitmqdemoartifactId>
- <version>0.0.1-SNAPSHOTversion>
- <name>springbootrabbitmqdemoname>
- <description>springbootrabbitmqdemodescription>
- <properties>
- <java.version>1.8java.version>
- properties>
- <dependencies>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-amqpartifactId>
- dependency>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-webartifactId>
- dependency>
-
- <dependency>
- <groupId>org.projectlombokgroupId>
- <artifactId>lombokartifactId>
- <optional>trueoptional>
- dependency>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-testartifactId>
- <scope>testscope>
- dependency>
- <dependency>
- <groupId>org.springframework.amqpgroupId>
- <artifactId>spring-rabbit-testartifactId>
- <scope>testscope>
- dependency>
- dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-maven-pluginartifactId>
- <configuration>
- <excludes>
- <exclude>
- <groupId>org.projectlombokgroupId>
- <artifactId>lombokartifactId>
- exclude>
- excludes>
- configuration>
- plugin>
- plugins>
- build>
-
- project>
- spring:
- rabbitmq:
- addresses: localhost #IP地址
- username: guest #用户名
- password: guest #密码
- dynamic: true #默认创建一个AmqpAdmin的Bean 默认:true
@RabbitListener(queuesToDeclare = @Queue("myQueue "))作用:
1、让监听关联到myQueue队列中
2、queuesToDeclare能够实现当myQueue队列不存在的时候,自动创建
3、queuesToDeclare:后面的参数需要的是数组类型使用@Queue创建
- @Configuration
- public class RabbitMQReceive {
- @RabbitListener(queuesToDeclare = @Queue("myQueue"))
- public void process(String message){
- System.out.println("======消费消息====="+message);
- }
- }
创建控制层,模拟向RabbitMQ消息中间件发送消息。
通过convertAndSend向RabbitMQ发送消息。
- @Controller
- public class TestRabbitMQController {
- @Resource
- private AmqpTemplate amqpTemplate;
-
- @RequestMapping("/sendMessageToQueue")
- @ResponseBody
- public void sendMessageToQueue(){
- amqpTemplate.convertAndSend("myQueue","{'code':'200','msg':'你想发的消息。'}");
- }
- }
访问地址:http://localhost:8080/sendMessageToQueue
我们要实现如果是华为的订单就发送到huaweiQueue队列中,如果是联想的订单就发送到lenovoQueue队列中。
主要就是通过RoutingKey(huaweiKey、lenovoKey)来实现。
通过@RabbitListener实现创建huaweiQueue队列和computerExchange交换器,并通过huaweiKey实现队列和交换器绑定。
在不指定交换器类型的情况下,默认的交换器类型是direct
- @RabbitListener(bindings = @QueueBinding(
- exchange = @Exchange("computerExchange"),
- key = "huaweiKey",
- value = @Queue("huaweiQueue")
- ))
- public void process1(String message){
- System.out.println("======消费消息====="+message);
- }
convertAndSend参数说明:convertAndSend(“交换器名称”,”绑定key”,”需要发送的消息”);
- @RequestMapping("/sendMessageToExchange")
- @ResponseBody
- public void sendMessageToExchange(){
- amqpTemplate.convertAndSend("computerExchange","huaweiKey","{'code':'200','msg':'测试Exchange和Queue绑定'}");
- }
访问地址:http://localhost:8080/sendMessageToExchange