• RabbitMQ之延迟队列解读


    目录

    基本介绍

    概述

     为什么需要引进RabbitMQ延迟队列

    应用场景

     springboot代码实战

    实战架构

    工程概述

    RabbitConfigDeal 配置类:创建队列及交换机并进行绑定 

    MessageService业务类:发送消息及接收消息

    主启动类RabbitMq01Application:实现ApplicationRunner接口


    基本介绍

    概述

    延时队列,首先,它是一种队列,队列意味着内部的元素是有序的,元素出队和入队是有方向性的,元素从一端进入,从另一端取出。 其次,延时队列,最重要的特性就体现在它的延时属性上,跟普通的队列不一样的是,普通队列中的元素总是等着希望被早点取出处理,而延时队列中的元素则是希望被在指定时间得到取出和处理,所以延时队列中的元素是都是带时间属性的,通常来说是需要被处理的消息或者任务。

    RabbitMQ 中并没有延时队列的概念,是通过 延时交换机与 死信队列实现。

    需要注意:如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡”,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先被执行。 

     为什么需要引进RabbitMQ延迟队列

    场景:有一个订单,15分钟内如果不支付,就把该订单设置为交易关闭,那么就不能支付了,这类实现延迟任务的场景就可以采用延迟队列来实现,当然除了延迟队列来实现,也可以有一些其他办法实现;

    定时任务方式

    每隔5秒扫描一次数据库,查询过期的订单然后进行处理;

    优点:

    简单,容易实现;

    缺点:

    • 存在延迟(延迟时间不准确),如果你每隔1分钟扫一次,那么就有可能延迟1分钟;
    • 性能较差,每次扫描数据库,如果订单量很大

    被动取消 

    当用户查询订单的时候,判断订单是否超时,超时了就取消(交易关闭);

    优点:

    对服务器而言,压力小;

    缺点:

    • 用户不查询订单,将永远处于待支付状态,会对数据统计等功能造成影响;
    • 用户打开订单页面,有可能比较慢,因为要处理大量订单,用户体验少稍差;
    • JDK延迟队列(单体应用,不能分布式下)

    DelayedQueue

    无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素 

    优点:

    实现简单,任务延迟低;

    缺点:

    • 服务重启、宕机,数据丢失;
    • 只适合单机版,不适合集群;
    • 订单量大,可能内存不足而发生异常; oom

    采用消息中间件(rabbitmq)

    RabbitMQ本身不支持延迟队列,可以使用TTL结合DLX的方式来实现消息的延迟投递,即把DLX跟某个队列绑定,到了指定时间,消息过期后,就会从DLX路由到这个队列,消费者可以从这个队列取走消息。

    应用场景

    •         1. 订单超时处理:当用户下单后,可以将订单放入一个延迟队列中,在一定的时间后检查订单是否完成支付。如果订单未及时支付,可以触发相应的超时处理逻辑,如取消订单、释放库存等操作。
    •         2. 消息重试机制:当某个消息无法被立即处理时,可以将该消息放入延迟队列,并设置延迟时间。在延迟时间到达后,将消息重新发送到原始队列,供消费者重新处理。
    •         3. 定时任务调度:延迟队列可以用于实现定时任务的调度。将需要执行的任务放入延迟队列,并设置合适的延迟时间,当延迟时间到达时,任务会被获取并执行。
    •         4. 优惠券过期提醒:在发放优惠券时,可以同时将过期时间放入延迟队列。当优惠券过期时间到达时,系统可以发送提醒消息给用户,以提醒其使用优惠券。
    •         5. 延迟通知和提醒:在需要延迟通知或提醒的场景中,可以将通知信息放入延迟队列,并设置适当的延迟时间。当延迟时间到达后,系统会从队列中获取通知信息并发送给相应的用户

     springboot代码实战

    实战架构

    如上图,消息到达正常的交换机exchange.nomal.a,通过与正常的队列queue.noaml.a绑定,消息会到达正常队列,如果消息到期变为死消息以后会重新转发到正常的交换机exchange.nomal.a中,最后会转发延迟队列queue.delayed.a中进行消化。

    工程概述

     工程采用springboot架构,主要用到的依赖为:

    1. <!-- rabbit的依赖-->
    2. <dependency>
    3. <groupId>org.springframework.boot</groupId>
    4. <artifactId>spring-boot-starter-amqp</artifactId>
    5. </dependency>
    6. <dependency>
    7. <groupId>org.projectlombok</groupId>
    8. <artifactId>lombok</artifactId>
    9. </dependency>

    application.yml配置文件如下:

    1. server:
    2. port: 8080
    3. spring:
    4. rabbitmq:
    5. host: 123.249.70.148
    6. port: 5673
    7. username: admin
    8. password: 123456
    9. virtual-host: /

    RabbitConfigDeal 配置类:创建队列及交换机并进行绑定 

    1. @Configuration
    2. public class RabbitConfigDeal {
    3. }

    创建正常交换机

    1. @Bean
    2. public DirectExchange normalExchange(){
    3. return ExchangeBuilder.directExchange("exchange.normal.a").build();
    4. }

     创建延迟队列

    1. @Bean
    2. public Queue delayedQueue(){
    3. return new Queue("queue.delayed.a");
    4. }

     创建正常队列,设置他的绑定死信交换机,以及对应绑定的路由key为order

    1. @Bean
    2. public Queue normalQueue(){
    3. Map arguments =new HashMap<>();
    4. arguments.put("x-dead-letter-exchange","exchange.normal.a");
    5. arguments.put("x-dead-letter-routing-key","error");
    6. return QueueBuilder.durable("queue.normal.a")
    7. .withArguments(arguments).build();
    8. }

    绑定正常交换机和正常队列

    1. @Bean
    2. public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue){
    3. return BindingBuilder.bind(normalQueue).to(normalExchange).with("info");
    4. }

     绑定正常交换机和延迟队列

    1. @Bean
    2. public Binding bindingDelayed(DirectExchange normalExchange,Queue delayedQueue){
    3. return BindingBuilder.bind(delayedQueue).to(normalExchange).with("error");
    4. }

    MessageService业务类:发送消息及接收消息

    1. @Component
    2. @Slf4j
    3. public class MessageService {
    4. @Resource
    5. private RabbitTemplate rabbitTemplate;
    6. }

     发送消息方法

    1. public void sendMsg(){
    2. MessageProperties messageProperties = new MessageProperties();
    3. // 设置过期时间,单位:毫秒
    4. messageProperties.setExpiration("15000");
    5. Message message1 = new Message("hello word 15s".getBytes(), messageProperties);
    6. //发送消息
    7. rabbitTemplate.convertAndSend("exchange.normal.a", "info", message1);
    8. System.out.println("发送完毕:" + new Date());
    9. // 设置过期时间,单位:毫秒
    10. messageProperties.setExpiration("15000");
    11. Message message2 = new Message("hello word 5s".getBytes(), messageProperties);
    12. //发送消息
    13. rabbitTemplate.convertAndSend("exchange.normal.a", "info", message2);
    14. System.out.println("发送完毕:" + new Date());
    15. }

    这里发送了俩条消息,路由key为info,第一条消息的过期时间为15s,第二条消息的过期的时间为5s,按照分析,虽然第二条消息先过期,但是第一条消息过期以后再会对第二条处理,也就意味着:到达延迟队列的顺序为:hello word 15s       hello word 5s

    MessageConvert

    • 涉及网络传输的应用序列化不可避免,发送端以某种规则将消息转成 byte 数组进行发送,接收端则以约定的规则进行 byte[] 数组的解析
    • RabbitMQ 的序列化是指 Messagebody 属性,即我们真正需要传输的内容,RabbitMQ 抽象出一个 MessageConvert 接口处理消息的序列化,其实现有 SimpleMessageConverter默认)、Jackson2JsonMessageConverter

      接受消息

    1. @RabbitListener(queues = {"queue.delayed.a"})
    2. public void receiveMsg(Message message){
    3. byte[] body = message.getBody();
    4. String queue = message.getMessageProperties().getConsumerQueue();
    5. String msg=new String(body);
    6. log.info("{}接收到消息时间:{},消息为{}",queue,new Date(),msg);
    7. }

     Message

    在消息传递的过程中,实际上传递的对象为 org.springframework.amqp.core.Message ,它主要由两部分组成:

    1. MessageProperties // 消息属性

    2. byte[] body // 消息内容

    @RabbitListener

    使用 @RabbitListener 注解标记方法,当监听到队列 debug 中有消息时则会进行接收并处理

    • 消息处理方法参数是由 MessageConverter 转化,若使用自定义 MessageConverter 则需要在 RabbitListenerContainerFactory 实例中去设置(默认 Spring 使用的实现是 SimpleRabbitListenerContainerFactory)

    • 消息的 content_type 属性表示消息 body 数据以什么数据格式存储,接收消息除了使用 Message 对象接收消息(包含消息属性等信息)之外,还可直接使用对应类型接收消息 body 内容,但若方法参数类型不正确会抛异常:

      • application/octet-stream:二进制字节数组存储,使用 byte[]
      • application/x-java-serialized-object:java 对象序列化格式存储,使用 Object、相应类型(反序列化时类型应该同包同名,否者会抛出找不到类异常)
      • text/plain:文本数据类型存储,使用 String
      • application/json:JSON 格式,使用 Object、相应类型

    主启动类RabbitMq01Application:实现ApplicationRunner接口

    1. /**
    2. * @author 风轻云淡
    3. */
    4. @SpringBootApplication
    5. public class RabbitMq01Application implements ApplicationRunner {
    6. public static void main(String[] args) {
    7. SpringApplication.run(RabbitMq01Application.class, args);
    8. }
    9. @Resource
    10. private MessageService messageService;
    11. /**
    12. * 程序一启动就会调用该方法
    13. * @param args
    14. * @throws Exception
    15. */
    16. @Override
    17. public void run(ApplicationArguments args) throws Exception {
    18. messageService.sendMsg();
    19. }
    20. }

    在SpringBoot中,提供了一个接口:ApplicationRunner。 该接口中,只有一个run方法,他执行的时机是:spring容器启动完成之后,就会紧接着执行这个接口实现类的run方法。

    由于该方法是在容器启动完成之后,才执行的,所以,这里可以从spring容器中拿到其他已经注入的bean。

    启动主启动类后查看控制台:

    1. 发送完毕:Fri Sep 29 17:12:35 CST 2023
    2. 发送完毕:Fri Sep 29 17:12:35 CST 2023
    3. 2023-09-29 17:12:50.165 INFO 89972 --- [ntContainer#0-1]
    4. c.e.rabbitmq01.service.MessageService :
    5. queue.delayed.a接收到消息时间:Fri Sep 29 17:12:50 CST 2023,消息为hello word 15s
    6. 2023-09-29 17:12:50.166 INFO 89972 --- [ntContainer#0-1]
    7. c.e.rabbitmq01.service.MessageService :
    8. queue.delayed.a接收到消息时间:Fri Sep 29 17:12:50 CST 2023,消息为hello word 5s

    我们在这里可以看见到达延迟队列的顺序是hello word 15s    hello word 5s

  • 相关阅读:
    用stream流将list转为map
    C++---类型转换
    OpenCV4(C++)—— 图像噪声与图像滤波
    Tips--lib静态库调用外部函数
    html和css创建一个简单的网页
    QCC51XX---串口仿真协议( RFCOMM)
    MYSQL索引查询问题质疑
    如何通过拍照识别植物?试试这几个软件
    CopyOnWriteArrayList解析
    浅谈当下7个网页设计趋势 优漫动游
  • 原文地址:https://blog.csdn.net/m0_62436868/article/details/133418971