前面将RabbitMQ时提到过延迟队列这个概念,延迟队列经典应用场景莫过于订单超时未支付自动关闭。想要使用延时队列实现该功能首先需要了解RabbitMQ以下两个特性:
队列消息自动删除(TTL)
死信队列(dead-letter-exchange)
RabbitMQ(三) – 消息与队列进阶一文中第五节详细讲过TTL分为三类:
自然本文中会通过Spring整合RabbitMQ的方式再重温以上内容设置与效果
单消息的TTL比较简单,只需要在生产者发送消息时设置MessageProperties对象的expiration属性即可。可以自行动手看看实践效果
@PostMapping("/messageTtl")
public String messageTtl(){
TestPojo messageBody = new TestPojo("zsl",23);
Gson gson = new Gson();
String bodyJson = gson.toJson(messageBody);
MessageProperties properties = new MessageProperties();
// 设置消息自动过期时长,单位ms
properties.setExpiration("10000");
Message message = new Message(bodyJson.getBytes(),properties);
rabbitTemplate.convertAndSend("baseFanoutExchange","",message);
return "success";
}
队列TTL并不等于队列中消息到时自动删除,而是当队列在超过多长时间没有被进行使用后自动删除。这个需要在声明队列时进行如下配置:
<!--定义TTL队列-->
<rabbit:queue id="ttlQueue" name="TTLQueue">
<rabbit:queue-arguments>
<!--声明队列TTL,单位ms-->
<entry key="x-expires">
<value type="java.lang.Integer">30000</value>
</entry>
</rabbit:queue-arguments>
</rabbit:queue>
通过WEB控制面板可以看到该队列被标记为EXP,等待30S后该队列自动删除
如果想要将整个队列中的消息都设置为某个时间自动删除则可以使用队列消息TTL配置,该实现进行下述配置:
<!--定义队列-->
<rabbit:queue id="baseQueue" name="baseQueue">
<rabbit:queue-arguments>
<!--定义队列中消息存活时间,单位ms-->
<entry key="x-message-ttl">
<value type="java.lang.Integer">50000</value>
</entry>
</rabbit:queue-arguments>
</rabbit:queue>
通过WEB控制面板可以看到该队列被标记为TTL,队列中消息存储50S后将自动删除
通过队列消息TTL实现可以完成订单关闭前半部分,后半部分就是当有队列消息过期时如何得知?这时候就需要用到死信队列,死信队列绑定以后如下三种情况丢失的消息会转发到死信队列中:
x-max-length/x-max-length-bytes:限制消息数量/大小的场景下,队列采用默认策略即x-overflow参数值为drop-head丢弃的消息
队列中过期的消息,包括单消息TTL、队列消息TTL等策略过期消息
消费者手动ACK的情况下,拒绝确认消息后未将消息返回到队列中的消息
通过以下配置就可以实现死信队列,有一点需要特别注意。参数x-dead-letter-routing-key声明的是发送到死信交换器的消息的routingKey,如果没有配置该属性,转发到死信交换器的消息将以原有的routingKey投递到死信队列。比如通过routingKey为base.routing投递到了队列中,过了30分钟后该消息自动过期,如果在没有配置死信路由x-dead-letter-routing-key参数的情况下,转发到死信交换器的消息的routingKey就是base.routing
<!--定义基础队列-->
<rabbit:queue id="baseQueue" name="baseQueue">
<rabbit:queue-arguments>
<!--定义队列中消息存活时间-->
<entry key="x-message-ttl">
<value type="java.lang.Integer">50000</value>
</entry>
<!--声明绑定死信交换器-->
<entry key="x-dead-letter-exchange">
<value type="java.lang.String">deadLetterExchange</value>
</entry>
<!--声明死信路由routingKey-->
<entry key="x-dead-letter-routing-key">
<value type="java.lang.String">dead.letter</value>
</entry>
</rabbit:queue-arguments>
</rabbit:queue>
<!--死信交换器队列-->
<rabbit:queue id="deadLetterQueue" name="deadLetterQueue"/>
<!--定义死信交换器-->
<rabbit:direct-exchange id="deadLetterExchange" name="deadLetterExchange">
<rabbit:bindings>
<rabbit:binding key="dead.letter" queue="deadLetterQueue"/>
</rabbit:bindings>
</rabbit:direct-exchange>