• (rabbitmq的高级特性)消息可靠性


     对应的教程视频:

    高级篇Day5-01-MQ常见问题及消息可靠性_哔哩哔哩_bilibili

    一、生产者消息确认

     

     1.在生成者这个微服务的apllication.yml中添加配置

    1. spring:
    2. rabbitmq:
    3. publisher-confirm-type: correlated
    4. publisher-returns: true
    5. template:
    6. mandatory: true

    2.每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置

    代码:

    1. import lombok.extern.slf4j.Slf4j;
    2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    3. import org.springframework.beans.BeansException;
    4. import org.springframework.context.ApplicationContext;
    5. import org.springframework.context.ApplicationContextAware;
    6. import org.springframework.context.annotation.Configuration;
    7. /*
    8. ApplicationContextAware : spring 的bean工厂的通知
    9. */
    10. @Slf4j //记录日志
    11. @Configuration
    12. public class CommonConfig implements ApplicationContextAware {
    13. @Override
    14. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    15. // 从bean工厂中获取 RabbitTemplate 对象
    16. RabbitTemplate rabbitTemplate = applicationContext.getBean( RabbitTemplate.class );
    17. // 配置ReturnCallback
    18. // rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
    19. // // 像这种里面只有一个方法的,把鼠标方法 new 前面,会提醒 推荐用lambda表达式,快捷键 Alt + Enter
    20. // @Override
    21. // public void returnedMessage(Message message, int i, String s, String s1, String s2) {
    22. //
    23. // }
    24. // });
    25. // 像这种里面只有一个方法的,把鼠标方法 new 前面,会提醒 推荐用lambda表达式,快捷键 Alt + Enter
    26. rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
    27. // 记录日志 【 {}是占位符,replyCode, replyText, exchange...会依次填进占位符里的 】
    28. log.error( "消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机:{}, 路由key:{}, 消息:{}",
    29. replyCode, replyText, exchange, routingKey, message.toString() );
    30. // 如果有需要的话,可以重发消息
    31. });
    32. }
    33. }

     3.发送消息,指定消息ID、消息ConfirmCallback

     代码:

    1. import lombok.extern.slf4j.Slf4j;
    2. import org.junit.Test;
    3. import org.junit.runner.RunWith;
    4. import org.springframework.amqp.rabbit.connection.CorrelationData;
    5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    6. import org.springframework.beans.factory.annotation.Autowired;
    7. import org.springframework.boot.test.context.SpringBootTest;
    8. import org.springframework.test.context.junit4.SpringRunner;
    9. import org.springframework.util.concurrent.FailureCallback;
    10. import org.springframework.util.concurrent.SuccessCallback;
    11. import java.util.UUID;
    12. @Slf4j
    13. @RunWith(SpringRunner.class)
    14. @SpringBootTest
    15. public class SpringAmqpTest {
    16. @Autowired
    17. private RabbitTemplate rabbitTemplate;
    18. @Test
    19. public void testSendMessage2SimpleQueue() throws InterruptedException {
    20. // 1.准备消息
    21. String message = "hello, spring amqp!";
    22. // 2.准备CorrelationData
    23. // 2.1.消息ID
    24. CorrelationData correlationData = new CorrelationData( UUID.randomUUID().toString() );
    25. // 2.2.准备ConfirmCallback
    26. correlationData.getFuture().addCallback(result -> { //成功回调
    27. // 判断结果
    28. if( result.isAck() ) {
    29. // ACK
    30. log.debug( "消息成功投递到交换机!消息ID:{}", correlationData.getId() );
    31. } else {
    32. //NACK
    33. log.error( "消息投递到交换机失败!消息ID:{}", correlationData.getId() );
    34. // 重发消息
    35. }
    36. }, ex -> { //失败回调
    37. //记录日志
    38. log.error( "消息发送失败!", ex );
    39. });
    40. // 2.发送消息
    41. rabbitTemplate.convertAndSend("amq.topic", "simple.text", message);
    42. }
    43. }

    测试:

    (给交换机添加绑定关系,这一步看情况做,如果绑定关系已经有的了的表不需要这一步)

    测试错误例子

     4.总结

    SpringAMQP中处理消息确认的几种情况:

    publisher-comfirm
    •  消息成功发送到 exchange ,返回 ack
    •  消息发送失败,没有到达交换机,返回 nack
    •  消息发送过程中出现异常,没有收到回执
    消息成功发送到exchange,但没有路由到queue,调用ReturnCallback

           

    二、消息持久化 

    MQ默认的是内存存储,如果mq发生了宕机,数据是可能丢失。如果要想数据安全,就要做到持久化,也就是能将数据写进磁盘里

     代码:

    交换机和队列持久化 

    1. package cn.itcast.mq.config;
    2. import org.springframework.amqp.core.*;
    3. import org.springframework.context.annotation.Bean;
    4. import org.springframework.context.annotation.Configuration;
    5. @Configuration
    6. public class CommonConfig {
    7. // 交换机持久化
    8. @Bean
    9. public DirectExchange simpleDirect() {
    10. // 三个参数: 交换机名称、 是否持久化、 当没有queue与其绑定时是否自动删除
    11. return new DirectExchange( "simple.direct", true, false );
    12. }
    13. // 队列持久化
    14. @Bean
    15. public Queue simpleQueue() {
    16. // 使用QueueBuilder构建队列,durable就是持久化的 nonDurable()非持久化的
    17. return QueueBuilder.durable( "simple.queue" ).build();
    18. }
    19. }

     交换机、队列持久了,但并不代表消息就能持久了,所以必须做消息持久化

    1. // 1.准备消息 MessageDeliveryMode.PERSISTENT 消息持久化,这样重启mq消息也可以保留
    2. Message message = MessageBuilder.withBody( "hello, spring".getBytes(StandardCharsets.UTF_8) )
    3. .setDeliveryMode( MessageDeliveryMode.PERSISTENT )
    4. .build();

    交换机 和 队列 创建 以及 发送消息 的源码其实默认的就是 持久化 的

    而之所以学,是因为我们有时候为了提高性能,便可以将一些非必要的设置为 非持久化

     

    三、消费者消息确认 

    测试 auto:

    进入simple.queue生产一条消息 

    填写 消息并发送

     刷新

     当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:

     auto模式 这种情况下,虽然也不好,mq一直在尝试,但是至少消息不会丢失,

    auto的这种遇到处理失败后一直投递再投递,这种处理方式不太友好,但是可以改的,看四、失败重试机制

    四、失败重试机制

     

     

     

     重试次数耗尽之后,其实会返回一个reject拒绝,然后就会把消息丢弃,这是重试机制的默认策略

     重试次数耗尽之后,会把消息丢弃,事实上丢弃也没事,因为已经重试了那么多次了,还是次失败的,即便把消息再丢回给mq,mq再投递给你,也还是会失败。

    那么除了丢弃,还有没有其它的策略呢?有的...

     这种方案是最健康的方案了,也建议在生产环境下 使用这种方案

     

    1. package cn.itcast.mq.config;
    2. import org.springframework.amqp.core.Binding;
    3. import org.springframework.amqp.core.BindingBuilder;
    4. import org.springframework.amqp.core.DirectExchange;
    5. import org.springframework.amqp.core.Queue;
    6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    7. import org.springframework.amqp.rabbit.retry.MessageRecoverer;
    8. import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
    9. import org.springframework.context.annotation.Bean;
    10. import org.springframework.context.annotation.Configuration;
    11. @Configuration
    12. public class ErrormessageConfig {
    13. //首先,定义接收失败消息的交换机、队列及其绑定关系:
    14. @Bean
    15. public DirectExchange errorMessageExchange() {
    16. return new DirectExchange( "error.direct" );
    17. }
    18. @Bean
    19. public Queue errorQueue() {
    20. return new Queue( "error.queue" );
    21. }
    22. @Bean
    23. public Binding errorMessageBinding() {
    24. return BindingBuilder.bind( errorQueue() ).to( errorMessageExchange() ).with( "errpr" );
    25. }
    26. //定义RepublishMessageRecoverer 会 覆盖spring默认的默bean (我们想覆盖spring默认的bean,重新定义一个bean即可)
    27. @Bean
    28. public MessageRecoverer republishMessageRecoverer( RabbitTemplate rabbitTemplate ) {
    29. return new RepublishMessageRecoverer( rabbitTemplate, "error.direct", "error" );
    30. }
    31. }

  • 相关阅读:
    Git基本使用
    File的常见成员方法
    Node.js的基本使用(三)数据库与身份认证
    无需CORS,用nginx解决跨域问题,轻松实现低代码开发的前后端分离
    影响服务器性能的主要因素是什么?
    数据结构学习笔记——查找算法中的树形查找(平衡二叉树)
    Android 移动记账管理系统
    【Java】static关键字,内部类
    ML/DL2022面试必备500知识点-《机器和深度学习纲要》免费分享
    mybatis参数为0识别为空字符串的查询处理
  • 原文地址:https://blog.csdn.net/QRLYLETITBE/article/details/126685420