目录
- class LatencyMonitor : public RecurrentRunner {
- public:
- LatencyMonitor();
- void RunOnce(const double current_time) override;
- bool GetFrequency(const std::string& channel_name, double* freq);
-
- private:
- void UpdateStat(
- const std::shared_ptr
& records); - void PublishLatencyReport();
- void AggregateLatency();
-
- apollo::common::LatencyReport latency_report_;
- std::unordered_map<uint64_t,
- std::set
uint64_t, uint64_t, std::string>>> - track_map_;
- std::unordered_map
double> freq_map_; - double flush_time_ = 0.0;
- };
-
- void LatencyMonitor::RunOnce(const double current_time) {
- static auto reader =
- MonitorManager::Instance()->CreateReader
( - FLAGS_latency_recording_topic);
- reader->SetHistoryDepth(FLAGS_latency_reader_capacity);
- reader->Observe();
-
- static std::string last_processed_key;
- std::string first_key_of_current_round;
- for (auto it = reader->Begin(); it != reader->End(); ++it) {
- const std::string current_key =
- absl::StrCat((*it)->module_name(), (*it)->header().sequence_num());
- if (it == reader->Begin()) {
- first_key_of_current_round = current_key;
- }
- if (current_key == last_processed_key) {
- break;
- }
- UpdateStat(*it);
- }
- last_processed_key = first_key_of_current_round;
-
- if (current_time - flush_time_ > FLAGS_latency_report_interval) {
- flush_time_ = current_time;
- if (!track_map_.empty()) {
- PublishLatencyReport();
- }
- }
- }
分析之前先回忆一下,之前模块channel 的之间的时间延迟就是通过LatencyMonitor 实现的,所以LatencyMonitor的工作就是收集各种时间延迟,并且汇总形成一个报告。
订阅latency_recording_topic,消息体是LatencyRecordMap
- message LatencyRecord {
- optional uint64 begin_time = 1;
- optional uint64 end_time = 2;
- optional uint64 message_id = 3;
- };
-
- message LatencyRecordMap {
- optional apollo.common.Header header = 1;
- optional string module_name = 2;
- repeated LatencyRecord latency_records = 3;
- };
遍历订阅到的所有的信息,然后使用UpdateStat 进行状态更新
- void LatencyMonitor::UpdateStat(
- const std::shared_ptr
& records) { - const auto module_name = records->module_name();
- for (const auto& record : records->latency_records()) {
- track_map_[record.message_id()].emplace(record.begin_time(),
- record.end_time(), module_name);
- }
-
- if (!records->latency_records().empty()) {
- const auto begin_time = records->latency_records().begin()->begin_time();
- const auto end_time = records->latency_records().rbegin()->end_time();
- if (end_time > begin_time) {
- freq_map_[module_name] =
- records->latency_records().size() /
- apollo::cyber::Time(end_time - begin_time).ToSecond();
- }
- }
- }
- void LatencyMonitor::PublishLatencyReport() {
- static auto writer = MonitorManager::Instance()->CreateWriter
( - FLAGS_latency_reporting_topic);
- apollo::common::util::FillHeader("LatencyReport", &latency_report_);
- AggregateLatency();
- writer->Write(latency_report_);
- latency_report_.clear_header();
- track_map_.clear();
- latency_report_.clear_modules_latency();
- latency_report_.clear_e2es_latency();
- }
- void LatencyMonitor::AggregateLatency() {
- static const std::string kE2EStartPoint = FLAGS_pointcloud_topic;
- std::unordered_map
uint64_t>> modules_track; - std::unordered_map
uint64_t>> e2es_track; - std::unordered_set
all_modules; -
- // Aggregate modules latencies
- std::string module_name;
- uint64_t begin_time = 0, end_time = 0;
- for (const auto& message : track_map_) {
- auto iter = message.second.begin();
- while (iter != message.second.end()) {
- std::tie(begin_time, end_time, module_name) = *iter;
- modules_track[module_name].push_back(end_time - begin_time);
- all_modules.emplace(module_name);
- ++iter;
- }
- }
- // Aggregate E2E latencies
- std::unordered_map
uint64_t> e2e_latencies; - for (const auto& message : track_map_) {
- uint64_t e2e_begin_time = 0;
- auto iter = message.second.begin();
- e2e_latencies.clear();
- while (iter != message.second.end()) {
- std::tie(begin_time, std::ignore, module_name) = *iter;
- if (e2e_begin_time == 0 && module_name == kE2EStartPoint) {
- e2e_begin_time = begin_time;
- } else if (module_name != kE2EStartPoint && e2e_begin_time != 0 &&
- e2e_latencies.find(module_name) == e2e_latencies.end()) {
- const auto duration = begin_time - e2e_begin_time;
- e2e_latencies[module_name] = duration;
- e2es_track[module_name].push_back(duration);
- }
- ++iter;
- }
- }
-
- // The results could be in the following fromat:
- // e2e latency:
- // pointcloud -> perception: min(500), max(600), average(550),
- // sample_size(1500) pointcloud -> planning: min(800), max(1000),
- // average(900), sample_size(1500) pointcloud -> control: min(1200),
- // max(1300), average(1250), sample_size(1500)
- // ...
- // modules latency:
- // perception: min(5), max(50), average(30), sample_size(1000)
- // prediction: min(500), max(5000), average(2000), sample_size(800)
- // control: min(500), max(800), average(600), sample_size(800)
- // ...
-
- auto* modules_latency = latency_report_.mutable_modules_latency();
- for (const auto& module : modules_track) {
- SetLatency(module.first, module.second, modules_latency);
- }
- auto* e2es_latency = latency_report_.mutable_e2es_latency();
- for (const auto& e2e : e2es_track) {
- SetLatency(absl::StrCat(kE2EStartPoint, " -> ", e2e.first), e2e.second,
- e2es_latency);
- }
- }
可以看出主要是两个部分的时间延迟:
这里的E2E就是端到端的时间延迟,在apollo 中指的是电晕信息到各个模块输出的时间。
E2E Latency 的逻辑:
Latency 的运行基于依靠于 CyberRT 的通信,所以,也必须保证 CyberRT 的 Channel 通信机制足够可靠,不然会产生误差。