Subscriber扮演容器的角色,里面可以有很多DataReaders,它们使用Subscriber的同一份SubscriberQos配置。Subscriber可以承载不同Topic和数据类型的DataReader对象。
默认Qos配置可以通过DomainParticipant实例的get_default_subscriber_qos()
函数获取。先前创建的qos也可以通过 Subscriber::set_qos()
函数重新设置。值SUBSCRIBER_QOS_DEFAULT可以在创建sub或修改时(create_subscriber() or Subscriber::set_qos())直接使用
SubscriberListener是个抽象类,继承自DataReaderListener。
class SubscriberListener : public DataReaderListener
{
public:
/**
* @brief Constructor
*/
RTPS_DllAPI SubscriberListener()
{
}
/**
* @brief Destructor
*/
RTPS_DllAPI virtual ~SubscriberListener()
{
}
/**
* Virtual function to be implemented by the user containing the actions to be performed when a new
* Data Message is available on any reader.
*
* @param sub Subscriber
*/
RTPS_DllAPI virtual void on_data_on_readers(
Subscriber* sub)
{
(void)sub;
}
};
用户需要根据自己的需要继承并实现。
callback:on_data_on_readers()
// 创建DomainParticipant实例participant
Subscriber* subscriber_with_default_qos = participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT);
if (nullptr == subscriber_with_default_qos) {
// Error
return;
}
Subscriber* subscriber_with_profile = participant->create_subscriber_with_profile("subscriber_profile");
if (nullptr == subscriber_with_profile) {
// Error
return;
}
需要先删除Subscriber中的所有实体(DataReaders),再调用 delete_subscriber()
删除Subscriber
// Delete the entities the subscriber created
if (subscriber->delete_contained_entities() != ReturnCode_t::RETCODE_OK) {
// Subscriber failed to delete the entities it created
return;
}
// Delete the Subscriber
if (participant->delete_subscriber(subscriber) != ReturnCode_t::RETCODE_OK) {
// Error
return;
}
DataReader只归属于创建它的Subscriber,每个DataReader在它创建支持绑定到单一的Topic上,所以Topic必须是在DataReader创建之前就已经创建。远端pub的消息可以通过DataReader::read_next_sample()
或DataReader::take_next_sample()
函数获取。
用于监听DataReader状态的改变。默认是空实现,需要用户重新继承实现。有以下callback成员函数:
DataReader* data_reader_with_default_qos = subscriber->create_datareader(topic, DATAREADER_QOS_DEFAULT);
if (nullptr == data_reader_with_default_qos) {
// Error
return;
}
reader_ = subscriber_->create_datareader_with_profile(topic_, "test_timebasedfilter", &listener_);
// A DataReaderQos must be provided to the creation method
DataReaderQos qos;
// Create PayloadPool
std::shared_ptr<CustomPayloadPool> payload_pool = std::make_shared<CustomPayloadPool>();
DataReader* data_reader = subscriber->create_datareader(topic, qos, nullptr, StatusMask::all(), payload_pool);
if (nullptr == data_reader) {
// Error
return;
}
同理,删除DataReader之前需要删除所有属于DataReader的实体(QueryConditions)
// Delete the entities the DataReader created
if (data_reader->delete_contained_entities() != ReturnCode_t::RETCODE_OK) {
// DataReader failed to delete the entities it created.
return;
}
// Delete the DataReader
if (subscriber->delete_datareader(data_reader) != ReturnCode_t::RETCODE_OK) {
// Error
return;
}
一个重要的数据结构,为每个DataReader提供了相关数据的元数据信息,包括:
通过reading或taking接收和消费DataReader读到的数据。reading和taking的这些函数实现都是一样的:
两个Listener的callbacks,可以自定义listener继承自DataReaderListener:
// Create a DataReader
DataReader* data_reader =
subscriber->create_datareader(topic, DATAREADER_QOS_DEFAULT);
if (nullptr == data_reader)
{
// Error
return;
}
// Prepare a wait-set to wait for data on the DataReader
WaitSet wait_set;
StatusCondition& condition = data_reader->get_statuscondition();
condition.set_enabled_statuses(StatusMask::data_available());
wait_set.attach_condition(condition);
// Create a data and SampleInfo instance
Foo data;
SampleInfo info;
//Define a timeout of 5 seconds
eprosima::fastrtps::Duration_t timeout (5, 0);
// Loop reading data as it arrives
// This will make the current thread to be dedicated exclusively to
// waiting and reading data until the remote DataWriter dies
while (true) {
ConditionSeq active_conditions;
if (ReturnCode_t::RETCODE_OK == wait_set.wait(active_conditions, timeout)) {
while (ReturnCode_t::RETCODE_OK == data_reader->take_next_sample(&data, &info)) {
if (info.valid_data) {
// Do something with the data
std::cout << "Received new data value for topic "
<< topic->get_name() << std::endl;
} else {
// If the remote writer is not alive, we exit the reading loop
std::cout << "Remote writer for topic "
<< topic->get_name() << " is dead" << std::endl;
break;
}
}
} else {
std::cout << "No data this time" << std::endl;
}
}
// Create a DataReader
DataReader* data_reader = subscriber->create_datareader(topic, DATAREADER_QOS_DEFAULT);
if (nullptr == data_reader) {
// Error
return;
}
// Create a data and SampleInfo instance
Foo data;
SampleInfo info;
//Define a timeout of 5 seconds
eprosima::fastrtps::Duration_t timeout (5, 0);
// Loop reading data as it arrives
// This will make the current thread to be dedicated exclusively to
// waiting and reading data until the remote DataWriter dies
while (true) {
if (data_reader->wait_for_unread_message(timeout)) {
if (ReturnCode_t::RETCODE_OK == data_reader->take_next_sample(&data, &info)) {
if (info.valid_data) {
// Do something with the data
std::cout << "Received new data value for topic "
<< topic->get_name() << std::endl;
} else {
// If the remote writer is not alive, we exit the reading loop
std::cout << "Remote writer for topic "
<< topic->get_name() << " is dead" << std::endl;
break;
}
}
} else {
std::cout << "No data this time" << std::endl;
}
}