在RabbitMQ里面有一些队列参数,在我们的图形化界面可以看到下面这些信息;
一共10个:


一个队列中的消息,在被丢弃之前能够存活多少毫秒.( key 为 "x-message-ttl").通俗讲就是,队列中的消息的生存周期,单位毫秒.
RabbitMQ可以对消息和队列设置TTL。目前有两种方法可以设置。
如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message被投递到死信队列, 消费者将无法再收到该消息。
根据官方描述,它有key肯定是一个map键值对,用代码怎么定义呢。我们先对队列设置过期时间


对消息单独设置过期时间,在业务代码里面实现

- //给消息设置过期时间
- MessagePostProcessor messagePostProcessor=new MessagePostProcessor() {
- @Override
- public Message postProcessMessage(Message message) throws AmqpException {
- message.getMessageProperties().setExpiration("5000"); //设置过期时间
- message.getMessageProperties().setContentEncoding("UTF-8"); //设置编码
- return message;
- }
- };
- rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer,messagePostProcessor);
设置消息过期和设置队列参数还有一个区别,单独设置消息过期是直接到期删除消息,而队列是打上了个标签TTL,在队列中的消息到期后,之后有个死信队列来接收,而不会直接删除,这是它的好处。
官方 : Optional name of an exchange to which messages will be republished if they are rejected or expire.(Sets the "x-dead-letter-exchange" argument.)
翻译 : 该参数值为一个(死信)交换机的名称,当队列中的消息的生存期到了,或者因长度限制被丢弃时,消息会被推送到(绑定到)这台交换机(的队列中),而不是直接丢掉.
DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。
要想使用死信队列,只需要在定义队列的时候设置队列参数 x-dead-letter-exchange 指定交换机即可。


Dead letter routing key是在需要routing key的交换机模式里面才需要配置的,如fanout模式就不用配置。
注意:如果你之前已经声明了这个队列,这次往队列里面新增了参数再去声明,它是不会去覆盖或者更新的,而是会报错,我们正常需要先删除这个队列,再去重新声明!

其实死信队列和普通队列是一样的,其实就是把普通队列绑定到过期队列。就相当于删除照片之后会到最近删除里,然后最近删除TTL为30天。死信队列在实际开发中可以用来实现订单超时业务。
可以看一下我在网上找的别人的一个绑定的案例,它的TTL是在消息里单独设置的。
- package com.xiaojie.springboot.config;
-
- import org.springframework.amqp.core.*;
- import org.springframework.context.annotation.Bean;
- import org.springframework.stereotype.Component;
-
- import java.util.HashMap;
- import java.util.Map;
-
- /**
- * @author xiaojie
- * @version 1.0
- * @description:死信队列配置
- * @date 2021/10/8 21:07
- */
- @Component
- public class DLXConfig {
- //定义队列
- private static final String MY_DIRECT_QUEUE = "snail_direct_queue";
- //定义队列
- private static final String MY_DIRECT_DLX_QUEUE = "xiaojie_direct_dlx_queue";
- //定义死信交换机
- private static final String MY_DIRECT_DLX_EXCHANGE = "xiaojie_direct_dlx_exchange";
- //定义交换机
- private static final String MY_DIRECT_EXCHANGE = "snail_direct_exchange";
- //死信路由键
- private static final String DIRECT_DLX_ROUTING_KEY = "msg.dlx";
-
- //绑定死信队列
- @Bean
- public Queue dlxQueue() {
- return new Queue(MY_DIRECT_DLX_QUEUE);
- }
- //绑定死信交换机
- @Bean
- public DirectExchange dlxExchange() {
- return new DirectExchange(MY_DIRECT_DLX_EXCHANGE);
- }
- @Bean
- public Queue snailQueue() {
- Map<String, Object> args = new HashMap<>();
- // 绑定我们的死信交换机
- args.put("x-dead-letter-exchange", MY_DIRECT_DLX_EXCHANGE);
- // 绑定我们的路由key
- args.put("x-dead-letter-routing-key", DIRECT_DLX_ROUTING_KEY);
- return new Queue(MY_DIRECT_QUEUE, true, false, false, args);
- }
- @Bean
- public DirectExchange snailExchange() {
- return new DirectExchange(MY_DIRECT_EXCHANGE);
- }
- //绑定队列到交换机
- @Bean
- public Binding snailBindingExchange(Queue snailQueue, DirectExchange snailExchange) {
- return BindingBuilder.bind(snailQueue).to(snailExchange).with("msg.send");
- }
- //绑定死信队列到死信交换机
- @Bean
- public Binding dlxBindingExchange(Queue dlxQueue, DirectExchange dlxExchange) {
- return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(DIRECT_DLX_ROUTING_KEY);
- }
- }
官方 : How many (ready) messages a queue can contain before it starts to drop them from its head.(Sets the "x-max-length" argument.)
翻译 : 队列可以容纳的消息的最大条数,超过这个条数,队列头部的消息将会被丢弃.
Max length还是跟死信队列有关,超过设置最大长度的消息会被送进死信队列。如:发送了十一条消息,max length=5,超过的六条直接送进死信队列。如果没有死信队列,就直接丢失。

官方 : How long a queue can be unused for before it is automatically deleted (milliseconds).(Sets the "x-expires" argument.)
翻译 : 队列多长时间(毫秒)没有被使用(访问)就会被删除.换个说法就是,当队列在指定的时间内没有被使用(访问)就会被删除.
- @Bean
- public Queue autoexpirequeue(){
- Map<String, Object> args = new HashMap<String, Object>();
- args.put("x-expires", 1800000);
- return new Queue("myqueue", false, false, false, args);
-
- }
官方 : Maximum number of priority levels for the queue to support; if not set, the queue will not support message priorities.(Sets the "x-max-priority" argument.)
翻译 : 设置该队列中的消息的优先级最大值.发布消息的时候,可以指定消息的优先级,优先级高的先被消费.如果没有设置该参数,那么该队列不支持消息优先级功能.也就是说,就算发布消息的时候传入了优先级的值,也不会起什么作用.
优先级加载队列上,就是一个优先级队列 【堆,大根堆,小根堆】
1、给队列贴上一个优先级的标签 x-max-priority=10
2、给message打上具体的优先级的value value<=10
- Map<String, Object> args = new HashMap<String, Object>();
- args.put("x-max-priority",10);
- return new Queue("myqueue", false, false, false, args);
官方 : Set the queue into lazy mode, keeping as many messages as possible on disk to reduce RAM usage; if not set, the queue will keep an in-memory cache to deliver messages as fast as possible.(Sets the "x-queue-mode" argument.)
翻译 : 设置队列为懒人模式.该模式下的队列会先将交换机推送过来的消息(尽可能多的)保存在磁盘上,以减少内存的占用.当消费者开始消费的时候才加载到内存中;如果没有设置懒人模式,队列则会直接利用内存缓存,以最快的速度传递消息.