• Apollo 应用与源码分析:Monitor监控-软件监控-时间延迟监控


     

    9bb78baf2ed9aa03ea73a8439259f5d2.png

     

    目录

     

    代码

    分析

    RunOnce 函数分析

    UpdateState函数分析

    发送时间延迟报告函数分析

    备注


     

    代码

    1. class LatencyMonitor : public RecurrentRunner {
    2. public:
    3. LatencyMonitor();
    4. void RunOnce(const double current_time) override;
    5. bool GetFrequency(const std::string& channel_name, double* freq);
    6. private:
    7. void UpdateStat(
    8. const std::shared_ptr& records);
    9. void PublishLatencyReport();
    10. void AggregateLatency();
    11. apollo::common::LatencyReport latency_report_;
    12. std::unordered_map<uint64_t,
    13. std::setuint64_t, uint64_t, std::string>>>
    14. track_map_;
    15. std::unordered_mapdouble> freq_map_;
    16. double flush_time_ = 0.0;
    17. };
    18. void LatencyMonitor::RunOnce(const double current_time) {
    19. static auto reader =
    20. MonitorManager::Instance()->CreateReader(
    21. FLAGS_latency_recording_topic);
    22. reader->SetHistoryDepth(FLAGS_latency_reader_capacity);
    23. reader->Observe();
    24. static std::string last_processed_key;
    25. std::string first_key_of_current_round;
    26. for (auto it = reader->Begin(); it != reader->End(); ++it) {
    27. const std::string current_key =
    28. absl::StrCat((*it)->module_name(), (*it)->header().sequence_num());
    29. if (it == reader->Begin()) {
    30. first_key_of_current_round = current_key;
    31. }
    32. if (current_key == last_processed_key) {
    33. break;
    34. }
    35. UpdateStat(*it);
    36. }
    37. last_processed_key = first_key_of_current_round;
    38. if (current_time - flush_time_ > FLAGS_latency_report_interval) {
    39. flush_time_ = current_time;
    40. if (!track_map_.empty()) {
    41. PublishLatencyReport();
    42. }
    43. }
    44. }

    分析

    分析之前先回忆一下,之前模块channel 的之间的时间延迟就是通过LatencyMonitor 实现的,所以LatencyMonitor的工作就是收集各种时间延迟,并且汇总形成一个报告。

    RunOnce 函数分析

    订阅latency_recording_topic,消息体是LatencyRecordMap

    1. message LatencyRecord {
    2. optional uint64 begin_time = 1;
    3. optional uint64 end_time = 2;
    4. optional uint64 message_id = 3;
    5. };
    6. message LatencyRecordMap {
    7. optional apollo.common.Header header = 1;
    8. optional string module_name = 2;
    9. repeated LatencyRecord latency_records = 3;
    10. };

    遍历订阅到的所有的信息,然后使用UpdateStat 进行状态更新

    UpdateState函数分析

    1. void LatencyMonitor::UpdateStat(
    2. const std::shared_ptr& records) {
    3. const auto module_name = records->module_name();
    4. for (const auto& record : records->latency_records()) {
    5. track_map_[record.message_id()].emplace(record.begin_time(),
    6. record.end_time(), module_name);
    7. }
    8. if (!records->latency_records().empty()) {
    9. const auto begin_time = records->latency_records().begin()->begin_time();
    10. const auto end_time = records->latency_records().rbegin()->end_time();
    11. if (end_time > begin_time) {
    12. freq_map_[module_name] =
    13. records->latency_records().size() /
    14. apollo::cyber::Time(end_time - begin_time).ToSecond();
    15. }
    16. }
    17. }
    1. 保存每个 msg 的耗时信息到 track_map_
    2. 更新 freq_map 中模块的频率信息

    发送时间延迟报告函数分析

    1. void LatencyMonitor::PublishLatencyReport() {
    2. static auto writer = MonitorManager::Instance()->CreateWriter(
    3. FLAGS_latency_reporting_topic);
    4. apollo::common::util::FillHeader("LatencyReport", &latency_report_);
    5. AggregateLatency();
    6. writer->Write(latency_report_);
    7. latency_report_.clear_header();
    8. track_map_.clear();
    9. latency_report_.clear_modules_latency();
    10. latency_report_.clear_e2es_latency();
    11. }
    12. void LatencyMonitor::AggregateLatency() {
    13. static const std::string kE2EStartPoint = FLAGS_pointcloud_topic;
    14. std::unordered_mapuint64_t>> modules_track;
    15. std::unordered_mapuint64_t>> e2es_track;
    16. std::unordered_set all_modules;
    17. // Aggregate modules latencies
    18. std::string module_name;
    19. uint64_t begin_time = 0, end_time = 0;
    20. for (const auto& message : track_map_) {
    21. auto iter = message.second.begin();
    22. while (iter != message.second.end()) {
    23. std::tie(begin_time, end_time, module_name) = *iter;
    24. modules_track[module_name].push_back(end_time - begin_time);
    25. all_modules.emplace(module_name);
    26. ++iter;
    27. }
    28. }
    29. // Aggregate E2E latencies
    30. std::unordered_mapuint64_t> e2e_latencies;
    31. for (const auto& message : track_map_) {
    32. uint64_t e2e_begin_time = 0;
    33. auto iter = message.second.begin();
    34. e2e_latencies.clear();
    35. while (iter != message.second.end()) {
    36. std::tie(begin_time, std::ignore, module_name) = *iter;
    37. if (e2e_begin_time == 0 && module_name == kE2EStartPoint) {
    38. e2e_begin_time = begin_time;
    39. } else if (module_name != kE2EStartPoint && e2e_begin_time != 0 &&
    40. e2e_latencies.find(module_name) == e2e_latencies.end()) {
    41. const auto duration = begin_time - e2e_begin_time;
    42. e2e_latencies[module_name] = duration;
    43. e2es_track[module_name].push_back(duration);
    44. }
    45. ++iter;
    46. }
    47. }
    48. // The results could be in the following fromat:
    49. // e2e latency:
    50. // pointcloud -> perception: min(500), max(600), average(550),
    51. // sample_size(1500) pointcloud -> planning: min(800), max(1000),
    52. // average(900), sample_size(1500) pointcloud -> control: min(1200),
    53. // max(1300), average(1250), sample_size(1500)
    54. // ...
    55. // modules latency:
    56. // perception: min(5), max(50), average(30), sample_size(1000)
    57. // prediction: min(500), max(5000), average(2000), sample_size(800)
    58. // control: min(500), max(800), average(600), sample_size(800)
    59. // ...
    60. auto* modules_latency = latency_report_.mutable_modules_latency();
    61. for (const auto& module : modules_track) {
    62. SetLatency(module.first, module.second, modules_latency);
    63. }
    64. auto* e2es_latency = latency_report_.mutable_e2es_latency();
    65. for (const auto& e2e : e2es_track) {
    66. SetLatency(absl::StrCat(kE2EStartPoint, " -> ", e2e.first), e2e.second,
    67. e2es_latency);
    68. }
    69. }

    可以看出主要是两个部分的时间延迟:

    1. 所有模块的时间延迟
    2. E2E 的时间延迟

    这里的E2E就是端到端的时间延迟,在apollo 中指的是电晕信息到各个模块输出的时间。

    E2E Latency 的逻辑:

    1. 记录第一条点云数据的开始时间
    2. 依次记录那些不是点云数据的记录的开始时间,计算它们之间的差值,就成了这一个测试周期的 E2E 时延。

    备注

    Latency 的运行基于依靠于 CyberRT 的通信,所以,也必须保证 CyberRT 的 Channel 通信机制足够可靠,不然会产生误差。

     

     

  • 相关阅读:
    如果焊接也需要打怪升级,你在哪一级?
    Windows『技巧』在不同前端项目中各种启动不同的Node环境、热启动 - nodemon代替node自动重启项目
    聚合诱导胶体SiO2@SiO2核壳微球/AIE邻碳硼烷-N-苯基咔唑区位异构体化合物制备
    达梦数据库配置SSL认证加密
    shiro反序列化漏洞(CVE-2016-4437)
    java汇美食电子商城计算机毕业设计MyBatis+系统+LW文档+源码+调试部署
    第一个vue-cli程序
    LeetCode每日一题(689. Maximum Sum of 3 Non-Overlapping Subarrays)
    Redis 最佳解析
    uniapp之uni-forms表单组件封装的双向数据绑定
  • 原文地址:https://blog.csdn.net/qq_32378713/article/details/128095329