• RabbitMQ之延迟队列


            延迟消息是指的消息发送出去后并不想立即就被消费,而是需要等(指定的)一段时间后才触发消费。

            例如下面的业务场景:在支付宝上面买电影票,锁定了一个座位后系统默认会帮你保留15分钟时间,如果15分钟后还没付款那么不好意思系统会自动把座位释放掉。怎么实现类似的功能呢?

    1. 可以用定时任务每分钟扫一次,发现有占座超过15分钟还没付款的就释放掉。但是这样做很低效,很多时候做的都是些无用功;
    2. 可以用分布式锁、分布式缓存的被动过期时间,15分钟过期后锁也释放了,缓存key也不存在了;
    3. 还可以用延迟队列,锁座成功后会发送1条延迟消息,这条消息15分钟后才会被消费,消费的过程就是检查这个座位是否已经是“已付款”状态;

            你在公司的协同办公系统上面预约了一个会议,邀请汪产品和陈序员今晚22点准时参加会有。系统还比较智能,除了默认发会议邀请的邮件告知参会者以外,到了今晚21:45分的时候(提前15分钟)就会通知提醒参会人员做好参会准备,会议马上开始...

            同样的,这也可以通过轮询“会议预定表”来实现,比如我每分钟跑一次定时任务看看当前有哪些会议即将开始了。当然也可以通过延迟消息来实现,预定会议以后系统投递一条延迟消息,而这条消息比较特殊不会立马被消费,而是延迟到指定时间后再触发消费动作(发通知提醒参会人准备)。不过遗憾的是,在AMQP协议和RabbitMQ中都没有相关的规定和实现。不过,我们似乎可以借助上一小节介绍的“死信队列”来变相的实现。

    可以使用rabbitmq_delayed_message_exchange插件实现。

            这里和TTL方式有个很大的不同就是TTL存放消息在死信队列(delayqueue)里,二基于插件存放消息在延时交换机里(x-delayed-message exchange)。

    • 生产者将消息(msg)和路由键(routekey)发送指定的延时交换机(exchange)上
    • 延时交换机(exchange)存储消息等待消息到期根据路由键(routekey)找到绑定自己的队列(queue)并把消息给它
    • 队列(queue)再把消息发送给监听它的消费者(customer) 

    下载插件

    下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

    安装插件:

            将插件拷贝到rabbitmq-server的安装路径:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.4/plugins 

    启用插件

    rabbitmq-plugins list
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange

    重启rabbitmq-server

    systemctl restart rabbitmq-server

    SpringBoot代码案例

    (1)xml配置文件与properties连接配置

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.springframework.bootgroupId>
    4. <artifactId>spring-boot-starter-amqpartifactId>
    5. dependency>
    6. <dependency>
    7. <groupId>org.springframework.bootgroupId>
    8. <artifactId>spring-boot-starter-webartifactId>
    9. dependency>
    10. <dependency>
    11. <groupId>org.springframework.bootgroupId>
    12. <artifactId>spring-boot-starter-testartifactId>
    13. <scope>testscope>
    14. dependency>
    15. <dependency>
    16. <groupId>org.springframework.amqpgroupId>
    17. <artifactId>spring-rabbit-testartifactId>
    18. <scope>testscope>
    19. dependency>
    20. dependencies>
    1. spring.application.name=delayed_exchange
    2. spring.rabbitmq.host=192.168.80.121
    3. spring.rabbitmq.virtual-host=/
    4. spring.rabbitmq.username=root
    5. spring.rabbitmq.password=123456
    6. spring.rabbitmq.port=5672
    7. # 设置手动确认消息
    8. #spring.rabbitmq.listener.simple.acknowledge-mode=manual

    (2)SpringBootApplication主入口类

    1. package com.lagou.rabbit.demo;
    2. import org.springframework.boot.SpringApplication;
    3. import org.springframework.boot.autoconfigure.SpringBootApplication;
    4. @SpringBootApplication
    5. public class Demo02RabbitmqDelayedApplication {
    6. public static void main(String[] args) {
    7. SpringApplication.run(Demo02RabbitmqDelayedApplication.class, args);
    8. }
    9. }

    (3)RabbitMQ的对象配置

    1. package com.lagou.rabbit.demo.config;
    2. import org.springframework.amqp.core.*;
    3. import org.springframework.amqp.rabbit.annotation.EnableRabbit;
    4. import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    5. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    6. import org.springframework.amqp.rabbit.core.RabbitAdmin;
    7. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    8. import org.springframework.beans.factory.annotation.Autowired;
    9. import org.springframework.context.annotation.Bean;
    10. import org.springframework.context.annotation.ComponentScan;
    11. import org.springframework.context.annotation.Configuration;
    12. import java.util.HashMap;
    13. import java.util.Map;
    14. @Configuration
    15. @EnableRabbit
    16. @ComponentScan("com.lagou.rabbit.demo")
    17. public class RabbitConfig {
    18. @Bean
    19. public Queue queue() {
    20. return new Queue("queue.delayed", true, false, false, null);
    21. }
    22. @Bean
    23. public Exchange exchange() {
    24. Map arguments = new HashMap<>();
    25. // 使用x-delayed-type指定交换器的类型
    26. arguments.put("x-delayed-type", ExchangeTypes.DIRECT);
    27. // 使用x-delayed-message表示使用delayed exchange插件处理消息
    28. return new CustomExchange("ex.delayed", "x-delayed-message", true, false, arguments);
    29. }
    30. @Bean
    31. public Binding binding() {
    32. return BindingBuilder.bind(queue()).to(exchange()).with("key.delayed").noargs();
    33. }
    34. @Bean
    35. @Autowired
    36. public RabbitAdmin rabbitAdmin(ConnectionFactory factory) {
    37. return new RabbitAdmin(factory);
    38. }
    39. @Bean
    40. @Autowired
    41. public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
    42. return new RabbitTemplate(factory);
    43. }
    44. @Bean
    45. @Autowired
    46. public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
    47. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    48. factory.setConnectionFactory(connectionFactory);
    49. factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    50. return factory;
    51. }
    52. }

    (4)使用推消息模式接收延迟队列的广播

    1. package com.lagou.rabbit.demo.listener;
    2. import org.springframework.amqp.core.Message;
    3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    4. import org.springframework.stereotype.Component;
    5. import com.rabbitmq.client.Channel;
    6. import java.io.IOException;
    7. @Component
    8. public class MyMeetingListener {
    9. @RabbitListener(queues = "queue.delayed")
    10. public void onMessage(Message message, Channel channel) throws IOException {
    11. System.out.println(new String(message.getBody(), message.getMessageProperties().getContentEncoding()));
    12. // 消息确认
    13. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    14. }
    15. }

    (5)开发RestController,用于向延迟队列发送消息,并指定延迟的时长

    1. package com.lagou.rabbit.demo.controller;
    2. import org.springframework.amqp.core.*;
    3. import org.springframework.beans.factory.annotation.Autowired;
    4. import org.springframework.web.bind.annotation.PathVariable;
    5. import org.springframework.web.bind.annotation.RequestMapping;
    6. import org.springframework.web.bind.annotation.RestController;
    7. import java.nio.charset.StandardCharsets;
    8. @RestController
    9. public class DelayedController {
    10. @Autowired
    11. private AmqpTemplate amqpTemplate;
    12. @RequestMapping("/meeting/{second}")
    13. public String bookMeeting(@PathVariable Integer second) {
    14. // RabbitMQ只会检查队列头部的消息是否过期,如果过期就放到死信队列
    15. // 假如第一个过期时间很长,10s,第二个消息3s,则系统先看第一个消息,等到第一个消息过期,放到DLX
    16. // 此时才会检查第二个消息,但实际上此时第二个消息早已经过期了,但是并没有先于第一个消息放到DLX。
    17. // 插件rabbitmq_delayed_message_exchange帮我们搞定这个。
    18. MessageProperties messageProperties = MessagePropertiesBuilder.newInstance()
    19. // 设置消息的过期时间
    20. .setHeader("x-delay", (second - 10) * 1000)
    21. .setContentEncoding("utf-8")
    22. .build();
    23. Message message = MessageBuilder.withBody("还有10s开始开会了".getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();
    24. // 如果不设置message的properties,也可以使用下述方法设置x-delay属性的值
    25. // rabbitTemplate.convertAndSend("ex.delayed", "key.delayed", message, msg -> {
    26. // 使用定制的属性x-delay设置过期时间,也就是提前5s提醒
    27. // 当消息转换完,设置消息头字段
    28. // msg.getMessageProperties().setHeader("x-delay", (seconds - 5) * 1000);
    29. // return msg;
    30. // });
    31. amqpTemplate.send("ex.delayed","key.delayed",message);
    32. return "会议订好了";
    33. }
    34. }

    (6)访问,然后查看输出

  • 相关阅读:
    RabbitMQ原理(五):消费者的可靠性
    GNN+RA 文献阅读
    数据结构-并查集刷题
    VS2019创建动态库,包含普通函数,类,C语言函数
    26. 删除有序数组中的重复项-遍历数组
    国产数据库达梦Dm8部署
    198. 打家劫舍
    haproxy+keepalived实战
    OpenAI官方吴达恩《ChatGPT Prompt Engineering 提示词工程师》(5)转换 / Transforming翻译
    Pytest系列- assert断言详细使用(4)
  • 原文地址:https://blog.csdn.net/weixin_52851967/article/details/128134264