目录
c)刚刚设定的RabbitMQ的数据卷名称为`mq-plugins`,所以我们使用下面命令查看数据卷:
想要知道什么是死信交换机,先来看看什么是死信(dead letter)~
当生产者发送了一个消息,经过交换机到达队列时,满足下列情况之一时,就可以成为死信:
那么如果这个时候,一个队列配置了 dead-letter-exchange 属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为 死信交换机.
TTL 就是过期时间. 如果一个队列中的消息到了过期时间还没有被消费, 就会变成死信.
这里的消息到了过期时间实际上有两种情况:
a)声明一个直接交换机和一个配置了过期时间(x-message-ttl 属性)以及配 deadLetterExchange、deadLetterRoutingKey 属性的普通队列,用来生成死信
- @Configuration
- public class TTLMessageConfig {
-
- @Bean
- public DirectExchange ttlDirectExchange() {
- return new DirectExchange("ttl.direct");
- }
-
- @Bean
- public Queue ttlQueue() {
- return QueueBuilder
- .durable("ttl.queue")
- .ttl(5000) //延时 5 s
- .deadLetterExchange("dl.direct") //消息如果超时没被消费就给这个死信交换机
- .deadLetterRoutingKey("dl")
- .build();
- }
-
- @Bean
- public Binding ttlBinding() {
- return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");
- }
-
- }
b)这里我们基于注解的方式,声明一组死信交换机和队列
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "dl.queue", durable = "true"),
- exchange = @Exchange(name = "dl.exchange"),
- key = "dl"
- ))
- public void listenDlQueue(String msg) {
- log.info("消费者收到死信消息!msg=" + msg);
- }
c)生产者发送一个过期时间为 5s 的消息
- @Test
- public void testTTLMessage() {
- //1.构造一个消息
- Message message = MessageBuilder.withBody("hello ttl message".getBytes())
- .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
- .setExpiration("5000")
- .build();
- //2.发送消息
- rabbitTemplate.convertAndSend("ttl.direct", "ttl", message);
- //3.记录日志
- log.info("消息已经成功发送!");
- }
d)执行结果如下
Ps:通过执行结果,也可以看出,如果消息和队列都设置了过期时间,那么以时间短的为主.
刚刚我们利用 TTL 结合死信交换机,实现了当消息发出后,消费者延迟收到消息的效果。这种消息模式就成为 延迟队列(Delay Queue) 模式。
延迟队列经常用于以下场景:
由于 利用 TTL 结合死信交换机的方式实现起来比较麻烦,并且延迟队列的需求又非常多,因此 RabbitMQ 官方推出了一个插件,可以通过更简单的方式,达到延迟队列的效果.
我们在Centos7虚拟机中使用Docker来安装。
docker pull rabbitmq:3.8-management
- docker run \
- -e RABBITMQ_DEFAULT_USER=itcast \
- -e RABBITMQ_DEFAULT_PASS=123321 \
- -v mq-plugins:/plugins \
- --name mq \
- --hostname mq1 \
- -p 15672:15672 \
- -p 5672:5672 \
- -d \
- rabbitmq:3.8-management
Ps:此命令还额外配置了插件目录对应的数据卷.
docker volume inspect mq-plugins
结果如下
使用 cd 命令切换到 Mountpoint 指定的目录下.
我的容器名为`mq`,所以执行下面命令:
docker exec -it mq bash
进入容器内部后,执行以下命令开启插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
a)声明一个延迟队列. 这里实际上和声明普通交换机只多出了一个 delayed 属性,设置为 true 就表示为延迟队列.
以下是基于 注解的方式声明交换机、队列、绑定.
Ps:如果是通过 java 代码的方式声明交换机,只需要 ExchangeBuilder().directExhange.delay() 即可.
- @Component
- @Slf4j
- public class SpringRabbitListener {
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "delay.queue", durable = "true"),
- exchange = @Exchange(name = "delay.direct", delayed = "true"),
- key = "delay"
- ))
- public void listenDelayExchange(String msg) {
- log.info("消费者接收到到了延迟消息!msg=" + msg);
- }
-
- }
b)生产者只需要在生产消息的时候添加一个 header:"x-delay",对应的值就是延迟时间,单位是毫秒:
- @Test
- public void testDelayMessage() {
- //1.准备消息
- Message message = MessageBuilder.withBody("hello ttl message".getBytes())
- .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
- .setHeader("x-delay", 5000) // 消息延迟时间
- .build();
- //2.消息 ID 需要封装到 CorrelationData 中
- CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
- //3.发送消息
- rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);
- log.info("消息已经成功发送!");
- }
c)结果如下