• RabbitMQ之死信队列解读


    目录

    基本介绍

    消息进入到死信队列的情况

    消息过期

    队列过期 

     队列达到最大长度(先入队的消息会被发送到DLX)

    消费者拒绝消息不进行重新投递

    消费者拒绝消息

    springboot代码实战

    实战架构

    工程概述

    RabbitConfigDeal 配置类:创建队列及交换机并进行绑定 

    MessageService业务类:发送消息及接收消息

    主启动类RabbitMq01Application:实现ApplicationRunner接口


    基本介绍

    什么是死信交换机

    在定义业务队列的时候,要考虑指定一个死信交换机,死信交换机可以和任何一个普通的队列进行绑定,然后在业务队列出现死信的时候就会将数据发送到死信队列。

    什么是死信队列

    死信队列实际上就是一个普通的队列,只是这个队列跟死信交换机进行了绑定,用来存放死信而已

    RabbitMQ 中有一种交换器叫 DLX,全称为 Dead-Letter-Exchange,可以称之为死信交换器。当消息在一个队列中变成死信(dead message)之后,它会被重新发送到另外一个交换器中,这个交换器就是 DLX,绑定在 DLX 上的队列就称之为死信队列。 

    要注意的是,DLX 也是一个正常的交换器,和一般的交换器没有区别,它能在任何队列上被指定,实际上就是设置某个队列的属性。当这个队列存在死信时,RabbitMQ 就会自动地将这个消息重新发布到设置的 DLX 上去,进而被路由到另一个队列,即死信队列。

    消息进入到死信队列的情况

    消息过期

    1. MessageProperties messageProperties=new MessageProperties();
    2. //设置此条消息的过期时间为10
    3. messageProperties.setExpiration("10000");

    队列过期 

    1. Map<String, Object> arguments =new HashMap<>();
    2. //指定死信交换机,通过x-dead-letter-exchange 来设置
    3. arguments.put("x-dead-letter-exchange",EXCHANGE_DLX);
    4. //设置死信路由keyvalue 为死信交换机和死信队列绑定的key
    5. arguments.put("x-dead-letter-routing-key",BINDING_DLX_KEY);
    6. //队列的过期时间
    7. arguments.put("x-message-ttl",10000);
    8. return new Queue(QUEUE_NORMAL,true,false,false,arguments);

     TTL: Time to Live的简称,过期时间

    队列达到最大长度(先入队的消息会被发送到DLX)

    1. Map<String, Object> arguments = new HashMap<String, Object>();
    2. //设置队列的最大长度 ,对头的消息会被挤出变成死信
    3. arguments.put("x-max-length", 5);

    消费者拒绝消息不进行重新投递

    从正常的队列接收消息,但是对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列。

    application.yml 启动手动确认

    1. spring:
    2. rabbitmq:
    3. listener:
    4. simple:
    5. acknowledge-mode: manual
    1. /**
    2. * 监听正常的那个队列的名字,不是监听那个死信队列
    3. * 我们从正常的队列接收消息,但是对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列
    4. *
    5. * channel 消息信道(是连接下的一个消息信道,一个连接下有多个消息信息,发消息/接消息都是通过信道完成的)
    6. */
    7. @RabbitListener(queues = {RabbitConfig.QUEUE})
    8. public void process(Message message, Channel channel) {
    9. System.out.println("接收到的消息:" + message);
    10. //对消息不确认, ack单词是 确认 的意思
    11. try {
    12. System.out.println("deliveryTag = " + message.getMessageProperties().getDeliveryTag());
    13. //要开启rabbitm消息消费的手动确认模式,然后才这么写代码;
    14. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
    15. } catch (IOException e) {
    16. e.printStackTrace();
    17. }
    18. }

    void basicNack(long deliveryTag, boolean multiple, boolean requeue)

    • deliveryTag:消息的一个数字标签
    • multiple:翻译成中文是多个的意思,如果是true表示对小于deliveryTag标签下的消息都进行Nack不确认,false表示只对当前deliveryTag标签的消息Nack
    • requeue:如果是true表示消息被Nack后,重新发送到队列,如果是false,消息被Nack后,不会重新发送到队列

    消费者拒绝消息

    开启手动确认模式,并拒绝消息,不重新投递,则进入死信队列

    1. /**
    2. * 监听正常的那个队列的名字,不是监听那个死信队列
    3. * 我们从正常的队列接收消息,但是对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列
    4. *
    5. * channel 消息信道(是连接下的一个消息信道,一个连接下有多个消息信息,发消息/接消息都是通过信道完成的)
    6. */
    7. @RabbitListener(queues = {RabbitConfig.QUEUE})
    8. public void process(Message message, Channel channel) {
    9. System.out.println("接收到的消息:" + message);
    10. try {
    11. System.out.println("deliveryTag = " + message.getMessageProperties().getDeliveryTag());
    12. //要开启rabbitm消息消费的手动确认模式,然后才这么写代码;
    13. channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
    14. } catch (IOException e) {
    15. e.printStackTrace();
    16. }
    17. }
    18. }

     channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

    • basicReject是否定的交付,一般在消费消息时出现异常等的时候执行。可以将该消息丢弃或重排序去重新处理消息
    • 参数1: 消费消息的index
    • 参数2: 对异常消息的处理,true表示重排序,false表示丢弃
    • Reject 在拒绝消息时,可以使用 requeue 标识,告诉 RabbitMQ 是否需要重新发送给别的消费者。如果是 false 则不重新发送,一般这个消息就会被RabbitMQ 丢弃。Reject 一次只能拒绝一条消息。如果是 true 则消息发生了重新投递。
    • Nack 跟 Reject 类似,只是它可以一次性拒绝多个消息。也可以使用 requeue 标识,这是 RabbitMQ 对 AMQP 规范的一个扩展。
    • 通过 RejectRequeuConsumer 可以看到无论是使用 Reject 方式还是 Nack 方式,当 requeue
    • 参数设置为 true 时,消息发生了重新投递。当 requeue 参数设置为 false 时,消息丢失了。

    springboot代码实战

    实战架构

    如上图,消息到达正常的交换机exchange.nomal.a,通过与正常的队列queue.noaml.a绑定,消息会到达正常队列,如果消息变为死消息以后则会转发到与正常队列绑定的死信交换机中,死信交换机会转发到与其绑定的死信队列queue.deal.a。

    工程概述

     工程采用springboot架构,主要用到的依赖为:

    1. <!-- rabbit的依赖-->
    2. <dependency>
    3. <groupId>org.springframework.boot</groupId>
    4. <artifactId>spring-boot-starter-amqp</artifactId>
    5. </dependency>
    6. <dependency>
    7. <groupId>org.projectlombok</groupId>
    8. <artifactId>lombok</artifactId>
    9. </dependency>

    application.yml配置文件如下:

    1. server:
    2. port: 8080
    3. spring:
    4. rabbitmq:
    5. host: 123.249.70.148
    6. port: 5673
    7. username: admin
    8. password: 123456
    9. virtual-host: /

    RabbitConfigDeal 配置类:创建队列及交换机并进行绑定 

    1. @Configuration
    2. public class RabbitConfigDeal {
    3. }

    创建正常交换机

    1. @Bean
    2. public DirectExchange normalExchange(){
    3. return ExchangeBuilder.directExchange("exchange.normal.a").build();
    4. }

    创建死信交换机

    1. @Bean
    2. public DirectExchange deadExchange(){
    3. return ExchangeBuilder.directExchange("exchange.dead.a").build();
    4. }

     创建死信队列

    1. @Bean
    2. public Queue deadQueue(){
    3. return QueueBuilder.durable("queue.dead.a").build();
    4. }

     创建正常队列,设置他的绑定死信交换机,以及对应绑定的路由key为order

    1. @Bean
    2. public Queue normalQueue(){
    3. Map arguments =new HashMap<>();
    4. arguments.put("x-message-ttl",20000);
    5. arguments.put("x-dead-letter-exchange","exchange.dead.a");
    6. arguments.put("x-dead-letter-routing-key","order");
    7. return QueueBuilder.durable("queue.normal.a")
    8. .withArguments(arguments).build();
    9. }

    绑定正常交换机和正常队列

    1. @Bean
    2. public Binding bindingNormal(DirectExchange normalExchange,Queue normalQueue){
    3. return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");
    4. }

     绑定死信交换机和死信队列

    1. @Bean
    2. public Binding bindingDeal(DirectExchange deadExchange,Queue deadQueue){
    3. return BindingBuilder.bind(deadQueue).to(deadExchange).with("order");
    4. }

    MessageService业务类:发送消息及接收消息

    1. @Component
    2. @Slf4j
    3. public class MessageService {
    4. @Resource
    5. private RabbitTemplate rabbitTemplate;
    6. }

     发送消息方法

    1. public void sendMsg(){
    2. //添加消息属性
    3. Message message = MessageBuilder.withBody("hello word!".getBytes(StandardCharsets.UTF_8))
    4. .build();
    5. rabbitTemplate.convertAndSend("exchange.normal.a","order",message);
    6. log.info("发送消息时间:{}",new Date());
    7. }

    这里用的路由key为info 

    MessageConvert

    • 涉及网络传输的应用序列化不可避免,发送端以某种规则将消息转成 byte 数组进行发送,接收端则以约定的规则进行 byte[] 数组的解析
    • RabbitMQ 的序列化是指 Messagebody 属性,即我们真正需要传输的内容,RabbitMQ 抽象出一个 MessageConvert 接口处理消息的序列化,其实现有 SimpleMessageConverter默认)、Jackson2JsonMessageConverter

      接受消息

    1. @RabbitListener(queues = {"queue.dead.a"})
    2. public void receiveMsg(Message message){
    3. byte[] body = message.getBody();
    4. String queue = message.getMessageProperties().getConsumerQueue();
    5. String msg=new String(body);
    6. log.info("{}接收到消息时间:{},消息为{}",queue,new Date(),msg);
    7. }

     Message

    在消息传递的过程中,实际上传递的对象为 org.springframework.amqp.core.Message ,它主要由两部分组成:

    1. MessageProperties // 消息属性

    2. byte[] body // 消息内容

    @RabbitListener

    使用 @RabbitListener 注解标记方法,当监听到队列 debug 中有消息时则会进行接收并处理

    • 消息处理方法参数是由 MessageConverter 转化,若使用自定义 MessageConverter 则需要在 RabbitListenerContainerFactory 实例中去设置(默认 Spring 使用的实现是 SimpleRabbitListenerContainerFactory)

    • 消息的 content_type 属性表示消息 body 数据以什么数据格式存储,接收消息除了使用 Message 对象接收消息(包含消息属性等信息)之外,还可直接使用对应类型接收消息 body 内容,但若方法参数类型不正确会抛异常:

      • application/octet-stream:二进制字节数组存储,使用 byte[]
      • application/x-java-serialized-object:java 对象序列化格式存储,使用 Object、相应类型(反序列化时类型应该同包同名,否者会抛出找不到类异常)
      • text/plain:文本数据类型存储,使用 String
      • application/json:JSON 格式,使用 Object、相应类型

    主启动类RabbitMq01Application:实现ApplicationRunner接口

    1. /**
    2. * @author 风轻云淡
    3. */
    4. @SpringBootApplication
    5. public class RabbitMq01Application implements ApplicationRunner {
    6. public static void main(String[] args) {
    7. SpringApplication.run(RabbitMq01Application.class, args);
    8. }
    9. @Resource
    10. private MessageService messageService;
    11. /**
    12. * 程序一启动就会调用该方法
    13. * @param args
    14. * @throws Exception
    15. */
    16. @Override
    17. public void run(ApplicationArguments args) throws Exception {
    18. messageService.sendMsg();
    19. }
    20. }

    在SpringBoot中,提供了一个接口:ApplicationRunner。 该接口中,只有一个run方法,他执行的时机是:spring容器启动完成之后,就会紧接着执行这个接口实现类的run方法。

    由于该方法是在容器启动完成之后,才执行的,所以,这里可以从spring容器中拿到其他已经注入的bean。

    启动主启动类后查看控制台:

    1. 2023-09-28 10:46:17.772 INFO 71700 --- [ main]
    2. c.e.rabbitmq01.service.MessageService :
    3. 发送消息时间:Thu Sep 28 10:46:17 CST 2023
    4. 2023-09-28 10:46:37.824 INFO 71700 --- [ntContainer#0-1]
    5. c.e.rabbitmq01.service.MessageService :
    6. queue.dead.a接收到消息时间:Thu Sep 28 10:46:37 CST 2023,消息为hello word!

    我们在这里可以看见17s的时候发送了消息,在经过了20s,即到37s的时候我们在死信队列queue.dead.a接受到了消息。

  • 相关阅读:
    【Linux 服务器运维】定时任务 crontab 详解 | 文末送书
    简单使用USB rndis驱动
    机器学习练习——熔池状态识别
    【Xilinx】基于MPSoC的OpenAMP实现(二)
    关于微前端,你理解到究极奥义了么?
    SQL中的CASE WHEN语句:从基础到高级应用指南
    使用EISeg自动标注数据,yolov5训练模型(保姆教程)
    java毕业设计校园美食评价系统mybatis+源码+调试部署+系统+数据库+lw
    理解TCP协议三次握手、四次挥手、流量控制、拥塞控制 、重传机制
    Windows安装SSH教程
  • 原文地址:https://blog.csdn.net/m0_62436868/article/details/133375821