• RabbitMQ - 死信、TTL原理、延迟队列安装和配置


    目录

    一、死信交换机

    1.1、什么是死信交换机

    1.2、TTL

    1.2.1、什么是 TTL 

    1.2.2、通过 TTL 模拟触发死信

    二、延迟队列

    2.1、什么是延迟队列

    2.2、配置延迟队列插件

    2.2.1、延迟队列配置

    a)下载镜像

    b)运行容器

    c)刚刚设定的RabbitMQ的数据卷名称为`mq-plugins`,所以我们使用下面命令查看数据卷:

    d)在此目录下,进入 MQ 容器内部.

    e)开启插件

    2.3、SpringAMQP 使用延迟队列插件


    一、死信交换机


    1.1、什么是死信交换机

    想要知道什么是死信交换机,先来看看什么是死信(dead letter)~

    当生产者发送了一个消息,经过交换机到达队列时,满足下列情况之一时,就可以成为死信:

    • 消费者使用 basic.reject 或 basic.nack 声明消费失败,并且消息的 requeue 参数设置为 false(消息不重新加入到队列中).
    • 消息设置了过期时间,到了时间没有被消费掉.
    • 要投递的队列消息堆积满了(队列设置了最大消息数目),最早的消息可能会成为死信(LRU 算法淘汰的消息).

    那么如果这个时候,一个队列配置了 dead-letter-exchange 属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为 死信交换机.

    1.2、TTL

    1.2.1、什么是 TTL 

    TTL 就是过期时间.  如果一个队列中的消息到了过期时间还没有被消费, 就会变成死信.

    这里的消息到了过期时间实际上有两种情况:

    • 消息所在的队列设置了消息过期时间(x_message_ttl).
    • 消息本身设置了存活时间.

    1.2.2、通过 TTL 模拟触发死信

    a)声明一个直接交换机和一个配置了过期时间(x-message-ttl 属性)以及配 deadLetterExchange、deadLetterRoutingKey 属性的普通队列,用来生成死信

    1. @Configuration
    2. public class TTLMessageConfig {
    3. @Bean
    4. public DirectExchange ttlDirectExchange() {
    5. return new DirectExchange("ttl.direct");
    6. }
    7. @Bean
    8. public Queue ttlQueue() {
    9. return QueueBuilder
    10. .durable("ttl.queue")
    11. .ttl(5000) //延时 5 s
    12. .deadLetterExchange("dl.direct") //消息如果超时没被消费就给这个死信交换机
    13. .deadLetterRoutingKey("dl")
    14. .build();
    15. }
    16. @Bean
    17. public Binding ttlBinding() {
    18. return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");
    19. }
    20. }

    b)这里我们基于注解的方式,声明一组死信交换机和队列

    1. @RabbitListener(bindings = @QueueBinding(
    2. value = @Queue(name = "dl.queue", durable = "true"),
    3. exchange = @Exchange(name = "dl.exchange"),
    4. key = "dl"
    5. ))
    6. public void listenDlQueue(String msg) {
    7. log.info("消费者收到死信消息!msg=" + msg);
    8. }

    c)生产者发送一个过期时间为 5s 的消息

    1. @Test
    2. public void testTTLMessage() {
    3. //1.构造一个消息
    4. Message message = MessageBuilder.withBody("hello ttl message".getBytes())
    5. .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
    6. .setExpiration("5000")
    7. .build();
    8. //2.发送消息
    9. rabbitTemplate.convertAndSend("ttl.direct", "ttl", message);
    10. //3.记录日志
    11. log.info("消息已经成功发送!");
    12. }

    d)执行结果如下

    Ps:通过执行结果,也可以看出,如果消息和队列都设置了过期时间,那么以时间短的为主.

    二、延迟队列


    2.1、什么是延迟队列

    刚刚我们利用 TTL 结合死信交换机,实现了当消息发出后,消费者延迟收到消息的效果。这种消息模式就成为 延迟队列(Delay Queue) 模式。

    延迟队列经常用于以下场景:

    1. 延迟发送短信.
    2. 用户下单,如果再 5 分钟内没有支付,就自动取消.
    3. 预约工作会议,10 分钟后自动通知所有参会人员.

    2.2、配置延迟队列插件

    由于 利用 TTL 结合死信交换机的方式实现起来比较麻烦,并且延迟队列的需求又非常多,因此 RabbitMQ 官方推出了一个插件,可以通过更简单的方式,达到延迟队列的效果.

    2.2.1、延迟队列配置

    我们在Centos7虚拟机中使用Docker来安装。

    a)下载镜像
    docker pull rabbitmq:3.8-management

    b)运行容器
    1. docker run \
    2. -e RABBITMQ_DEFAULT_USER=itcast \
    3. -e RABBITMQ_DEFAULT_PASS=123321 \
    4. -v mq-plugins:/plugins \
    5. --name mq \
    6. --hostname mq1 \
    7. -p 15672:15672 \
    8. -p 5672:5672 \
    9. -d \
    10. rabbitmq:3.8-management

    Ps:此命令还额外配置了插件目录对应的数据卷.

    c)刚刚设定的RabbitMQ的数据卷名称为`mq-plugins`,所以我们使用下面命令查看数据卷:
    docker volume inspect mq-plugins

    结果如下 

    使用 cd 命令切换到 Mountpoint 指定的目录下.

    d)在此目录下,进入 MQ 容器内部.

    我的容器名为`mq`,所以执行下面命令:

    docker exec -it mq bash

    e)开启插件

    进入容器内部后,执行以下命令开启插件:

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange

    2.3、SpringAMQP 使用延迟队列插件

    a)声明一个延迟队列.  这里实际上和声明普通交换机只多出了一个 delayed 属性,设置为 true 就表示为延迟队列.

    以下是基于 注解的方式声明交换机、队列、绑定.

    Ps:如果是通过 java 代码的方式声明交换机,只需要 ExchangeBuilder().directExhange.delay() 即可.

    1. @Component
    2. @Slf4j
    3. public class SpringRabbitListener {
    4. @RabbitListener(bindings = @QueueBinding(
    5. value = @Queue(name = "delay.queue", durable = "true"),
    6. exchange = @Exchange(name = "delay.direct", delayed = "true"),
    7. key = "delay"
    8. ))
    9. public void listenDelayExchange(String msg) {
    10. log.info("消费者接收到到了延迟消息!msg=" + msg);
    11. }
    12. }

    b)生产者只需要在生产消息的时候添加一个 header:"x-delay",对应的值就是延迟时间,单位是毫秒:

    1. @Test
    2. public void testDelayMessage() {
    3. //1.准备消息
    4. Message message = MessageBuilder.withBody("hello ttl message".getBytes())
    5. .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
    6. .setHeader("x-delay", 5000) // 消息延迟时间
    7. .build();
    8. //2.消息 ID 需要封装到 CorrelationData 中
    9. CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    10. //3.发送消息
    11. rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);
    12. log.info("消息已经成功发送!");
    13. }

    c)结果如下 

  • 相关阅读:
    2.4g无线收发芯片:Ci24R1(DFN8)
    Jackson+Feign反序列化问题排查
    Spring IoC容器初始化主体流程
    微信小程序数据加解密
    AI加速(八)| 循环展开Unrooling——你肯定能学会的程序加速方法
    Java 同步锁ReentrantLock与抽象同步队列AQS
    [渗透测试]—7.1 漏洞利用开发和Shellcode编写
    Security5实现鉴权访问【即有权限才能访问指定资源】前后端分离
    学习C语言的好处:
    Haproxy配合Nginx搭建Web集群
  • 原文地址:https://blog.csdn.net/CYK_byte/article/details/133090424