消息发送方希望杜绝任何消息丢失或者投递失败,RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式
- confirm 确认模式
- return 退回模式
消息从 producer 到 exchange有问题了
则会返回一个 confirmCallback (确认模式)。
消息从 exchange–>queue 投递失败
则会返回一个 returnCallback(退回模式)
利用这两个 callback 控制消息的可靠性投递
测试confirm 确认模式(失败和成功都会触发)
开启支持不支持
publisher-confirms="true" 生产者支持确认模式
publisher-returns="true" 生产者支持退回模式
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
-
-
- <rabbit:connection-factory id="connectionFactory" host="192.168.2.108"
- port="5672"
- username="admin"
- password="123456"
- virtual-host="MyVirtualHost"
- publisher-confirms="true"
- publisher-returns="true"
- />
-
-
- <rabbit:admin connection-factory="connectionFactory"/>
-
-
-
- <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
-
-
-
- <rabbit:queue id="test_confirm_queueue" name="test_confirm_queue" durable="true" auto-delete="false" auto-declare="true" exclusive="false">rabbit:queue>
- <rabbit:direct-exchange name="test_confirm_ex" durable="true" auto-declare="true" auto-delete="false">
- <rabbit:bindings>
- <rabbit:binding queue="test_confirm_queueue" key="confirm">rabbit:binding>
- rabbit:bindings>
- rabbit:direct-exchange>
-
- beans>
经我测试:交换机和队列绑定标签的queue属性的值,是bean id,不是队列名
测试方法::
- 使用 rabbitTemplate.setConfirmCallback 设置回调函数。当消息发送到 exchange 后回调 confirm 方法。在方法中判断 ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。
@Test public void t1() { //todo 测试确认模式 //定义回调 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack){ //接收成功 System.out.println("消息成功达到交换机" + cause); }else { //接收失败 System.out.println("接收失败消息" + cause); //做一些处理,让消息再次发送。 } } }); String message = "确认模式:test_confirm"; //发送消息 rabbitTemplate.convertAndSend("test_confirm_ex", "confirm", message); rabbitTemplate.convertAndSend("test_confirm_ex000", "confirm", message); }确认模式,不管成功失败都会触发,控制台打印输出。提示还是很不错的,上面的第二个交换机明显是不存在的
-----------------
测试return 退回模式(失败触发、成功不会触发)
退回投递失败的消息
- 使用 rabbitTemplate.setReturnCallback 设置退回函数,当消息从exchange 路由到 queue 失败后,如果设置了 rabbitTemplate.setMandatory(true) 参数,则会将消息退回给 producer并执行回调函数returnedMessage
@Test public void t2() { //todo 测试退回模式 //手动开启退回模式,让当前方法支持 rabbitTemplate.setMandatory(true); //2.设置ReturnCallBack 退回投递失败的消息 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * @param message 消息对象 * @param replyCode 错误码 * @param replyText 错误信息 * @param exchange 交换机名称 * @param routingKey 路由键 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("return 执行了...."); System.out.println("消息对象"+message); System.out.println(replyCode); System.out.println("错误信息"+replyText); System.out.println("交换机"+exchange); System.out.println("路由key"+routingKey); } }); //3. 发送消息 rabbitTemplate.convertAndSend("test_confirm_ex", "confirm", "success:大河之剑天上来"); rabbitTemplate.convertAndSend("test_confirm_ex", "confirm00", "fail:奔流到海不复回"); }执行结果:退回模式,失败才触发,成功不会触发
Consumer Ack 消息的自动签收和手动签收
ack 表示消费端收到消息后的确认方式,有2种
自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是很可能会引起消息丢失
手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
手动签收具体实现
1、消费端的配置文件添加属性 acknowledge="manual"
2、监听器实现手动签收
最大的区别在于,实现的接口不同了。这个重写的onMessage方法里多了一个信道参数
没问题,调用channel.basicAck(),手动确认签收,
出现异常,则调用channel.basicNack()方法,拒绝签收
- @Component
- public class AckListener implements ChannelAwareMessageListener {
- @Override
- public void onMessage(Message message, Channel channel) throws Exception {
- Thread.sleep(1000);
- // 获取消息传递标记
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- try {
- // 接收消息
- System.out.println("接收消息"+new String(message.getBody()));
- // 处理业务逻辑
- System.out.println("处理业务逻辑");
- int i = 3/0;//出现错误
- /**
- * 第一个参数:被消费的消息的唯一标识
- * 第二个参数:如果为true表示可以签收所有的消息
- */
- //手动签收
- channel.basicAck(deliveryTag,true);
- } catch (Exception e) {
- e.printStackTrace();
- /*
- 第三个参数:requeue:被拒收的消息返回原队列。
- 设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
- */
- // 拒绝签收
- channel.basicNack(deliveryTag,true,true);
- }
- }
- }
在监听器中我们设置了一个“”除0“”异常,则会拒绝签收, 第三个参数设置为true,则会把消息返还给原队列。
测试:
在rabbitmq的管理控制台删除队列,运行test方法
@Test public void t1() { //发送消息 rabbitTemplate.convertAndSend("test_confirm_ex", "confirm", "测试手动签收"); }运行消费端:
会执行监听器,并发生异常
因为在发生异常时设置了拒绝签收,所以该消息返回给了原队列
确保消息被确认。不确认是不继续处理其他消息的
prefetch = 1,表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉取下一条消息。
测试:
@Test public void t3() { //todo 消费端限流测试 for (int i = 1; i < 11; i++) { rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","梦琪梦琪~~~第"+i+"条"); } }管理控制台,
启动消费端
一瞬间就给我跳了这么多,服了,线程睡眠时间多设置点应该会更明显
RabbitMQ可以对消息设置过期时间,当消息到达过期时间后,还没有被消费,会被自动清除。
对整个队列(Queue)设置过期时间。
即设置队列中所有的消息的过期时间
-
- <rabbit:queue id="test_queue_ttl" name="test_queue_ttl">
-
- <rabbit:queue-arguments>
-
- <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer">entry>
- rabbit:queue-arguments>
- rabbit:queue>
-
- <rabbit:topic-exchange name="test_exchange_ttl">
-
- <rabbit:bindings>
- <rabbit:binding pattern="ttl.#" queue="test_queue_ttl">rabbit:binding>
- rabbit:bindings>
- rabbit:topic-exchange>
测试队列过期时间,在测试类里运行
// todo 测试消息的存活时间-设置队列的过期时间 for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.gg","message ttl"); }10s后
消息后置处理器-测试消息过期时间
如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。
测试方法里,测试消息的单独过期时间
// todo 测试消息的存活时间-单独设置消息的过期时间 // 消息后置处理对象,发送消息到达队列之前,进行拦截 MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //1.设置message的消息的过期时间 ,5秒之后过期 message.getMessageProperties().setExpiration("5000"); //2.返回该消息 加工完后放行 return message; } }; //消息单独过期 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。 rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.gg","message ttl....",messagePostProcessor);一会消息就过期了,自动从队列里清除
当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
死信队列
某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信,自然就有了死信队列;
消息成为死信的三种情况:
死信的处理方式:
-------------------------
让正常队列绑定死信交换机
死信队列(queue_dlx)和死信交换机(exchange_dlx)绑定
给队列设置参数:
x-dead-letter-exchange 死信交换机名称
x-dead-letter-routing-key 发送给死信交换机的routingkey
测试死信队列,需要:
1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
-
- <rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
-
- <rabbit:queue-arguments>
-
- <entry key="x-dead-letter-exchange" value="exchange_dlx"/>
-
- <entry key="x-dead-letter-routing-key" value="dlx.hehe">entry>
-
- <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
-
- <entry key="x-max-length" value="10" value-type="java.lang.Integer"/>
- rabbit:queue-arguments>
- rabbit:queue>
-
- <rabbit:topic-exchange name="test_exchange_dlx">
- <rabbit:bindings>
- <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx">rabbit:binding>
- rabbit:bindings>
- rabbit:topic-exchange>
-
- <rabbit:queue name="queue_dlx" id="queue_dlx">rabbit:queue>
- <rabbit:topic-exchange name="exchange_dlx">
- <rabbit:bindings>
- <rabbit:binding pattern="dlx.#" queue="queue_dlx">rabbit:binding>
- rabbit:bindings>
- rabbit:topic-exchange>
死信队列- 测试过期时间
test_exchange_dlx是正常的交换机,不是死信交换机
//todo 死信队列- 测试过期时间 //1. 测试过期时间,死信消息 rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");会现在正常队列中存活10s
10s后就存到死信队列里了
死信队列- 测试长度限制
//todo 死信队列- 测试长度限制 for (int i = 0; i < 20; i++) { //test_exchange_dlx是正常的交换机,不是死信交换机 rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.haha", "我是一条消息,我会死吗?"); }根据先进先出,前10条会自动进入死信队列
10s过后,正常队列里的10条消息过了存活时间,也到了死信队列
死信队列- 测试消息拒收
在消费端的监听器,需要把最后一个参数改为false,表示不把拒绝签收的信息放回原队列
// 拒绝签收 channel.basicNack(deliveryTag,true,false);测试:先开启消费端,然后在运行生产端
//todo 死信队列- 测试消息拒收 //test_exchange_dlx是正常的交换机,不是死信交换机 rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");消费端发生异常,拒绝签收
把拒绝签收的消息存储到了死信队列中
死信队列的客户端不写
延迟队列存储的是延时消息。
延时消息
所谓”延时消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。
应用场景
下单后,30分钟未支付,取消订单,回滚库存。
修改一番,结果如下:
- <rabbit:queue name="order_queue" id="order_queue">
-
- <rabbit:queue-arguments>
-
- <entry key="x-dead-letter-exchange" value="order_exchange_dlx"/>
-
- <entry key="x-dead-letter-routing-key" value="dlx.order.cancel">entry>
-
- <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
- rabbit:queue-arguments>
- rabbit:queue>
-
- <rabbit:topic-exchange name="order_exchange">
- <rabbit:bindings>
- <rabbit:binding pattern="order.#" queue="order_queue">rabbit:binding>
- rabbit:bindings>
- rabbit:topic-exchange>
-
- <rabbit:queue name="order_queue_dlx" id="order_queue_dlx">rabbit:queue>
- <rabbit:topic-exchange name="order_exchange_dlx">
- <rabbit:bindings>
- <rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx">rabbit:binding>
- rabbit:bindings>
- rabbit:topic-exchange>
在消费端一定要配置,延迟队列消费端监听的一定是死信队列
<rabbit:listener ref="orderListener" queue-names="order_queue_dlx">rabbit:listener>延迟队列对应监听器OrderListener
@Component public class OrderListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { //1.接收转换消息 System.out.println(new String(message.getBody())); //2. 处理业务逻辑 System.out.println("处理业务逻辑..."); System.out.println("根据订单id查询其状态..."); System.out.println("判断状态是否为支付成功"); System.out.println("取消订单,回滚库存...."); //3. 手动签收 channel.basicAck(deliveryTag,true); } catch (Exception e) { //e.printStackTrace(); System.out.println("出现异常,拒绝接受"); //4.拒绝签收,不重回队列 requeue=false channel.basicNack(deliveryTag,true,false); } } }
延迟队列-测试:
先开启消费端,再运行生产端运行测试
@Test public void t9() throws InterruptedException { //1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息 rabbitTemplate.convertAndSend("order_exchange", "order.msg","订单信息:id=1,time=2022年07月19日18:41:47"); //2.打印倒计时10秒 for (int i = 10; i > 0 ; i--) { System.out.println(i+"..."); Thread.sleep(1000); } }发送消息后,跟我倒数10秒,10秒之后在“延迟队列”中的消息才能被消费端处理
0投递中,1投递成功,2投递失败
在消息确认这个过程中可能由于网络闪断、MQ Broker端异常等原因导致 回送消息失败或者异常。这个时候就需要发送方(生产者)对消息进行可靠性投递了,保障消息不丢失,100%的投递成功!(有一种极限情况是闪断,Broker返回的成功确认消息,但是生产端由于网络闪断没收到,这个时候重新投递可能会造成消息重复,需要消费端去做幂等处理)所以我们需要有一个定时任务,(比如每5分钟拉取一下处于中间状态的消息,当然这个消息可以设置一个超时时间,比如超过1分钟 Status = 0 ,也就说明了1分钟这个时间窗口内,我们的消息没有被确认,那么会被定时任务拉取
设置最大努力尝试次数,比如投递了3次,还是失败,那么我们可以将最终状态设置为Status = 2 ,最后 交由人工解决处理此类问题(或者把消息转储到失败表中)出来)
1、防止单节点故障
2、扩容(节点越多,容量越多)
3、提供集群高并发的吞吐量
配置多机集群 Clustering Guide — RabbitMQ
代理服务器,也可以称为网关
RabbitMQ天然支持Clustering
单机多实例部署(模拟集群)
---------------
搭建集群,拍快照
/usr/lib/rabbitmq/bin
rabbit2 join_cluster rabbit1 认大哥
通过本地访问
localhost:15672
------------
镜像集群
同步镜像
Admin->Policies
加虚拟主机myhost,配置为镜像
Name:策略名称
Pattern:匹配的规则,如果是匹配所有的队列,是^.
Definition:使用ha-mode模式中的all,也就是同步所有匹配的队列。
问号链接帮助文档。