• RabbitMQ(六)死信队列


    6.1概念

    ​ 先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到broker或者直接到queue里了,consumer 从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列
    ​ 应用场景:为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

    6.2死信来源

    • 消息TTL(Time To Live)过期
    • 队列到达了最大长度(队列满了,无法再添加数据到mq中)
    • 消息被拒绝(basic.reject或basic.nack)并且requeue = false

    6.3死信实战

    6.3.1代码架构图

    image-20220820211507919

    6.3.2代码

    ​ 由上图可知,需要有两个交换机(普通交换机、死信交换机),两个队列(普通队列、死信队列)。模拟三种成为死信的方式。

    1、消息TTL过期

    • producer
    package com.example.demo08;
    
    import com.example.utils.RabbitMqUtils;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author 我见青山多妩媚
     * @date Create on 2022/8/20 21:59
     * 死信队列之生产者
     */
    public class Producer {
        //普通交换机名称
       	private static final String NORMAL_EXCHANGE = "normal_exchange";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMqUtils.getChannel();
    
            //设置消息的过期时间 10s = 10000ms
            AMQP.BasicProperties properties = new AMQP.BasicProperties()
                    .builder()
                    .expiration("10000")
                    .build();
    
            //死信消息  设置TTL(time to live)时间
            for (int i = 0; i < 10; i++) {
                String message = "info:"+(i+1) + "";
                channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 消费者
    package com.example.demo08;
    
    import com.example.utils.RabbitMqUtils;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author 我见青山多妩媚
     * @date Create on 2022/8/20 21:33
     * 死信队列实战:消费者1
     */
    public class Consumer01 {
        //普通交换机
        private static final String NORMAL_EXCHANGE = "normal_exchange";
    
        //死信交换机名称
        private static final String DEAD_EXCHANGE = "dead_exchange";
    
        //普通队列名称
        private static final String NORMAL_QUEUE = "normal_queue";
    
        //死信队列名称
        private static final String DEAD_QUEUE = "dead_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMqUtils.getChannel();
    
            //声明死信交换机和普通交换机,类型为direct
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
    
            //声明普通队列,消息成为死信之后发送到死信交换机,发送到死信队列
            //最后一个参数:Map arguments
            Map<String, Object> arguments = new HashMap<>();
    
            //第一种成为死信的原因:过期时间,正常队列过期之后的死信交换机
            //过期时间: 10s = 10000ms 单位为ms,也可以在生产者设置过期时间,为了方便,此处在生产者放设置
    //        arguments.put("x-message-ttl",10000);
            //声明消息成为死信之后发送到的队列
            arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
            //设置死信routingKey:lisi
            arguments.put("x-dead-letter-routing-key","lisi");
    
            channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
    
            //声明死信队列
            channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
    
            //绑定普通交换机和队列
            channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
    
            //绑定死信交换机和队列
            channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
    
    
            //接收消息
            			channel.basicConsume(NORMAL_QUEUE,true,RabbitMqUtils.getDeliverCallback("Consume1"),RabbitMqUtils.getCancelCallback());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    演示:

    • 启动Consumer01,查看queue是否创建成功

    image-20220820221527057

    • 关闭Consumer01,模拟消费者假死
    • 打开生产者,发送10条消息

    image-20220820221648005

    可以看到生产者有10条消息,稍等之后刷新

    image-20220820221708464

    因为无法处理,所以进入了死信队列

    • 编写Consumer02
    package com.example.demo08;
    
    import com.example.utils.RabbitMqUtils;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author 我见青山多妩媚
     * @date Create on 2022/8/20 21:33
     * 死信队列实战:消费者2
     */
    public class Consumer02 {
        //普通交换机名称 
        private static final String DEAD_QUEUE = "dead_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMqUtils.getChannel();
    
            System.out.println("Consumer02等待接收消息");
            //接收消息
            channel.basicConsume(DEAD_QUEUE,true,RabbitMqUtils.getDeliverCallback("Consumer02"),RabbitMqUtils.getCancelCallback());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    因为消费者2仅仅用来消费死信队列的内容,所以代码比较简单。

    • 启动Consumer02
    //运行结果
    
    Consumer02等待接收消息
    Consumer02接收到的消息:info:1
    Consumer02接收到的消息:info:2
    Consumer02接收到的消息:info:3
    Consumer02接收到的消息:info:4
    Consumer02接收到的消息:info:5
    Consumer02接收到的消息:info:6
    Consumer02接收到的消息:info:7
    Consumer02接收到的消息:info:8
    Consumer02接收到的消息:info:9
    Consumer02接收到的消息:info:10
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    此时dead_queue也空了

    image-20220820222120398

    2、队列达到最大长度

    • produce去掉过期时间
    package com.example.demo08;
    
    import com.example.utils.RabbitMqUtils;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author 我见青山多妩媚
     * @date Create on 2022/8/20 21:59
     * 死信队列之生产者
     */
    public class Producer {
        //普通队列名称
       	private static final String NORMAL_EXCHANGE = "normal_exchange";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMqUtils.getChannel();
    
            //死信消息  设置TTL(time to live)时间
            for (int i = 0; i < 10; i++) {
                String message = "info:"+(i+1);
                channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes());
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • Consumer01加一个最大参数长度,其他代码不变
    		Map<String, Object> arguments = new HashMap<>();
            //第一种成为死信的原因:过期时间,正常队列过期之后的死信交换机
            //过期时间: 10s = 10000ms 单位为ms,也可以在生产者设置过期时间,为了方便,此处在生产者放设置
    //        arguments.put("x-message-ttl",10000);
            //声明消息成为死信之后发送到的队列
            arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
            //设置死信routingKey:lisi
            arguments.put("x-dead-letter-routing-key","lisi");
    	
            //设置最大队列长度:6,普通队列长度比6大时,进入死信队列
            arguments.put("x-max-length",6);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 删除normal_queue,因为修改了他的参数,启动Consumer01

    image-20220820222837140

    • 关闭Consumer01,打开produce

    image-20220820222939667

    四条消息进入死信队列,剩下的就和上面的一样,开启消费者2消费掉其中的数据

    Consumer02等待接收消息
    Consumer02接收到的消息:info:1
    Consumer02接收到的消息:info:2
    Consumer02接收到的消息:info:3
    Consumer02接收到的消息:info:4
    
    • 1
    • 2
    • 3
    • 4
    • 5

    3、消息被拒绝

    为了不被上方的影响,将Consumer01启动,消费掉6个消息

    Consume1接收到的消息:info:5
    Consume1接收到的消息:info:6
    Consume1接收到的消息:info:7
    Consume1接收到的消息:info:8
    Consume1接收到的消息:info:9
    Consume1接收到的消息:info:10
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    自定义拒绝消息,拒绝info:5

    • produce无任何变换,和第二种情况一样
    • Consumer01,增加拒绝消息
    package com.example.demo08;
    
    import com.example.utils.RabbitMqUtils;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author 我见青山多妩媚
     * @date Create on 2022/8/20 21:33
     * 死信队列实战:消费者1
     */
    public class Consumer01 {
        //普通交换机
        private static final String NORMAL_EXCHANGE = "normal_exchange";
    
        //死信交换机名称
        private static final String DEAD_EXCHANGE = "dead_exchange";
    
        //普通队列名称
        private static final String NORMAL_QUEUE = "normal_queue";
    
        //死信队列名称
        private static final String DEAD_QUEUE = "dead_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMqUtils.getChannel();
    
            //声明死信交换机和普通交换机,类型为direct
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
    
            //声明普通队列,消息成为死信之后发送到死信交换机,发送到死信队列
            //最后一个参数:Map arguments
            Map<String, Object> arguments = new HashMap<>();
    
            //第一种成为死信的原因:过期时间,正常队列过期之后的死信交换机
            //过期时间: 10s = 10000ms 单位为ms,也可以在生产者设置过期时间,为了方便,此处在生产者放设置
    //        arguments.put("x-message-ttl",10000);
            //声明消息成为死信之后发送到的队列
            arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
            //设置死信routingKey:lisi
            arguments.put("x-dead-letter-routing-key","lisi");
            //设置最大队列长度:6,普通队列长度比6大时,进入死信队列
    //        arguments.put("x-max-length",6);
    
    
    
            channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
    
            //声明死信队列
            channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
    
            //绑定普通交换机和队列
            channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
    
            //绑定死信交换机和队列
            channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
    
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                String msg = new String(message.getBody());
                if(msg.equals("info:5")){
                    System.out.println("Consume1拒收"+msg);
                    //拒绝消息,消息的标签;是否重新放回队列:false不放回普通队列-->成为死信队列
                    channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
                }else {
                    //打印消息内容
                    System.out.println("Consume1接收到的消息:"+new String(message.getBody()));
                    //手动应答消息
                    channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
                }
    
            };
    
            //接收消息,开启手动应答,不然无法拒绝消息
            channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,RabbitMqUtils.getCancelCallback());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 删除上一个案例的normal_queue,启动Consumer01,此时不再关闭
    • 开启produce

    image-20220820224128246

    死信队列中有一条消息

    • Consumer01的控制台
    Consume1接收到的消息:info:1
    Consume1接收到的消息:info:2
    Consume1接收到的消息:info:3
    Consume1接收到的消息:info:4
    Consume1拒收info:5
    Consume1接收到的消息:info:6
    Consume1接收到的消息:info:7
    Consume1接收到的消息:info:8
    Consume1接收到的消息:info:9
    Consume1接收到的消息:info:10
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 启动Consumer02
    Consumer02等待接收消息
    Consumer02接收到的消息:info:5
    
    • 1
    • 2
  • 相关阅读:
    普林斯顿微积分读本05第四章--求解多项式的极限问题
    内存屏障(Memory Barrier)
    Java 设计模式中之装饰器与观察者模式
    华为OD机试真题- 非严格递增连续数字序列-2023年OD统一考试(B卷)
    批量手机号码归属地查询工具
    Java 24 Design Pattern 之 策略模式
    中国男装产业政府战略管理与区域发展战略研究咨询报告
    deepin换阿里镜像源
    基于flowable的upp(统一流程平台)运行性能优化
    SaaSBase:什么是SAP(思爱普) ERP?
  • 原文地址:https://blog.csdn.net/YSecret_Y/article/details/126776474