• clickhouse 中 ReplicasMaxAbsoluteDelay 的计算


    小白上路,如有错误,还请指正,谢谢。

    一、 问题背景

    最近偶尔会收到延迟超时的告警,随后恢复

    时间:2022.09.30-10:04:48

    replication lag across all tables (ads_ch03:ch_params[ReplicasMaxAbsoluteDelay]): 52y 9m 15d

    历史告警日期如下

          可以看到ReplicasMaxAbsoluteDelay的时间,随着告警发送的日期在增长,但是始终是52年左右。微信群里的朋友发现52年前刚好是1970年,即unix的起始时间附近。

           因此大致可以猜测这个值是currenttime减去unix起始时间的秒数,表示一个无穷大而不是真正的延迟。

    二、 源码分析

           其实本来是想在网上直接搜资料的,但实在太少了,只能搜ClickHouse ReplicasMaxAbsoluteDelay source code看看。

    1. AsynchronousMetrics.cpp文件

           可以查到设置ReplicasMaxAbsoluteDelay的入口函数在AsynchronousMetrics.cpp文件,也就是它对应的系统表名字。

    1. // 循环检查各db
    2. for (const auto & db : databases)
    3. {
    4. // Check if database can contain MergeTree tables
    5. // 检查db中是否可以包含MergeTree引擎表,若不能则跳到下个db
    6. if (!db.second->canContainMergeTreeTables())
    7. continue;
    8. // 循环检查db中每个表
    9. for (auto iterator = db.second->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
    10. {
    11. ++total_number_of_tables;
    12. // 如果是ReplicatedMergeTree引擎表
    13. if (StorageReplicatedMergeTree * table_replicated_merge_tree = typeid_cast(table.get()))
    14. {
    15. StorageReplicatedMergeTree::Status status;
    16. table_replicated_merge_tree->getStatus(&status, false);
    17. // 如果不是只读状态
    18. if (!status.is_readonly)
    19. {
    20. try
    21. {
    22. time_t absolute_delay = 0;
    23. time_t relative_delay = 0;
    24. // 这步是主要函数,给两个变量赋值,下面会详细看
    25. table_replicated_merge_tree->getReplicaDelays(&absolute_delay, &relative_delay);
    26. // calculateMax是个获取最大值的函数:比较max_absolute_delay与absolute_delay,较大者存入max_absolute_delay
    27. calculateMax(&max_absolute_delay, absolute_delay);
    28. calculateMax(&max_relative_delay, relative_delay);
    29. }
    30. catch (...)
    31. {
    32. tryLogCurrentException(__PRETTY_FUNCTION__,
    33. "Cannot get replica delay for table: " + backQuoteIfNeed(db.first) + "." + backQuoteIfNeed(iterator->name()));
    34. }
    35. }
    36. }
    37. }
    38. }
    39. // 循环获取到每个库每个表的max_absolute_delay后,赋给ReplicasMaxAbsoluteDelay
    40. new_values["ReplicasMaxAbsoluteDelay"] = max_absolute_delay;

     很简单的calculateMax函数

    1. static void calculateMax(Max & max, T x)
    2. {
    3. if (Max(x) > max)
    4. max = x;
    5. }

    2. getReplicaDelays函数

            前面的getReplicaDelays(&absolute_delay, &relative_delay); 主要是调用该函数,并给absolute_delay和relative_delay变量赋值,这里我们只看absolute_delay。

    1. void StorageReplicatedMergeTree::getReplicaDelays(time_t & out_absolute_delay, time_t & out_relative_delay)
    2. {
    3. assertNotReadonly();
    4. time_t current_time = time(nullptr);
    5. out_absolute_delay = getAbsoluteDelay();
    6. out_relative_delay = 0;
    7. }

             首先执行一个assertNotReadonly,判断表非只读

    1. void StorageReplicatedMergeTree::assertNotReadonly() const
    2. {
    3. if (is_readonly)
    4. throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is in readonly mode (replica path: {})", replica_path);
    5. }

    然后是主要的,执行getAbsoluteDelay函数,获取延迟值。

    3. getAbsoluteDelay函数

    1. time_t StorageReplicatedMergeTree::getAbsoluteDelay() const
    2. {
    3. // 队列中未处理日志的最早插入时间
    4. time_t min_unprocessed_insert_time = 0;
    5. // 队列中已处理日志的最大插入时间
    6. time_t max_processed_insert_time = 0;
    7. // 获取这两个变量值
    8. queue.getInsertTimes(&min_unprocessed_insert_time, &max_processed_insert_time);
    9. // Load start time, then finish time to avoid reporting false delay when start time is updated between loading of two variables.
    10. // 获取队列更新的开始及结束时间
    11. time_t queue_update_start_time = last_queue_update_start_time.load();
    12. time_t queue_update_finish_time = last_queue_update_finish_time.load();
    13. // 当前时间,返回的是从纪元开始(1970-01-01)至今的秒数
    14. time_t current_time = time(nullptr);
    15. if (!queue_update_finish_time)
    16. {
    17. /// We have not updated queue even once yet (perhaps replica is readonly). As we have no info about the current state of replication log, return effectively infinite delay.
    18. /// 看到一篇文章的解释是:如果队列最近一次的更新一直没结束,表示正在向当前队列中加操作日志,则认为延迟时间是无穷大
    19. return current_time;
    20. }
    21. else if (min_unprocessed_insert_time)
    22. {
    23. /// There are some unprocessed insert entries in queue.
    24. /// 如果队列中有尚未处理的日志,且其时间早于当前时间,则输出两者差值作为delay;否则,说明是后生成的,delay记为0.
    25. return (current_time > min_unprocessed_insert_time) ? (current_time - min_unprocessed_insert_time) : 0;
    26. }
    27. /// 如果队列更新开始时间大于结束时间
    28. else if (queue_update_start_time > queue_update_finish_time)
    29. {
    30. /// Queue is empty, but there are some in-flight or failed queue update attempts (likely because of problems with connecting to ZooKeeper).
    31. /// Return the time passed since last attempt.
    32. /// 说明队列为空,但有一些正在运行或失败的队列更新尝试(可能是由于连接到ZooKeeper时出现问题)。
    33. // 返回自上次尝试后经过的时间。
    34. return (current_time > queue_update_start_time) ? (current_time - queue_update_start_time) : 0;
    35. }
    36. else
    37. {
    38. /// Everything is up-to-date. 否则,说明没有延迟
    39. return 0;
    40. }
    41. }

    4. queue_update_finish_time的计算

    上面函数中只简单提到了

    time_t queue_update_finish_time = last_queue_update_finish_time.load();

    其定义在

    1. /** The queue of what needs to be done on this replica to catch up with everyone. It is taken from ZooKeeper (/replicas/me/queue/).
    2. * In ZK entries in chronological order. Here it is not necessary.
    3. */
    4. ReplicatedMergeTreeQueue queue;
    5. std::atomic<time_t> last_queue_update_start_time{0};
    6. std::atomic<time_t> last_queue_update_finish_time{0};

    找到一张图

    在这里插入图片描述

    可以看到,其计算主要在queueUpdatingTask函数

    1. void StorageReplicatedMergeTree::queueUpdatingTask()
    2. {
    3. if (!queue_update_in_progress)
    4. {
    5. last_queue_update_start_time.store(time(nullptr));
    6. queue_update_in_progress = true;
    7. }
    8. try
    9. {
    10. queue.pullLogsToQueue(getZooKeeperAndAssertNotReadonly(), queue_updating_task->getWatchCallback(), ReplicatedMergeTreeQueue::UPDATE);
    11. last_queue_update_finish_time.store(time(nullptr));
    12. queue_update_in_progress = false;
    13. }
    14. catch (const Coordination::Exception & e)
    15. {
    16. ...
    17. }
    18. catch (...)
    19. {
    20. ...
    21. }
    22. }

    根据chatgpt的介绍:

    queue.pullLogsToQueue(getZooKeeperAndAssertNotReadonly(), queue_updating_task->getWatchCallback(), ReplicatedMergeTreeQueue::UPDATE);

    queue.pullLogsToQueue 函数用于从日志表中获取更新任务,并将其添加到队列中。具体来说,该函数会检查当前副本是否处于只读模式,如果是,则直接返回;否则,使用 ZooKeeper 客户端对象从日志表中获取更新任务,并将其添加到队列中。

    在调用 queue.pullLogsToQueue 函数时可能出现以下异常:

    • 如果当前副本处于只读模式,会抛出 Exception 异常。
    • 如果无法获取到 ZooKeeper 客户端对象,会抛出 zkutil::KeeperException 异常。
    • 如果从日志表中获取更新任务失败,会抛出 Exception 异常。

    此外,在使用客户端对象操作ZooKeeper时,还可能遇到网络连接问题或ZooKeeper服务器响应超时等问题,这些情况也有可能导致 zkutil::KeeperException 异常。

    last_queue_update_finish_time.store(time(nullptr));

    last_queue_update_finish_time 表示最近一次更新操作完成的时间。

    time(nullptr) 函数返回当前系统时间(以秒为单位),用于更新 last_queue_update_finish_time 的值。通过调用 store 函数将新的时间戳写入到原子变量中,在这个过程中使用了锁,以保证多线程环境下的原子性和可见性。

    这个时间戳可以用来计算当前更新操作的延迟,具体来说,可以通过与最近一次更新完成时间的比较,得到当前更新操作等待的时间长度,从而判断该更新操作是否需要加速执行。同时,由于该变量会被多个线程访问,因此需要使用原子变量来确保线程安全。

            因此可以猜测,当queue.pullLogsToQueue 函数抛异常时,会跳过last_queue_update_finish_time的计算,导致其值为0。具体可能包括:

    • 当前副本处于只读模式
    • 无法获取到 ZooKeeper 客户端对象
    • 从日志表中获取更新任务失败

    三、 遗留问题

            至此,我们清楚了ReplicasMaxAbsoluteDelay返回52年的原因,以及可能导致queue_update_finish_time0的场景。但结合到实例监控

    • clickhousezookeeper的日志中目前没发现报错
    • Read-Only Replicas监控也一直显示为0

    • 队列中的任务数也很少

           没有发现符合上述问题的具体场景,且从实际来看业务没反馈过有什么影响,或许可以改为连续出现两次或多次才告警?

           希望熟悉clickhouse的朋友指点指点~

    参考

    AsynchronousMetrics.cpp source code [ClickHouse/src/Interpreters/AsynchronousMetrics.cpp] - Woboq Code Browser

    ReplicatedMergeTreeQueue.h source code [ClickHouse/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h] - Woboq Code Browser

    StorageReplicatedMergeTree.cpp source code [ClickHouse/src/Storages/StorageReplicatedMergeTree.cpp] - Woboq Code Browser

    StorageReplicatedMergeTree.h source code [ClickHouse/src/Storages/StorageReplicatedMergeTree.h] - Woboq Code Browser

    https://www-programmersought-com.translate.goog/article/56754023733/?_x_tr_sl=en&_x_tr_tl=zh-CN&_x_tr_hl=zh-CN&_x_tr_pto=sc

    https://www.dounaite.com/article/62c907c8f4ab41be4873aadd.html

    Clickhouse ReplicatedMergeTree 后台任务的工作原理-pudn.com

    【ClickHouse源码】ReplicatedMergeTree之数据同步流程_一只努力的微服务的博客-CSDN博客

    chatgpt的回答

  • 相关阅读:
    有没有适用的kafka镜像
    Spring——AOP(很全很详细,耐心看完有收获)
    系列学习 SpringCloud-Alibaba 框架之第 4 篇 —— Sentinel 高可用流量控制组件
    1011 World Cup Betting
    【JavaScript】WebAPI详解
    uni-app学习(1)
    常用ADB指令
    【学习笔记38】JavaScript中的本地存储
    [NOIP2012 提高组] 开车旅行
    文章解读与仿真程序复现思路——电网技术EI\CSCD\北大核心《基于多任务学习和单任务学习组合模型的综合能源系统多元负荷预测》
  • 原文地址:https://blog.csdn.net/Hehuyi_In/article/details/127127727