MQ,它是一种应用间的通信方式(中间件),消息发送后可以立即返回,有消息系统来确保信息的可靠传递,消息发布者和使用者都不需要知道对方的存在,发布者只管发布,而使用者只管取消息进行使用。 
1.2.1 场景一: 异步处理

1.2.2 场景二: 应用解耦

1.2.3 场景三: 流量削峰

RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。
消息代理(message broker)和目的地(destination):当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。
两种形式的目的地:
消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列。
消息只有唯一的发送者和接受者,但并不是说只能有一个接收者,队列可以允许很多人同时监听消息。但该模式将消息放到队列后,最终只会交给一个对象,谁先抢到就是谁的。
JMS(Java Message Service)JAVA消息服务:基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现。AMQP(Advanced Message Queuing Protocol):高级消息队列协议,也是一个消息代理的规范,兼容JMS,RabbitMQ是AMQP的实现。| JMS(Java Message Service) | AMQP(Advanced Message Queuing Protocol) | |
|---|---|---|
| 定义 | Java api | 网络线级协议 |
| 跨语言 | 否 | 是 |
| 跨平台 | 否 | 是 |
| Model | 提供两种消息模型: (1)、Peer-2-Peer (2)、Pub/sub | 提供了五种消息模型: (1)、direct exchange (2)、fanout exchange (3)、topic change (4)、headers exchange (5)、system exchange 本质来讲,后四种和JMS的pub/sub模型没有太大差别, 仅是在路由机制上做了更详细的划分; |
| 支持消息类 | 多种消息类型: TextMessage MapMessage BytesMessage StreamMessage ObjectMessage Message (只有消息头和属性) | byte[] 当实际应用时,有复杂的消息,可以将消息序列化后发 送。 |
| 综合评价 | JMS 定义了JAVA API层面的标准;在java体系中, 多个client均可以通过JMS进行交互,不需要应用修 改代码,但是其对跨平台的支持较差; | AMQP定义了wire-level层的协议标准;天然具有跨平 台、跨语言特性。 |

RabbitMQ 建立一条连接(Connection),且一个客户端只会建立一条连接。RabbitMQ就会实时的感知有消费者下线,消息没办法派发,它就会再次存储到队列(Queue)中,不会造成大面积的消息丢失。AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。# 安装
$ docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
# 设置自启动
$ docker update rabbitmq --restart=always
端口映射说明:
Erlang发现&集群端口)。AMQP端口)。web管理后台端口)。STOMP协议端口)。MQTT协议端口)。访问管理后台(可视化操作界面):
# 浏览器访问
ip=http://yourIp:15672/
# 用户名密码
Username=admin
Password=admin
Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:
AMQP 消息的 header 而不是路由键,headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类。JMS中说的点对点通信方式实现,而fanout、topic则是发布订阅的一些实现。

# 表示匹配 0 个或多个单词,* 表示匹配一个单词。#”作为绑定键时,它将接收所有消息,而不管路由键如何,类似于fanout型交换器。*”、”#”没有用到绑定时,topic型交换器就好比direct型交换器了。# 为例,所有 usa 开头的路由键会进入这个队列。#.news ,则是所有以 news 结尾的路由键会进入这个队列。
背景说明:
MQ 的宕机、资源耗尽等情况),以及无论是发消息的发布者、还是收消息的消费者,它们的卡顿、宕机等各种问题,都会导致消息的丢失。(比如发送者发消息的时候,给弄丢了 ,看起来消息是发出去了,MQ网络抖动没接到, 或者MQ接到了,但是它消费消息的时候,因为网络抖动又没拿到,等等各种问题…)MQ,还是收消息,MQ的消息可靠抵达到我们的消费端,我们一定要保证消息可靠抵达,包括如果出现错误,我们也应该知道哪些消息丢失了。MQ 建立一个连接,会在通道里面发消息,可以将通道设置成事务模式,这样发消息,只有整个消息发送过去,MQ消费成功给我们有完全的响应以后,我们才算消息成功,但是使用事务消息,会使性能下降的很严重。
MQ 服务器,服务器收到以后,消息该怎么存怎么存,该投给哪投给哪,所以 Broker 首先会将消息交给 Exhchange,再有 Exchange 送达给 Queue,所以整个发送消息的过程,牵扯到两个:
confirmCallback,就是P端给B端 发送消息的过程,Broker 一旦收到了消息,就会回调我们的方法 confirmCallback,这是第一个回调时机,这个时机就可以知道哪些消息到达服务器了。Exchange 交换机,最终投递给 Queue,但是投递给队列这个过程可能也会失败,比如我们指定的路由键有问题,或者我们队列正在使用的过程中,被其它的一些客户端删除等操作,可能都会投递失败,投递失败就会调用 returnCallback。ack 机制(消息确认机制),这个机制能保证, 让 Broker 知道哪些消息都被消费者正确的拿到了,如果消费者正确接到,这个消息就要从队列里面删除,如果没有正确接到,可能就需要重新投递消息。5.3.1 发送端-确认回调:ConfimCallback
# 确认消息已发送到交换机
spring.rabbitmq.publisher-confirm-type=correlated
5.3.2 发送端-退回回调:ReturnCallback
# 开启发送端消息抵达队列的确认,此处必须设置为true,否则消息消息路由失败也无法触发Return回调
spring.rabbitmq.publisher-returns=true
# 只要消息抵达了队列,以异步发送优先回调这个ReturnCallback
spring.rabbitmq.template.mandatory=true
5.3.3 消费端-Ack消息确认机制:
保证每个消息被正确消费,此时Broker才可以删除这个消息,消费端默认是自动确认的,只要消息接收到,客户端会自动确认,服务端就会移除这个消息。
存在问题:消费端收到很多消息,自动回复给服务器ack,只有一个消息处理成功,消费端突然宕机了,结果MQ中剩下的消息全部丢失。
解决方式:消费端如果无法确定此消息是否被处理完成,可以手动确认消息,即处理一个确认一个,未确认的消息不会被删除。
手动确认模式:只要我们没有明确告诉MQ收到消息。没有 Ack,消息就一直是 Unacked 状态,即使 Consumer 宕机,消息也不会丢失,会重新变为 Ready,等待下次有新的 Consumer 连接进来时,再发给新的 Consumer。消费者获取到消息,成功处理,可以回复 Ack 给 Broker。
ack() 用于肯定确认,Broker 将移除此消息。nack() 用于否定确认,可以指定 Broker 是否丢弃此消息,可以批量。reject() 用于否定确认,同上,但不能批量。6.1 依赖引入:
<!--RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
6.2 添加配置:
# 默认服务端口号为5672,而15672为可视化操作界面端口
spring.rabbitmq.host=yourIp
spring.rabbitmq.port=5672
# 配置虚拟机
spring.rabbitmq.virtual-host=/
6.3 启动类开启注解:
@EnableRabbit
6.4 基本功能测试:
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {
/**
* amqp管理组件
*/
@Autowired
private AmqpAdmin amqpAdmin;
/**
* 消息发送处理组件
*/
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 1. 创建交换机
*/
@Test
public void createExchange() {
Exchange directExchange = new DirectExchange("jan-exchange", true, false);
amqpAdmin.declareExchange(directExchange);
log.info("Exchange[{}]创建成功:", "jan-exchange");
}
/**
* 2. 创建队列
*/
@Test
public void createQueue() {
Queue queue = new Queue("jan-queue", true, false, false);
amqpAdmin.declareQueue(queue);
log.info("Queue[{}]创建成功:", "jan-queue");
}
/**
* 3. 交换机与队列绑定
*/
@Test
public void createBinding() {
Binding binding = new Binding("jan-queue",
Binding.DestinationType.QUEUE,
"jan-exchange",
"hello.jan",
null);
amqpAdmin.declareBinding(binding);
log.info("Binding[{}]创建成功:", "jan-binding");
}
/**
* 4. 发送消息测试
* 如果发送的消息是个对象,会使用序列化机制,将对象写出去,对象必须实现Serializable接口
* 发送的对象类型的消息,可以是一个json
*/
@Test
public void sendMessageTest() {
OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
reasonEntity.setId(1L);
reasonEntity.setCreateTime(new Date());
reasonEntity.setName("jan");
reasonEntity.setStatus(1);
reasonEntity.setSort(2);
rabbitTemplate.convertAndSend("jan-exchange", "hello.jan",
reasonEntity, new CorrelationData(UUID.randomUUID().toString()));
log.info("消息发送完成:{}", reasonEntity);
}
/**
* 5. 创建延时队列
*/
@Test
public void createDelayQueue() {
HashMap<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "delay-exchange");
arguments.put("x-dead-letter-routing-key", "delay.key");
// 消息过期时间 1分钟(单位毫秒)
arguments.put("x-message-ttl", 60000);
Queue queue = new Queue("delay-queue", true, false, false, arguments);
amqpAdmin.declareQueue(queue);
log.info("Queue[{}]创建成功:", "delay-queue");
}
}
解锁库存场景:当订单操作失败(超时未支付 / 被取消等)后,我们需要将库存进行解锁并回滚。


MQ 的延时队列(RabbitMQ的消息TTL与死信Exchange结合)。处理思路:
解决问题:
MQ 服务器。7.4.1 TTL(Time To Live):
消息的TTL就是消息的存活时间。
RabbitMQ可以对队列和消息分别设置TTL。
对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。
如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者x- message-ttl属性来设置时间,两者是一样的效果。
7.4.2 死信路由(Dead Letter Exchanges(DLX)):
一个消息如果满足以下条件,那它就会成为死信:
Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。 ( basic.reject/ basic.nack ) requeue=false。TTL到了,消息过期了。Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
手动ack与异常消息统一放在一个队列处理建议的两种方式:
channel给RabbitMQ确认消息已消费。Queue绑定延时队列,使用nack(requque=false)确认消息消费失败。总结:我们既可以控制消息在一段时间后变成死信(TTL),又可以控制变成死信的消息被路由到某一个指定的交换机(死信路由),二者结合就可以实现一个延时队列。
7.5.1 设置队列过期时间:

x-message-ttl:消息的存活时间,它以毫秒为单位,相当于300秒,也就是5分钟以后消息过期 。x-dead-letter-exchange:死信交换机,就是死信路由,意思就是告诉服务器当前这个队列里边的消息死了,别乱丢,需要转给隔壁的死信路由 。x-dead-letter-routing-key:将消息转给死信队列的路由键。7.5.2 设置消息过期时间:

7.5.3 两种方式推荐:
Rabbit MQ 采用的是惰性检查机制,也就是懒检查:服务器来拿第一条消息,发现5分钟后过期,于是就放回了该消息,5分钟后将该消息路由到死信队列,最终被消费者拿到。此时又先后来拿后面两条过期设置为1分钟、1秒钟的消息发现都早已经过期,于是就将后面两条消息加入死信队列,但是后两条消息处理已经是五分钟以后了。8.1.1 场景一:消息发送出去,由于网络问题没有抵达服务器
try-catch),发送消息可能会网络失败,失败后要有重试机制。可记录到数据库,如果消息没有发送成功,定期去数据库扫描未成功的消息进行定期重发。CREATE TABLE `mq_message` (
`message_id` char(32) NOT NULL,
`content` text,
`to_exchane` varchar(255) DEFAULT NULL,
`routing_key` varchar(255) DEFAULT NULL,
`class_type` varchar(255) DEFAULT NULL,
`message_status` int(1) DEFAULT '0' COMMENT '0-新建 1-已发送 2-错误抵达 3-已抵达',
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
8.1.2 场景二:消息抵达Broker之后,Broker要将消息写入磁盘(持久化)时宕机
publisher必须加入确认回调机制,确认成功的消息,修改数据库消息状态。8.1.3 场景三:自动ACK的状态下。消费者收到消息,但没来得及消息然后宕机
ACK,消费成功再移除,失败或者没来得及处理就noAck并重新入队。ack时,机器宕机。导致没有ack成功,Broker的消息重新由unack变为ready,并发送给其他消费。ack时宕机,消息由unack变为ready,Broker又重新发送。Redis/Mysql),发送消息每一个都有业务的唯一标识,处理过就不用处理。RabbitMQ的每一个消息都有redelivered字段,可以获取是否是被重新投递过来的,而不是第一次投递过来的。
“-------怕什么真理无穷,进一寸有一寸的欢喜。”
微信公众号搜索:饺子泡牛奶。