• RabbitMq高级特性-2


    一、死信队列

    死信队列,英文缩写:DLX。Dead Letter Exchange (死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

    1.1消息在什么情况下会成为死信?

    1.队列消息长度到达限制;
    ⒉消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
    3.原队列存在消息过期设置,消息到达超时时间未被消费;

    1.2如何绑定死信交换机?

    1.2.1 broker设置:

    给队列设置参数: x-dead-letter-exchange和x-dead-letter-routing-key

    1.2.2 代码设置

    1.2.2.1 消息过期进入死信队列

       /**
         * 正常队列
         * @return
         */
        @Bean
        public Binding normalBinding(){
            return BindingBuilder.bind(normalQueue()).to(topicExchange()).with("normal.#");
        }
    
        @Bean
        public Queue normalQueue(){
            Queue queue = QueueBuilder.durable("normal.queue").ttl(5000).deadLetterRoutingKey("dlx.hello").deadLetterExchange("dlx.exchange").build();
            return queue;
        }
    
        @Bean
        public TopicExchange topicExchange(){
            return new TopicExchange("normal.exchange");
        }
    
        /**
         * 死信队列
         * @return
         */
    
        @Bean
        public Queue dlxQueue(){
            Queue queue = QueueBuilder.durable("dlx.queue").build();
            queue.setActualName("dlx.queue");
            return queue;
        }
    
        @Bean
        public TopicExchange dlxTopicExchange(){
            return new TopicExchange("dlx.exchange");
        }
    
        @Bean
        public Binding dlxBinding(){
            return BindingBuilder.bind(dlxQueue()).to(dlxTopicExchange()).with("dlx.#");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    1.2.2.2 队列的消息超过最大长度进入死信队列

      Queue queue = QueueBuilder.durable("normal.queue").maxLength(5).deadLetterRoutingKey("dlx.hello").deadLetterExchange("dlx.exchange").build();
    
    • 1
    public void  testDlx(){
            String msg = "这是发送的信息";
            for (int i = 0; i < 10; i++) {
                rabbitTemplate.convertAndSend("normal.exchange","normal.test",msg+i);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    1.2.2.3 消费者拒收进入死信队列(注入设置requeue为false,不然会重试无法进入死信队列)

    @RabbitListener(queues = {"normal.queue"})
        public void normalReceieveMsg(Message message,Channel channel) throws IOException {
    
    
            System.out.println("正常队列接收到消息->"+messageConverter.fromMessage(message));
    
            System.out.println("拒收进入死信队列");
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            channel.basicNack(deliveryTag,true,false);
    
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    二、延迟队列

    延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

    使用TTL加死信队列实现

    三、消息追踪

    在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。对于RabbitMQ而言,可能是因为生产者或消费者与RabbitMQ断开了连接,而它们与RabbitMQ又采用了不同的确认机制;也有可能是因为交换器与队列之间不同的转发策略;甚至是交换器并没有与任何队列进行绑定,生产者又不感知或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时候就需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和运维人员进行问题的定位。
    在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能来实现消息追踪。

    1、
    进入rabbitmq

    	rabbitmqctl trace_on
    
    • 1

    2、
    启用rabbitmq_tracing插件

    rabbitmq-plugins enable rabbitmq_tracing
    
    • 1
  • 相关阅读:
    ros发布节点和接收节点的编写
    Pandas教程(非常详细)(第六部分)
    大数据毕业设计选题推荐-智慧消防大数据平台-Hadoop-Spark-Hive
    (一)高并发压力测试调优篇——MYSQL数据库的调优
    mackdown语法
    基于web多媒体电子贺卡平台
    TensorRt推理部署优化方案及流程概述
    【ROS第十三课】《TF坐标系广播与监听的编程实现》详解
    链表剖析及自己手撸“单链表“实现基本操作(初始化、增、删、改等)
    力扣leetcode 827. 最大人工岛
  • 原文地址:https://blog.csdn.net/qq_46624276/article/details/125426723