• 浅析 Redisson 的分布式延时队列 RedissonDelayedQueue 运行流程


    前言

    因为工作中需要用到分布式的延时队列,调研了一段时间,选择使用 RedissonDelayedQueue,为了搞清楚内部运行流程,特记录下来。

    总体流程大概是图中的这个样子,初看一眼有点不知从何下手,接下来我会通过以下几点来分析流程,相信看完本文你能了解整个运行流程。

    • 基本使用
    • 内部数据结构介绍
    • 基本流程
    • 发送延时消息
    • 获取延时消息
    • 初始化延时队列

    基本使用

    发送延迟消息代码如下,发送了一条延迟时间为 5s 的消息。

    1.    public void produce() {
    2.        String queuename = "delay-queue";
    3.        RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename);
    4.        RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
    5.        delayedQueue.offer("测试延迟消息", 5, TimeUnit.SECONDS);
    6.   }
    7. 复制代码

    接收消息代码如下,可以看到 delayedQueue 是没有用到的,那么为什么要加这一行呢,这个后面总结部分回答。

    1.    public void consume() throws InterruptedException {
    2.        String queuename = "delay-queue";
    3.        RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename);
    4.        RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
    5.        String msg = blockingQueue.take();
    6.        //收到消息进行处理...
    7.   }
    8. 复制代码

    这两段代码可以写在两个不同的 Java 工程里,只要连接的是同一个 Redis 就行。

    调用 comsume() 之后,如果队列里没有消息,会阻塞等待队列里有消息并且取到了才会返回。之所以这么说是因为可能有别的 Java 进程也在跟你一样取同一个队列里的消息,如果消息被另一个抢完了,那这时就还得阻塞等待。

    这时看上去的原理是这样的:

    生产者调用 offer() 后,自己内部开启一个定时器,等到了时间在发送到 redis 的 list 里。

    如果是这样设计的话,相信大家都能看出来一个很简单的问题,要是延时时间还没到,生产者自己挂了,那样消息就丢了。所以,还是让我们接着往下看。

    内部数据结构介绍

    redisson 源码里一共创建了三个队列:【消息延时队列】、【消息顺序队列】、【消息目标队列】。

    假设在同一时间按照 msg1、msg2、msg3 的顺序发消息到延时队列,这三条消息就会被保存在【消息延时队列】和【消息顺序队列】。

    可以看到【消息延时队列】的顺序是按照到期时间升序排列的,而不是像【消息顺序队列】按照插入顺序排。

    消息到期后会将消息从前两个队列移除(怎么移?谁来移?),插入【消息目标队列】,也就是图中第三个队列。

    消费者也是阻塞在【消息目标队列】上取消息。

    这时可以简单说明下每个队列的作用:

    • 【消息延时队列】利用按照到期时间排序的特性,可以很快找到下一个要到期的消息,客户端内部自己定时到【消息目标队列】取
    • 【消息顺序队列】这个队列对分析的流程关联不大,可以忽略
    • 【消息目标队列】存放到期的消息,供消费端取

    其实【消息延时队列】队列里存的时间(也就是 zet 的 score)是到期的时间戳,为了画图方便,图里就画的是延迟的时间,不过不影响理解。

    理解好这几个队列的名字和作用,后面还会一直用到,如果忘了可以翻回来回顾下。

    因为书写理解方便和【消息顺序队列】在本文没涉及到,后面部分好几次提到的内容:把到期的消息从【消息延时队列】移到【消息目标队列】里,这句话实际的代码逻辑是这样:把【消息延时队列】和【消息顺序队列】里的到期消息移除,把它们插入到【消息目标队列】。

    基本流程

    知道了内部所使用到的数据结构后,这里可以简单说下整体的基本流程。

    先说

  • 相关阅读:
    C语言-求一个整数储存在内存中的二进制中1的个数
    进销存管理对于企业的意义
    kubekey 离线部署 kubesphere v3.3.0
    固定时间刷新算法
    如何使用本地PHP搭建本地Imagewheel云图床在公网远程访问
    JavaEE——No.2 套接字编程(TCP)
    Docker从初学到进阶二(使用Docker命令,自定义镜像,部署微服务集群,配置自己的镜像仓库)
    助力工业物联网,工业大数据之服务域:node_exporter插件【三十七】
    wodP2P ActiveX 最新版 Crack
    2022/7/27 考试总结
  • 原文地址:https://blog.csdn.net/m0_73311735/article/details/127070042