• 22-07-19 西安 RabbitMQ(03) 消息可靠投递、消费端限流、死信队列、延迟队列、集群搭建


    消息的可靠投递

    消息发送方希望杜绝任何消息丢失或者投递失败,RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式

    • confirm 确认模式
    • return 退回模式

    消息从 producer 到 exchange有问题了
    则会返回一个 confirmCallback (确认模式)。

    消息从 exchange–>queue 投递失败
    则会返回一个 returnCallback(退回模式

    利用这两个 callback 控制消息的可靠性投递

    测试confirm 确认模式(失败和成功都会触发)

    开启支持不支持
     publisher-confirms="true"  生产者支持确认模式
     publisher-returns="true"  生产者支持退回模式

    1. <beans xmlns="http://www.springframework.org/schema/beans"
    2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    3. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    4. <rabbit:connection-factory id="connectionFactory" host="192.168.2.108"
    5. port="5672"
    6. username="admin"
    7. password="123456"
    8. virtual-host="MyVirtualHost"
    9. publisher-confirms="true"
    10. publisher-returns="true"
    11. />
    12. <rabbit:admin connection-factory="connectionFactory"/>
    13. <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
    14. <rabbit:queue id="test_confirm_queueue" name="test_confirm_queue" durable="true" auto-delete="false" auto-declare="true" exclusive="false">rabbit:queue>
    15. <rabbit:direct-exchange name="test_confirm_ex" durable="true" auto-declare="true" auto-delete="false">
    16. <rabbit:bindings>
    17. <rabbit:binding queue="test_confirm_queueue" key="confirm">rabbit:binding>
    18. rabbit:bindings>
    19. rabbit:direct-exchange>
    20. beans>

    经我测试:交换机和队列绑定标签的queue属性的值,是bean id,不是队列名

    测试方法::

    • 使用 rabbitTemplate.setConfirmCallback 设置回调函数。当消息发送到 exchange 后回调 confirm 方法。在方法中判断 ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。
    1. @Test
    2. public void t1() {
    3. //todo 测试确认模式
    4. //定义回调
    5. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    6. @Override
    7. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    8. if (ack){
    9. //接收成功
    10. System.out.println("消息成功达到交换机" + cause);
    11. }else {
    12. //接收失败
    13. System.out.println("接收失败消息" + cause);
    14. //做一些处理,让消息再次发送。
    15. }
    16. }
    17. });
    18. String message = "确认模式:test_confirm";
    19. //发送消息
    20. rabbitTemplate.convertAndSend("test_confirm_ex", "confirm", message);
    21. rabbitTemplate.convertAndSend("test_confirm_ex000", "confirm", message);
    22. }

    确认模式,不管成功失败都会触发,控制台打印输出。提示还是很不错的,上面的第二个交换机明显是不存在的

    -----------------
    测试return 退回模式(失败触发、成功不会触发)
    退回投递失败的消息

    • 使用 rabbitTemplate.setReturnCallback 设置退回函数,当消息从exchange 路由到 queue 失败后,如果设置了 rabbitTemplate.setMandatory(true) 参数,则会将消息退回给 producer并执行回调函数returnedMessage

    1. @Test
    2. public void t2() {
    3. //todo 测试退回模式
    4. //手动开启退回模式,让当前方法支持
    5. rabbitTemplate.setMandatory(true);
    6. //2.设置ReturnCallBack 退回投递失败的消息
    7. rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
    8. /**
    9. * @param message 消息对象
    10. * @param replyCode 错误码
    11. * @param replyText 错误信息
    12. * @param exchange 交换机名称
    13. * @param routingKey 路由键
    14. */
    15. @Override
    16. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    17. System.out.println("return 执行了....");
    18. System.out.println("消息对象"+message);
    19. System.out.println(replyCode);
    20. System.out.println("错误信息"+replyText);
    21. System.out.println("交换机"+exchange);
    22. System.out.println("路由key"+routingKey);
    23. }
    24. });
    25. //3. 发送消息
    26. rabbitTemplate.convertAndSend("test_confirm_ex", "confirm", "success:大河之剑天上来");
    27. rabbitTemplate.convertAndSend("test_confirm_ex", "confirm00", "fail:奔流到海不复回");
    28. }

    执行结果:退回模式,失败才触发,成功不会触发


    Consumer Ack 消息的自动签收和手动签收

    ack 表示消费端收到消息后的确认方式,有2种

    1. 自动确认:acknowledge=“none” 默认
    2. 手动确认:acknowledge=“manual”

    自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是很可能会引起消息丢失

    手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

    手动签收具体实现

    1、消费端的配置文件添加属性 acknowledge="manual" 

     2、监听器实现手动签收

    最大的区别在于,实现的接口不同了。这个重写的onMessage方法里多了一个信道参数

    没问题,调用channel.basicAck(),手动确认签收,

    出现异常,则调用channel.basicNack()方法,拒绝签收

    1. @Component
    2. public class AckListener implements ChannelAwareMessageListener {
    3. @Override
    4. public void onMessage(Message message, Channel channel) throws Exception {
    5. Thread.sleep(1000);
    6. // 获取消息传递标记
    7. long deliveryTag = message.getMessageProperties().getDeliveryTag();
    8. try {
    9. // 接收消息
    10. System.out.println("接收消息"+new String(message.getBody()));
    11. // 处理业务逻辑
    12. System.out.println("处理业务逻辑");
    13. int i = 3/0;//出现错误
    14. /**
    15. * 第一个参数:被消费的消息的唯一标识
    16. * 第二个参数:如果为true表示可以签收所有的消息
    17. */
    18. //手动签收
    19. channel.basicAck(deliveryTag,true);
    20. } catch (Exception e) {
    21. e.printStackTrace();
    22. /*
    23. 第三个参数:requeue:被拒收的消息返回原队列。
    24. 设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
    25. */
    26. // 拒绝签收
    27. channel.basicNack(deliveryTag,true,true);
    28. }
    29. }
    30. }

    在监听器中我们设置了一个“”除0“”异常,则会拒绝签收, 第三个参数设置为true,则会把消息返还给原队列。

    测试:
    在rabbitmq的管理控制台删除队列,运行test方法

    1. @Test
    2. public void t1() {
    3. //发送消息
    4. rabbitTemplate.convertAndSend("test_confirm_ex", "confirm", "测试手动签收");
    5. }

    运行消费端:

    会执行监听器,并发生异常

     因为在发生异常时设置了拒绝签收,所以该消息返回给了原队列 


    消费端限流

    确保消息被确认。不确认是不继续处理其他消息的

     prefetch = 1,表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉下一条消息。

    测试:

    1. @Test
    2. public void t3() {
    3. //todo 消费端限流测试
    4. for (int i = 1; i < 11; i++) {
    5. rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","梦琪梦琪~~~第"+i+"条");
    6. }
    7. }

     管理控制台,

     启动消费端

    一瞬间就给我跳了这么多,服了,线程睡眠时间多设置点应该会更明显


    TTL   Time To Live(存活时间/过期时间)

    RabbitMQ可以对消息设置过期时间,当消息到达过期时间后,还没有被消费,会被自动清除。

    对整个队列(Queue)设置过期时间。

    即设置队列中所有的消息的过期时间

    1. <rabbit:queue id="test_queue_ttl" name="test_queue_ttl">
    2. <rabbit:queue-arguments>
    3. <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer">entry>
    4. rabbit:queue-arguments>
    5. rabbit:queue>
    6. <rabbit:topic-exchange name="test_exchange_ttl">
    7. <rabbit:bindings>
    8. <rabbit:binding pattern="ttl.#" queue="test_queue_ttl">rabbit:binding>
    9. rabbit:bindings>
    10. rabbit:topic-exchange>

    测试队列过期时间,在测试类里运行

    1. // todo 测试消息的存活时间-设置队列的过期时间
    2. for (int i = 0; i < 10; i++) {
    3. rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.gg","message ttl");
    4. }

    10s后

    消息后置处理器-测试消息过期时间 

    如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。

    测试方法里,测试消息的单独过期时间

    1. // todo 测试消息的存活时间-单独设置消息的过期时间
    2. // 消息后置处理对象,发送消息到达队列之前,进行拦截
    3. MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
    4. @Override
    5. public Message postProcessMessage(Message message) throws AmqpException {
    6. //1.设置message的消息的过期时间 ,5秒之后过期
    7. message.getMessageProperties().setExpiration("5000");
    8. //2.返回该消息 加工完后放行
    9. return message;
    10. }
    11. };
    12. //消息单独过期 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。
    13. rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.gg","message ttl....",messagePostProcessor);

    一会消息就过期了,自动从队列里清除


    死信队列 和 死信交换机 DLX

    当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

    死信队列

    某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信,自然就有了死信队列;

    消息成为死信的三种情况:

    1. 队列消息数量到达限制,根据先进先出,最先发的消息会进入死信队列
    2. 消费者拒接消费消息(不返回原队列)
    3. 消息到达超时时间未被消费;

    死信的处理方式:

    1. 丢弃,如果不是很重要,可以选择丢弃(默认)
    2. 记录死信入库,然后做后续的业务分析或处理
    3. 通过死信队列,由负责监听死信的应用程序进行处理(我们用的)

    -------------------------

    让正常队列绑定死信交换机
    死信队列(queue_dlx)和死信交换机(exchange_dlx)绑定

    给队列设置参数:

      x-dead-letter-exchange  死信交换机名称

      x-dead-letter-routing-key 发送给死信交换机的routingkey

    测试死信队列,需要:

    1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)

    2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)

    1. <rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
    2. <rabbit:queue-arguments>
    3. <entry key="x-dead-letter-exchange" value="exchange_dlx"/>
    4. <entry key="x-dead-letter-routing-key" value="dlx.hehe">entry>
    5. <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
    6. <entry key="x-max-length" value="10" value-type="java.lang.Integer"/>
    7. rabbit:queue-arguments>
    8. rabbit:queue>
    9. <rabbit:topic-exchange name="test_exchange_dlx">
    10. <rabbit:bindings>
    11. <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx">rabbit:binding>
    12. rabbit:bindings>
    13. rabbit:topic-exchange>
    14. <rabbit:queue name="queue_dlx" id="queue_dlx">rabbit:queue>
    15. <rabbit:topic-exchange name="exchange_dlx">
    16. <rabbit:bindings>
    17. <rabbit:binding pattern="dlx.#" queue="queue_dlx">rabbit:binding>
    18. rabbit:bindings>
    19. rabbit:topic-exchange>

    死信队列- 测试过期时间

    test_exchange_dlx是正常的交换机,不是死信交换机

    1. //todo 死信队列- 测试过期时间
    2. //1. 测试过期时间,死信消息
    3. rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");

    会现在正常队列中存活10s

    10s后就存到死信队列里了

    死信队列- 测试长度限制

    1. //todo 死信队列- 测试长度限制
    2. for (int i = 0; i < 20; i++) {
    3. //test_exchange_dlx是正常的交换机,不是死信交换机
    4. rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.haha", "我是一条消息,我会死吗?");
    5. }

     根据先进先出,前10条会自动进入死信队列

    10s过后,正常队列里的10条消息过了存活时间,也到了死信队列

    死信队列- 测试消息拒收

     在消费端的监听器,需要把最后一个参数改为false,表示不把拒绝签收的信息放回原队列

    1. // 拒绝签收
    2. channel.basicNack(deliveryTag,true,false);

    测试:先开启消费端,然后在运行生产端

    1. //todo 死信队列- 测试消息拒收
    2. //test_exchange_dlx是正常的交换机,不是死信交换机
    3. rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");

    消费端发生异常,拒绝签收

    把拒绝签收的消息存储到了死信队列中

    死信队列的客户端不写


    延迟队列  底层 TTL+死信队列

    延迟队列存储的是延时消息。

    延时消息

    所谓”延时消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。

    应用场景

    下单后,30分钟未支付,取消订单,回滚库存。

    修改一番,结果如下:

    1. <rabbit:queue name="order_queue" id="order_queue">
    2. <rabbit:queue-arguments>
    3. <entry key="x-dead-letter-exchange" value="order_exchange_dlx"/>
    4. <entry key="x-dead-letter-routing-key" value="dlx.order.cancel">entry>
    5. <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
    6. rabbit:queue-arguments>
    7. rabbit:queue>
    8. <rabbit:topic-exchange name="order_exchange">
    9. <rabbit:bindings>
    10. <rabbit:binding pattern="order.#" queue="order_queue">rabbit:binding>
    11. rabbit:bindings>
    12. rabbit:topic-exchange>
    13. <rabbit:queue name="order_queue_dlx" id="order_queue_dlx">rabbit:queue>
    14. <rabbit:topic-exchange name="order_exchange_dlx">
    15. <rabbit:bindings>
    16. <rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx">rabbit:binding>
    17. rabbit:bindings>
    18. rabbit:topic-exchange>

    在消费端一定要配置,延迟队列消费端监听的一定是死信队列

    1. <rabbit:listener ref="orderListener" queue-names="order_queue_dlx">rabbit:listener>

    延迟队列对应监听器OrderListener

    1. @Component
    2. public class OrderListener implements ChannelAwareMessageListener {
    3. @Override
    4. public void onMessage(Message message, Channel channel) throws Exception {
    5. long deliveryTag = message.getMessageProperties().getDeliveryTag();
    6. try {
    7. //1.接收转换消息
    8. System.out.println(new String(message.getBody()));
    9. //2. 处理业务逻辑
    10. System.out.println("处理业务逻辑...");
    11. System.out.println("根据订单id查询其状态...");
    12. System.out.println("判断状态是否为支付成功");
    13. System.out.println("取消订单,回滚库存....");
    14. //3. 手动签收
    15. channel.basicAck(deliveryTag,true);
    16. } catch (Exception e) {
    17. //e.printStackTrace();
    18. System.out.println("出现异常,拒绝接受");
    19. //4.拒绝签收,不重回队列 requeue=false
    20. channel.basicNack(deliveryTag,true,false);
    21. }
    22. }
    23. }

    延迟队列-测试:

    先开启消费端,再运行生产端运行测试

    1. @Test
    2. public void t9() throws InterruptedException {
    3. //1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息
    4. rabbitTemplate.convertAndSend("order_exchange",
    5. "order.msg","订单信息:id=1,time=2022年07月19日18:41:47");
    6. //2.打印倒计时10秒
    7. for (int i = 10; i > 0 ; i--) {
    8. System.out.println(i+"...");
    9. Thread.sleep(1000);
    10. }
    11. }

    发送消息后,跟我倒数10秒,10秒之后在“延迟队列”中的消息才能被消费端处理

     


    消息百分百成功投递

    0投递中,1投递成功,2投递失败

    在消息确认这个过程中可能由于网络闪断、MQ Broker端异常等原因导致 回送消息失败或者异常。这个时候就需要发送方(生产者)对消息进行可靠性投递了,保障消息不丢失,100%的投递成功!(有一种极限情况是闪断,Broker返回的成功确认消息,但是生产端由于网络闪断没收到,这个时候重新投递可能会造成消息重复,需要消费端去做幂等处理)所以我们需要有一个定时任务,(比如每5分钟拉取一下处于中间状态的消息,当然这个消息可以设置一个超时时间,比如超过1分钟 Status = 0 ,也就说明了1分钟这个时间窗口内,我们的消息没有被确认,那么会被定时任务拉取

    设置最大努力尝试次数,比如投递了3次,还是失败,那么我们可以将最终状态设置为Status = 2 ,最后 交由人工解决处理此类问题(或者把消息转储到失败表中)出来)


    RabbitMQ集群搭建

    1、防止单节点故障
    2、扩容(节点越多,容量越多)
    3、提供集群高并发的吞吐量

    配置多机集群  Clustering Guide — RabbitMQ

     

     

    代理服务器,也可以称为网关

    RabbitMQ天然支持Clustering

    单机多实例部署(模拟集群)
    ---------------
    搭建集群,拍快照

    /usr/lib/rabbitmq/bin
    rabbit2 join_cluster rabbit1 认大哥

    通过本地访问
    localhost:15672

    ------------
    镜像集群
    同步镜像
    Admin->Policies

    加虚拟主机myhost,配置为镜像

    Name:策略名称
    Pattern:匹配的规则,如果是匹配所有的队列,是^.
    Definition:使用ha-mode模式中的all,也就是同步所有匹配的队列。
    问号链接帮助文档。


    HAProxy 高可用代理服务器

  • 相关阅读:
    【小程序图片水印】微信小程序图片加水印功能 canvas绘图
    golang中如何配置 sql.DB 以获得更好的性能
    SQL之回炉重造
    Shiro反序列化漏洞利用笔记
    坚持每天做一件事情的意义
    C# Winfrom 常用功能整合-2
    java计算机毕业设计在线影院系统MyBatis+系统+LW文档+源码+调试部署
    docker(Kubernetes)环境如何查看network namespace
    靠一颗火锅丸子弯道超车三全,安井到底凭什么?
    servlet交互过程图详解,servlet的常见问题,创建web项目(一)
  • 原文地址:https://blog.csdn.net/m0_56799642/article/details/125873808