• FASTRTPS(publisher-subscriber)实践及问题


    Fast-RTPS 提供了两个层次的 API

    	· publisher-Subscriber层:RTPS 上的简化抽象
    	· Writer-Reader层,对于RTPS端点的直接控制(更底层)
    
    • 1
    • 2

    Publisher-Subscriber层为大多数开发者提供了一个方便的抽象。允许定义与topic关联的发布者和订阅者,以及传输topic数据的简单方法。
    Writer-Reader层更接近于RTPS标准中定义的概念,并且可以进行更精准的控制,但是要求开发者直接与每个端点的历史记录缓存进行交互。

    这篇文章是基于 Publisher-Subscriber层 进行记录的,如果文中的问题有道友知道解决方法,劳烦指出,谢谢。

    首先安装就不说了,包括 fastrtpsgen 的安装,现在这个工具叫 fastddsgen,是一样的。看别的博客,也有人自己改用的 protobuf 做序列化,这个我还没研究,不太清除。

    先编写好 msg.idl 文件,里面是定义的通讯格式,比如一个结构体,如下

    struct MSG
    {
        unsigned long index;
        string rbuf;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5

    然后使用 fastddsgen 工具,使用命令fastddsgen msg.idl
    就会生成四个文件 xxx.cxx、 xxx.h、xxxPubSubTypes.cxx、 xxxPubSubTypes.h
    如果需要例程文件,也可以使用命令 fastrtpsgen -example CMake msg.idl
    就会生成几个 .cxx .h 文件,不过这里的 publihser.cpp subscriber.cpp我是自己写的

    然后再找个例程,把cmakelists.txt 拷贝一下,然后修改一下
    我这里简单贴一下,省得找

    # Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    cmake_minimum_required(VERSION 2.8.12)
    
    if(NOT CMAKE_VERSION VERSION_LESS 3.0)
        cmake_policy(SET CMP0048 NEW)
    endif()
    
    project("DDSTCP")
    
    # Find requirements
    if(NOT fastcdr_FOUND)
        find_package(fastcdr REQUIRED)
    endif()
    
    if(NOT foonathan_memory_FOUND)
        find_package(foonathan_memory REQUIRED)
    endif()
    
    if(NOT fastrtps_FOUND)
        find_package(fastrtps REQUIRED)
    endif()
    
    #Check C++11
    include(CheckCXXCompilerFlag)
    if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_COMPILER_IS_CLANG OR
            CMAKE_CXX_COMPILER_ID MATCHES "Clang")
        check_cxx_compiler_flag(-std=c++11 SUPPORTS_CXX11)
        if(NOT SUPPORTS_CXX11)
            message(FATAL_ERROR "Compiler doesn't support C++11")
        endif()
    endif()
    
    message(STATUS "Configuring DDSTCP example...")
    file(GLOB HELLOWORLD_EXAMPLE_SOURCES_CXX "*.cxx")
    file(GLOB HELLOWORLD_EXAMPLE_SOURCES_CPP "*.cpp")
    #file(GLOB XML_CONFIG_FILES "*.xml")
    #message(STATUS "XML Files: " ${XML_CONFIG_FILES})
    # configure_file("HelloWorldSubscriber.xml" "HelloWorldSubscriber.xml" COPYONLY)
    # configure_file("HelloWorldPublisher.xml" "HelloWorldPublisher.xml" COPYONLY)
    # configure_file("dh2048.pem" "dh2048.pem" COPYONLY)
    # configure_file("server.pem" "server.pem" COPYONLY)
    # configure_file("ca.pem" "ca.pem" COPYONLY)
    
    
    add_executable(ddstcp ${HELLOWORLD_EXAMPLE_SOURCES_CXX} ${HELLOWORLD_EXAMPLE_SOURCES_CPP})
    target_compile_definitions(ddstcp PRIVATE
        $<$<AND:$<NOT:$<BOOL:${WIN32}>>,$<STREQUAL:"${CMAKE_BUILD_TYPE}","Debug">>:__DEBUG>
        $<$<BOOL:${INTERNAL_DEBUG}>:__INTERNALDEBUG> # Internal debug activated.
    )
    target_link_libraries(ddstcp fastrtps fastcdr foonathan_memory fastdds::optionparser)
    #install(TARGETS ddstcp
    #    RUNTIME DESTINATION examples/C++/ddstcp/${BIN_INSTALL_DIR})
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    现在说一下publisher里面

    /**
     * @file msgPublisher.h
     *
     */
    #ifndef MSGPUBLISHER_H_
    #define MSGPUBLISHER_H_
    
    #include <fastrtps/fastrtps_fwd.h>
    #include <fastrtps/attributes/PublisherAttributes.h>
    #include <fastrtps/publisher/PublisherListener.h>
    
    #include "messagePubSubTypes.h"
    #include "message.h"
    #include <vector>
    
    class MsgPublisher
    {
        class PubListener : public eprosima::fastrtps::PublisherListener
        {
        public:
            PubListener() : n_matched(0), firstConnected(false){};
    
            ~PubListener(){};
    
            // 回调实现。每当发布者在网络上找到侦听同一主题的订阅者时,就会调用此函数
            void onPublicationMatched(eprosima::fastrtps::Publisher *pub, eprosima::fastrtps::rtps::MatchingInfo &info);
            // 回调函数,错过截止日期时调用的方法
            void on_offered_deadline_missed(eprosima::fastrtps::Publisher *pub, const eprosima::fastrtps::OfferedDeadlineMissedStatus &status);
            // 回调函数,当发布者的活力丧失时调用的方法
            void on_liveliness_lost(eprosima::fastrtps::Publisher *pub, const eprosima::fastrtps::LivelinessLostStatus &status);
    
            int n_matched;
            bool firstConnected;
        } listener_;
    
        DDSTCP message_;
        eprosima::fastrtps::Participant *participant_;
        eprosima::fastrtps::Publisher *publisher_;
        DDSTCPPubSubType type_;
    
        void runThread(uint32_t number, long sleep_ms);
    
    public:
        MsgPublisher();
    
        virtual ~MsgPublisher();
    
        //! Initialize
        bool init(const std::string &wan_ip, unsigned short port, std::string topicDataType, std::string topicName);
    
        // fill publisher buff
        bool getPublishMsg(char *p);
    
        //! Publish a sample
        bool publish(bool waitForListener = true);
    
        //! Run for number samples
        void run(uint32_t number, long sleep_ms);
    };
    
    #endif /* MSGPUBLISHER_H_ */
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    这里面的三个回调函数,都是基于父类里面找出来的,就只有 onPublicationMatched 这个函数,在找到订阅者的时候会调用一下,然后订阅者丢失,不会再次调用,就是这个坑。其他两个函数基本不会调用。

    所以就会导致问题,n_matched 匹配的个数只会增长,不会下降。

    void MsgPublisher::PubListener::onPublicationMatched(Publisher *, MatchingInfo &info)
    {
        printf("发现回调函数........\n ");
        if (info.status == MATCHED_MATCHING)
        {
            n_matched++;
            firstConnected = true;
            // logError(HW, "Matched");
            std::cout << "Publisher matched, online num = " << n_matched << std::endl;
        }
        else
        {
            n_matched--;
            std::cout << "Publisher unmatched, online num = " << n_matched << std::endl;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    就这样,记录完毕,下次记录 reader-writer层

  • 相关阅读:
    Android学习-网络编程
    VMware虚拟机-Ubuntu设置共享文件夹(超详细)
    微软博客上几篇 Semantic-kernel (SK)文章
    C#语言:散修笔记
    数据结构之队列
    Flask数据库_Column的常用参数与使用
    vue3 element-ui-plus Carousel 跑马灯 的使用 及 踩坑记录
    【Pytorch with fastai】第 18 章 :使用 CAM 进行 CNN 解释
    Electron项目中将CommonJS改成使用ES 模块(ESM)语法preload.js加载报错
    flutter webrtc搭建1v1通信服务
  • 原文地址:https://blog.csdn.net/tao_292/article/details/125515733