• 延迟队列的理解与使用


    目录

    一、场景引入

    二、延迟队列的三种场景

     1、死信队列+TTL对队列进行延迟

     2、创建通用延时消息+死信队列 对消息延迟

     3、使用rabbitmq的延时队列插件

    x-delayed-message使用

    父pom文件

    pom文件

    配置文件

    config

    生产者

    消费者

    结果


    一、场景引入

    我们知道可以通过TTL来对队列进行设置过期时间;通过后置处理器MessagePostProcessor对消息进行设置过期时间;

     那么根据TTL及MessagePostProcessor机制可以处理关于延迟方面的问题。

    比如:秒杀之后,给30分钟时间进行支付,如果30分钟后,没有支付,订单取消。

    比如:餐厅订座,A用户早上8点预定的某家餐厅下午6点的座位,B用户中午12点预定的下午5点的座位;根据场景我们需要的时先让B用户进行消费,然后A用户再消费;这时TTL和MessagePostProcessor延迟就已经不能满足订餐的场景了;

    因为TTL是对队列进行延迟,MessagePostProcessor是对消息进行延迟,但是MessagePostProcessor对消息延迟是不能根据订座的时间去排序消费的;

    /**
     * 比如当我们发送第一个消息时延迟的时间时50s,而发送的第二个消息延迟时间是30s,虽然延迟30s的消息比延迟50s发送的晚
     * 但按照我们设想的情况,延迟30s的消息应该先消费;可是实际情况却不是这样,而是延迟50s的消息到达时间后 30s的才能消费!(队列先进先出)
     * 那这样此方式的不足就出现了!
     * 场景:
     *      A用户和B用户预定餐厅,A用户先开始预定的,预定的是下午6点。B用户比A用户预定操作晚一些,但是B用户预定的时间是下午5点。通过此场景
     *      我们希望的是B用户先进行用餐(因为他预定的吃饭时间比A早一些,需要先安排吃饭。不能说A用户没到6点B用户预定5点的吃不了。),根据此
     *      场景 之前的队列延迟还是消息延迟都不能满足场景需求了,这样就需要另一种延迟方式进行解决了! ->使用rabbitmq的延时队列插件
     */
    
    /**
     * 注意:
     *      TTL是对队列进行延迟,只要是在此队列中的消息都会按照TTL设置时间进行延迟;
     *      MessagePostProcessor是对消息进行延迟;
     *
     *      如果我们不仅使用了消息延迟,而且还使用了队列延迟,那么延迟的时间就会以小的时间为准!
     *   理解:
     *      如果a消息设置的消息延迟是30s,b消息设置的延迟时间是90s,队列设置的延迟是60s。那么a消息最终的延迟是30s(a的消息延迟与队列延迟
     *      比对以延迟时间小的为准!),b消息最终延迟的时间是60s(b的消息延迟与队列延迟比对以延迟的时间小的为准!)
     */

     二、延迟队列的三种场景

    1、死信队列+TTL对队列进行延迟

    给队列设置TTL过期时间,此队列可不用绑定消费者,时间到后把消息投递到死信队列中,由死信队列的消费者进行消费,这样就能达到延迟消费的作用

    1. @Bean
    2. public Queue directQueueLong(){
    3. return QueueBuilder.durable("业务队列名称")
    4. .deadLetterExchange("死信交换机名称")
    5. .deadLetterRoutingKey("死信队列 RoutingKey")
    6. .ttl(20000) // 消息停留时间
    7. //.maxLength(500)
    8. .build();
    9. }

    监听死信队列,即可处理超时的消息队列

    缺点:

    上述实现方式中,ttl延时队列中所有的消息超时时间都是一样的,如果不同消息想设置不一样的超时时间,就需要建立多个不同超时时间的消息队列,比较麻烦,且不利于维护。

     2、创建通用延时消息+死信队列 对消息延迟

    这样的方式可对每一个消息设置指定的过期时间,不用像TTL那样给队列设置过期时间(若队列设置的过期时间达到后,其队列中的消息均会被删除或别的处理),但是由于队列是先进先出,若先投递的消息设置的过期时间是40s,后投递的消息过期时间是30s,那么设置过期时间为30s的并不会到30s时就投递到死信队列中,而是等40s到期后才会一起被投递。

    1. rabbitTemplate.convertAndSend("交换机名称", "RoutingKey","对象",
    2. message => {
    3. message.getMessageProperties().setExpiration(String.valueOf(5000))
    4. // 设置消息的持久化属性
    5. //这样发送的消息就会被持久化到 RabbitMQ 中,即使 RabbitMQ 重启,消息也不会丢失。 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
    6. return message;
    7. }
    8. );

    缺点:

    该种方式可以创建一个承载不同超时时间消息的消息队列,但是这种方式有一个问题,如果消息队列中排在前面的消息没有到超时时间,即使后面的消息到了超时时间,先到超时时间的消息也不会进入死信队列,而是先检查排在最前面的消息队列是否到了超时时间,如果到了超时时间才会继续检查后面的消息。

     3、使用rabbitmq的延时队列插件

    使用rabbitmq的延时队列插件,实现同一个队列中有多个不同超时时间的消息,并按时间超时顺序出队

    1、下载延迟插件

    在 RabbitMQ 的 3.5.7 版本之后,提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列 ,同时需保证 Erlang/OPT 版本为 18.0 之后。

    下载地址:

    https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

    2、安装插件并启用

    下载完成后直接把插件放在 /home/rabbitmq 目录,然后拷贝到容器内plugins目录下(my-rabbit是容器的name,也可以使用容器id)

    docker cp /home/rabbitmq/rabbitmq_delayed_message_exchange-3.12.0.ez rabbitmq:/plugins

     进入 Docker 容器

    docker exec -it rabbit /bin/bash

     在plugins内启用插件

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange

     退出容器

    exit

    重启 RabbitMQ

    docker restart my-rabbit

     貌似不重启也能生效!

     结果:

     就多了一个x-delayed-message交换机

    x-delayed-message使用

    父pom文件

    1. "1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    4. <modelVersion>4.0.0modelVersion>
    5. <parent>
    6. <groupId>org.springframework.bootgroupId>
    7. <artifactId>spring-boot-starter-parentartifactId>
    8. <version>2.7.1version>
    9. <relativePath/>
    10. parent>
    11. <groupId>com.chensirgroupId>
    12. <artifactId>spring-boot-rabbitmqartifactId>
    13. <version>0.0.1-SNAPSHOTversion>
    14. <name>spring-boot-rabbitmqname>
    15. <properties>
    16. <java.version>8java.version>
    17. <hutool.version>5.8.3hutool.version>
    18. <lombok.version>1.18.24lombok.version>
    19. properties>
    20. <description>spring-boot-rabbitmqdescription>
    21. <packaging>pompackaging>
    22. <modules>
    23. <module>direct-exchangemodule>
    24. <module>fanout-exchangemodule>
    25. <module>topic-exchangemodule>
    26. <module>game-exchangemodule>
    27. <module>dead-letter-queuemodule>
    28. <module>delay-queuemodule>
    29. <module>delay-queue2module>
    30. modules>
    31. <dependencyManagement>
    32. <dependencies>
    33. <dependency>
    34. <groupId>cn.hutoolgroupId>
    35. <artifactId>hutool-allartifactId>
    36. <version>${hutool.version}version>
    37. dependency>
    38. <dependency>
    39. <groupId>org.projectlombokgroupId>
    40. <artifactId>lombokartifactId>
    41. <version>${lombok.version}version>
    42. dependency>
    43. dependencies>
    44. dependencyManagement>
    45. project>

    pom文件

    1. "1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    4. <modelVersion>4.0.0modelVersion>
    5. <parent>
    6. <groupId>com.chensirgroupId>
    7. <artifactId>spring-boot-rabbitmqartifactId>
    8. <version>0.0.1-SNAPSHOTversion>
    9. <relativePath>../pom.xml relativePath>
    10. parent>
    11. <artifactId>delay-queue2artifactId>
    12. <dependencies>
    13. <dependency>
    14. <groupId>org.springframework.bootgroupId>
    15. <artifactId>spring-boot-starter-amqpartifactId>
    16. dependency>
    17. <dependency>
    18. <groupId>org.springframework.bootgroupId>
    19. <artifactId>spring-boot-starter-webartifactId>
    20. dependency>
    21. <dependency>
    22. <groupId>cn.hutoolgroupId>
    23. <artifactId>hutool-allartifactId>
    24. dependency>
    25. <dependency>
    26. <groupId>org.projectlombokgroupId>
    27. <artifactId>lombokartifactId>
    28. <optional>trueoptional>
    29. dependency>
    30. <dependency>
    31. <groupId>org.springframework.bootgroupId>
    32. <artifactId>spring-boot-starter-testartifactId>
    33. <scope>testscope>
    34. dependency>
    35. <dependency>
    36. <groupId>org.springframework.amqpgroupId>
    37. <artifactId>spring-rabbit-testartifactId>
    38. <scope>testscope>
    39. dependency>
    40. dependencies>
    41. <build>
    42. <plugins>
    43. <plugin>
    44. <groupId>org.springframework.bootgroupId>
    45. <artifactId>spring-boot-maven-pluginartifactId>
    46. plugin>
    47. plugins>
    48. build>
    49. project>

    配置文件

    1. logging.level.com.chensir = debug
    2. server.port=8086
    3. #host
    4. spring.rabbitmq.host=121.40.100.66
    5. #默认5672
    6. spring.rabbitmq.port=5672
    7. #用户名
    8. spring.rabbitmq.username=guest
    9. #密码
    10. spring.rabbitmq.password=guest
    11. #连接到代理时用的虚拟主机
    12. spring.rabbitmq.virtual-host=/
    13. #每个消费者每次可最大处理的nack消息数量
    14. spring.rabbitmq.listener.simple.prefetch=1
    15. #表示消息确认方式,其有三种配置方式,分别是none、manual(手动)和auto(自动);默认auto
    16. spring.rabbitmq.listener.simple.acknowledge-mode=auto
    17. #监听重试是否可用
    18. spring.rabbitmq.listener.simple.retry.enabled=true
    19. #最大重试次数
    20. spring.rabbitmq.listener.simple.retry.max-attempts=5
    21. #最大重试时间间隔
    22. spring.rabbitmq.listener.simple.retry.max-interval=3000ms
    23. #第一次和第二次尝试传递消息的时间间隔
    24. spring.rabbitmq.listener.simple.retry.initial-interval=1000ms
    25. #应用于上一重试间隔的乘数
    26. spring.rabbitmq.listener.simple.retry.multiplier=2
    27. #决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
    28. spring.rabbitmq.listener.simple.default-requeue-rejected=false

    config

    1. package com.chensir.config;
    2. import org.springframework.amqp.core.*;
    3. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    4. import org.springframework.amqp.support.converter.MessageConverter;
    5. import org.springframework.context.annotation.Bean;
    6. import org.springframework.context.annotation.Configuration;
    7. import java.util.HashMap;
    8. import java.util.Map;
    9. @Configuration
    10. public class RabbitConfig {
    11. @Bean
    12. public MessageConverter messageConverter(){
    13. return new Jackson2JsonMessageConverter();
    14. }
    15. @Bean
    16. public CustomExchange customExchange(){
    17. Map args = new HashMap<>();
    18. //延迟时以direct直连方式绑定
    19. args.put("x-delayed-type","direct");
    20. // name:交换机名称 type:类型 固定值x-delayed-message
    21. return new CustomExchange("chen-x-delayedExchange","x-delayed-message",true,false,args);
    22. }
    23. @Bean
    24. public Queue directQueueLong(){
    25. return QueueBuilder.durable("chen-DirectQueue")
    26. .build();
    27. }
    28. @Bean
    29. public Binding binding(){
    30. return BindingBuilder.bind(directQueueLong()).to(customExchange()).with("direct123").noargs();
    31. }
    32. }

    生产者

    1. package com.chensir.provider;
    2. import com.chensir.model.OrderIngOk;
    3. import lombok.extern.slf4j.Slf4j;
    4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    5. import org.springframework.stereotype.Component;
    6. import javax.annotation.Resource;
    7. @Component
    8. @Slf4j
    9. public class DirectProvider {
    10. @Resource
    11. private RabbitTemplate rabbitTemplate;
    12. public void send(){
    13. OrderIngOk orderIngOk = new OrderIngOk();
    14. orderIngOk.setOrderNo("202207149687-1");
    15. orderIngOk.setId(1);
    16. orderIngOk.setUserName("倪海杉前-延迟40秒");
    17. rabbitTemplate.convertAndSend("chen-x-delayedExchange", "direct123",orderIngOk,m->{
    18. m.getMessageProperties().setDelay(40*1000); //设置延迟时间,对延迟交换机有效
    19. // m.getMessageProperties().setExpiration(String.valueOf(30*1000)); 设置过期时间,对队列有效
    20. return m;
    21. });
    22. OrderIngOk orderIngOk2 = new OrderIngOk();
    23. orderIngOk2.setOrderNo("202207149687-2");
    24. orderIngOk2.setId(2);
    25. orderIngOk2.setUserName("倪海杉后-延迟30秒");
    26. rabbitTemplate.convertAndSend("chen-x-delayedExchange", "direct123",orderIngOk2,m->{
    27. m.getMessageProperties().setDelay(30*1000);
    28. return m;
    29. });
    30. log.debug("消息生产完成");
    31. }
    32. }

    消费者

    1. package com.chensir.consumer;
    2. import cn.hutool.json.JSONUtil;
    3. import com.chensir.model.OrderIngOk;
    4. import lombok.extern.slf4j.Slf4j;
    5. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    6. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    7. import org.springframework.stereotype.Component;
    8. import java.io.IOException;
    9. @Component
    10. @Slf4j
    11. public class DirectConsumer {
    12. @RabbitHandler
    13. @RabbitListener(queues = "chen-DirectQueue" )
    14. public void process(OrderIngOk orderIngOk) throws IOException {
    15. try {
    16. // 处理业务开始
    17. log.debug("接受到消息,并正常处理结束,{}", JSONUtil.toJsonStr(orderIngOk));
    18. // 处理业务结束
    19. } catch (Exception ex){
    20. throw ex;
    21. }
    22. }
    23. }

    结果

    1. 2023-08-29 18:27:45.983 DEBUG 15568 --- [ main] com.chensir.provider.DirectProvider : 消息生产完成
    2. 2023-08-29 18:28:16.143 DEBUG 15568 --- [ntContainer#0-1] com.chensir.consumer.DirectConsumer : 接受到消息,并正常处理结束,{"id":2,"OrderNo":"202207149687-2","userName":"倪海杉后-延迟30秒"}
    3. 2023-08-29 18:28:26.057 DEBUG 15568 --- [ntContainer#0-1] com.chensir.consumer.DirectConsumer : 接受到消息,并正常处理结束,{"id":1,"OrderNo":"202207149687-1","userName":"倪海杉前-延迟40秒"}

    可见 延迟40s的是先发送的,但是最终结果是先消费延迟30s的。这样就能达到我们订座的场景需求了。

  • 相关阅读:
    【Redis】链表和字典
    (五)CSS前端开发面试会问到的问题有哪些?
    2023年了,java后端还有未来吗?
    现代循环神经网络 - 编码器-解码器架构
    LeetCode_35_搜索插入位置
    CMSC5707-高级人工智能之语音识别
    滤波数据分析
    2022千元无线蓝牙耳机,音质超高的千元蓝牙耳机品牌
    关于线程池概念使用
    C++菜鸟日记2
  • 原文地址:https://blog.csdn.net/weixin_45326523/article/details/132566475