• SpringACK对RabbitMQ消息的确认(消费)


    SpringAMQP对RabbitMQ消息的确认(消费)

    之前已经简单介绍了基本是从发送方去确认的,我们需要在配置文件当中开启发送方确认模式,共育两种,一种是相对于交换机一个是相对于队列

    本次的介绍是基于消费者对消息的确认,也就是基本的逻辑是消费者对消息处理的确认。

    基本上生产者这边的代码是不需要去改变的,但是我们需要让消费者去正确的人发送到消息。我们按照什么形式都可以,确认与不确认都可以,因为本次主要是为了测试消费端对消息的处理确认。

    首先生产者的配置和相关的代码

    spring:
    #  profiles:
    #    active: dev
      rabbitmq:
        host:  #远程主机外网地址
        username: shabi #远程用户名
        password:  #密码
        virtual-host: shabi #虚拟机名称
        port: 5672 #远程主机端口名称
        publisher-confirm-type: correlated #开启确认模式
        publisher-returns: true
    
    
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    在这里插入图片描述

    然后就是之前我们在测试类当中写的一些发送的各种模式,包括一般的默认发送,以及发送者确认,以及发送者回执。
    然后具体的配置类就是真不要进行了队列和交换机的声明和创建,然后进行了具体绑定。

    package com.jgdabc.rabbitconfig;
    
    import com.rabbitmq.client.ConnectionFactory;
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitConfig {
        //交换机
        public static final String Exchange_Name = "boot_rabbit_topic_ee";
        public static final String Queue_Name = "boot_rabbit_topic_qqq";
        @Bean("bootExchange") //交换机的创建
        public Exchange bootExchange()
        {
            return ExchangeBuilder.topicExchange(Exchange_Name).durable(true).build(); //绑定一个topic类型的交换机,持久化并构建
        }
    
    
        @Bean("bootQueue") //队列的创建
        public Queue bootQueue()
        {
            return QueueBuilder.durable(Queue_Name).build();
        }
    //    队列和交换机的绑定关系
    //    哪个队列
    //  哪个交换机
    //    routing key
    //    这里不写的话会按照方法名注入
        @Bean
        public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange)
        {
            return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
        }
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    package com.jgdabc;
    
    import com.jgdabc.rabbitconfig.RabbitConfig;
    
    import lombok.extern.slf4j.Slf4j;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    
    import org.springframework.amqp.core.ReturnedMessage;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.context.annotation.ComponentScan;
    import org.springframework.context.annotation.Import;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import java.util.*;
    import java.util.stream.IntStream;
    @Slf4j
    @SpringBootTest
    @RunWith(SpringRunner.class)
    public class DemoApplicationTests {
    
    
        //    注入RabbitTemplate
        @Autowired
        private RabbitTemplate template;
    
        @Test
        public void testSend() {
            template.convertAndSend(RabbitConfig.Exchange_Name, "boot.haha", "hi");
    
        }
    
        /**
         * 在yml配置文件当中开启去人模式
         * 在RabbitTemplate定义ConfirmCallBack回调函数
         */
        @Test
        public void testConfirm() {
            //定义回调
            template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    
                @Override
                public void confirm(CorrelationData correlationData, boolean b, String s) {
                    System.out.println(b);
    
                    System.out.println("confirm 方法被执行了");
                    if (!b) {
                        //接收成功
                        System.out.println("消息成功接收");
    
                    } else {
                        System.out.println("消息接受失败," + b);
    
                    }
                }
            });
            //发送一条消息
            template.convertAndSend(RabbitConfig.Exchange_Name, "boot.haha", "你好,我的小宝贝");
    
        }
    //    回退模式,当消息发送给Exchange后,Exchange路由到Queue失败后才会执行ReturnCallBack
    
        /**
         * 回退模式
         * 1:在yml文件当中开启回退模式
         * 2:设置ReturnCallBack
         * 3:设置Exchange处理消息的模式
         * <1:如果消息没有路由到Queue,那么丢弃掉消息(默认)
         * <2:如果路由没有回退到Queue,返回给消息发送方
         */
        @Test
        public void testReturn() {
    //        设置交换机处理消息的模式
            template.setMandatory(true);//设置为true交换机会将路由到队列失败的消息再返回给发送者
            template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
                @Override
                public void returnedMessage(ReturnedMessage returnedMessage) {
                    System.out.println("消息对象:" + returnedMessage.getMessage());
                    System.out.println("错误码:" + returnedMessage.getReplyCode());
                    System.out.println("错误信息:" + returnedMessage.getReplyText());
                    System.out.println("交换机:" + returnedMessage.getExchange());
                    System.out.println("路由键:" + returnedMessage.getRoutingKey());
    
    
                    System.out.println("return执行了...");
                }
            });
            template.convertAndSend(RabbitConfig.Exchange_Name, "boot.haha", "hi");
        }
    
    
    
        }
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99

    然后是这次主要介绍的消费端。

    先看配置

    spring:
      rabbitmq:
        host: 
        username: 
        password: 
        virtual-host:
        port: 5672
    #    publisher-confirm-type: correlated
    #    publisher-returns: true
    #    开启ack也就是手动消息确认
        listener:
    #      设置手动确认
          simple:
            acknowledge-mode: manual
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    具体的类,

    package com.jgdabc.boot_rabbit_consumer;
    
    import com.rabbitmq.client.Channel;
    
    
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener;
    
    
    import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.util.List;
    
    /**
     * consumer ack 机制
     * 设置手动签收,acknowledge = “manual”
     * 如果消息成功处理,则调用channel的basicAck签收
     * // * 如果消息处理失败,则调用channel的basicNack拒绝签收,broker重新发送给consumer
     */
    @Component
    public class ConsumerSpringbootApplication implements ChannelAwareMessageListener {
        @RabbitListener(queues = "boot_rabbit_topic_qqq") //指定要消费消息的队
        public void onMessage(Message message, Channel channel) throws Exception {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try {
                System.out.println("接收转换消息:" + new String(message.getBody()));
    //            手动签收
                channel.basicAck(deliveryTag, true);
    
            } catch (IOException e) {
                channel.basicNack(deliveryTag, true, true);
    
            }
    //        第二个参数代表运行多条消息被签收
    //        拒绝签收,第三个参数重回队列,如果设置为true,则消息重新回到队列
    
    
        }
    
        public void onMessage(Message message) {
    
    //        System.out.println(message);
    
        }
    
    
    
    
        }
    
    
    
    
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    这个方法具体没有用,之所以写上,是因为我实现上边那个类的时候,如果不实现这个方法的话,那么启动就会报错。所以就写上了。
    在这里插入图片描述
    然后主要在说明一些参数

    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    message.getMessageProperties ().getMessageId () 获取 MessageID,获取的 MessageID 可以用来判断是否已经被消费者消费过了,如果已经消费则取消再次消费。

    下面这里加了一个异常的捕获,因为可能消费者这个处理消息出错,所以进行了异常的捕获。首先一定是接收了具体的消息。然后会进行一个签收

    channel.basicAck (long deliveryTag, boolean multiple)为消息确认,参数1:消息的id;参数2:是否批量应答。

    basic.nack方法为不确认deliveryTag对应的消息,第二个参数是否应用于多消息,第三个参数是否requeue,与basic.reject区别就是同时支持多个消息,可以nack该消费者先前接收未ack的所有消息。nack后的消息也会被自己消费到。

    try {
                System.out.println("接收转换消息:" + new String(message.getBody()));
    //            手动签收
                channel.basicAck(deliveryTag, true);
    
            } catch (IOException e) {
                channel.basicNack(deliveryTag, true, true);
    
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    这里只是列举一些方法的使用,当然还有其他的方法,后面慢慢来熟悉好了。打开这个管理面板,可以看到没有队列,这里提前已经删除掉之前的创建好的队列和交换机了,为的是为了是运行展示后的效果比较明显一些。
    交换机和队列都是可以在程序中创建和绑定的。
    在这里插入图片描述
    现在我们在生产者测试类去生产一条消息。可以随便去用一个方法就可以了。
    我们就运行这个方法
    在这里插入图片描述
    因为没有做错误,所以不会有错误信息输出的。

    现在我们去面板看,可以看到这里就自动创建出来队列和生产了一条消息,当然交换机的创建和队列的绑定也是执行了。

    在这里插入图片描述
    现在我们在消费者去消费,执行的话,我们就去执行启动类就好。
    在这里插入图片描述
    因为我们这个类加上了这个注解,其实就是已经实例化给spring了。表明了已经成为spring的一个组件,所以直接去启动启动运行类就好了。
    在这里插入图片描述
    你看这里就接收到消息了,并且会处于一个持续运行的等待过程。
    在这里插入图片描述
    同时消费处理成功验证。
    在这里插入图片描述

    现在我们可以去让程序出错,来验证消息处理失败情况。
    我们在签收之前让代码出一个错。
    在这里插入图片描述

    哦对了,这个异常是算数异常,我们之前捕获一个大的异常算了。

    下面那段改成这样。

    在这里插入图片描述

    现在重新开始之前的步骤。然后这里器是会一直打印这段话,主要是因为我们设置basic.nack方法为不确认deliveryTag对应的消息,第二个参数是否应用于多消息,第三个参数是否requeue。我们这里出现异常,第二个参数为true,代表不确认,第三个代表重新让它回到队列,设置为true该行消息重新回到队列,但是我们这里会持续接收进行接收消费,于是来来回回就形成了死循环。

    在这里插入图片描述
    同时验证我们这里设置的重回队列确实生效。

    在这里插入图片描述

    大概就是这样的一个模式,当热这种处理模式并不是合适的,主要是举个例子,其他的方法处理模式顺着这个模板来就行了。

    主要是为了忘记后好回顾,必要的时候直接就地取材。

  • 相关阅读:
    Nuxt.js的详细使用
    求臻人故事 | 在求臻医学的沃土中,我像竹子般茁壮成长
    引用计数法
    Linux-管理文件系统及存储
    三、视频设备的枚举以及插拔检测
    Java 序列化和反序列化为什么要实现 Serializable 接口?
    【DevPress】V2.4.3版本发布,增加内容收录管理
    解决arm-none-eabi-gcc交叉编译helloworld程序segmentation fault 错误
    Revit翻模技巧丨怎么一次性翻转所有墙体?
    【苹果推位置信息推iMessage】 l AttributeIDs:NC希望读取的变量ID列表
  • 原文地址:https://blog.csdn.net/jgdabc/article/details/126251142