目录
代码
分析
RunOnce 函数分析
UpdateState函数分析
发送时间延迟报告函数分析
备注
代码
class LatencyMonitor : public RecurrentRunner {public:LatencyMonitor();void RunOnce(const double current_time) override;bool GetFrequency(const std::string& channel_name, double* freq);private:void UpdateStat(const std::shared_ptr<apollo::common::LatencyRecordMap>& records);void PublishLatencyReport();void AggregateLatency();apollo::common::LatencyReport latency_report_;std::unordered_map<uint64_t,std::set<std::tuple<uint64_t, uint64_t, std::string>>>track_map_;std::unordered_map<std::string, double> freq_map_;double flush_time_ = 0.0;
};void LatencyMonitor::RunOnce(const double current_time) {static auto reader =MonitorManager::Instance()->CreateReader<LatencyRecordMap>(FLAGS_latency_recording_topic);reader->SetHistoryDepth(FLAGS_latency_reader_capacity);reader->Observe();static std::string last_processed_key;std::string first_key_of_current_round;for (auto it = reader->Begin(); it != reader->End(); ++it) {const std::string current_key =absl::StrCat((*it)->module_name(), (*it)->header().sequence_num());if (it == reader->Begin()) {first_key_of_current_round = current_key;}if (current_key == last_processed_key) {break;}UpdateStat(*it);}last_processed_key = first_key_of_current_round;if (current_time - flush_time_ > FLAGS_latency_report_interval) {flush_time_ = current_time;if (!track_map_.empty()) {PublishLatencyReport();}}
}
分析
分析之前先回忆一下,之前模块channel 的之间的时间延迟就是通过LatencyMonitor 实现的,所以LatencyMonitor的工作就是收集各种时间延迟,并且汇总形成一个报告。
RunOnce 函数分析
订阅latency_recording_topic,消息体是LatencyRecordMap
message LatencyRecord {optional uint64 begin_time = 1;optional uint64 end_time = 2;optional uint64 message_id = 3;
};message LatencyRecordMap {optional apollo.common.Header header = 1;optional string module_name = 2;repeated LatencyRecord latency_records = 3;
};
遍历订阅到的所有的信息,然后使用UpdateStat 进行状态更新
UpdateState函数分析
void LatencyMonitor::UpdateStat(const std::shared_ptr<LatencyRecordMap>& records) {const auto module_name = records->module_name();for (const auto& record : records->latency_records()) {track_map_[record.message_id()].emplace(record.begin_time(),record.end_time(), module_name);}if (!records->latency_records().empty()) {const auto begin_time = records->latency_records().begin()->begin_time();const auto end_time = records->latency_records().rbegin()->end_time();if (end_time > begin_time) {freq_map_[module_name] =records->latency_records().size() /apollo::cyber::Time(end_time - begin_time).ToSecond();}}
}
- 保存每个 msg 的耗时信息到 track_map_
- 更新 freq_map 中模块的频率信息
发送时间延迟报告函数分析
void LatencyMonitor::PublishLatencyReport() {static auto writer = MonitorManager::Instance()->CreateWriter<LatencyReport>(FLAGS_latency_reporting_topic);apollo::common::util::FillHeader("LatencyReport", &latency_report_);AggregateLatency();writer->Write(latency_report_);latency_report_.clear_header();track_map_.clear();latency_report_.clear_modules_latency();latency_report_.clear_e2es_latency();
}
void LatencyMonitor::AggregateLatency() {static const std::string kE2EStartPoint = FLAGS_pointcloud_topic;std::unordered_map<std::string, std::vector<uint64_t>> modules_track;std::unordered_map<std::string, std::vector<uint64_t>> e2es_track;std::unordered_set<std::string> all_modules;// Aggregate modules latenciesstd::string module_name;uint64_t begin_time = 0, end_time = 0;for (const auto& message : track_map_) {auto iter = message.second.begin();while (iter != message.second.end()) {std::tie(begin_time, end_time, module_name) = *iter;modules_track[module_name].push_back(end_time - begin_time);all_modules.emplace(module_name);++iter;}}// Aggregate E2E latenciesstd::unordered_map<std::string, uint64_t> e2e_latencies;for (const auto& message : track_map_) {uint64_t e2e_begin_time = 0;auto iter = message.second.begin();e2e_latencies.clear();while (iter != message.second.end()) {std::tie(begin_time, std::ignore, module_name) = *iter;if (e2e_begin_time == 0 && module_name == kE2EStartPoint) {e2e_begin_time = begin_time;} else if (module_name != kE2EStartPoint && e2e_begin_time != 0 &&e2e_latencies.find(module_name) == e2e_latencies.end()) {const auto duration = begin_time - e2e_begin_time;e2e_latencies[module_name] = duration;e2es_track[module_name].push_back(duration);}++iter;}}// The results could be in the following fromat:// e2e latency:// pointcloud -> perception: min(500), max(600), average(550),// sample_size(1500) pointcloud -> planning: min(800), max(1000),// average(900), sample_size(1500) pointcloud -> control: min(1200),// max(1300), average(1250), sample_size(1500)// ...// modules latency:// perception: min(5), max(50), average(30), sample_size(1000)// prediction: min(500), max(5000), average(2000), sample_size(800)// control: min(500), max(800), average(600), sample_size(800)// ...auto* modules_latency = latency_report_.mutable_modules_latency();for (const auto& module : modules_track) {SetLatency(module.first, module.second, modules_latency);}auto* e2es_latency = latency_report_.mutable_e2es_latency();for (const auto& e2e : e2es_track) {SetLatency(absl::StrCat(kE2EStartPoint, " -> ", e2e.first), e2e.second,e2es_latency);}
}
可以看出主要是两个部分的时间延迟:
- 所有模块的时间延迟
- E2E 的时间延迟
这里的E2E就是端到端的时间延迟,在apollo 中指的是电晕信息到各个模块输出的时间。
E2E Latency 的逻辑:
- 记录第一条点云数据的开始时间
- 依次记录那些不是点云数据的记录的开始时间,计算它们之间的差值,就成了这一个测试周期的 E2E 时延。
备注
Latency 的运行基于依靠于 CyberRT 的通信,所以,也必须保证 CyberRT 的 Channel 通信机制足够可靠,不然会产生误差。