• Spring-RabbitMQ 生产者消息确认案例分析


    测试环境

    1. Springboot 2.7.0
    2. 本机RabbitMQ Server端

    Springboot 集成 RabbitMQ

    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-amqpartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4

    案例准备

    1. 推送成功(推送到Exchange并路由到Queue)。
    2. 推送到Exchange,但没有被路由到Queue。
    3. 没有被推送到Exchange。

    本次测试分析准备有3个案例,分别在不开启消息确认和开启消息确认的情况下测试程序运行状态和结果。且测试前提是RabbitMQ Server端是可达的,如不可达会直接报错,也就无法进行后续测试了。
    Spring-RabbitMQ 消息确认案例分析

    案例测试

    一、不开启消息确认

    配置文件 application.yaml, 仅包含rabbitMQ连接信息。

    spring:
      rabbitmq:
        host: localhost
        port: 5672
        username: admin
        password: admin
        virtual-host: my_vhost
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    生产者:

    @Slf4j
    @Component
    public class SenderDemo {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void send(){
            log.info("======before========");
            CorrelationData correlationData = new CorrelationData();
            rabbitTemplate.convertAndSend("my.test.msg", (Object) "hello world", correlationData);
            log.info("======before========");
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    消费者:

    @Slf4j
    @Component
    public class Consumer {
        
        @RabbitListener(queues = "myQueue")
        public void consume(@Payload String msg){
            log.info("消费者收到消息 --> {}", msg);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    案例一(推送到Exchange并路由到Queue)测试

    配置类:创建Queue,并将其绑定到Exchange。

    @Configuration
    public class RabbitNoConfirmConfiguration {
    
        private final static String TOPIC_EXCHANGE = "myExchange";
    
        private final static String QUEUE_NAME = "myQueue";
    
        @Bean
        public RabbitAdmin amqpAdmin(ConnectionFactory connectionFactory) {
            return new RabbitAdmin(connectionFactory);
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            template.setMessageConverter(new Jackson2JsonMessageConverter());
            template.setExchange(TOPIC_EXCHANGE);
            return template;
        }
    
        @Bean
        public TopicExchange topicExchange(){
            return new TopicExchange(TOPIC_EXCHANGE, false, true);
        }
        
        @Bean
        public Queue queue(){
            return new Queue(QUEUE_NAME, false, false, true);
        }
        
        @Bean
        public Binding binding() {
            return BindingBuilder.bind(queue()).to(topicExchange()).with("my.test.*");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    测试结果:毫无疑问,一切正常。
    Spring-RabbitMQ 消息确认案例分析

    案例二(推送到Exchange,但没有被路由到Queue)测试

    配置类:创建了Queue,但没有绑定到Exchange。

    @Configuration
    public class RabbitNoConfirmConfiguration {
    
        private final static String TOPIC_EXCHANGE = "myExchange";
    
        private final static String QUEUE_NAME = "myQueue";
    
        @Bean
        public RabbitAdmin amqpAdmin(ConnectionFactory connectionFactory) {
            return new RabbitAdmin(connectionFactory);
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            template.setMessageConverter(new Jackson2JsonMessageConverter());
            template.setExchange(TOPIC_EXCHANGE);
            return template;
        }
    
        @Bean
        public TopicExchange topicExchange(){
            return new TopicExchange(TOPIC_EXCHANGE, false, true);
        }
    
        @Bean
        public Queue queue(){
            return new Queue(QUEUE_NAME, false, false, true);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    测试结果:生产者正常发送消息并结束。消费者没有消费消息,因为消息没有被路由到Queue。
    Spring-RabbitMQ 消息确认案例分析

    案例三(消息没有到达Exchange)测试

    配置类:让生产者发送消息到一个不存在的Exchange。

    @Configuration
    public class RabbitNoConfirmConfiguration {
    
        private final static String QUEUE_NAME = "myQueue";
    
        @Bean
        public RabbitAdmin amqpAdmin(ConnectionFactory connectionFactory) {
            return new RabbitAdmin(connectionFactory);
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            template.setMessageConverter(new Jackson2JsonMessageConverter());
            template.setExchange("not_exist_exchange");
            return template;
        }
    
        @Bean
        public Queue queue(){
            return new Queue(QUEUE_NAME, false, false, true);
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    测试结果:
    生产者正常发送消息并结束。虽然控制台有错误输出,但是是在发送结束后异步打印出来的。

    2022-08-02 12:13:36.012  INFO 56200 --- [           main] i.k.j.spring.rabbitmq.sender.SenderDemo  : ======生产者发送开始========
    2022-08-02 12:13:36.029  INFO 56200 --- [           main] i.k.j.spring.rabbitmq.sender.SenderDemo  : ======生产者发送结束========
    2022-08-02 12:13:36.032 ERROR 56200 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - no exchange 'not_exist_exchange' in vhost 'my_vhost', class-id=60, method-id=40)
    
    • 1
    • 2
    • 3

    结论

    如果不开启消息确认,只要RabbitMQ连接可达,生产者就不会发现异常。

    二、开启Simple消息确认

    配置文件:增加confirmType配置。

    spring:
      rabbitmq:
        host: localhost
        port: 5672
        username: admin
        password: admin
        virtual-host: my_vhost
        publisher-confirm-type: SIMPLE
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    生产者:Simple 消息确认模式下,生产者需要使用 rabbitTemplate.invoke 发送消息。

    public void simpleConfirmSend(){
        log.info("======生产者发送开始========");
        boolean success = rabbitTemplate.invoke(operations -> {
            rabbitTemplate.convertAndSend("my.test.msg", (Object) "hello world", new CorrelationData());
            try {
                return rabbitTemplate.waitForConfirms(5_1000);
            }catch (RuntimeException e){
                log.error(e.getMessage(), e);
                return false;
            }
        });
        log.info("消息发送成功了吗? --> {}", success);
        log.info("======生产者发送结束========");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    消费者:

    @Slf4j
    @Component
    public class Consumer {
        
        @RabbitListener(queues = "myQueue")
        public void consume(@Payload String msg){
            log.info("消费者收到消息 --> {}", msg);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    案例一(推送到Exchange并路由到Queue)测试

    配置类:

    @Slf4j
    @Configuration
    public class RabbitConfiguration {
        private final static String TOPIC_EXCHANGE = "myExchange";
        private final static String QUEUE_NAME = "myQueue";
        
        @Bean
        public RabbitAdmin amqpAdmin(ConnectionFactory connectionFactory) {
            return new RabbitAdmin(connectionFactory);
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            template.setMessageConverter(new Jackson2JsonMessageConverter());
            template.setExchange(TOPIC_EXCHANGE);
            return template;
        }
    
        @Bean
        public TopicExchange topicExchange(){
            return new TopicExchange(TOPIC_EXCHANGE, false, true);
        }
    
        @Bean
        public Queue queue(){
            return new Queue(QUEUE_NAME, false, false, true);
        }
    
        @Bean
        public Binding binding() {
            return BindingBuilder.bind(queue()).to(topicExchange()).with("my.test.*");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    测试结果:生成者发送成功,消费者消费成功。
    Spring-RabbitMQ 消息确认案例分析

    案例二(推送到Exchange,但没有被路由到Queue)测试

    配置类:

    @Slf4j
    @Configuration
    public class RabbitConfiguration {
        private final static String TOPIC_EXCHANGE = "myExchange";
    
        private final static String QUEUE_NAME = "myQueue";
    
        @Bean
        public RabbitAdmin amqpAdmin(ConnectionFactory connectionFactory) {
            return new RabbitAdmin(connectionFactory);
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            template.setMessageConverter(new Jackson2JsonMessageConverter());
            template.setExchange(TOPIC_EXCHANGE);
            return template;
        }
    
        @Bean
        public TopicExchange topicExchange(){
            return new TopicExchange(TOPIC_EXCHANGE, false, true);
        }
    
        @Bean
        public Queue queue(){
            return new Queue(QUEUE_NAME, false, false, true);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    测试结果:生产者发送成功,消费者没有消费到消息。
    Spring-RabbitMQ 消息确认案例分析

    案例三(消息没有到达Exchange)测试

    配置类:

    @Slf4j
    @Configuration
    public class RabbitConfiguration {
    
        private final static String QUEUE_NAME = "myQueue";
    
        @Bean
        public RabbitAdmin amqpAdmin(ConnectionFactory connectionFactory) {
            return new RabbitAdmin(connectionFactory);
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            template.setMessageConverter(new Jackson2JsonMessageConverter());
            template.setExchange("not_exist_exchange");
            return template;
        }
    
        @Bean
        public Queue queue(){
            return new Queue(QUEUE_NAME, false, false, true);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    测试结果:生产者发送失败。
    Spring-RabbitMQ 生产者消息确认案例分析

    结论

    如果开启Simple消息确认,当消息没有送达Exchange的时候,生产者会失败。但是由生产者代码可见,生产者要等到RabbitMQ回调回来之后才能继续往下执行,所有效率不高但是实现简单。

    三、开启Correlated消息确认

    配置文件:

    spring:
      rabbitmq:
        host: localhost
        port: 5672
        username: admin
        password: admin
        virtual-host: my_vhost
        publisher-confirm-type: CORRELATED
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    生产者:这里生产者就是简单的发送了,成功与失败在回调中异步确定。

    public void send(){
        log.info("======生产者发送开始========");
        CorrelationData correlationData = new CorrelationData();
        rabbitTemplate.convertAndSend("my.test.msg", (Object) "hello world", correlationData);
        log.info("======生产者发送结束========");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    消费者:

    @RabbitListener(queues = "myQueue")
    public void consume(@Payload String msg){
        log.info("消费者收到消息 --> {}", msg);
    }
    
    • 1
    • 2
    • 3
    • 4

    案例一(推送到Exchange并路由到Queue)测试

    @Slf4j
    @Configuration
    public class RabbitConfiguration {
        private final static String TOPIC_EXCHANGE = "myExchange";
        private final static String QUEUE_NAME = "myQueue";
    
        @Bean
        public RabbitAdmin amqpAdmin(ConnectionFactory connectionFactory) {
            return new RabbitAdmin(connectionFactory);
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            template.setMessageConverter(new Jackson2JsonMessageConverter());
            template.setExchange(TOPIC_EXCHANGE);
    
    
    		//增加confirm回调
            template.setConfirmCallback((correlationData, ack, cause) -> {
                log.info("ConfirmCallback:     关联数据:{}", correlationData);
                log.info("ConfirmCallback:     是否成功:{}", ack);
                log.info("ConfirmCallback:     失败原因原因:{}", cause);
            });
    
    
            return template;
        }
    
        @Bean
        public TopicExchange topicExchange(){
            return new TopicExchange(TOPIC_EXCHANGE, false, true);
        }
    
        @Bean
        public Queue queue(){
            return new Queue(QUEUE_NAME, false, false, true);
        }
    
        @Bean
        public Binding binding() {
            return BindingBuilder.bind(queue()).to(topicExchange()).with("my.test.*");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    测试结果:生产者发送成功,消费者消费成功。confirm回调确认结果为true。
    Spring-RabbitMQ 生产者消息确认案例分析

    案例二(推送到Exchange,但没有被路由到Queue)测试

    配置类:

    @Slf4j
    @Configuration
    public class RabbitConfiguration {
        private final static String TOPIC_EXCHANGE = "myExchange";
        private final static String QUEUE_NAME = "myQueue";
    
        @Bean
        public RabbitAdmin amqpAdmin(ConnectionFactory connectionFactory) {
            return new RabbitAdmin(connectionFactory);
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            template.setMessageConverter(new Jackson2JsonMessageConverter());
            template.setExchange(TOPIC_EXCHANGE);
    
            template.setConfirmCallback((correlationData, ack, cause) -> {
                log.info("ConfirmCallback:     关联数据:{}", correlationData);
                log.info("ConfirmCallback:     是否成功:{}", ack);
                log.info("ConfirmCallback:     失败原因原因:{}", cause);
            });
    
    
            return template;
        }
    
        @Bean
        public TopicExchange topicExchange() {
            return new TopicExchange(TOPIC_EXCHANGE, false, true);
        }
    
        @Bean
        public Queue queue() {
            return new Queue(QUEUE_NAME, false, false, true);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    测试结果:生产者发送成功,消费者没有消费消息。 确认回调结果为true。
    Spring-RabbitMQ 生产者消息确认案例分析

    案例三(消息没有到达Exchange)测试

    配置类:

    @Slf4j
    @Configuration
    public class RabbitConfiguration {
        private final static String QUEUE_NAME = "myQueue";
    
        @Bean
        public RabbitAdmin amqpAdmin(ConnectionFactory connectionFactory) {
            return new RabbitAdmin(connectionFactory);
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            template.setMessageConverter(new Jackson2JsonMessageConverter());
            template.setExchange("not_exist_exchange");
    
            template.setConfirmCallback((correlationData, ack, cause) -> {
                log.info("ConfirmCallback:     关联数据:{}", correlationData);
                log.info("ConfirmCallback:     是否成功:{}", ack);
                log.info("ConfirmCallback:     失败原因原因:{}", cause);
            });
    
    
            return template;
        }
    
        @Bean
        public Queue queue() {
            return new Queue(QUEUE_NAME, false, false, true);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    测试结果:生产者收到发送失败的回调,即判定消息发送失败。
    Spring-RabbitMQ 生产者消息确认案例分析
    失败原因:

    channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - no exchange 'not_exist_exchange' in vhost 'my_vhost', class-id=60, method-id=40)
    
    • 1

    结论

    如果开启Correlated消息确认,当消息没有送达Exchange的时候,生产者会失败。这里生产者只管发送消息,所有回调都是异步执行,性能上有了非常大的提升。

    四、开启Returns消息确认

    上面我们看到,不论 SIMPLE 还是 CORRELATED 消息确认,都只能确认消息是否成功发送到了Exchange,而无法确认消息是否被正常路由到Queue。如果我们需要确保消息必须被路由到Queue,那么就需要配置publisherReturnstrue, 配置publisherReturns=true 的前提是必须开启消息确认,即conformType 必须是SIMPLE 或者CORRELATED 。conformType 默认为NONE。

    配置文件:

    spring:
      rabbitmq:
        host: localhost
        port: 5672
        username: admin
        password: admin
        virtual-host: my_vhost
        # 消息确认(ACK)
        publisher-confirm-type: CORRELATED #correlated #确认消息已发送到交换机(Exchange)
        publisher-returns: true #确认消息已发送到队列(Queue)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    生产者消费者代码同 CORRELATED 确认时一样。

    案例一(推送到Exchange并路由到Queue)测试

    配置类:添加returns回调相关代码。

    @Slf4j
    @Configuration
    public class RabbitConfiguration {
        private final static String TOPIC_EXCHANGE = "myExchange";
        private final static String QUEUE_NAME = "myQueue";
    
        @Bean
        public RabbitAdmin amqpAdmin(ConnectionFactory connectionFactory) {
            return new RabbitAdmin(connectionFactory);
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            template.setMessageConverter(new Jackson2JsonMessageConverter());
            template.setExchange(TOPIC_EXCHANGE);
    
    
            template.setConfirmCallback((correlationData, ack, cause) -> {
                log.info("ConfirmCallback:     关联数据 -> {}", correlationData);
                log.info("ConfirmCallback:     是否成功 -> {}", ack);
                log.info("ConfirmCallback:     失败原因原因 -> {}", cause);
            });
    
    
            template.setMandatory(true);
            template.setReturnsCallback(returned -> {
                log.info("ReturnCallback -> {}", returned);
            });
    
            return template;
        }
    
        @Bean
        public TopicExchange topicExchange() {
            return new TopicExchange(TOPIC_EXCHANGE, false, true);
        }
    
        @Bean
        public Queue queue() {
            return new Queue(QUEUE_NAME, false, false, true);
        }
    
        @Bean
        public Binding binding() {
            return BindingBuilder.bind(queue()).to(topicExchange()).with("my.test.*");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    测试结果:生产者回调成功,消费着消费成功。returns回调没有被调用。
    Spring-RabbitMQ 生产者消息确认案例分析

    案例二(推送到Exchange,但没有被路由到Queue)测试

    配置类:

    @Slf4j
    @Configuration
    public class RabbitConfiguration {
        private final static String TOPIC_EXCHANGE = "myExchange";
        private final static String QUEUE_NAME = "myQueue";
    
        @Bean
        public RabbitAdmin amqpAdmin(ConnectionFactory connectionFactory) {
            return new RabbitAdmin(connectionFactory);
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            template.setMessageConverter(new Jackson2JsonMessageConverter());
            template.setExchange(TOPIC_EXCHANGE);
    
    
            template.setConfirmCallback((correlationData, ack, cause) -> {
                log.info("ConfirmCallback:     关联数据 -> {}", correlationData);
                log.info("ConfirmCallback:     是否成功 -> {}", ack);
                log.info("ConfirmCallback:     失败原因原因 -> {}", cause);
            });
    
    
            template.setMandatory(true);
            template.setReturnsCallback(returned -> {
                log.info("ReturnCallback -> {}", returned);
            });
    
            return template;
        }
    
        @Bean
        public TopicExchange topicExchange() {
            return new TopicExchange(TOPIC_EXCHANGE, false, true);
        }
    
        @Bean
        public Queue queue() {
            return new Queue(QUEUE_NAME, false, false, true);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    测试结果:可以看到,confirm回调结果为true。而且return回调也有被执行,且告诉我们路由到Queue失败。
    Spring-RabbitMQ 生产者消息确认案例分析

    案例三(消息没有到达Exchange)测试

    配置类:

    @Slf4j
    @Configuration
    public class RabbitConfiguration {
        private final static String QUEUE_NAME = "myQueue";
    
        @Bean
        public RabbitAdmin amqpAdmin(ConnectionFactory connectionFactory) {
            return new RabbitAdmin(connectionFactory);
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            template.setMessageConverter(new Jackson2JsonMessageConverter());
            template.setExchange("not_exist_exchange");
    
    
            template.setConfirmCallback((correlationData, ack, cause) -> {
                log.info("ConfirmCallback:     关联数据 -> {}", correlationData);
                log.info("ConfirmCallback:     是否成功 -> {}", ack);
                log.info("ConfirmCallback:     失败原因原因 -> {}", cause);
            });
    
    
            template.setMandatory(true);
            template.setReturnsCallback(returned -> {
                log.info("ReturnCallback -> {}", returned);
            });
    
    
            return template;
        }
    
    
        @Bean
        public Queue queue() {
            return new Queue(QUEUE_NAME, false, false, true);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    测试结果:
    可以看到,confirm回调告诉客户端消息发送失败,return回调没有被调用。
    Spring-RabbitMQ 生产者消息确认案例分析

    结论

    returns回调只有在消息正确发送到Exchange,但没有被路由到Queue才会回调。

    另:当return回调被调用后,confirm回调中的关联数据中也可以获取到returnedMessage.

  • 相关阅读:
    JavaScript
    【数据库MongoDB】MongoDB与大数据关系以及MongoDB中重要的进程:mongod进程与mongo进程关系
    基础生态学名词解释
    MIPI 打怪升级之D-PHY篇
    SpringBoot学习
    基于MVT的医学图像处理平台设计与实现
    19.1 STL总述、发展史、组成与数据结构谈
    JDK JRE JVM 的区别
    不花一分钱,在 Mac 上跑 Windows(M1/M2 版)
    网络协议攻击
  • 原文地址:https://blog.csdn.net/u012359704/article/details/126174525