• RabbitMQ 消息队列学习 (四)


    1. 延迟队列


    延迟队列就是用来存放需要在指定时间被处理的元素的队列。

    例如:订单在十分钟之内未支付则自动取消。

    • 订单消息进入了延迟队列,10分钟过后,该消息从延迟队列中传出,走自动取消订单的程序。

    例如:用户注册成功后,如果三天内没有登录则进行短信提醒。

    • 用户注册成功后,注册成功的消息进入了延迟队列,三天后,该消息从延迟队列中传出,判断三天内用户是否登录,没有就走发送短信提醒程序。
      在这里插入图片描述

    一个订单业务流程如下:
    在这里插入图片描述

    2. rabbitmq 整合springboot

    2.1 rabbitmq的环境搭建


    导入rabbitmq的相关配置:

    <!--支持rabbitmq依赖-->
    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
    <!--rabbitmq的测试依赖-->
    <dependency>
    	<groupId>org.springframework.amqp</groupId>
    	<artifactId>spring-rabbit-test</artifactId>
    	<scope>test</scope>
    </dependency>
    
    <!--支持web-->
    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <!--fastjson依赖-->
    <dependency>
    	<groupId>com.alibaba</groupId>
    	<artifactId>fastjson</artifactId>
    	<version>1.2.80</version>
    </dependency>
    
    <!--swagger对界面进行测试-->
    <dependency>
    	<groupId>io.springfox</groupId>
    	<artifactId>springfox-swagger2</artifactId>
    	<version>2.9.2</version>
    </dependency>
    <dependency>
    	<groupId>io.springfox</groupId>
    	<artifactId>springfox-swagger-ui</artifactId>
    	<version>2.9.2</version>
    </dependency>
    
    <dependency>
    	<groupId>org.projectlombok</groupId>
    	<artifactId>lombok</artifactId>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    swagger2的作用:

    • Swagger2可以快速帮助我们编写最新的API接口文档。

    修改rabbitmq的相关配置文件:

    spring.rabbitmq.host=39.103.163.156
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=0818
    
    • 1
    • 2
    • 3
    • 4

    代码架构图:

    • 按照如下架构搭建代码。
      在这里插入图片描述

    springboot项目 和 一般项目 都可以单独声明一些交换机,队列:

    • springboot项目中可以通过使用配置类的方式来声明交换机和队列。

    2.2 基于死信的 TTL延迟队列 配置文件类代码


    配置文件类代码:

    • 提前将所有的交换机和队列以及它们之间的绑定关系,提前通过配置类的方式声明好。
    package com.itholmes.shopping.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * TTL队列 , 延迟队列
     *      配置文件类代码
     */
    
    @Configuration
    public class TtlQueueConfig {
    
        //普通交换机的名称
        public static final String X_EXCHANGE = "X";
        //死信交换机的名称
        public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
        //普通队列的名称
        public static final String QUEUE_A = "QA";
        public static final String QUEUE_B = "QB";
        //死信队列的名称
        public static final String DEAD_LETTER_QUEUE_D = "QD";
    
        //声明xExchange 普通直接交换机
        @Bean("xExchange")
        public DirectExchange xExchange(){
            return new DirectExchange(X_EXCHANGE);
        }
    
        //声明yExchange 死信直接交换机
        @Bean("yExchange")
        public DirectExchange yExchange(){
            return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
        }
    
        //声明queueA普通队列 , 10秒
        @Bean("queueA")
        public Queue queueA(){
            Map<String ,Object> arguments = new HashMap<>(3);
    
            //设置死信交换机
            arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
            //设置死信RoutingKey
            arguments.put("x-dead-letter-routing-key","YD");
            //设置过期时间ttl,单位是ms毫秒。
            arguments.put("x-message-ttl",10000);
    
            return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
        }
    
        //声明queueB普通队列 , 40秒
        @Bean("queueB")
        public Queue queueB(){
            Map<String ,Object> arguments = new HashMap<>(3);
    
            //设置死信交换机
            arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
            //设置死信RoutingKey
            arguments.put("x-dead-letter-routing-key","YD");
            //设置过期时间ttl,单位是ms毫秒。
            arguments.put("x-message-ttl",40000);
    
            //通过使用构建工具类QueueBuilder来声明队列。
            return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
        }
    
        //死信队列
        @Bean("queueD")
        public Queue queueD(){
            return QueueBuilder.durable(DEAD_LETTER_QUEUE_D).build();
        }
    
        //绑定
        @Bean
        public Binding queueABindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange){
            //构建工具类来创建Binding,将queueA绑定给xExchange ,RoutingKey为XA。
            return BindingBuilder.bind(queueA).to(xExchange).with("XA");
        }
    
        @Bean
        public Binding queueBBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange){
            //构建工具类来创建Binding,将queueA绑定给xExchange ,RoutingKey为XA。
            return BindingBuilder.bind(queueB).to(xExchange).with("XB");
        }
    
        @Bean
        public Binding queueDBindingY(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange){
            //构建工具类来创建Binding,将queueA绑定给xExchange ,RoutingKey为XA。
            return BindingBuilder.bind(queueD).to(yExchange).with("YD");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96

    2.3 基于死信的 TTL延迟队列 接口生产者(发送消息)


    接口生产者(发送消息)代码:

    • 通过浏览器调用接口发送数据,传给controller再传给延迟队列。
    package com.itholmes.shopping.controller;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.Date;
    
    /**
     *  生产者:
     *      发送延迟消息
     *      通过controller层接口接受浏览器发送过来的信息,进而作为生产者操作。
     */
    
    @Slf4j //slf4j日志
    @RestController
    @RequestMapping("/ttl1")
    public class SendMsgController {
    
        //使用spring公司提供的RabbitTemplate操作
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping("/sendMsg/{message}")
        public void sendMsg(@PathVariable String message){
            //下面的大括号是占位符,对应后面两个占位符。
            log.info("当前事件:{},发送一条信息给两个TTL队列:{}",new Date().toString(),message);
    
            /**
             * rabbitTemplate.convertAndSend()的参数:
             *      参数exchange:交换机名
             *      参数routingKey:对应的routingKey绑定
             *      参数Object:对应发送的消息
             */
            rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列:"+message);
            rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列:"+message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    2.4 基于死信的 TTL延迟队列 消费者代码(处理ttl过时,死信队列里面的消息)


    消费者代码(处理ttl过时,死信队列里面的消息):

    • 监听某个队列。
    • 需要注意这里使用的各种包都是rabbitmq的包。
    package com.itholmes.shopping.consumer;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;
    import java.util.Date;
    
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    
    /**
     * 消费者:
     *      对于延迟队列已经过期的消息会发到死信队列中
     *      之后,消费者负责处理死信队列里面的消息。
     */
    
    @Slf4j
    @Component
    public class DeadLetterQueueConsumer {
    
        //接受消息
        @RabbitListener(queues = "QD")
        public void receiveD(Message message, Channel channel){
            //注意这里导入的要导入rabbitmq的相关包!!
            String msg = new String(message.getBody());
            log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    在这里插入图片描述

    3. 基于死信的 延时队列优化


    上面代码结构,一个过期时间对应一个队列,这样很冗余!

    因此就有了下面的这种优化结构:

    • 下图的QC是不设置TTL过期时间的队列
    • 并且让生产者来决定过期时间。
      在这里插入图片描述

    配置类添加:

    • 声明一个QC队列不单独设置过期时间,让生产者来设置过期时间。
    //优化队列-普通队列的名称
    public static final String QUEUE_C = "QC";
    
    //优化队列配置: 死信配置
    @Bean("queueC")
    public Queue queueC(){
        Map<String ,Object> arguments = new HashMap<>(3);
        //设置死信交换机
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //设置死信routingkey
        arguments.put("x-dead-letter-routing-key","YD");
        return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
    }
    //优化队列配置:绑定配置
    @Bean
    public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    生产者代码:

    //优化后的消息,从生产者发送TTL
    @GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
    public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){
        log.info("当前时间:{},发送一条时长{}毫秒TTL信息给队列QC:{}",new Date().toString(),ttlTime,message);
    
        /**
         *  确定权在生产者手中。
         *      通过写MessagePostProcessor的lamba表达式来操作
         */
    
        rabbitTemplate.convertAndSend("X","XC",message,msg->{
            //设置发送消息的时候 延迟时长 还是要注意这里的单位是毫秒数。
            msg.getMessageProperties().setExpiration(ttlTime);
            return msg;
        });
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    基于死信做延迟队列的巨大缺点:

    • 上面代码有个巨大缺陷就是在生产者发送了两个消息,一个20秒,一个5秒,因为rabbitmq只会检查第一个消息是否过期,如果第一个消息延迟时间很长,第二个消息的延迟时间很短,第二个消息并不会优先的得到执行。
      在这里插入图片描述

    这个问题没办法解决,因此就有了基于插件的延迟队列。

    4. 基于rabbitmq插件的延迟队列

    4.1 安装延时队列插件


    去官方下载rabbitmq_delayed_message_exchange插件。


    将下载的插件,放到rabbitmq目录下的plugins目录下:
    在这里插入图片描述


    在plugins目录下执行使插件生效的命令:

    • 命令:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
      在这里插入图片描述

    之后,重新启动rabbitmq的服务。
    在这里插入图片描述


    安装成功后,交换机就会多了一种类型:
    在这里插入图片描述

    4.2 基于插件 和 基于死信 的延迟队列 原理对比


    基于死信的延迟队列:(是在队列设置TTL实现的)
    在这里插入图片描述

    基于插件的延迟队列:(是在交换机实现延迟效果的)
    在这里插入图片描述

    4.3 基于插件的延迟队列 配置类代码


    在这里插入图片描述

    基于插件的延迟队列声明,配置类:

    package com.itholmes.shopping.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.CustomExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import java.util.HashMap;
    
    @Configuration
    public class DelayedQueueConfig {
    
        //队列
        public static final String DELAYED_QUEUE_NAME = "delayed.queue";
        //交换机
        public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
        //routingKey
        public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
    
        //声明队列
        @Bean
        public Queue delayedQueue(){
            return new Queue(DELAYED_QUEUE_NAME);
        }
    
        //声明交换机,因为这次用的是插件引入的新交换机,所以使用的是自定义交换机
        @Bean
        public CustomExchange delayedExchange(){
    
            HashMap<String, Object> arguments = new HashMap<>();
    
            arguments.put("x-delayed-type","direct");//延迟类型,直接
    
            /**
             *  1.交换机名称
             *  2.交换机的类型
             *  3.是否需要持久化
             *  4.是否需要自动删除
             *  5.其他的参数map
             */
            return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,arguments);
        }
    
        //声明绑定
        @Bean
        public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,@Qualifier("delayedExchange") CustomExchange delayedExchange){
            return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    4.4 基于插件的延迟队列 生产者和消费者代码


    生产者代码:

    //队列
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    //交换机
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    //routingKey
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
    
    //基于插件的延迟队列生产者
    @GetMapping("/sendDelayMsg/{message}/{delayTime}")
    public void sendDelayMsg(@PathVariable String message,@PathVariable Integer delayTime){
        log.info("当前时间:{},发送一条时长{}毫秒信息给延迟队列delayed.queue:{}",new Date().toString(),delayTime,message);
    
        rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME,DELAYED_ROUTING_KEY,message,msg->{
            //这里就要设置 延迟时长 单位依然是毫秒。
            msg.getMessageProperties().setDelay(delayTime);
            return msg;
        });
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    消费者代码:

    package com.itholmes.shopping.consumer;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    
    /**
     * 基于插件的延迟消息
     */
    @Slf4j
    @Component
    public class DelayQueueConsumer {
    
        //队列
        public static final String DELAYED_QUEUE_NAME = "delayed.queue";
        
        //监听消息
        @RabbitListener(queues = DELAYED_QUEUE_NAME)
        public void receiveDelayQueue(Message message){
            String msg = new String(message.getBody());
            log.info("当前事件:{},收到延迟队列的消息:{}",new Date().toString(),msg);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    基于插件的延迟队列,就弥补了基于死信的延迟队里的巨大缺陷问题。因此,优先使用基于插件的延迟队列。

    5. 发布确认高级

    5.1 rabbitmq宕机或者重启的情况


    问题:RabbitMQ由于某些原因宕机或者重启,在这个期间生产者发送的消息投递失败,导致消息丢失,需要手动处理和恢复。
    在这里插入图片描述


    为了解决上面问题,就有了发布确认高级机制。

    5.2 发布确认高级 (配置类+生产者+消费者+回调接口) 解决rabbitmq的交换机出现问题

    在这里插入图片描述
    为了解决这一问题,rabbitmq出现问题,这种情况,我们必须要有一个回调接口!


    因为涉及到回调接口,所以要进行配置:

    • spring.rabbitmq.publisher-confirm-type=correlated 配置开启确认回调。
      在这里插入图片描述

    发布确认高级 配置类:

    package com.itholmes.shopping.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * 发布确认高级 配置类
     */
    @Configuration
    public class ConfirmConfig {
    
        //交换机
        public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
        //队列
        public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
        //RoutingKey
        public static final String CONFIRM_ROUTING_KEY = "key1";
    
        //声明交换机
        @Bean("confirmExchange")
        public DirectExchange confirmExchange(){
            return new DirectExchange(CONFIRM_EXCHANGE_NAME);
        }
        //声明队列
        @Bean("confirmQueue")
        public Queue confirmQueue(){
            return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
        }
        //绑定
        @Bean
        public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,@Qualifier("confirmExchange") DirectExchange confirmExchange){
            return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    发布确认高级 生产者:

    package com.itholmes.shopping.controller;
    
    import com.itholmes.shopping.config.ConfirmConfig;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * 发布确认高级:生产者 开始发消息 测试确认
     */
    @Slf4j
    @RestController
    public class ProducerController {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        //发消息
        @GetMapping("/sendMessage/{message}")
        public void sendMessage(@PathVariable String message){
    
            //对应回调接口的correlationData
            CorrelationData correlationData = new CorrelationData("1");
    
            rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                    ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData);
    
            log.info("发送消息为:{}",message+"key1");
    
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    发布确认高级 消费者:

    package com.itholmes.shopping.consumer;
    
    import com.itholmes.shopping.config.ConfirmConfig;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 发布确认高级:消费者
     */
    @Slf4j
    @Component
    public class Consumer {
        @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
        public void receiveConfirmMessage(Message message){
            String msg = new String(message.getBody());
            log.info("接受消费为confirm.queue的消息:{}",msg);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    发布确认高级 回调接口(重点):

    • 解决rabbitmq的交换机出现问题,产生的消息丢失。
    package com.itholmes.shopping.config;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    
    /**
     * 发布确认高级 回调接口
     *      必须实现RabbitTemplate.ConfirmCallback(函数式接口,可以写lamda表达式)
     */
    @Slf4j
    @Component
    public class MyCallBack implements RabbitTemplate.ConfirmCallback {
    
        /**
         *  此外要将MyCallBack注入到RabbitTemplate.ConfirmCallback,这样才会起作用!!
         */
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         *    注解:@PostConstruct注解
         *              这个注解是其他注解加载完成后,再执行!
         *              这里我们要等到rabbitTemplate注入后,在执行。
         *    可以使用@PostConstruct注解一个方法来完成初始化
         *    ,@PostConstruct注解的方法将会在依赖注入完成后被自动调用。
         */
        @PostConstruct
        public void init(){
            //注入
            rabbitTemplate.setConfirmCallback(this);
        }
    
        /**
         * 该方法就是 交换机确认回调方法
         *
         *      发消息,交换机接收到了的回调:
         *          correlationData:保存回调消息的ID以及相关信息
         *          b(ack): 因为交换机收到了消息,返回true
         *          s(cause): 失败原因,成功为null。
         *
         *      发消息,交换机接受失败了的回调:
         *          correlationData:保存回调消息的ID以及相关信息
         *          b(ack): 交换机未收到消息,返回false
         *          s(cause): 失败原因
         *
         *
         *      correlationData这个参数是生产者参数发出的,生产者不传参,这里就拿不到。
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            //三元运算,拿到ID,预防correlationData为null报错。
            String id = correlationData != null ? correlationData.getId() : "";
            if (ack){
                log.info("交换机已经收到了,ID为{}",id);
            }else {
                log.info("交换机还未收到ID为:{}的消息,由于原因:{}",id,cause);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    以上就仅仅解决了rabbitmq的交换机宕机或者出问题,丢失消息。接下来解决由于队列(routingkey)出现问题,引起的消息丢失。

    5.3 发布确认高级 回退消息


    交换机发送消息给队列,如果由于某些原因没有发送过去,交换机就应该回退消息给生产者,这样消息就不会丢失了。

    在这里插入图片描述


    配置springboot,开启回退消息:

    • spring.rabbitmq.publisher-returns=true
      在这里插入图片描述

    以下整合发布确认和回退消息全部代码:

    配置类:

    • 一定要注入rabbitTemplate!!
    package com.itholmes.shopping.config;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.ReturnedMessage;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    
    /**
     * 发布确认高级
     *  回调接口
     *      必须实现RabbitTemplate.ConfirmCallback(函数式接口,可以写lamda表达式)
     *  回退接口
     *      必须实现RabbitTemplate.ReturnsCallback
     */
    @Slf4j
    @Component
    public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{
    
        /**
         *  此外要将MyCallBack注入到RabbitTemplate.ConfirmCallback,这样才会起作用!!
         */
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         *    注解:@PostConstruct注解
         *              这个注解是其他注解加载完成后,再执行!
         *              这里我们要等到rabbitTemplate注入后,在执行。
         *    可以使用@PostConstruct注解一个方法来完成初始化
         *    ,@PostConstruct注解的方法将会在依赖注入完成后被自动调用。
         */
        @PostConstruct
        public void init(){
            //发布确认注入rabbitTemplate
            rabbitTemplate.setConfirmCallback(this);
            //回退消息注入rabbitTemplate
            rabbitTemplate.setReturnsCallback(this);
            //再次强调不注入是不行的!!
        }
    
        /**
         * 该方法就是 交换机确认回调方法
         *
         *      发消息,交换机接收到了的回调:
         *          correlationData:保存回调消息的ID以及相关信息
         *          b(ack): 因为交换机收到了消息,返回true
         *          s(cause): 失败原因,成功为null。
         *
         *      发消息,交换机接受失败了的回调:
         *          correlationData:保存回调消息的ID以及相关信息
         *          b(ack): 交换机未收到消息,返回false
         *          s(cause): 失败原因
         *
         *
         *      correlationData这个参数是生产者参数发出的,生产者不传参,这里就拿不到。
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            //三元运算,拿到ID,预防correlationData为null报错。
            String id = correlationData != null ? correlationData.getId() : "";
            if (ack){
                log.info("交换机已经收到了,ID为{}",id);
            }else {
                log.info("交换机还未收到ID为:{}的消息,由于原因:{}",id,cause);
            }
        }
    
        /**
         * 可以在消息传递过程中不可达目的地时将消息返回给生产者 ,达到回退效果。
         * returnedMessage对象对应属性:
         *      message:对应消息
         *      replyText:退回原因
         *      exchange:那个交换机
         *      routingKey:那个路由
         *  也别忘记注入到rabbitTemplate里面!!
         */
        @Override
        public void returnedMessage(ReturnedMessage returnedMessage) {
            log.error("消息{},被交换机{}退出,退回原因:{},路由Key:{}"
                    ,new String(returnedMessage.getMessage().getBody()),
                    returnedMessage.getExchange(),
                    returnedMessage.getReplyText(),
                    returnedMessage.getRoutingKey());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90

    生产者:

    package com.itholmes.shopping.controller;
    
    import com.itholmes.shopping.config.ConfirmConfig;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * 发布确认高级:生产者 开始发消息 测试确认
     */
    @Slf4j
    @RestController
    public class ProducerController {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        //发消息
        @GetMapping("/sendMessage/{message}")
        public void sendMessage(@PathVariable String message){
    
            //对应回调接口的correlationData
            CorrelationData correlationData = new CorrelationData("1");
    
            rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                    ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData);
    
            log.info("发送消息为:{}",message+"key1");
    
    
            //模拟队列错误,专门绑定一个不存在的routingkey
            CorrelationData correlationData2 = new CorrelationData("2");
    
            rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                    ConfirmConfig.CONFIRM_ROUTING_KEY+"2",message,correlationData2);
    
            log.info("发送消息为:{}",message+"key2");
    
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    消费者和环境配置类都和发布确认高级一样。

    这样就解决了rabbitmq(交换机或者队列)出现问题,防止消息丢失了。

  • 相关阅读:
    vue基础语法02
    软件工程毕业设计课题(5)基于python的毕业设计python高校新生报到系统毕设作品源码
    图像处理用什么神经网络,神经网络图像处理
    MySQL数据库入门到精通1--基础篇(MySQL概述,SQL)
    ConnectTimeout 和 ConnectionError 的差异
    多线程使用处理数据库导致锁表解决办法
    百望云亮相服贸会 重磅发布业财税融Copilot
    路漫漫远修兮-GeoServer2.16.0版本跨域解决
    【数据结构】串(三)—— KMP 算法
    vue中如何快速的让某个元素全屏 ——利用screenfull插件
  • 原文地址:https://blog.csdn.net/IT_Holmes/article/details/124883148