• iceoryx源码阅读(八)——IPC通信机制


    0 导引

    本系列文章对iceoryx源码进行解读,索引如下:

    1 整体结构

    通过前面的介绍,订阅者、发布者与Roudi守护进程之间也需要通信,如上文介绍的,请求Roudi守护进村创建并配置端口数据。整体结构如下图所示:

    image

    由于通信层在类Unix操作系统和Windows操作系统下实现不同(见下面的代码片段),所以我们分开介绍其实现。

    #if defined(_WIN32)
    using IoxIpcChannelType = iox::posix::NamedPipe;
    #else
    using IoxIpcChannelType = iox::posix::UnixDomainSocket;
    #endif
    

    接下来我们从数据的序列化和反序列化开始。

    2 序列化与反序列化

    前一篇文章中,这部分通信没有使用三方框架,使用简单的字符串拼接的方式进行序列化,如下所示:

    template 
    void IpcMessage::addEntry(const T& entry) noexcept
    {
        std::stringstream newEntry;
        newEntry << entry;
    
        if (!isValidEntry(newEntry.str()))
        {
            LogError() << "\'" << newEntry.str().c_str() << "\' is an invalid IPC channel entry";
            m_isValid = false;
        }
        else
        {
            m_msg.append(newEntry.str() + m_separator);
            ++m_numberOfElements;
        }
    }
    
    template 
    IpcMessage& IpcMessage::operator<<(const T& entry) noexcept
    {
        addEntry(entry);
        return *this;
    }
    

    上面的代码较为简单,这里不作详细解释了。反序列化也很简单,这里就贴一下代码了,非常简单粗暴的实现:

    std::string IpcMessage::getElementAtIndex(const uint32_t index) const noexcept
    {
        std::string messageRemainder(m_msg);
        size_t startPos = 0u;
        size_t endPos = messageRemainder.find_first_of(m_separator, startPos);
    
        for (uint32_t counter = 0u; endPos != std::string::npos; ++counter)
        {
            if (counter == index)
            {
                return messageRemainder.substr(startPos, endPos - startPos);
            }
    
            startPos = endPos + 1u;
            endPos = messageRemainder.find_first_of(m_separator, startPos);
        }
    
        return std::string();
    }
    

    3 类Unix系统的实现

    正如在 引言 中介绍的,类Unix系统使用Unix域套接字实现IPC通信机制。由UnixDomainSocket封装初始化、销毁、发送和接收等逻辑,这里我们主要介绍发送和接收逻辑的具体实现。

    3.1 发送函数send

    职责:
    封装客户端的消息发送逻辑

    参数:

    • msg:待发送的消息。
    cxx::expected UnixDomainSocket::send(const std::string& msg) const noexcept
    {
        // we also support timedSend. The setsockopt call sets the timeout for all further sendto calls, so we must set
        // it to 0 to turn the timeout off
        return timedSend(msg, units::Duration::fromSeconds(0ULL));
    }
    

    发送函数send只是简单地调用地超时时间的发送函数timedSend。输入的超时时间为0,意味着立即发送。timedSend的实现如下所示:

    cxx::expected UnixDomainSocket::timedSend(const std::string& msg,
                                                               const units::Duration& timeout) const noexcept
    {
        if (msg.size() > m_maxMessageSize)
        {
            return cxx::error(IpcChannelError::MESSAGE_TOO_LONG);
        }
    
        if (IpcChannelSide::SERVER == m_channelSide)
        {
            std::cerr << "sending on server side not supported for unix domain socket \"" << m_name << "\"" << std::endl;
            return cxx::error(IpcChannelError::INTERNAL_LOGIC_ERROR);
        }
    
        auto tv = timeout.timeval();
        auto setsockoptCall = posixCall(iox_setsockopt)(m_sockfd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv))
                                  .failureReturnValue(ERROR_CODE)
                                  .ignoreErrnos(EWOULDBLOCK)
                                  .evaluate();
    
        if (setsockoptCall.has_error())
        {
            return cxx::error(convertErrnoToIpcChannelError(setsockoptCall.get_error().errnum));
        }
        auto sendCall = posixCall(iox_sendto)(m_sockfd, msg.c_str(), msg.size() + NULL_TERMINATOR_SIZE, 0, nullptr, 0)
                            .failureReturnValue(ERROR_CODE)
                            .evaluate();
    
        if (sendCall.has_error())
        {
            return cxx::error(convertErrnoToIpcChannelError(sendCall.get_error().errnum));
        }
        return cxx::success();
    }
    

    逐段代码分析:

    • LINE 04 ~ LINE 13: 错误处理——消息长度过长、类型服务端。整体结构图中,黄色的

    • LINE 15 ~ LINE 24: 调用POSIX接口(类Unix系统调用)setsockopt,设置超时时间。

    • LINE 25 ~ LINE 32: 调用POSIX接口(类Unix系统调用)sendto发送数据。

    可以看到,Unix版本的发送实现就是简单地调用系统调用。

    3.2 接收函数receive

    职责:
    封装消息接收逻辑。

    返回:
    消息字符串或错误类型。

    cxx::expected UnixDomainSocket::receive() const noexcept
    {
        // we also support timedReceive. The setsockopt call sets the timeout for all further recvfrom calls, so we must set
        // it to 0 to turn the timeout off
        struct timeval tv = {};
        tv.tv_sec = 0;
        tv.tv_usec = 0;
    
        return timedReceive(units::Duration(tv));
    }
    

    接收函数receive只是简单地调用地超时时间的发送函数timedReceive。输入的超时时间为0,即没有结果立即返回。timedReceive的实现如下所示:

    cxx::expected
    UnixDomainSocket::timedReceive(const units::Duration& timeout) const noexcept
    {
        if (IpcChannelSide::CLIENT == m_channelSide)
        {
            std::cerr << "receiving on client side not supported for unix domain socket \"" << m_name << "\"" << std::endl;
            return cxx::error(IpcChannelError::INTERNAL_LOGIC_ERROR);
        }
    
        auto tv = timeout.timeval();
        auto setsockoptCall = posixCall(iox_setsockopt)(m_sockfd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))
                                  .failureReturnValue(ERROR_CODE)
                                  .ignoreErrnos(EWOULDBLOCK)
                                  .evaluate();
    
        if (setsockoptCall.has_error())
        {
            return cxx::error(convertErrnoToIpcChannelError(setsockoptCall.get_error().errnum));
        }
        // NOLINTJUSTIFICATION needed for recvfrom
        // NOLINTNEXTLINE(hicpp-avoid-c-arrays, cppcoreguidelines-avoid-c-arrays)
        char message[MAX_MESSAGE_SIZE + 1];
    
        auto recvCall = posixCall(iox_recvfrom)(m_sockfd, &message[0], MAX_MESSAGE_SIZE, 0, nullptr, nullptr)
                            .failureReturnValue(ERROR_CODE)
                            .suppressErrorMessagesForErrnos(EAGAIN, EWOULDBLOCK)
                            .evaluate();
        message[MAX_MESSAGE_SIZE] = 0;
    
        if (recvCall.has_error())
        {
            return cxx::error(convertErrnoToIpcChannelError(recvCall.get_error().errnum));
        }
        return cxx::success(&message[0]);
    }
    

    逐段代码分析:

    • LINE 04 ~ LINE 08: 错误处理——通道类型服务端。整体结构图中,黄色的。

    • LINE 10 ~ LINE 19: 调用POSIX接口(类Unix系统调用)setsockopt,设置超时时间。

    • LINE 22 ~ LINE 33: 调用POSIX接口(类Unix系统调用)recvfrom接收数据。

    4 Windows系统的实现

    由于Windows不支持Unix域套接字,使用共享内存的方式来模拟。每引入一个发布者或订阅者,都需要开辟两条通道——收和发,每条通道会使用单独一块共享内存,即需要开辟两块共享内存。

    4.1 发送函数send

    职责:
    封装消息发送逻辑。

    参数:

    • msg:待发送的消息。
    cxx::expected NamedPipe::send(const std::string& message) const noexcept
    {
        if (!m_isInitialized)
        {
            return cxx::error(IpcChannelError::NOT_INITIALIZED);
        }
    
        if (message.size() > MAX_MESSAGE_SIZE)
        {
            return cxx::error(IpcChannelError::MESSAGE_TOO_LONG);
        }
    
        cxx::Expects(!m_data->sendSemaphore().wait().has_error());
        IOX_DISCARD_RESULT(m_data->messages.push(Message_t(cxx::TruncateToCapacity, message)));
        cxx::Expects(!m_data->receiveSemaphore().post().has_error());
    
        return cxx::success<>();
    }
    

    逐段代码分析:

    • LINE 03 ~ LINE 11: 错误处理——未初始化(消息队列共享内存未创建)、消息长度过长。这里没有判断是服务端还是客户端,估计是不同人实现的。

    • LINE 13 ~ LINE 15: 第14行,往消息队列(共享内存)中存入消息。第13行是通过发送信号量判断消息队列是否已满,若已满,则一直等待,直到接收端读取消息,唤醒发送端。第15行是唤醒接收端读取消息。

    iceoryx还提供了timedSend函数,带有超时机制,即超时则发送失败。还提供了不等待的版本trySend,若队列已满,则发送失败。这两个函数本文不做介绍。

    4.2 接收函数receive

    职责:
    封装消息接收逻辑。

    返回:
    消息字符串或错误类型。

    cxx::expected NamedPipe::receive() const noexcept
    {
        if (!m_isInitialized)
        {
            return cxx::error(IpcChannelError::NOT_INITIALIZED);
        }
    
        cxx::Expects(!m_data->receiveSemaphore().wait().has_error());
        auto message = m_data->messages.pop();
        if (message.has_value())
        {
            cxx::Expects(!m_data->sendSemaphore().post().has_error());
            return cxx::success(message->c_str());
        }
        return cxx::error(IpcChannelError::INTERNAL_LOGIC_ERROR);
    }
    

    逐段代码分析:

    • LINE 03 ~ LINE 06: 错误处理——未初始化(消息队列共享内存未创建)。这里没有判断是服务端还是客户端,估计是不同人实现的。

    • LINE 08 ~ LINE 14: 第14行,往消息队列(共享内存)中存入消息。第8行是通过接收信号量判断消息队列是否为空,若为空,则一直等待,直到发送端发送消息,唤醒发送端。第12行是唤醒发送端发送消息。

    iceoryx还提供了timedReceive函数,带有超时机制,即超时则接收失败。还提供了不等待的版本tryReceive,若队列为空,则接收失败。这两个函数本文不做介绍。

    5 Roudi的监听逻辑

    Roudi启动后,会开启一个线程来监听和处理来自客户端(订阅者、发布者)的请求,如下所示:

    void RouDi::startProcessRuntimeMessagesThread() noexcept
    {
        m_handleRuntimeMessageThread = std::thread(&RouDi::processRuntimeMessages, this);
        posix::setThreadName(m_handleRuntimeMessageThread.native_handle(), "IPC-msg-process");
    }
    

    线程执行函数为processRuntimeMessages,内部就是一个循环,如下所示:

    void RouDi::processRuntimeMessages() noexcept
    {
        runtime::IpcInterfaceCreator roudiIpcInterface{IPC_CHANNEL_ROUDI_NAME};
    
        // the logger is intentionally not used, to ensure that this message is always printed
        std::cout << "RouDi is ready for clients" << std::endl;
    
        while (m_runHandleRuntimeMessageThread)
        {
            // read RouDi's IPC channel
            runtime::IpcMessage message;
            if (roudiIpcInterface.timedReceive(m_runtimeMessagesThreadTimeout, message))
            {
                auto cmd = runtime::stringToIpcMessageType(message.getElementAtIndex(0).c_str());
                std::string runtimeName = message.getElementAtIndex(1);
    
                processMessage(message, cmd, RuntimeName_t(cxx::TruncateToCapacity, runtimeName));
            }
        }
    }
    

    通过上述代码可知,发送给Roudi的所有消息,第一项为请求类型,第二项为运行。这里调用了processMessage函数,这和上一篇文章中的 3.5 RouDi::processMessage 关联了。

  • 相关阅读:
    Linux系统之部署Linux命令大全搜索工具
    Vue基础知识(条件渲染、列表渲染、收集表单数据、过滤器)(三)
    XML解析入门
    快速创建微信小程序,注册即认证,无需300元认证费
    【Tableau Server 企业日常问题 24】Tableau server提示工具嵌入工作表,服务器字体乱码问题解决
    py 异步
    LINUX安装nginx
    计算机网络原理 谢希仁(第8版)第四章习题答案
    解决hide方法反射限制的问题
    Unity技术手册-脚本初始(上)
  • 原文地址:https://www.cnblogs.com/lijihong-jerry/p/18156475