原理:rabbitmq的消息TTL和死信Exchange结合
消息的TTL就是消息的存活时间。
RabbitMQ可以对队列和消息分别设置TTL。
对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。
如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的
TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者x- message-ttl属性来设置时间,两者是一样的效果。
一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列, 一个路由可以对应很多队列。
一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。(basic.reject/ basic.nack)requeue=false。
上面的消息的TTL到了,消息过期了。
队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。
Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有 消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息被路由到某一个指定的交换机,结合二者,其实就可以实现一个延时队列。
案例:

- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.Exchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.core.TopicExchange;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import java.util.HashMap;
-
- @Configuration
- public class MyMQConfig {
-
- //死信队列
- @Bean
- public Queue orderDelayQueue(){
- HashMap
arguments = new HashMap<>(); - /**
- * x-dead-letter-exchange: order-event-exchange
- * x-dead-letter-routing-key: order.release.order
- * x-message-ttl: 60000
- */
- arguments.put("x-dead-letter-exchange", "order-event-exchange");
- arguments.put("x-dead-letter-routing-key", "order.release.order");
- arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
- /**
- * String name, 队列名字
- * boolean durable, 是否持久化
- * boolean exclusive, 是否排他
- * boolean autoDelete, 是否自动删除
- * Map
arguments) 属性 - */
- //String name, boolean durable, boolean exclusive, boolean autoDelete, Map
arguments - return new Queue("order.delay.queue", true, false, false,arguments);
- }
-
- //普通队列
- @Bean
- public Queue orderReleaseOrderQueue(){
- return new Queue("order.release.order.queue", true, false, false);
- }
-
- @Bean
- public Exchange orderEventExchange(){
- //String name, boolean durable, boolean autoDelete
- return new TopicExchange("order-event-exchange", true, false);
- }
-
- @Bean
- public Binding orderCreateOrderBinding(){
- //String destination, Binding.DestinationType destinationType,
- //String exchange, String routingKey, Map
arguments - return new Binding("order.delay.queue",
- Binding.DestinationType.QUEUE,
- "order-event-exchange",
- "order.create.order",
- null);
- }
-
- @Bean
- public Binding orderReleaseOrderBinding(){
- return new Binding("order.release.order.queue",
- Binding.DestinationType.QUEUE,
- "order-event-exchange",
- "order.release.order",
- null);
- }
-
- }
使用注入bean的方式能自动生成没有的队列相关内容(注意,一旦生成需要手动删除才能重新生成,无法覆盖),启动项目后进入rabbitmq配置页可以看到:


监听队列:
- @RabbitListener(queues = "order.release.order.queue")
- public void listener(OrderEntity orderEntity, Channel channel, Message message){
- System.out.println("收到过期的订单,准备释放订单"+orderEntity.getOrderSn());
- try {
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
模拟生成订单:
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @ResponseBody
- @GetMapping(value = "/test/createOrder")
- public String createOrderTest() {
- //订单下单成功
- OrderEntity orderEntity = new OrderEntity();
- orderEntity.setOrderSn(UUID.randomUUID().toString());
- orderEntity.setModifyTime(new Date());
- //给MQ发送消息
- rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",orderEntity);
- return "ok";
- }
再看队列:

等待一分钟看控制台:

再返回查看队列,发现消息已经不见了
