• Fast DDS之Subscriber


    在这里插入图片描述
    Subscriber扮演容器的角色,里面可以有很多DataReaders,它们使用Subscriber的同一份SubscriberQos配置。Subscriber可以承载不同Topic和数据类型的DataReader对象。

    Subscriber

    SubscriberQos

    默认Qos配置可以通过DomainParticipant实例的get_default_subscriber_qos()函数获取。先前创建的qos也可以通过 Subscriber::set_qos() 函数重新设置。值SUBSCRIBER_QOS_DEFAULT可以在创建sub或修改时(create_subscriber() or Subscriber::set_qos())直接使用

    SubscriberListener

    SubscriberListener是个抽象类,继承自DataReaderListener。

    class SubscriberListener : public DataReaderListener
    {
    public:
    
        /**
         * @brief Constructor
         */
        RTPS_DllAPI SubscriberListener()
        {
        }
    
        /**
         * @brief Destructor
         */
        RTPS_DllAPI virtual ~SubscriberListener()
        {
        }
    
        /**
         * Virtual function to be implemented by the user containing the actions to be performed when a new
         * Data Message is available on any reader.
         *
         * @param sub Subscriber
         */
        RTPS_DllAPI virtual void on_data_on_readers(
                Subscriber* sub)
        {
            (void)sub;
        }
    
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    用户需要根据自己的需要继承并实现。
    callback:on_data_on_readers()

    创建Subscriber

    1. 通过DomainParticipant的create_subscriber()函数创建,SubscriberQos参数是必需的,有个默认值SUBSCRIBER_QOS_DEFAULT可以使用,SubscriberListener和StatusMask参数可选。
    // 创建DomainParticipant实例participant
    Subscriber* subscriber_with_default_qos = participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT);
    if (nullptr == subscriber_with_default_qos) {
        // Error
        return;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    1. 基于Profile创建Subscriber:用于标识subscriber的字符串名字参数是必需的,继承自SubscriberListener或DataReaderListener的Listener和StatusMask是可选的
    Subscriber* subscriber_with_profile = participant->create_subscriber_with_profile("subscriber_profile");
    if (nullptr == subscriber_with_profile) {
        // Error
        return;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    删除Subscriber

    需要先删除Subscriber中的所有实体(DataReaders),再调用 delete_subscriber() 删除Subscriber

    // Delete the entities the subscriber created
    if (subscriber->delete_contained_entities() != ReturnCode_t::RETCODE_OK) {
        // Subscriber failed to delete the entities it created
        return;
    }
    // Delete the Subscriber
    if (participant->delete_subscriber(subscriber) != ReturnCode_t::RETCODE_OK) {
        // Error
        return;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    DataReader

    DataReader只归属于创建它的Subscriber,每个DataReader在它创建支持绑定到单一的Topic上,所以Topic必须是在DataReader创建之前就已经创建。远端pub的消息可以通过DataReader::read_next_sample()DataReader::take_next_sample()函数获取。

    DataReaderQos

    DataReaderListener

    用于监听DataReader状态的改变。默认是空实现,需要用户重新继承实现。有以下callback成员函数:

    • on_data_available:回调函数,表示有数据收到,可以在这个回调中主动去获取数据
    • on_subscription_matched:服务发现回调
    • on_requested_deadline_missed()
    • on_requested_incompatible_qos()
    • on_liveliness_changed()
    • on_sample_rejected()
    • on_sample_lost()

    DataReader的创建

    1. 绑定到要传输数据的Topic和DataReaderQos是必需的参数,DataReaderListener和StatusMask是可选的
    DataReader* data_reader_with_default_qos = subscriber->create_datareader(topic, DATAREADER_QOS_DEFAULT);
    if (nullptr == data_reader_with_default_qos) {
        // Error
        return;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    1. 基于Profile创建DataReader
    reader_ = subscriber_->create_datareader_with_profile(topic_, "test_timebasedfilter", &listener_);
    
    • 1
    1. 使用自定义的PayloadPool创建DataReader(为什么要使用自定义的PayloadPool?)
    // A DataReaderQos must be provided to the creation method
    DataReaderQos qos;
    
    // Create PayloadPool
    std::shared_ptr<CustomPayloadPool> payload_pool = std::make_shared<CustomPayloadPool>();
    DataReader* data_reader = subscriber->create_datareader(topic, qos, nullptr, StatusMask::all(), payload_pool);
    if (nullptr == data_reader) {
        // Error
        return;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    删除DataReader

    同理,删除DataReader之前需要删除所有属于DataReader的实体(QueryConditions)

    // Delete the entities the DataReader created
    if (data_reader->delete_contained_entities() != ReturnCode_t::RETCODE_OK) {
        // DataReader failed to delete the entities it created.
        return;
    }
    
    // Delete the DataReader
    if (subscriber->delete_datareader(data_reader) != ReturnCode_t::RETCODE_OK) {
        // Error
        return;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    SampleInfo

    一个重要的数据结构,为每个DataReader提供了相关数据的元数据信息,包括:

    • 数据样本的状态,如是否被修改或是否被读走
    • 数据样本的来源,如发布者的实例句柄或公开的实例句柄
    • 数据样本的序列号,这对确保数据样本的顺序接收非常重要
    • 数据样本的时间戳,即样本何时被写入或修改
    • 数据样本是否有效,无效的数据样本通常表示实例的生命周期已经结束

    读取数据

    通过reading或taking接收和消费DataReader读到的数据。reading和taking的这些函数实现都是一样的:

    • DataReader::read_next_sample / DataReader::take_next_sample
    • DataReader::read(), DataReader::read_instance(), DataReader::read_next_instance() / DataReader::take(), DataReader::take_instance(), DataReader::take_next_instance():获取符合特定条件的samples的集合

    通过callback获取数据

    两个Listener的callbacks,可以自定义listener继承自DataReaderListener:

    • on_data_available()
    • on_data_readers()

    通过启动一个等待线程处理

    1. 异步
    // Create a DataReader
    DataReader* data_reader =
            subscriber->create_datareader(topic, DATAREADER_QOS_DEFAULT);
    if (nullptr == data_reader)
    {
        // Error
        return;
    }
    
    // Prepare a wait-set to wait for data on the DataReader
    WaitSet wait_set;
    StatusCondition& condition = data_reader->get_statuscondition();
    condition.set_enabled_statuses(StatusMask::data_available());
    wait_set.attach_condition(condition);
    
    // Create a data and SampleInfo instance
    Foo data;
    SampleInfo info;
    
    //Define a timeout of 5 seconds
    eprosima::fastrtps::Duration_t timeout (5, 0);
    
    // Loop reading data as it arrives
    // This will make the current thread to be dedicated exclusively to
    // waiting and reading data until the remote DataWriter dies
    while (true) {
        ConditionSeq active_conditions;
        if (ReturnCode_t::RETCODE_OK == wait_set.wait(active_conditions, timeout)) {
            while (ReturnCode_t::RETCODE_OK == data_reader->take_next_sample(&data, &info)) {
                if (info.valid_data) {
                    // Do something with the data
                    std::cout << "Received new data value for topic "
                              << topic->get_name() << std::endl;
                } else {
                    // If the remote writer is not alive, we exit the reading loop
                    std::cout << "Remote writer for topic "
                              << topic->get_name() << " is dead" << std::endl;
                    break;
                }
            }
        } else {
            std::cout << "No data this time" << std::endl;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    1. 同步,通过DataReader::wait_for_unread_message() 函数等待直到数据到达或者时间超时
    // Create a DataReader
    DataReader* data_reader = subscriber->create_datareader(topic, DATAREADER_QOS_DEFAULT);
    if (nullptr == data_reader) {
        // Error
        return;
    }
    // Create a data and SampleInfo instance
    Foo data;
    SampleInfo info;
    //Define a timeout of 5 seconds
    eprosima::fastrtps::Duration_t timeout (5, 0);
    
    // Loop reading data as it arrives
    // This will make the current thread to be dedicated exclusively to
    // waiting and reading data until the remote DataWriter dies
    while (true) {
        if (data_reader->wait_for_unread_message(timeout)) {
            if (ReturnCode_t::RETCODE_OK == data_reader->take_next_sample(&data, &info)) {
                if (info.valid_data) {
                    // Do something with the data
                    std::cout << "Received new data value for topic "
                              << topic->get_name() << std::endl;
                } else {
                    // If the remote writer is not alive, we exit the reading loop
                    std::cout << "Remote writer for topic "
                              << topic->get_name() << " is dead" << std::endl;
                    break;
                }
            }
        } else {
            std::cout << "No data this time" << std::endl;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
  • 相关阅读:
    QT断点调试[通俗易懂]
    最新Python深度学习技术进阶与应用
    Flutter实践二:repository模式
    Springboot毕设项目城乡客运服务系统7y7y1(java+VUE+Mybatis+Maven+Mysql)
    解救Kubernetes混乱:Descheduler快速实现资源平衡
    MyBioSource p53 肿瘤抑制蛋白 (TP53),多克隆抗体
    2304. 网格中的最小路径代价 : 从「图论最短路」过渡到「O(1) 空间的原地模拟」
    Installing harbor-2.6.2 on openEuler
    机器学习02
    设计模式-结构型模式
  • 原文地址:https://blog.csdn.net/u010378559/article/details/133933962