本文基于vsomeip 3.1.20.3总结而成
源码地址:https://github.com/GENIVI/vsomeip.git
本文主要涉及vsomeip库中的如下代码:
implementation/runtime/application_impl.cpp
implementation/routing/src/routing_manager_base.cpp
implementation/routing/src/routing_manager_impl.cpp
implementation/routing/src/routing_manager_proxy.cpp
implementation/routing/src/routing_manager_stub.cpp
implementation/endpoints/local_client_endpoint_impl.cpp
implementation/endpoints/local_server_endpoint_impl.cpp
整个流程过于复杂,里面的各类判断盘根错结,这里需要先了解里面几个offerService中常用的数据结构
//key是std::pair类型,value是类型为tuple的双端队列
std::map<std::pair<service_t, instance_t>,
std::deque<std::tuple<uint8_t, client_t,
major_version_t, minor_version_t>>> offer_commands_;
上述map在insert_offer_command函数中添加数据, 在erase_offer_command函数中删除数据
typedef std::map<service_t,
std::map<instance_t,
std::tuple<major_version_t, minor_version_t, client_t>>> local_services_map_t;
local_services_map_t local_services_;
在handle_local_offer_service中添加数据,在on_stop_offer_service中删除数据
// map to store pending offers.
// 1st client id in tuple: client id of new offering application
// 2nd client id in tuple: client id of previously/stored offering application
std::map<service_t,std::map<instance_t,
std::tuple<major_version_t, minor_version_t,
client_t, client_t>>> pending_offers_;
handle_local_offer_service 中添加数据,在on_pong,handle_client_error 中删除数据
offerService会根据路由模式的不同,走的流程也不一样。
void application_impl::offer_service(service_t _service, instance_t _instance,
major_version_t _major, minor_version_t _minor) {
if (routing_)
routing_->offer_service(client_, _service, _instance, _major, _minor);
}
上面的routing_就是init流程中分析过的路由模块指针,路由分为host模式与proxy模式,接下来将两种模式分开分析
host模式中的offerService实现在routing_manager_impl中,下文将该模块缩略写为rtm_impl,看代码:
routing_manager_impl.hpp
public:
bool offer_service(client_t _client,
service_t _service, instance_t _instance,
major_version_t _major, minor_version_t _minor);
private:
bool routing_manager_impl::offer_service(client_t _client,
service_t _service, instance_t _instance,
major_version_t _major, minor_version_t _minor)
routing_manager_impl.cpp
private:
bool routing_manager_impl::offer_service(client_t _client,
service_t _service, instance_t _instance,
major_version_t _major, minor_version_t _minor) {
return offer_service(_client, _service, _instance, _major, _minor, true);
}
这个对外暴露的这个接口实现中啥也没做,就调用了私有的offer_service方法,把参数传进入,同时传入的_must_queue属性为true, 然后主战场转入到私有方法的offer_service中。
bool routing_manager_impl::offer_service(client_t _client,
service_t _service, instance_t _instance,
major_version_t _major, minor_version_t _minor,
bool _must_queue) {
VSOMEIP_INFO << "OFFER("
<< std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
<< std::hex << std::setw(4) << std::setfill('0') << _service << "."
<< std::hex << std::setw(4) << std::setfill('0') << _instance
<< ":" << std::dec << int(_major) << "." << std::dec << _minor << "]"
<< " (" << std::boolalpha << _must_queue << ")";
// only queue commands if method was NOT called via erase_offer_command()
if (_must_queue) {
//添加offer命令到offer_commands_的map表中,key是一个_service,_instance的std::pair结构
//value是一个存储std::tuple类型的反向队列
if (!insert_offer_command(_service, _instance, VSOMEIP_OFFER_SERVICE,
_client, _major, _minor)) {
return false;
}
}
// Check if the application hosted by routing manager is allowed to offer
// offer_service requests of local proxies are checked in rms::on:message
if (_client == get_client()) {
#ifdef _WIN32
std::uint32_t its_routing_uid = ANY_UID;
std::uint32_t its_routing_gid = ANY_GID;
#else
std::uint32_t its_routing_uid = getuid();
std::uint32_t its_routing_gid = getgid();
#endif
//使用安全模块的配置校验是否允许当前的host路由提供offer动作,不允许的情况下,
//则将offer_service命令从offer_commands_中服务与实例ID对应的反向队列中移除
if (!security::get()->is_offer_allowed(its_routing_uid, its_routing_gid,
_client, _service, _instance)) {
VSOMEIP_WARNING << "routing_manager_impl::offer_service: "
<< std::hex << "Security: Client 0x" << _client
<< " isn't allowed to offer the following service/instance "
<< _service << "/" << _instance
<< " ~> Skip offer!";
erase_offer_command(_service, _instance);
return false;
}
}
/*check对应client端提供的服务与实例信息是否已经记录在local_services_中,如果服务实例已经存在,
且记录的信息一致,则判断当前client是重复提交offer_service命令,rtm_impl将本次offer命令从记
录表移除,并返回;如果本地缓存服务信息中存在服务实例信息,但是提供该实例的client端不一致,
则继续从预提交命令的数据结构pending_offers_中拿到服务实例,预提交中的实例信息存在,且client
与当前offer_service中的一致,则返回true, 否则返回false表明当前服务已经被其他client端提供了。
如果pending_offers_中找不到可用的服务实例信息,则通过clientid来找到客户端的endpoint,并发送
一个ping命令,去检测客户端是否还存活,发送ping命令的同时,将实例信息保存到pending_offers中
收到客户端回复的pong命令时,从pending_offers中移除。汇总流程如下:
一:判断提供的服务实例在路由中的状态
1.local_services中有缓存的相同内容的服务实例,且client_id一致,则返回false
2.local_services中有缓存的相同内容的服务实例,且client_id不一致:
- pending_offers中存在缓存的clientid与提供的clientid一致,命令处理正在ping流程中,返回false
- pending_offers中存在缓存的clientid与提供的clientid不一致,打印日志,返回false
- pending_offers中不存在缓存的服务实例
-- 从endpoint_mgr中查找对应clientid的endpoint,发送ping,并将服务实例的命令加入到pending_offers中后返回false
-- endpoint_mgr中没有clientid对应的endpoint,如果当前发送命令的client是host路由,返回false
3.local_services中有缓存的相同内容的服务实例,但是服务版本号或者clientid不一致,返回false
4.其他情况继续往下走,走到二阶段。
二:创建或者修改服务实例信息
三:服务实例信息创建或者更新成功之后,返回true
*/
if (!handle_local_offer_service(_client, _service, _instance, _major, _minor)) {
//从offer_command_中移除相关命令
erase_offer_command(_service, _instance);
return false;
}
{
std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_);
if (if_state_running_) {
//通信网卡已就绪的情况下,初始化服务实例数据结构
init_service_info(_service, _instance, true);
} else {
//未就绪的情况下,将服务实例加到预提交的缓存中
pending_sd_offers_.push_back(std::make_pair(_service, _instance));
}
}
if (discovery_) {
//SD模块已启用,将服务信息提供给到SD
std::shared_ptr<serviceinfo> its_info = find_service(_service, _instance);
if (its_info) {
discovery_->offer_service(its_info);
}
}
{
//处理服务中的事件订阅逻辑
std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_);
std::set<event_t> its_already_subscribed_events;
for (auto &ps : pending_subscriptions_) {
if (ps.service_ == _service
&& ps.instance_ == _instance
&& ps.major_ == _major) {
insert_subscription(ps.service_, ps.instance_,
ps.eventgroup_, ps.event_, client_, &its_already_subscribed_events);
#if 0
VSOMEIP_ERROR << __func__
<< ": event="
<< std::hex << ps.service_ << "."
<< std::hex << ps.instance_ << "."
<< std::hex << ps.event_;
#endif
}
}
send_pending_subscriptions(_service, _instance, _major);
}
stub_->on_offer_service(_client, _service, _instance, _major, _minor);
//给注册了服务可用状态监听的app模块回调服务状态为可用
on_availability(_service, _instance, true, _major, _minor);
//offerService流程已完成,从offer_commands中删除相关命令
erase_offer_command(_service, _instance);
return true;
}
整个rtm_impl中的大流程如上所述,中间包含了许多的子流程,下面将一一展开来跟。按照顺序来,首先第一个函数为insert_offer_command,代码如下:
bool routing_manager_impl::insert_offer_command(service_t _service, instance_t _instance, uint8_t _command,
client_t _client, major_version_t _major, minor_version_t _minor) {
std::lock_guard<std::mutex> its_lock(offer_serialization_mutex_);
// flag to indicate whether caller of this function can start directly processing the command
//查找当前路由模块中是否存在指定服务实例处理的命令,两种类型的命令(VSOMEIP_OFFER_SERVICE, VSOMEIP_STOP_OFFER_SERVICE)
bool must_process(false);
auto found_service_instance = offer_commands_.find(std::make_pair(_service, _instance));
if (found_service_instance != offer_commands_.end()) {
//如果offer_commands中存在服务实例,但是处理命令为空,则更新该命令,然后直接处理该命令
//如果已经有处理指令了,则仅更新指令信息,并返回false
// if nothing is queued
if (found_service_instance->second.empty()) {
must_process = true;
}
found_service_instance->second.push_back(
std::make_tuple(_command, _client, _major, _minor));
} else {
//如果不存在服务实例,则将命令加入队列,并直接处理该命令
// nothing is queued -> add command to queue and process command directly
offer_commands_[std::make_pair(_service, _instance)].push_back(
std::make_tuple(_command, _client, _major, _minor));
must_process = true;
}
return must_process;
}
根据offer_service传入的参数创建了VSOMEIP_OFFER_SERVICE指令信息并加入offer_commands_表后,接下来开始走到安全校验模块,这块暂时不跟,默认是通过的,那么接下来就是到了handle_local_offer_service这个函数的流程中, 这里为了节省空间,我把log打印的部分代码去掉了,主要流程如下:
bool routing_manager_impl::handle_local_offer_service(client_t _client, service_t _service,
instance_t _instance, major_version_t _major,minor_version_t _minor) {
{
std::lock_guard<std::mutex> its_lock(local_services_mutex_);
//根据服务ID从本地服务缓存中查找是否存在实例信息
auto found_service = local_services_.find(_service);
if (found_service != local_services_.end()) {
//获取服务实例信息
auto found_instance = found_service->second.find(_instance);
if (found_instance != found_service->second.end()) {
//获取到了缓存的服务实例信息(大小版本号,以及提供服务的客户端id)
const major_version_t its_stored_major(std::get<0>(found_instance->second));
const minor_version_t its_stored_minor(std::get<1>(found_instance->second));
const client_t its_stored_client(std::get<2>(found_instance->second));
if (its_stored_major == _major
&& its_stored_minor == _minor
&& its_stored_client == _client) {
//客户端id一致,服务实例信息一致,则该客户端重复提供服务,返回false
return false;
} else if ( its_stored_major == _major
&& its_stored_minor == _minor
&& its_stored_client != _client) {
//服务实例信息一致,但是提供服务的客户端不一样,则继续比对
// check if previous offering application is still alive
bool already_pinged(false);
{
std::lock_guard<std::mutex> its_lock(pending_offers_mutex_);
//从pending_offers中查看是否存在服务
auto found_service2 = pending_offers_.find(_service);
if (found_service2 != pending_offers_.end()) {
//pending_offers中存在服务,取出实例信息
auto found_instance2 = found_service2->second.find(_instance);
if (found_instance2 != found_service2->second.end()) {
//存在对应的服务实例信息,且之前在pending_offer中提供信息的客户端 与本次的
//客户端一致,则表示表示上一次该客户端提供过该服务信息,
//但是由于当时处于发送ping命令中的流程还没走完,又收到该
//命令,则默认对应客户端是运行的。
if(std::get<2>(found_instance2->second) == _client) {
already_pinged = true;
} else {
//此时有其他的vsomeip app模块正在提供该服务信息,此时路由模块
//拒绝处理此命令,直接返回false
return false;
}
}
}
}
//没有在pending_offer中找到预提交的服务信息或者实例信息
if (!already_pinged) {
// find out endpoint of previously offering application
//获取local_services_中获取的缓存服务对应的通信端点
std::shared_ptr<local_client_endpoint_base_impl>
its_old_endpoint
= std::dynamic_pointer_cast<local_client_endpoint_base_impl>(
find_local(its_stored_client));
if (its_old_endpoint) {
//获取到端点,发送一个ping命令看看客户端是否还存活,然后将服务实例信息
//加入到pending_offers结构中,并返回false, 这样offer_service就不会继续执行了,
//后续会如果客户端还存活,那host路由会收到on_pong命令,再处理这个服务实例
std::lock_guard<std::mutex> its_lock(pending_offers_mutex_);
if(stub_->send_ping(its_stored_client)) {
pending_offers_[_service][_instance] =
std::make_tuple(_major, _minor, _client,
its_stored_client);
return false;
}
} else if (its_stored_client == host_->get_client()) {
//缓存的offer服务实例信息是host自己发送的,这里就不再处理了
...
return false;
}
} else {
//命令处理正在ping流程中,返回false
return false;
}
} else {
...
return false;
}
}
}
// check if the same service instance is already offered remotely
//创建一个服务实例对象
if (routing_manager_base::offer_service(_client, _service, _instance,
_major, _minor)) {
//将本地服务加入缓存
local_services_[_service][_instance] = std::make_tuple(_major,
_minor, _client);
} else {
//已有对应client提供的缓存服务,重复提交返回false
return false;
}
}
return true;
}
上面的函数又有一个子流程了,这个子流程的函数定义在父类routing_manager_base中,它的实现如下:
bool routing_manager_base::offer_service(client_t _client,
service_t _service, instance_t _instance,
major_version_t _major, minor_version_t _minor) {
(void)_client;
// Remote route (incoming only)
//从服务记录表services_中获取到服务对象serviceinfo
auto its_info = find_service(_service, _instance);
//当然第一次去拿的时候,这里为空,所以走else流程
if (its_info) {
//判断该服务对象是不是本地服务,如果不是本地服务则返回false
if (!its_info->is_local()) {
return false;
} else if (its_info->get_major() == _major
&& its_info->get_minor() == _minor) {
//设置本地服务的生命周期为DEFAULT_TTL
its_info->set_ttl(DEFAULT_TTL);
} else {
//本地服务的版本号与当前offer中的大小版本号不一致,返回false
VSOMEIP_ERROR << "rm_base::offer_service service property mismatch ("
<< std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
<< std::hex << std::setw(4) << std::setfill('0') << _service << "."
<< std::hex << std::setw(4) << std::setfill('0') << _instance << ":"
<< std::dec << static_cast<std::uint32_t>(its_info->get_major()) << ":"
<< std::dec << its_info->get_minor() << "] passed: "
<< std::dec << static_cast<std::uint32_t>(_major) << ":"
<< std::dec << _minor;
return false;
}
} else {
//创建一个std::shared_ptr 共享指针, 并将创建的服务信息记录到services_中
its_info = create_service_info(_service, _instance, _major, _minor,
DEFAULT_TTL, true);
}
//更新事件表中,该服务所对应的所有事件的主版本号
{
std::lock_guard<std::mutex> its_lock(events_mutex_);
// Set major version for all registered events of this service and instance
const auto found_service = events_.find(_service);
if (found_service != events_.end()) {
const auto found_instance = found_service->second.find(_instance);
if (found_instance != found_service->second.end()) {
for (const auto &j : found_instance->second) {
j.second->set_version(_major);
}
}
}
}
return true;
}
总结一下handle_local_offer_service这个函数的作用就是各种状态对比,目的是要求各个client千万不要重复offer service。在没有重复提交的情况下,其会创建一个本地服务对象加入到local_services_ 中保存起来,同时创建的serviceinfo的智能指针也会保存在**service_**服务表中。
好了继续往下走,下面这块代码的作用是,如果当前host路由已经处于运行状态,则根据服务实例进行初始化动作,init_service_info的主要作用是按照服务id与实例id,到配置信息模块中找到配置的对应端口以及传输协议来配置通络通信的端点(endpoint), 如果当前host模块没有运行,那么就将服务先添加到预提交的pending_sd_offers集合中, 这个pending_sd_offer中的内容,最终在网卡起来后,里面的内容都会去运行一次init_service_info创造服务对应的通信端点
{
std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_);
if (if_state_running_) {
init_service_info(_service, _instance, true);
} else {
pending_sd_offers_.push_back(std::make_pair(_service, _instance));
}
}
继续往下走,此时路由会调用sd模块的offerService方法,将服务实例信息提供过去,SD模块中首先回去查找一下该服务实例是否已经存在了,如果存在则不处理,如果没有则将其加入已收集的服务信息中,后续用于SD模块的在重复阶段,主阶段的信息轮播。这块后面会单独分析SD模块来跟。
void
service_discovery_impl::offer_service(const std::shared_ptr<serviceinfo> &_info) {
service_t its_service = _info->get_service();
service_t its_instance = _info->get_instance();
std::lock_guard<std::mutex> its_lock(collected_offers_mutex_);
// check if offer is in map
bool found(false);
const auto its_service_it = collected_offers_.find(its_service);
if (its_service_it != collected_offers_.end()) {
const auto its_instance_it = its_service_it->second.find(its_instance);
if (its_instance_it != its_service_it->second.end()) {
found = true;
}
}
if (!found) {
//这个就是SD模块中搜集到的要广播出去的服务信息,在主阶段或者重复阶段,SD模块会间隔一段事件往
//组播地址中广播所提供的服务信息
collected_offers_[its_service][its_instance] = _info;
}
}
服务加入到SD后,最后还剩三个函数,分别如下
//stub_指针指向的是routing_manager_stub对象,该对象只存在于rtm_impl中,用于host通知代理模块
//该服务实例状态
stub_->on_offer_service(_client, _service, _instance, _major, _minor);
//通知路由模块该服务状态
on_availability(_service, _instance, true, _major, _minor);
//offer命令处理完了,将该offer命令从offer_commands_中移除。
erase_offer_command(_service, _instance);
routing_manager_stub中的on_offer_service实现没有太多的逻辑,主要是经过安全模块的校验后,调用了inform_requesters方法
void routing_manager_stub::on_offer_service(client_t _client,
service_t _service, instance_t _instance, major_version_t _major, minor_version_t _minor) {
...
inform_requesters(_client, _service, _instance, _major, _minor,
routing_info_entry_e::RIE_ADD_SERVICE_INSTANCE, true);
}
void routing_manager_stub::inform_requesters(client_t _hoster, service_t _service,
instance_t _instance, major_version_t _major, minor_version_t _minor,
routing_info_entry_e _entry, bool _inform_service) {
//这里的service_requests_保存的是其他proxy端通过request_service请求的服务信息
for (auto its_client : service_requests_) {
auto its_service = its_client.second.find(_service);
if (its_service != its_client.second.end()) {
bool send(false);
for (auto its_instance : its_service->second) {
if (its_instance.first == ANY_INSTANCE ||
its_instance.first == _instance) {
send = true;
}
}
//其他APP有请求该服务,则发送client端的路由信息给到client端
//create_client_routing_info跟insert_client_routing_info是创建VSOMEIP_ROUTING_INFO命令
//send_client_routing_info是发送该命令给到其他app模块,发送之后该命令之后,请求该服务
//的本地客户端的路由代理rtm_proxy就能收到该命令了,然后如果该客户端的app注册了availability的回调,则能够实时收到服务实例可用的状态了。
if (send) {
if (_inform_service) {
if (_hoster != VSOMEIP_ROUTING_CLIENT &&
_hoster != host_->get_client()) {
if (!is_already_connected(_hoster, its_client.first)) {
//这两个函数是创建VSOMEIP_ROUTING_INFO命令
create_client_routing_info(_hoster);
insert_client_routing_info(_hoster,
routing_info_entry_e::RIE_ADD_CLIENT,
its_client.first);
//发送VSOMEIP_ROUTING_INFO命令,
send_client_routing_info(_hoster);
}
}
}
if (its_client.first != VSOMEIP_ROUTING_CLIENT &&
its_client.first != get_client()) {
//这两个函数是创建VSOMEIP_ROUTING_INFO命令
create_client_routing_info(its_client.first);
insert_client_routing_info(its_client.first, _entry, _hoster,
_service, _instance, _major, _minor);
//发送VSOMEIP_ROUTING_INFO命令,
send_client_routing_info(its_client.first);
}
}
}
}
}
上面分析了host模式路由的offerService的流程,这里来跟一跟proxy路由的offerService流程,其实这两块有一部分的逻辑是重叠的,具体可以看下如下的时序图
从上面时序图可以看到,proxy这边的offer_service的逻辑,就是构建了一个VSOMEIP_OFFER_SERVICE的指令,通过unix域通信的方式,把指令发给了host路由,host路由的模块响应该指令的地方就是在routing_manager_stub中的on_message方法中,最后丢给了host路由去走上一章节的逻辑,废话不说,上代码:
bool routing_manager_proxy::offer_service(client_t _client,
service_t _service, instance_t _instance,
major_version_t _major, minor_version_t _minor) {
//代理app将服务实例保存到自己的模块的services_数据结构中
if(!routing_manager_base::offer_service(_client, _service, _instance, _major, _minor)) {
VSOMEIP_WARNING << "routing_manager_proxy::offer_service,"
<< "routing_manager_base::offer_service returned false";
}
{
std::lock_guard<std::mutex> its_lock(state_mutex_);
if (state_ == inner_state_type_e::ST_REGISTERED) {
//如果app已经处于注册状态,也就是start后,则直接发送offer命令
send_offer_service(_client, _service, _instance, _major, _minor);
}
//如果没有注册,则将其加入到预处理命令中,后续状态变更后,会再次调用send_offer_service发送
service_data_t offer = { _service, _instance, _major, _minor };
pending_offers_.insert(offer);
}
return true;
}
send_offer_service就是使用自己的local_client_endpoint去给host路由发命令
void routing_manager_proxy::send_offer_service(client_t _client,
service_t _service, instance_t _instance,
major_version_t _major, minor_version_t _minor) {
(void)_client;
byte_t its_command[VSOMEIP_OFFER_SERVICE_COMMAND_SIZE];
uint32_t its_size = VSOMEIP_OFFER_SERVICE_COMMAND_SIZE
- VSOMEIP_COMMAND_HEADER_SIZE;
//命令类型:VSOMEIP_OFFER_SERVICE
its_command[VSOMEIP_COMMAND_TYPE_POS] = VSOMEIP_OFFER_SERVICE;
//省略拼包逻辑
...
{
std::lock_guard<std::mutex> its_lock(sender_mutex_);
if (sender_) {
//发送包
sender_->send(its_command, sizeof(its_command));
}
}
}
消息传递给host路由后,由stub模块去解析命令:
void routing_manager_stub::on_message(const byte_t *_data, length_t _size,
endpoint *_receiver, const boost::asio::ip::address &_destination,
client_t _bound_client,
credentials_t _credentials,
const boost::asio::ip::address &_remote_address,
std::uint16_t _remote_port) {
//省略N多代码
...
if (its_size <= _size - VSOMEIP_COMMAND_HEADER_SIZE) {
switch (its_command) {
case VSOMEIP_OFFER_SERVICE:
if (_size != VSOMEIP_OFFER_SERVICE_COMMAND_SIZE) {
VSOMEIP_WARNING << "Received a OFFER_SERVICE command with wrong size ~> skip!";
break;
}
...
if (security::get()->is_offer_allowed(its_sender_uid, its_sender_gid,
its_client, its_service, its_instance)) {
//走到上一章将的host路由中的offer_service处理流程中了
host_->offer_service(its_client, its_service, its_instance,
its_major, its_minor);
}
}
}
}
总结:vsomeip中的offerService流程中涉及到了众多复杂判断流程,以及很多的数据结构来缓存服务实例信息,我想这个可能跟协议栈的设计结构有关系,在一个操作系统中,routing, server, client三者可以是独立的进程,甚至有可能存在这种情况:在一个操作系统中,存在一个vsomeip的routing进程,多个server进程,多个client进程,那么这种情况中offer_service操作, service实例状态变更通知肯定涉及到多个进程的同步,以及管理服务的效率问题,所以流程才会如此繁杂,下一篇计划输出vsomeip中的offer_event以及stop_offer_event的流程,周拱一卒,继续前进。