• RabbitMQ:发布确认高级



    📃个人主页:不断前进的皮卡丘
    🌞博客描述:梦想也许遥不可及,但重要的是追梦的过程,用博客记录自己的成长,记录自己一步一步向上攀登的印记
    🔥个人专栏:消息中间件
    有时候因为一些问题,会导致RabbitMQ重启,在重启期间,生产者消息投递失败,导致消息丢失,需要手动处理和恢复,我们需要保证消息的可靠传递。

    1.发布确认

    1.1发布确认机制方案

    在这里插入图片描述

    在这里插入图片描述

    1.2全局配置文件

    在application.properties全局配置文件中添加spring.rabbitmq.publish-confirm-type属性,这个属性有以下几种值

    • none:禁用发布确认模式(默认)0
    • correlated:发布消息成功到交换机后会触发回调方法
    • simple:有两种效果
      • 第一种效果是和correlated一样会触发回调方法
      • 第二种效果是在发布消息成功以后使用rabbitTemplate调用waitForConfirms或者waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判单下一步的逻辑
      • waitForConfirmsOrDie方法如果返回false则会关闭信道,那么接下来就无法发送消息到broker
    # RabbitMQ/配置
    #服务器地址
    spring.rabbitmq.host=192.168.88.136
    #服务端口号
    spring.rabbitmq.port=5672
    #虚拟主机名称
    spring.rabbitmq.virtual-host=/myhost
    #用户名
    spring.rabbitmq.username=admin
    #密码
    spring.rabbitmq.password=123456
    #设置生产者发布确认模式
    spring.rabbitmq.publisher-confirm-type=correlated
     
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    1.3配置类

    package com.zyh.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
    * @author zengyihong
    * @create 2022--10--06 10:06
    */
    @Configuration
        public class ConfirmConfig {
            //确认交换机
            public static final String CONFIRM_EXCHANGE = "confirm_exchange";
            //确认队列
            public static final String CONFIRM_QUEUE = "confirm_queue";
            //路由key
            public static final String CONFIRM_ROUTING_KEY = "key1";
    
            /**
    * 声明确认交换机
    *
    * @return
    */
            @Bean
            public DirectExchange confirmExchange() {
                return new DirectExchange(CONFIRM_EXCHANGE);
            }
    
            /**
    * 声明确认队列
    *
    * @return
    */
            @Bean
            public Queue confirmQueue() {
                return QueueBuilder.durable(CONFIRM_QUEUE).build();
            }
    
            /**
    * 把确认交换机和确认队列进行绑定
    * @param queue
    * @param exchange
    * @return
    */
            @Bean
            public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,@Qualifier("confirmExchange") DirectExchange exchange){
                return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY);
            }
    
        }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    1.4生产者

    package com.zyh.controller;
    
    import com.zyh.config.ConfirmConfig;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import javax.annotation.Resource;
    import java.util.Date;
    
    /**
     * @author zengyihong
     * @create 2022--10--06 10:15
     */
    @Slf4j
    @RestController
    @RequestMapping("/confirm")
    public class ConfirmController {
        @Resource
        private RabbitTemplate rabbitTemplate;
        /**
         * 生产者发送消息
         *
         * @param message
         */
        @GetMapping("/sendConfirmMessage/{message}")
        public void sendMessage(@PathVariable String message) {
            log.info("生产者发送消息:{}",message);
            rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE, ConfirmConfig.CONFIRM_ROUTING_KEY, message);
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    1.5消费者

    package com.zyh.consumer;
    
    import com.zyh.config.ConfirmConfig;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.io.UnsupportedEncodingException;
    
    /**
     * @author zengyihong
     * @create 2022--10--06 10:20
     */
    @Slf4j
    @Component
    public class ConfirmConsumer {
        @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE)
        public void receiveConfirmMessage(Message message) {
    
            try {
                //获取消息
                String msg = new String(message.getBody(),"UTF-8");
                //记录日志
                log.info("消费者接收到确认队列中的消息:{}"+msg);
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    1.6测试

    正常运行结果如图所示,如果rabbitmq出现故障的话,那么结果是不会显示出来的,我们可以通过回调接口来监测运行结果
    image.png

    1.7回调接口

    package com.zyh.config;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    import javax.annotation.Resource;
    
    /**
     * @author zengyihong
     * @create 2022--10--06 10:36
     */
    @Slf4j
    @Component
    public class MyCallBack implements RabbitTemplate.ConfirmCallback {
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        //依赖注入rabbitTemplate之后再设置它的回调对象
        @PostConstruct
        public void init() {
            //把当前类MyCallBack实现类注入到RabbitTemplate中确认回调接口中
            rabbitTemplate.setConfirmCallback(this);
        }
    
        /**
         * 不管交换机有没有接收到消息,都会执行这个回调方法
         * @param correlationData
         * @param ack
         * @param cause
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            //获取消息id
            String id = correlationData != null ? correlationData.getId() : "";
            //判断交换机是否接收到消息
            if (ack) {
                log.info("交换机已经收到id为{}的消息", id);
            } else {
                log.info("交换机还没有收到id为{}的消息,原因是{}", id, cause);
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    1.8改写生产者代码

    package com.zyh.controller;
    
    import com.zyh.config.ConfirmConfig;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import javax.annotation.Resource;
     
    
    /**
     * @author zengyihong
     * @create 2022--10--06 10:15
     */
    @Slf4j
    @RestController
    @RequestMapping("/confirm")
    public class ConfirmController {
        @Resource
        private RabbitTemplate rabbitTemplate;
        /**
         * 生产者发送消息
         *
         * @param message
         */
        @GetMapping("/sendConfirmMessage/{message}")
        public void sendMessage(@PathVariable String message) {
            //指定消息id为1的数据
            CorrelationData correlationData1 = new CorrelationData("1");
    
            rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE, ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData1);
            CorrelationData correlationData2 = new CorrelationData("2");
            //key2是一个不存在的路由key
            rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE, "key2", message,correlationData2);
            log.info("生产者发送消息:{}",message);
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    1.9测试

    image.png
    交换机收到两条信息,但是消费者只能消费一条消息,因为第二条消息的路由key和交换机的binding key不一样,也没有其他队列可以接收这条消息,所以就被丢弃了。

    2.回退消息

    2.1Mandatory参数

    如果我们仅仅开启了生产者确认机制,那么当交换机接收到消息以后,会直接给生产者发送确认消息,但是如果发现消息不可以路由,就会直接把消息丢弃,此时消费者接收不到消息,而且这个时候生产者也不知道消息被丢弃了,这样就导致消息丢失。我们可以通过设置mandatory参数,使得消息在传递过程中出现不可到达的目的地的时候可以把消息返回给生产者

    2.2在全局配置文件中开启回退消息

    # RabbitMQ/配置
    #服务器地址
    spring.rabbitmq.host=192.168.88.136
    #服务端口号
    spring.rabbitmq.port=5672
    #虚拟主机名称
    spring.rabbitmq.virtual-host=/myhost
    #用户名
    spring.rabbitmq.username=admin
    #密码
    spring.rabbitmq.password=123456
    #设置生产者发布确认模式
    spring.rabbitmq.publisher-confirm-type=correlated
    #开启消息回退
    spring.rabbitmq.publisher-returns=true
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    2.3生产者

    package com.zyh.controller;
    
    import com.zyh.config.ConfirmConfig;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import javax.annotation.Resource;
    
    
    /**
     * @author zengyihong
     * @create 2022--10--06 10:15
     */
    @Slf4j
    @RestController
    @RequestMapping("/confirm")
    public class ConfirmController {
        @Resource
        private RabbitTemplate rabbitTemplate;
        /**
         * 生产者发送消息
         *
         * @param message
         */
        @GetMapping("/sendConfirmMessage/{message}")
        public void sendMessage(@PathVariable String message) {
            //指定消息id为1的数据
            CorrelationData correlationData1 = new CorrelationData("1");
    
            rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE, ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData1);
            CorrelationData correlationData2 = new CorrelationData("2");
            //key2是一个不存在的路由key
            rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE, "key2", message,correlationData2);
            log.info("生产者发送消息:{}",message);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    2.4回调接口

    package com.zyh.config;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.ReturnedMessage;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    import javax.annotation.Resource;
    
    /**
     * @author zengyihong
     * @create 2022--10--06 10:36
     */
    @Slf4j
    @Component
    public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        //依赖注入rabbitTemplate之后再设置它的回调对象
        @PostConstruct
        public void init() {
            //把当前类MyCallBack实现类注入到RabbitTemplate中确认回调接口中
            rabbitTemplate.setConfirmCallback(this);
            //把当前类MyCallBack实现类注入到RabbitTemplate中消息回退接口中
            rabbitTemplate.setReturnsCallback(this);
        }
    
        /**
         * 不管交换机有没有接收到消息,都会执行这个回调方法
         * @param correlationData
         * @param ack
         * @param cause
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            //获取消息id
            String id = correlationData != null ? correlationData.getId() : "";
            //判断交换机是否接收到消息
            if (ack) {
                log.info("交换机已经收到id为{}的消息", id);
            } else {
                log.info("交换机还没有收到id为{}的消息,原因是{}", id, cause);
            }
        }
    
        @Override
        public void returnedMessage(ReturnedMessage returnedMessage) {
            log.error("消息{}----->被交换机{}退回,退回原因:{},路由key:{}",
                    new String(returnedMessage.getMessage().getBody()),
                    returnedMessage.getExchange(),
                    returnedMessage.getReplyText(),
                    returnedMessage.getRoutingKey());
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    2.5消费者

    package com.zyh.consumer;
    
    import com.zyh.config.ConfirmConfig;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.io.UnsupportedEncodingException;
    
    /**
     * @author zengyihong
     * @create 2022--10--06 10:20
     */
    @Slf4j
    @Component
    public class ConfirmConsumer {
        @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE)
        public void receiveConfirmMessage(Message message) {
    
            try {
                //获取消息
                String msg = new String(message.getBody(),"UTF-8");
                //记录日志
                log.info("消费者接收到确认队列中的消息:{}",msg);
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    2.6测试

    image.png

    3.备份交换机

    Rabbitmq——备份交换机

  • 相关阅读:
    【能效管理】电力监控系统在移动某分公司配电系统中的应用分析
    L2-032 彩虹瓶
    (一)JDK、转义字符、数据类型
    设备巡检的痛点和巡检方案
    Uniapp进行App云打包—安卓Android端
    向毕业妥协系列之机器学习笔记:监督学习-回归与分类(二)
    【04】概率图表示之贝叶斯网络
    第四章、实现用例
    ECCV 2022 | 石溪大学联合小鹏汽车提出先验知识指导的无监督领域自适应
    ElasticSearch(一)
  • 原文地址:https://blog.csdn.net/qq_52797170/article/details/127361814