• RabbitMQ死信队列原理与项目代码示例


    1、产生死信消息的原因

    当在消费消息时,如果队列里的消息出现以下情况,那么该消息将成为一条死信消息:

    • 当一条消息被使用 channel.basicNack方法 或 channel.basicReject方法所nack响应 ,并且此时requeue 属性被设置为false。

    • 消息在队列的存活时间超过设置的生存时间(TTL)时间。

    • 消息队列的消息数量超过了设置的最大队列长度。

    死信队列(DLQ)非常简单,就一个普通的队列,只不过它是和死信交换机绑定的而已,在声明队列的时候,通过x-dead-letter-exchange参数和x-dead-letter-routing-key指定死信交换机以及死信路由键即可。

    • 参数dead-letter-exchange:指定死信交换机

    • 参数dead-letter-routing-key:指定死信路由键,用于绑定死信交换机和死信队列。

    2、代码实现

    pom.xml

    plugins {
    	id 'java'
    	id 'org.springframework.boot' version '3.1.1'
    	id 'io.spring.dependency-management' version '1.1.0'
    }
    
    group = 'com.kexuexiong'
    version = '0.0.1-SNAPSHOT'
    
    java {
    	sourceCompatibility = '17'
    }
    
    configurations {
    	compileOnly {
    		extendsFrom annotationProcessor
    	}
    }
    
    repositories {
    //	mavenCentral()
    	maven {
    		url 'https://maven.aliyun.com/repository/public'
    	}
    }
    
    dependencies {
    	implementation 'org.springframework.boot:spring-boot-starter-jdbc'
    	implementation 'org.springframework.boot:spring-boot-starter-validation'
    	implementation 'org.springframework.boot:spring-boot-starter-web'
    	implementation 'org.mybatis.spring.boot:mybatis-spring-boot-starter:3.0.2'
    	compileOnly 'org.projectlombok:lombok'
    	runtimeOnly 'com.mysql:mysql-connector-j'
    	annotationProcessor 'org.projectlombok:lombok'
    	testImplementation 'org.springframework.boot:spring-boot-starter-test'
    	testImplementation 'org.mybatis.spring.boot:mybatis-spring-boot-starter-test:3.0.2'
    	// https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp
    	implementation 'org.springframework.boot:spring-boot-starter-amqp'
    	implementation 'cn.hutool:hutool-all:5.8.16'
    }
    
    tasks.named('test') {
    	useJUnitPlatform()
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    yml配置文件:
    搭建的是RabbitMQ集群:192.168.49.10:5672,192.168.49.9:5672,192.168.49.11:5672
    详细搭建过程可以参考往期中的RabbitMQ集群搭建。

    server:
      port: 8014
    
    spring:
      rabbitmq:
        username: admin
        password: 123456
        dynamic: true
    #    port: 5672
    #    host: 192.168.49.9
        addresses: 192.168.49.10:5672,192.168.49.9:5672,192.168.49.11:5672
        publisher-confirm-type: correlated
        publisher-returns: true
      application:
        name: shushan
      datasource:
          driver-class-name: com.mysql.cj.jdbc.Driver
          url: jdbc:mysql://ip/shushan
          username: root
          password: 
          hikari:
            minimum-idle: 10
            maximum-pool-size: 20
            idle-timeout: 50000
            max-lifetime: 540000
            connection-test-query: select 1
            connection-timeout: 600000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    常量文件:

    package com.kexuexiong.shushan.common.mq;
    
    import javax.swing.plaf.PanelUI;
    
    public class MqConstant {
    
        public final static String demoDirectQueue = "demoDirectQueue";
    
        public final static String demoDirectExchange = "demoDirectExchange";
    
        public final static String demoDirectRouting = "demoDirectRouting";
    
        public final static String lonelyDirectExchange = "lonelyDirectExchange";
    
        public final static String topicExchange = "topicExchange";
    
        public final static String BIG_CAR_TOPIC = "topic.big_car";
    
        public final static String SMALL_CAR_TOPIC = "topic.small_car";
    
        public final static String TOPIC_ALL = "topic.#";
    
        public final static String FANOUT_A = "fanout.A";
    
        public final static String FANOUT_B = "fanout_B";
    
        public final static String FANOUT_C = "fanout_c";
    
        public final static String FANOUT_EXCHANGE = "fanoutExchange";
    
        public final static String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";
    
        public final static String DEAD_LETTER_QUEUE = "dead.letter.queue";
    
        public final static String DEAD_LETTER_ROUTING_KEY = "dead.letter.routing.key";
    
        public final static String BUSINESS_QUEUE = "business.queue";
    
        public final static String BUSINESS_ROUTING_KEY = "business.routing.key";
    
        public final static String BUSINESS_EXCHANGE = "business.exchange";
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    RabbitMQ配置文件:

    package com.kexuexiong.shushan.common.mq;
    
    
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    
    
    @Configuration
    public class DirectDLXRabbitConfig {
    
        @Bean
        public Queue businessDirectQueue() {
            HashMap<String, Object> args = new HashMap<>();
            args.put("x-dead-letter-exchange",MqConstant.DEAD_LETTER_EXCHANGE);//设置死信交换机
            args.put("x-dead-letter-routing-key",MqConstant.DEAD_LETTER_ROUTING_KEY);
            return QueueBuilder.durable(MqConstant.BUSINESS_QUEUE).withArguments(args).build();
        }
    
        @Bean
        DirectExchange businessDirectExchange() {
            return new DirectExchange(MqConstant.BUSINESS_EXCHANGE, true, false);
        }
    
        @Bean
        DirectExchange deadLetterDirectExchange() {
            return new DirectExchange(MqConstant.DEAD_LETTER_EXCHANGE, true, false);
        }
    
        @Bean
        public Queue deadLetterDirectQueue() {
            return new Queue(MqConstant.DEAD_LETTER_QUEUE, true, false, false);
        }
    
        @Bean
        Binding deadLetterBingingDirect() {
            return BindingBuilder.bind(deadLetterDirectQueue()).to(deadLetterDirectExchange()).with(MqConstant.DEAD_LETTER_ROUTING_KEY);
        }
    
    
        @Bean
        Binding businessBingingDirect() {
            return BindingBuilder.bind(businessDirectQueue()).to(businessDirectExchange()).with(MqConstant.BUSINESS_ROUTING_KEY);
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    队列绑定死信交换机:
    在这里插入图片描述

    生产者:

    package com.kexuexiong.shushan.controller.mq;
    
    import cn.hutool.core.date.DateUtil;
    import cn.hutool.core.util.RandomUtil;
    import com.kexuexiong.shushan.common.mq.MqConstant;
    import com.kexuexiong.shushan.controller.BaseController;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.MessagePostProcessor;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.UUID;
    
    @Slf4j
    @RestController
    @RequestMapping("/mq/")
    public class MqController extends BaseController {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @GetMapping("/deadLetterSendDirectMessage")
        public String deadLetterSendDirectMessage(){
    
            String msgId = UUID.randomUUID().toString();
            String msg = "demo msg ,dead letter!!";
            String createTime = DateUtil.format(new Date(),"YYYY-MM-dd HH:mm:ss");
    
            Map<String,Object> map = new HashMap();
            map.put("msgId",msgId);
            map.put("msg",msg);
            map.put("createTime",createTime);
    
            rabbitTemplate.convertAndSend(MqConstant.BUSINESS_EXCHANGE,MqConstant.BUSINESS_ROUTING_KEY,map);
    
            return "ok";
        }
    
        @GetMapping("/deadLetterTimeOutSendDirectMessage")
        public String deadLetterTimeOutSendDirectMessage(){
    
            String msgId = UUID.randomUUID().toString();
            String msg = "demo msg ,dead letter!!";
            String createTime = DateUtil.format(new Date(),"YYYY-MM-dd HH:mm:ss");
            log.info("msg time :" +createTime);
            Map<String,Object> map = new HashMap();
            map.put("msgId",msgId);
            map.put("msg",msg);
            map.put("createTime",createTime);
            Integer randomInt = RandomUtil.randomInt(30000);
            map.put("randomTime",randomInt);
            MessagePostProcessor messagePostProcessor = message -> {
                // 设置消息过期时间为5秒
                message.getMessageProperties().setExpiration("30000");
                return message;
            };
    
            rabbitTemplate.convertAndSend(MqConstant.BUSINESS_EXCHANGE,MqConstant.BUSINESS_ROUTING_KEY,map,messagePostProcessor);
    
            return "ok";
        }
    
       
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71

    消费者:

    package com.kexuexiong.shushan.common.mq;
    
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class MessageListenerConfig {
    
        @Autowired
        private CachingConnectionFactory connectionFactory;
    
        @Autowired
        private AckReceiver ackReceiver;
    
        @Bean
        public SimpleMessageListenerContainer simpleMessageListenerContainer() {
            SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
    
            simpleMessageListenerContainer.setConcurrentConsumers(2);
            simpleMessageListenerContainer.setMaxConcurrentConsumers(2);
            simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            //,MqConstant.demoDirectQueue, MqConstant.FANOUT_A, MqConstant.BIG_CAR_TOPIC
    
            simpleMessageListenerContainer.setQueueNames(MqConstant.DEAD_LETTER_QUEUE);
    
            simpleMessageListenerContainer.setMessageListener(ackReceiver);
    
            return simpleMessageListenerContainer;
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    AckReceiver:

    package com.kexuexiong.shushan.common.mq;
    
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
    import org.springframework.stereotype.Component;
    
    import java.io.ByteArrayInputStream;
    import java.io.ObjectInputStream;
    import java.util.Map;
    import java.util.Objects;
    
    @Slf4j
    @Component
    public class AckReceiver implements ChannelAwareMessageListener {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            byte[] messageBody = message.getBody();
            try (ObjectInputStream inputStream = new ObjectInputStream(new ByteArrayInputStream(messageBody));) {
    
                Map<String, String> msg = (Map<String, String>) inputStream.readObject();
                log.info(message.getMessageProperties().getConsumerQueue()+"-ack Receiver :" + msg);
                log.info("header msg :"+message.getMessageProperties().getHeaders());
                if(Objects.equals(message.getMessageProperties().getConsumerQueue(),MqConstant.BUSINESS_QUEUE)){
                 //拒绝
                    channel.basicNack(deliveryTag,false,false);
    
                }else if(Objects.equals(message.getMessageProperties().getConsumerQueue(),MqConstant.DEAD_LETTER_QUEUE)){
                    channel.basicAck(deliveryTag, true);
                }else {
                    channel.basicAck(deliveryTag, true);
                }
    
            } catch (Exception e) {
                channel.basicReject(deliveryTag, false);
                log.error(e.getMessage());
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    测试:

    1、当一条消息被使用 channel.basicNack方法 或 channel.basicReject方法所nack响应 ,并且此时requeue 属性被设置为false

    在这里插入图片描述

    2023-10-11T16:18:13.124+08:00  INFO 28104 --- [enerContainer-2] c.k.shushan.common.mq.AckReceiver        : business.queue-ack Receiver :{msg=demo msg ,dead letter!!, createTime=2023-10-11 16:18:13, msgId=9b31b4b3-c58f-47bd-8b27-cac4f53ae120}
    2023-10-11T16:18:13.125+08:00  INFO 28104 --- [enerContainer-2] c.k.shushan.common.mq.AckReceiver        : header msg :{spring_listener_return_correlation=995de3d2-d5a8-42fe-91e6-992fd485d20d}
    2023-10-11T16:18:13.125+08:00  INFO 28104 --- [nectionFactory1] c.k.shushan.common.config.RabbitConfig   : confirmCallback  data: null
    2023-10-11T16:18:13.125+08:00  INFO 28104 --- [nectionFactory1] c.k.shushan.common.config.RabbitConfig   : confirmCallback ack :true
    2023-10-11T16:18:13.125+08:00  INFO 28104 --- [nectionFactory1] c.k.shushan.common.config.RabbitConfig   : confirmCallback cause :null
    2023-10-11T16:18:13.127+08:00  INFO 28104 --- [enerContainer-1] c.k.shushan.common.mq.AckReceiver        : dead.letter.queue-ack Receiver :{msg=demo msg ,dead letter!!, createTime=2023-10-11 16:18:13, msgId=9b31b4b3-c58f-47bd-8b27-cac4f53ae120}
    2023-10-11T16:18:13.127+08:00  INFO 28104 --- [enerContainer-1] c.k.shushan.common.mq.AckReceiver        : header msg :{spring_listener_return_correlation=995de3d2-d5a8-42fe-91e6-992fd485d20d, x-first-death-exchange=business.exchange, x-death=[{reason=rejected, count=1, exchange=business.exchange, time=Wed Oct 11 16:18:14 CST 2023, routing-keys=[business.routing.key], queue=business.queue}], x-first-death-reason=rejected, x-first-death-queue=business.queue}
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    if(Objects.equals(message.getMessageProperties().getConsumerQueue(),MqConstant.BUSINESS_QUEUE)){
                 //拒绝
                    channel.basicNack(deliveryTag,false,false);
    
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    消费者代码中拒绝消费,然后消息被路由到死信队列中。

    {spring_listener_return_correlation=995de3d2-d5a8-42fe-91e6-992fd485d20d, x-first-death-exchange=business.exchange, x-death=[{reason=rejected, count=1, exchange=business.exchange, time=Wed Oct 11 16:18:14 CST 2023, routing-keys=[business.routing.key], queue=business.queue}], x-first-death-reason=rejected, x-first-death-queue=business.queue}
    
    • 1

    reason为rejected,并记录原交换机 exchange=business.exchange

    2、消息在队列的存活时间超过设置的生存时间(TTL)时间

    将消费者中的MqConstant.BUSINESS_QUEUE去掉,后测试。
    设置消息过期时间可以在队列中设置:增加参数x-message-ttl

     @Bean("businessQueue")
        public Queue businessQueue() {
            Map<String, Object> args = new HashMap<>(16);
            // 设置当前队列绑定的死信交换机
            args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
            // 设置当前队列的死信路由key
            args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
            // 设置消息的过期时间 单位:ms(毫秒)
            args.put("x-message-ttl", 5000);
            return QueueBuilder.durable(BUSINESS_QUEUE).withArguments(args).build();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    也可以针对每个消息进行设置过期时间:

      @GetMapping("/deadLetterTimeOutSendDirectMessage")
        public String deadLetterTimeOutSendDirectMessage(){
    
            String msgId = UUID.randomUUID().toString();
            String msg = "demo msg ,dead letter!!";
            String createTime = DateUtil.format(new Date(),"YYYY-MM-dd HH:mm:ss");
            log.info("msg time :" +createTime);
            Map<String,Object> map = new HashMap();
            map.put("msgId",msgId);
            map.put("msg",msg);
            map.put("createTime",createTime);
            Integer randomInt = RandomUtil.randomInt(30000);
            map.put("randomTime",randomInt);
            MessagePostProcessor messagePostProcessor = message -> {
                // 设置消息过期时间为5秒
                message.getMessageProperties().setExpiration("30000");
                return message;
            };
    
            rabbitTemplate.convertAndSend(MqConstant.BUSINESS_EXCHANGE,MqConstant.BUSINESS_ROUTING_KEY,map,messagePostProcessor);
    
            return "ok";
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    在这里插入图片描述
    测试:
    在这里插入图片描述

    2023-10-11T16:28:49.052+08:00  INFO 25848 --- [nio-8014-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
    2023-10-11T16:28:49.052+08:00  INFO 25848 --- [nio-8014-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
    2023-10-11T16:28:49.053+08:00  INFO 25848 --- [nio-8014-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 1 ms
    2023-10-11T16:28:49.099+08:00  INFO 25848 --- [nio-8014-exec-1] c.k.shushan.controller.mq.MqController   : msg time :2023-10-11 16:28:49
    2023-10-11T16:28:49.120+08:00  INFO 25848 --- [nectionFactory1] c.k.shushan.common.config.RabbitConfig   : confirmCallback  data: null
    2023-10-11T16:28:49.121+08:00  INFO 25848 --- [nectionFactory1] c.k.shushan.common.config.RabbitConfig   : confirmCallback ack :true
    2023-10-11T16:28:49.121+08:00  INFO 25848 --- [nectionFactory1] c.k.shushan.common.config.RabbitConfig   : confirmCallback cause :null
    2023-10-11T16:29:19.123+08:00  INFO 25848 --- [enerContainer-1] c.k.shushan.common.mq.AckReceiver        : dead.letter.queue-ack Receiver :{msg=demo msg ,dead letter!!, randomTime=22235, createTime=2023-10-11 16:28:49, msgId=4793dd5c-70c2-4c2f-a6ae-cdc6663e0b05}
    2023-10-11T16:29:19.123+08:00  INFO 25848 --- [enerContainer-1] c.k.shushan.common.mq.AckReceiver        : header msg :{spring_listener_return_correlation=22fdc49f-30da-4524-9fc8-d4f69eb4c2c3, x-first-death-exchange=business.exchange, x-death=[{reason=expired, original-expiration=30000, count=1, exchange=business.exchange, time=Wed Oct 11 16:29:20 CST 2023, routing-keys=[business.routing.key], queue=business.queue}], x-first-death-reason=expired, x-first-death-queue=business.queue}
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    在这里插入图片描述
    30秒过期,进入死信队列,然后被消费。

    3、 消息队列的消息数量超过了设置的最大队列长度

    修改RabbitMQ配置文件,创建businessDirectQueue时增加x-max-length设置容量的参数。

    package com.kexuexiong.shushan.common.mq;
    
    
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    
    
    @Configuration
    public class DirectDLXRabbitConfig {
    
        @Bean
        public Queue businessDirectQueue() {
            HashMap<String, Object> args = new HashMap<>();
            args.put("x-dead-letter-exchange",MqConstant.DEAD_LETTER_EXCHANGE);
            args.put("x-dead-letter-routing-key",MqConstant.DEAD_LETTER_ROUTING_KEY);
            args.put("x-max-length", 3);
            return QueueBuilder.durable(MqConstant.BUSINESS_QUEUE).withArguments(args).build();
        }
    
        @Bean
        DirectExchange businessDirectExchange() {
            return new DirectExchange(MqConstant.BUSINESS_EXCHANGE, true, false);
        }
    
        @Bean
        DirectExchange deadLetterDirectExchange() {
            return new DirectExchange(MqConstant.DEAD_LETTER_EXCHANGE, true, false);
        }
    
        @Bean
        public Queue deadLetterDirectQueue() {
            return new Queue(MqConstant.DEAD_LETTER_QUEUE, true, false, false);
        }
    
        @Bean
        Binding deadLetterBingingDirect() {
            return BindingBuilder.bind(deadLetterDirectQueue()).to(deadLetterDirectExchange()).with(MqConstant.DEAD_LETTER_ROUTING_KEY);
        }
    
    
        @Bean
        Binding businessBingingDirect() {
            return BindingBuilder.bind(businessDirectQueue()).to(businessDirectExchange()).with(MqConstant.BUSINESS_ROUTING_KEY);
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    在这里插入图片描述

    测试:
    在这里插入图片描述

    在这里插入图片描述
    第三次请求的结果:
    在这里插入图片描述

    第四次请求结果:
    在这里插入图片描述
    控制台输出,死信队列接收到消息。

    2023-10-11T16:46:54.783+08:00  INFO 20444 --- [nectionFactory2] c.k.shushan.common.config.RabbitConfig   : confirmCallback  data: null
    2023-10-11T16:46:54.783+08:00  INFO 20444 --- [nectionFactory2] c.k.shushan.common.config.RabbitConfig   : confirmCallback ack :true
    2023-10-11T16:46:54.783+08:00  INFO 20444 --- [nectionFactory2] c.k.shushan.common.config.RabbitConfig   : confirmCallback cause :null
    2023-10-11T16:46:54.787+08:00  INFO 20444 --- [enerContainer-1] c.k.shushan.common.mq.AckReceiver        : dead.letter.queue-ack Receiver :{msg=demo msg ,dead letter!!, createTime=2023-10-11 16:44:35, msgId=db7d2d7f-12a0-4ca4-9e92-81758cda436e}
    2023-10-11T16:46:54.787+08:00  INFO 20444 --- [enerContainer-1] c.k.shushan.common.mq.AckReceiver        : header msg :{spring_listener_return_correlation=e2a1871a-7765-43ee-b44c-d80d0ac6323c, x-first-death-exchange=business.exchange, x-death=[{reason=maxlen, count=1, exchange=business.exchange, time=Wed Oct 11 16:46:56 CST 2023, routing-keys=[business.routing.key], queue=business.queue}], x-first-death-reason=maxlen, x-first-death-queue=business.queue}
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    到这里三种情况都介绍完了,总体来讲RabbitMQ的死信队列还是很简单的。但是它作用还是很强大的,可以用于实现延时消息,订单到时取消等。

  • 相关阅读:
    javaee spring aop 注解实现
    深入浅出 SQL 优化:全面提升查询性能的技巧
    传输层协议—UDP协议
    redis漏洞利用总结
    浅析什么是伪类和伪元素?伪类和伪元素的区别解析
    动态规划算法(4)01背包问题
    本地数据库IndexedDB - 学员管理系统之条件筛选(四)
    删除数据库表中重复数据的方法
    <十>关于菱形继承
    在局域网里怎么在windows 10里连接到龙梦福珑2.0的Fedora 28图形界面?
  • 原文地址:https://blog.csdn.net/qq_22744093/article/details/133772586