• [MQ] 死信队列介绍与场景描述


    ✨✨个人主页:沫洺的主页

    📚📚系列专栏: 📖 JavaWeb专栏📖 JavaSE专栏 📖 Java基础专栏📖vue3专栏 

                               📖MyBatis专栏📖Spring专栏📖SpringMVC专栏📖SpringBoot专栏

                               📖Docker专栏📖Reids专栏📖MQ专栏📖SpringCloud专栏     

    💖💖如果文章对你有所帮助请留下三连✨✨

    🐣什么是死信

    在 RabbitMQ 中充当主角的就是消息,在不同场景下,消息会有不同地表现。

    死信就是消息在特定场景下的一种表现形式,这些场景包括:

    1. 消息被拒绝访问,即 RabbitMQ返回 basicNack 的信号时。 或者拒绝basicReject
    2. 消费者发生异常,超过重试次数 。
    3. 消息的 TTL 时间过期。
    4. 消息队列达到最大长度。

    上述场景经常产生死信,即消息在这些场景中时,被称为死信。

    🐤什么是死信队列 

    死信队列就是用于储存死信的消息队列,在死信队列中,有且只有死信构成,不会存在其余类型的消息。

    死信队列在 RabbitMQ 中并不会单独存在,往往死信队列都会绑定这一个普通的消息队列,当所绑定的消息队列中,有消息变成死信了,那么这个消息就会重新被交换机路由到指定的死信队列中去,我们可以通过对这个死信队列进行监听,从而手动的去对这一消息进行补偿。

    🐓使用死信队列

    死信队列基本使用,只需要在声明业务队列的时候,绑定指定的死信交换机和RoutingKey即可。

    1. //定义业务队列
    2. @Bean
    3. public Queue directQueue() {
    4. return QueueBuilder.durable(QNAME)
    5. .deadLetterExchange(DENAME) //通过这两个配置,使我们的业务队列与死信交换机有关系了
    6. .deadLetterRoutingKey(DKEY)
    7. .ttl(60*1000) //设置消息多久过期,消息超过这个时间就直接给死信交换机,让它处理
    8. .maxLength(1000) //设置队列最大容量,超过这个容量就直接给死信交换机,让它处理
    9. .build();
    10. }

    环境搭建

    消费者

    1. @Component
    2. public class DeadConsumer {
    3. //死信
    4. private static final String DENAME = "211-DeadExchage-死信";
    5. private static final String DQNAME = "211-DeadQueue-死信";
    6. private static final String DKEY = "211-DeadQueue-RoutingKey";
    7. //业务
    8. private static final String ENAME = "211-DirectExchage-业务";
    9. private static final String QNAME = "211-DirectQueue-业务";
    10. private static final String KEY = "211-DirectQueue-RoutingKey";
    11. //region 队列和交换机的注册
    12. //定义死信交换机,本质就是普通的直连交换机
    13. @Bean
    14. public DirectExchange deadExchange() {
    15. return new DirectExchange(DENAME, true, false);
    16. }
    17. //定义死信队列
    18. @Bean
    19. public Queue deadQueue() {
    20. return QueueBuilder.durable(DQNAME).build();
    21. }
    22. //创建死信队列和死信交换机的绑定关系
    23. @Bean
    24. public Binding binding1() {
    25. return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(DKEY);
    26. }
    27. //定义业务交换机
    28. @Bean
    29. public DirectExchange directExchange() {
    30. return new DirectExchange(ENAME, true, false);
    31. }
    32. //定义业务队列
    33. @Bean
    34. public Queue directQueue() {
    35. return QueueBuilder.durable(QNAME)
    36. .deadLetterExchange(DENAME) //通过这两个配置,使我们的业务队列与死信交换机有关系了
    37. .deadLetterRoutingKey(DKEY)
    38. .ttl(60*1000) //设置消息多久过期,消息超过这个时间就直接给死信交换机,让它处理
    39. .maxLength(5) //设置队列最大容量,超过这个容量就直接给死信交换机,让它处理
    40. .build();
    41. }
    42. //创建业务队列和业务交换机的绑定关系
    43. @Bean
    44. public Binding binding2() {
    45. return BindingBuilder.bind(directQueue()).to(directExchange()).with(KEY);
    46. }
    47. //endregion
    48. //业务消费者
    49. @RabbitHandler
    50. @RabbitListener(queues = QNAME)
    51. public void process1(UserRegisterOk userRegisterOk) {
    52. System.out.println(DateUtil.format(DateUtil.date(),"HH:mm:ss")
    53. +"消费者接收消息:" + userRegisterOk.getName()
    54. + "," + userRegisterOk.getAge());
    55. }
    56. }

    生产者

    1. @Component
    2. public class DeadProducer {
    3. @Autowired
    4. private RabbitTemplate rabbitTemplate;
    5. public void sendMessage(){
    6. UserRegisterOk userRegisterOk1 = UserRegisterOk.builder().name("张三").age(18).build();
    7. //要将对象序列化,转成字符串,使用消息转换器MessageConverter
    8. rabbitTemplate.convertAndSend(
    9. "211-DirectExchage-业务",
    10. "211-DirectQueue-RoutingKey",
    11. userRegisterOk1);
    12. System.out.println(DateUtil.format(DateUtil.date(),"HH:mm:ss") +"生产者生产消息-->张三发送成功");
    13. }
    14. }

    启动类

    1. @SpringBootApplication
    2. public class App4 {
    3. public static void main(String[] args) {
    4. ConfigurableApplicationContext context = SpringApplication.run(App4.class, args);
    5. DeadProducer producer = context.getBean(DeadProducer.class);
    6. producer.sendMessage();
    7. }
    8. }

    场景描述: 当没有消费者时,消息的 TTL 时间过期

    ttl(60*1000) //设置消息多久过期,消息超过这个时间就直接给死信交换机,让它处理

    死信队列一般是没有消费者的,都是通过人工干预去处理

    场景描述: 没有消费者时,消息队列达到最大长度

     生产者发送8条消息,消息队列最大长度为5,多出的3条会放到死信队列里

     maxLength(5) //设置队列最大容量,超过这个容量就直接给死信交换机,让它处理

    场景描述: 手动应答下,消息被拒绝访问

    1. #开启消费者应答模式为 auto自动应答 manual手动应答
    2. spring.rabbitmq.listener.simple.acknowledge-mode=manual
    3. #开启消费者自动重试机制,也就是消费者函数只要抛出异常,就会触发重试
    4. spring.rabbitmq.listener.simple.retry.enabled=false

    还是生产者发送8条消息,区别是对象年龄不同

    1. @Component
    2. public class DeadProducer {
    3. @Autowired
    4. private RabbitTemplate rabbitTemplate;
    5. public void sendMessage(){
    6. for (int i=1;i<=8;i++){
    7. UserRegisterOk userRegisterOk1 = UserRegisterOk.builder().name("张三").age(18+i).build();
    8. //要将对象序列化,转成字符串,使用消息转换器MessageConverter
    9. rabbitTemplate.convertAndSend(
    10. "211-DirectExchage-业务",
    11. "211-DirectQueue-RoutingKey",
    12. userRegisterOk1);
    13. System.out.println(DateUtil.format(DateUtil.date(),"HH:mm:ss") +"生产者生产消息-->张三发送成功");
    14. }
    15. }
    16. }

    消费者判断对象的年龄,符合就应答,不符合就拒绝应答

    1. //业务消费者
    2. @RabbitHandler
    3. @RabbitListener(queues = QNAME)
    4. public void process1(UserRegisterOk userRegisterOk,Message message, Channel channel) throws IOException {
    5. long deliveryTag = message.getMessageProperties().getDeliveryTag();
    6. if(userRegisterOk.getAge()>23){
    7. System.out.println(DateUtil.format(DateUtil.date(),"HH:mm:ss")
    8. +"消费者接收消息:" + userRegisterOk.getName()
    9. + "," + userRegisterOk.getAge()+"年龄符合");
    10. channel.basicAck(deliveryTag,false);
    11. }else {
    12. System.out.println(DateUtil.format(DateUtil.date(),"HH:mm:ss")
    13. +"消费者接收消息:" + userRegisterOk.getName()
    14. + "," + userRegisterOk.getAge()+"年龄不符合");
    15. channel.basicNack(deliveryTag,false,false);
    16. }
    17. }

    场景描述: 消费者发生异常,超过重试次数

    重试5次

    1. #开启消费者应答模式为 auto自动应答 manual手动应答
    2. spring.rabbitmq.listener.simple.acknowledge-mode=auto
    3. #开启消费者自动重试机制,也就是消费者函数只要抛出异常,就会触发重试
    4. spring.rabbitmq.listener.simple.retry.enabled=true
    5. #设置重试最大次数
    6. spring.rabbitmq.listener.simple.retry.max-attempts=5
    7. #设置重试时间最大间隔
    8. spring.rabbitmq.listener.simple.retry.max-interval=8000ms
    9. #设置重试时间间隔
    10. spring.rabbitmq.listener.simple.retry.initial-interval=1000ms
    11. #设置重试时间间隔的倍数
    12. spring.rabbitmq.listener.simple.retry.multiplier=2

    模拟消费者异常

    1. //业务消费者
    2. @RabbitHandler
    3. @RabbitListener(queues = QNAME)
    4. public void process1(UserRegisterOk userRegisterOk) {
    5. System.out.println(DateUtil.format(DateUtil.date(),"HH:mm:ss")
    6. +"消费者接收消息:" + userRegisterOk.getName()
    7. + "," + userRegisterOk.getAge());
    8. int i=1/0;
    9. }

    5次之后就不再消费了,将消息放到死信队列里

    1. #超过重试次数后,进入死信队列,默认为false
    2. #重试次数超过上面的设置之后是否丢弃(false不丢弃时需要写相应代码将该消息加入死信队列)
    3. spring.rabbitmq.listener.simple.default-requeue-rejected = false
  • 相关阅读:
    Redis查询,设置超时时间
    高可用环境kafka消息未按顺序消费问题
    java版本企业电子招标采购系统源码Spring Cloud + Spring Boot +二次开发
    linux下命令行静默安装oracle11G简要步骤
    Python蒙特卡洛树搜索算法实现的黑白棋AI系统
    『现学现忘』Docker基础 — 31、实现MySQL同步数据
    java生成验证码返回前端图片,后端通过redis存储和校验
    【计算机毕业设计】基于SpringBoot+Vue记帐理财系统的设计与实现
    函数的节流和防抖?节流和防抖的区别及实现
    Python接口自动化测试post请求和get请求,获取请求返回值
  • 原文地址:https://blog.csdn.net/HeyVIrBbox/article/details/127881475