• [MQ] 延迟队列/延迟插件下载


    ✨✨个人主页:沫洺的主页

    📚📚系列专栏: 📖 JavaWeb专栏📖 JavaSE专栏 📖 Java基础专栏📖vue3专栏 

                               📖MyBatis专栏📖Spring专栏📖SpringMVC专栏📖SpringBoot专栏

                               📖Docker专栏📖Reids专栏📖MQ专栏📖SpringCloud专栏     

    💖💖如果文章对你有所帮助请留下三连✨✨

    🐕延迟队列

    使用rabbitmq的延时队列插件,实现同一个队列中有多个不同超时时间的消息,并按时间超时顺序出队

    🦢下载延迟插件

    在 RabbitMQ 的 3.5.7 版本之后,提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列 ,同时需保证 Erlang/OPT 版本为 18.0 之后。

    我这里 MQ 的版本是 3.10.5,现在去 GitHub 上根据版本号下载插件

    https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases 

     安装插件并启用

    下载完成后直接把插件放在 /root/211 目录,然后拷贝到容器内plugins目录下(rabbitmq是容器的name,也可以使用容器id)

     

    docker cp /home/211/rabbitmq_delayed_message_exchange-3.10.0.ez rabbitmq:/plugins

    进入 Docker 容器

    docker exec -it rabbitmq /bin/bash

    在plugins内启用插件

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange

    退出容器

    exit

    重启 RabbitMQ

    docker restart rabbitmq

    安装成功

     通过UI查看

    🐬SpringBoot使用延迟队列

    消费者

    自定义交换机CustomExchange

    1. @Component
    2. public class DelayConsumer {
    3. private static final String ENAME = "211-DelayExchage-01";
    4. private static final String QNAME1 = "211-DelayQueue-01";
    5. //自定义交换机
    6. @Bean
    7. public CustomExchange customExchange() {
    8. HashMap args = new HashMap<>();
    9. args.put("x-delayed-type","direct");
    10. //延迟交换机
    11. return new CustomExchange(ENAME, "x-delayed-message", true, false, args);
    12. }
    13. //定义一个队列
    14. @Bean
    15. public Queue queue() {
    16. return QueueBuilder.durable(QNAME1).build();
    17. }
    18. //创建队列和交换机的绑定关系
    19. @Bean
    20. public Binding binding1() {
    21. return BindingBuilder.bind(queue()).to(customExchange()).with("diancan").noargs();
    22. }
    23. //消费者
    24. @RabbitHandler
    25. @RabbitListener(queues = QNAME1)
    26. public void process1(UserRegisterOk userRegisterOk) {
    27. System.out.println(DateUtil.format(DateUtil.date(),"HH:mm:ss") +"消费者收到:" + userRegisterOk.getName() + "," + userRegisterOk.getHeight());
    28. }
    29. }

    messages delayed:0

    默认延迟时间0s

    生产者

    设置延迟时间

    1.         message -> {
    2.             //设置消息延迟时间5秒,5秒之后投递给队列 针对的是交换机
    3.             message.getMessageProperties().setDelay(5*1000);
    4.             return message;
    5.         }
    1. @Component
    2. public class DelayProducer {
    3. @Autowired
    4. private RabbitTemplate rabbitTemplate;
    5. public void sendMessage(){
    6. //延迟5秒
    7. UserRegisterOk userRegisterOk1 = UserRegisterOk.builder().name("张一").phone("123456").height("1.8.5").build();
    8. //要将对象序列化,转成字符串,使用消息转换器MessageConverter
    9. rabbitTemplate.convertAndSend("211-DelayExchage-01","diancan",userRegisterOk1,message -> {
    10. message.getMessageProperties().setDelay(5*1000);
    11. return message;
    12. });
    13. System.out.println(DateUtil.format(DateUtil.date(),"HH:mm:ss") +"生产者1生产-->张一发送成功");
    14. //延迟8秒
    15. UserRegisterOk userRegisterOk2 = UserRegisterOk.builder().name("张二").phone("123456").height("1.8.5").build();
    16. //要将对象序列化,转成字符串,使用消息转换器MessageConverter
    17. rabbitTemplate.convertAndSend("211-DelayExchage-01","diancan",userRegisterOk2,message -> {
    18. message.getMessageProperties().setDelay(8*1000);
    19. return message;
    20. });
    21. System.out.println(DateUtil.format(DateUtil.date(),"HH:mm:ss") +"生产者2生产-->张二发送成功");
    22. }
    23. }

    整个的流程就是生产者生产消息后,在交换机中停留指定的延迟时间,后发送到队列,消费者获取队列中的消息 

    补充延迟队列不常用的两种方式

    创建具有超时功能且绑定死信交换机的消息队列

    创建通用延时消息

  • 相关阅读:
    【仿牛客论坛java项目】第五章 Kafka,构建TB级异步消息系统:阻塞队列、Kafka入门、Spring整合Kafka、发送系统通知、显示系统通知
    idea模板设置
    Aocoda-RC F405V2 FC(STM32F405RGT6 v.s. AT32F435RGT7) IO Definitions
    解决svn update 产生Node remains in conflict的报错问题
    基于Java的医院管理系统设计与实现(源码+lw+部署文档+讲解等)
    Docker常用命令
    创建一个前后端分离项目:Vue+SpringBoot
    c#string常用方法总结
    linux centos7 rpm 安装 mysql5.7
    自媒体视频剪辑中的视频素材是从哪里找的?
  • 原文地址:https://blog.csdn.net/HeyVIrBbox/article/details/127874243