(1)介绍
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。
(2)工作流程
发布者(Publisher)发布消息(Message),经过交换机(Exchange),交换机根据路由规则将收到消息分发给交换机绑定的队列(Queue),最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。
(1)介绍
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的。所有主要的编程语言均有与代理接口通讯的客户端库。
消息中间件作为分布式系统重要组件之一,可以解决应用耦合,异步消息,流量削峰等问题。
(2)特点
RabbitMQ两大核心特性:异步消息、队列。 异步消息:只要异步消息就不阻塞线程,减少了主线程执行时间。所有需要这种效果场景都可以使用MQ。 队列:进入队列的数据一定有先后之分。只要应用程序要对内容分先后的场景都可以使用MQ。
(3)适用场景
1.应用解耦
2. 排队算法
使用队列特性。把数据发送给MQ,进入到队列就有了排队的效果
3.秒杀活动
使用队列特性。例如:抢红包、限时秒杀、直播卖货时抢商品。使用了MQ按照顺序一个一个操作,当商品库存操作到0个时,秒杀结束。
4.消息分发
在程序中同时向多个其他程序发送消息。应用了AMQP中交换机,实现消息分发
5.异步处理
利用MQ异步消息特性。大大提升主线程效率。
6.数据同步
利用异步特性。我们电商中使用RabbitMQ绝大多数的事情就是在实现数据同步。
7.处理耗时任务
利用异步特性。可以把程序中耗时任务(例如:发送邮件、发送验证码)交给MQ去处理,减少当前项目的耗时时间。
8.流量削峰
在互联网项目中,可能会出现某一段时间范围内,访问流量骤增的情况(双11、品牌促销,10点抢购),如果使用监控工具,会发现这段时间访问出现顶峰。使用MQ可以把这些访问分摊到多个项目中,把流量分摊,去除了顶峰效果,这就叫做流量削锋。是利用RabbitMQ中交换机实现的。
发送者Publisher向RabbitMQ发送消息Message,在Message会包含路由键Routing Key、交换器名称、消息内容。交换器Exchange接收到消息Message后会根据交换器类型Exchange Type判断把消息如何发送给绑定的队列Queue中,如果交换器类型是Direct这个消息只放入到路由键对应的队列中,如果是topic交换器消息放入到routing key匹配的多个队列中,如果是fanout交换器消息会放入到所有绑定到交换器的队列中。放入到队列成功后会返回给发送者Publisher一个ACK确认消息,表示消息发送成功了。剩下的事情是由Consumer进行完成,Consumer一直在监听队列,当队列里面有消息就会把消息取出,取出后根据程序的逻辑对消息进行处理,处理完成后会返回给RabbitMQ一个ACK,表示消息处理完成,RabbitMQ会删除这个消息。以上这些就是RabbitMQ的运行原理。
1. Message
消息。它由消息头消息体组成。消息体是不透明的,而消息头则由一系列可选属性组成,这些属性包括:routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出消息可能持久性存储)等。
2.Publisher
消息的生产者。也是一个向交换器发布消息的客户端应用程序。 通俗说明:哪些项目向RabbitMQ发送消息,哪些项目就是Publisher
3. Consumer
消息的消费者。表示一个从消息队列中取得消息的客户端应用程序。Consumer会一直监听指定的队列,只要队列中有消息,就会按照顺序依次取出。使用MQ做耗时任务时,耗时任务就交给Consumer进行完成。
4. Exchange
交换器。用来接收生产者发送的消息并将这些消息路由给服务器中的队列。一共支持四种的交换器类型:
direct(发布与订阅 完全匹配)
fanout(广播)
topic(主题,规则匹配)
header(使用较少,相比direct就多了一些头信息)
5. Binding
绑定。用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。 一个交换器里面可以绑定多个队列。一个队列一般都是只绑定到一个交换器上。消息发送给交换器,交换器会把效果按照特定规则发送给绑定的队列。
6. Queue
消息队列。用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
7. Routing-key
路由键。RabbitMQ决定消息该投递到哪个队列的规则。(也可以理解为队列的映射,路由键是key,队列是value)。队列通过路由键绑定到交换器。 消息发送到MQ服务器时,消息将拥有一个路由键,即便是空的,RabbitMQ也会将其和绑定使用的路由键进行匹配。 如果相匹配,消息将会投递到该队列。如果不匹配,消息将会进入黑洞(相当于丢弃)。 通俗理解:队列绑定到交换器时有路由键,这个路由键就相当于key-value中的key,value则是队列。当Publisher发送消息时一定会携带路由键(即使路由键是Null),有了路由键就让交换器知道了这个消息要发送给哪个队列。
8. Connection
链接。指Rabbit服务器和客户端建立的TCP链接。
9. Channel
Channel中文叫做信道,是TCP里面的虚拟链接。例如:电缆相当于TCP,信道是一个独立光纤束,一条TCP连接上创建多条信道是没有问题的。 在RabbitMQ中,TCP链接一旦打开,就会创建AMQP信道。无论是发布消息、接收消息、订阅队列,这些动作都是通过信道完成的。
10. Virtual Host
虚拟主机。表示一批交换器,消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是 '/'。 通俗理解:一个RabbitMQ可以包含多个虚拟主机,每个虚拟主机都是一个RabbitMQ。平时我们没有去创建虚拟主机,都是使用RabbitMQ里面默认的'/'虚拟主机,单实际上一个RabbitMQ可以包含多个虚拟主机主机的,也就是说一个RabbitMQ可以包含多个实例。就像MySQL可以创建多个数据库一样。
11. Borker
表示消息队列服务器实体。就是RabbitMQ服务器进程。
12. 交换器和队列的关系
交换器是通过路由键和队列绑定在一起的,如果消息拥有的路由键跟交换器中绑定的路由键匹配,那么消息就会被路由到该绑定的队列中。 也就是说,消息到队列的过程中,消息首先会经过交换器,接下来交换器在通过路由键匹配分发消息到具体的队列中。 路由键可以理解为匹配的规则。
13. RabbitMQ为什么需要信道?为什么不是TCP直接通信?
TCP的创建和销毁开销特别大。创建需要3次握手,销毁需要4次分手。
如果不用信道,那应用程序就会以TCP链接RabbitMQ,高峰时每秒成千上万条链接会造成资源巨大的浪费,而且操作系统每秒处理TCP链接数也是有限制的,必定造成性能瓶颈。
信道的原理是一条线程一条通道,多条线程多条通道共用一条TCP链接。一条TCP链接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能的瓶颈。
这也是为什么使用RabbitMQ去处理秒杀、流量削锋、海量请求时依然对RabbitMQ比较有信心的原因。
(1)拉取镜像
docker pull rabbitmq:management
(2)创建并启动容器
docker run -d --name rabbitmq -p 15672:15672 -p 5672:5672 --restart=always -e DEFAULT_USER=bjsxt -e DEFAULT_PASS=bjsxt rabbitmq:management
Spring AMQP是Spring的顶级项目。是基于AMQP协议的消息传递解决方案。此框架提供顶级抽象模板AmqpTemplate接口,用于抽象消息传递标准。提供基于容器的监听处理。暂时Spring AMQP只提供了基于RabbitMQ处理消息传递的解决方案。其具体接口是RabbitOperations、实现是RabbitTemplate。
(1)引入依赖
org.springframework.boot
spring-boot-starter-amqp
(2)编写配置文件
# 配置RabbitMQ相关信息
# 当创建RabbitMQ容器的时候,不提供用户名和密码配置,自动创建用户guest,密码guest。
# guest用户只能本地访问RabbitMQ。
spring:
rabbitmq:
host: 192.168.91.128 # RabbitMQ服务器的IP。默认localhost
port: 5672 # RabbitMQ服务器的端口。
username: bjsxt # RabbitMQ的访问用户名。默认guest。
password: bjsxt # RabbitMQ的访问密码。默认guest
virtual-host: / # 连接RabbitMQ中的哪一个虚拟主机。默认 /
(3)发送消息
- /**
- * 测试Spring AMQP框架中发送消息的方式。
- */
- @SpringBootTest
- public class TestPublisher {
- /**
- * 注入客户端对象。
- * 类型可以是: AmqpTemplate(顶级接口), RabbitOperations(专用子接口),RabbitTemplate(具体实现)
- * 建议使用接口: 优先级是 RabbitOperations > AmqpTemplate
- */
- @Autowired
- private RabbitOperations rabbitOperations;
-
- /**
- * 测试发送消息
- * 消息内容是字符串。
- * 注意:
- * Spring AMQP可以发送的消息类型必须是Message类型。
- * Spring AMQP可以帮助程序员自动封装消息类型Message对象。
- * 只要提供消息具体内容(消息体)即可实现默认封装。
- * Spring AMQP可以自动转换封装的消息体类型是Object。只要类型可序列化即可。
- */
- @Test
- public void testSendStringMessage(){
- String messageContent = "第一个字符串消息";
- String exchangeName = "direct.first.ex";
- String routingKey = "routing.key.1";
-
- // 发送消息的时候,只要指定要发送到的具体交换器名称,路由键,和消息内容即可。
- rabbitOperations.convertAndSend(exchangeName, routingKey, messageContent);
-
- System.out.println("消息发送完毕");
- }
- }
(4)基于Configuration配置创建交换器、队列及完成绑定
- @Configuration
- public class RabbitMQConfig {
- // 发送消息时如果不存在这个队列,会自动创建这个队列。
- // 注意:是发送消息时,而不是启动项目时。
- // 相当于:可视化操作时创建一个队列
- // 如果队列创建完成后,没有绑定(没有另外两个方法),默认绑定到AMQP default交换器
- @Bean
- public Queue queue(){
- return new Queue("queue.second");
- }
-
- // 如果没有这个交换器,在发送消息创建这个交换器
- // 配置类中方法名就是这个类型的实例名。相当于
的id属性,返回值相当于class - @Bean
- public DirectExchange directExchange(){
- return new DirectExchange("direct.first.ex");
- }
-
- // 配置类中方法参数,会由Spring 容器自动注入
- @Bean
- public Binding directBingding(DirectExchange directExchange,Queue queue){
- // with(“自定义路由键名称”)
- return BindingBuilder.bind(queue).to(directExchange).with("routing.key.2");
- // withQueueName() 表示队列名就是路由键名称
- // return BindingBuilder.bind(queue).to(directExchange).withQueueName();
- }
- }
(5)编辑消息消费者
- /**
- * 字符串类型消息体处理消费者
- * Spring AMQP和Spring Boot配合的启动器,可以自动注册监听。
- * 要求,当前类型的bean对象,必须被spring容器管理。
- */
- @Component
- public class StringMessageConsumer {
-
- /**
- * 常规开发中,都会先定义消息的消费者。后定义消息的发布者。
- * 典型的观察者设计模式。先有监听,后有事件。
- *
- * 注解RabbitListener
- * 可选属性: 常用属性
- * bindings - 定义绑定规则。属性类型是: QueueBinding[]
- * 注解QueueBinding,描述具体的绑定规则。就是交换器和队列的绑定规则。
- * 必要属性:
- * value - 监听的队列,属性类型是Queue
- * exchange - 队列绑定的交换器,属性是Exchange
- * 可选属性:
- * key - 绑定的路由键都是什么,类型是String[]
- * 注解Queue,描述一个具体的队列,如果队列在RabbitMQ中存在,直接使用并监听;如果不存在,
- * 创建队列并监听。
- * 可选属性:
- * name - 队列名称,String类型
- * autoDelete - 是否是自动删除的队列,String类型。可选值: "true" | "false"
- *
- * 注解Exchange,描述一个具体的交换器,如果交换器存在,直接使用;如果不存在,则创建,
- * 并基于key绑定队列
- * 可选属性:
- * name - 交换器名称,String类型
- * autoDelete - 是否是自动删除的交换器,String类型。可选值: "false" | "true"。默认false
- * type - 交换器的类型,String类型。默认是direct。可选: direct,fanout,topic,headers
- * 可以使用枚举类型中的常量赋值,具体是ExchangeTypes.XXX
- * @param messageBody
- */
- @RabbitListener(bindings = {
- @QueueBinding(
- value = @Queue(name = "queue.second", autoDelete = "false", durable = "true"),
- exchange = @Exchange(name = "direct.second.ex",
- autoDelete = "false", type = ExchangeTypes.DIRECT),
- key = {"routing.key.second.1", "routing.key.second.2"}
- )
- })
- public void onMessage(String messageBody){
- System.out.println("第二个消息消费者监听,处理消息:" + messageBody);
- }
- }
- /**
- * 处理自定义类型消息体
- */
- @Component
- public class MyMessageConsumer {
- /**
- * Spring AMQP可以自动实现消息体类型转换。
- * 使用的方式是强制类型转换。只要包装传输的消息体数据类型和方法参数类型匹配即可。
- */
- @RabbitListener(bindings = {
- @QueueBinding(
- value = @Queue(name = "pojo.queue1", autoDelete = "false"),
- exchange = @Exchange(name = "topic.pojo.ex", type = "topic"),
- key = {"routing.key.1"}
- )
- })
- public void onMessage1(MyMessage myMessage){
- System.out.println("处理自定义消息:" + myMessage);
- }
-
- /**
- * 可以通过统一消息类型Message处理消息内容
- * @param message
- */
- @RabbitListener(bindings = {
- @QueueBinding(
- value = @Queue(name = "pojo.queue2", autoDelete = "false"),
- exchange = @Exchange(name = "topic.pojo.ex", type = "topic"),
- key = {"routing.key.2"}
- )
- })
- public void onMessage2(Message message) throws Exception{
- // 获取消息体,消息体是字节数组。根据具体类型进行处理。
- byte[] body = message.getBody();
- ByteArrayInputStream byteArrayInputStream =
- new ByteArrayInputStream(body);
- ObjectInputStream inputStream =
- new ObjectInputStream(byteArrayInputStream);
- Object obj = inputStream.readObject();
- if(obj.getClass() == MyMessage.class){
- MyMessage myMessage = (MyMessage) obj;
- System.out.println(myMessage);
- }
- System.out.println("消息体中的对象类型是:" + obj.getClass().getName());
- }
- }
(1)修改配置
spring:
rabbitmq:
host: 192.168.91.128 # RabbitMQ服务器的IP。默认localhost
port: 5672 # RabbitMQ服务器的端口。
username: bjsxt # RabbitMQ的访问用户名。默认guest。
password: bjsxt # RabbitMQ的访问密码。默认guest
virtual-host: / # 连接RabbitMQ中的哪一个虚拟主机。默认 /
publisher-confirm-type: correlated # 开启到达交换器确认机制。默认值:none,不开启确认机制。
publisher-returns: true # 开启路由失败确认机制。默认值:false
(2)实现RabbitTemplate.ConfirmCallback和RabbitTemplate.ReturnsCallback接口重写不能到达交换器或路由失败时的回调处理逻辑
- @Component
- public class PublisherHandler implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- /**
- * 构造方法执行结束后立刻执行此方法。即初始化逻辑。
- */
- @PostConstruct
- public void init(){
- // 设置RabbitTemplate中的回调逻辑
- this.rabbitTemplate.setConfirmCallback(this);
- this.rabbitTemplate.setReturnsCallback(this);
- }
-
- /**
- * 消息路由失败回调逻辑
- * @param returned 路由失败的消息
- */
- @Override
- public void returnedMessage(ReturnedMessage returned) {
- System.out.println("交换器 : " + returned.getExchange());
- System.out.println("路由键 : " + returned.getRoutingKey());
- System.out.println("路由失败编码 : " + returned.getReplyCode());
- System.out.println("路由失败描述 : " + returned.getReplyText());
- System.out.println("消息 : " + returned.getMessage());
- }
-
- /**
- * 当交换器不能到达时,具体的处理方案。
- * @param correlationData 消息唯一标记
- * @param ack 是否确认
- * @param cause 不能到达交换器(即ack为false)的具体原因
- */
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- System.out.println("消息唯一标记 : " + correlationData);
- System.out.println("是否确认到达交换器 : " + ack);
- System.out.println("不能到达交换器的原因 : " + cause);
- }
- }
在Spring AMQP中,消费者(Consumer)默认的ACK机制是自动确认,即消费代码正常执行结束,立刻确认消息已消费;消费代码发送异常,相当于消息未消费。如果希望关闭自动ACK机制,可使用两种处理方案实现。
(1)重试消费
可以在消费者中基于配置开启重试机制,并设置重试消费次数。当消费消息发生错误,导致未确认(NACK)时,消费者尝试重复消费消息;当重复消费次数到达设置阈值后,强制确认(ACK),RabbitMQ会移除队列中的消息。
修改配置文件
spring:
rabbitmq:
host: 192.168.91.128
port: 5672
username: bjsxt
password: bjsxt
listener:
simple:
retry:
enabled: true # 开启重试机制
max-attempts: 1 # 重试消费1次
(2)手工确认ACK
1.修改配置文件
spring:
rabbitmq:
host: 192.168.91.128
port: 5672
username: bjsxt
password: bjsxt
listener:
simple:
acknowledge-mode: manual # 手工确认。 默认AUTO,自动确认
2.正常消费
- @Component
- public class StringMessageConsumer {
- /**
- * 消费方法。实现手工ACK确认
- * @param messageBody 消息内容
- * @param channel 信道对象,当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,
- * RabbitMQ 会用 basic.deliver 方法向消费者推送消息。
- * @param deliveryTag RabbitMQ向该Channel投递的这条消息的唯一标识 ID,
- * 是一个递增的正整数,delivery tag 的范围仅限于 Channel
- */
- @RabbitListener(queues = {"queue.first"})
- public void onMessage(String messageBody, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag){
- System.out.println("第一个消息消费者监听,处理消息:" + messageBody);
- try {
- /*
- * 确认消息
- * 参数1 - 消息的唯一标识
- * 参数2 - 是否批量提交。为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,
- * 则可以一次性确认 deliveryTag 小于等于传入值的所有消息
- */
- channel.basicAck(deliveryTag, false);
- System.out.println("消息已确认");
- }catch (IOException e){
- e.printStackTrace();
- }
- }
- }
3.错误消费
- @Component
- public class StringMessageConsumer {
- /**
- * 消费方法。实现手工NACK确认
- * @param messageBody 消息内容
- * @param channel 信道对象,当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,
- * RabbitMQ 会用 basic.deliver 方法向消费者推送消息。
- * @param deliveryTag RabbitMQ向该Channel投递的这条消息的唯一标识 ID,
- * 是一个递增的正整数,delivery tag 的范围仅限于 Channel
- */
- @RabbitListener(queues = {"queue.first"})
- public void onMessage(String messageBody, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag){
- System.out.println("第一个消息消费者监听,处理消息:" + messageBody);
- try {
- /*
- * 参数1 - 消息的唯一标识
- * 参数2 - 是否批量提交
- * 参数3 - 是否重新发出消息。false则废弃此消息。
- */
- channel.basicNack(deliveryTag, false, true);
-
- System.out.println("消息未确认-重复消费-deliveryTag=" + deliveryTag);
- }catch (IOException e){
- e.printStackTrace();
- }
- }
- }
4.Reject确认(丢弃)
- @Component
- public class StringMessageConsumer {
- /**
- * 消费方法。实现手工ACK确认
- * @param messageBody 消息内容
- * @param channel 信道对象,当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,
- * RabbitMQ 会用 basic.deliver 方法向消费者推送消息。
- * @param deliveryTag RabbitMQ向该Channel投递的这条消息的唯一标识 ID,
- * 是一个递增的正整数,delivery tag 的范围仅限于 Channel
- */
- @RabbitListener(queues = {"queue.first"})
- public void onMessage(String messageBody, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag){
- System.out.println("第一个消息消费者监听,处理消息:" + messageBody);
- try {
- /*
- * 参数1 - 消息的唯一标识
- * 参数3 - 是否重新发出消息。false则废弃此消息。
- */
- channel.basicReject(deliveryTag, false);
-
- System.out.println("消息未确认-重复消费-deliveryTag=" + deliveryTag);
- }catch (IOException e){
- e.printStackTrace();
- }
- }
- }
(1)修改配置文件
spring:
rabbitmq:
host: 192.168.91.128 # RabbitMQ服务器的IP。默认localhost
port: 5672 # RabbitMQ服务器的端口。
username: bjsxt # RabbitMQ的访问用户名。默认guest。
password: bjsxt # RabbitMQ的访问密码。默认guest
virtual-host: / # 连接RabbitMQ中的哪一个虚拟主机。默认 /
template:
reply-timeout: 100000 # 配置同步消息超时时长,单位毫秒
(2)消费者
- /**
- * 同步消息消费者。如:抢红包。
- */
- @Component
- public class SyncMessageConsumer {
- /**
- * 同步消息消费方法。和异步消息消费方法的唯一区别就是有返回值。类型不限。
- * @param message
- * @return
- */
- @RabbitListener(bindings = {
- @QueueBinding(
- value = @Queue(name = "queue.sync", autoDelete = "false"),
- exchange = @Exchange(name = "topic.sync", type = "topic"),
- key = {"routing.key"}
- )
- })
- public String onMessage(String message){
- System.out.println("处理同步消息:" + message);
- return "消息已处理";
- }
- }
(3)发送者
- /**
- * 转换,处理,发送消息,并等待接收消费者反馈。方法返回值就是消费者端返回的确认消息,即消费方法返回结果
- * Object convertSendAndReceive(String exchange, String routingKey, Object messageBody)
- * 如果消费者超时未返回,代码自动结束,并向下继续运行。
- * 消费者反馈结果使用null填充。
- */
- @Test
- public void testSyncMessage(){
- String message = "测试同步消息";
- Object result = rabbitOperations.convertSendAndReceive(
- "topic.sync",
- "routing.key",
- message
- );
- if(result == null){
- System.out.println("超时未返回");
- }else {
- System.out.println(result.getClass().getName());
- System.out.println(result);
- }
- }