目录
RecordReader是用于在网络框架中读取消息的组件。每个记录阅读器可以通过Open方法打开现有记录文件,线程将异步读取记录文件中的信息。用户只需执行ReadMessage即可在RecordReader中提取最新消息,然后通过GetCurrentMessageChannelName、GetCurrentRawMessage、GetCurrentMessageTime获取消息信息。
RecordWriter是用于在网络框架中记录消息的组件。每个RecordWriter可以通过Open方法创建一个新的记录文件。用户只需执行WriteMessage和WriteChannel即可编写消息和通道信息,并且编写过程是异步的。
读取记录文件的接口用于读取记录文件中的消息和通道信息。
RecordReader(); bool Open(const std::string& filename, uint64_t begin_time = 0, uint64_t end_time = UINT64_MAX); void Close(); bool ReadMessage(); bool EndOfFile(); const std::string& CurrentMessageChannelName(); std::shared_ptrCurrentRawMessage(); uint64_t CurrentMessageTime();
写入记录文件的界面,用于将消息和通道信息记录到记录文件中。
RecordWriter(); bool Open(const std::string& file); void Close(); bool WriteChannel(const std::string& name, const std::string& type, const std::string& proto_desc); templatebool WriteMessage(const std::string& channel_name, const MessageT& message, const uint64_t time_nanosec, const std::string& proto_desc = ""); bool SetSizeOfFileSegmentation(uint64_t size_kilobytes); bool SetIntervalOfFileSegmentation(uint64_t time_sec);
通过test_write方法将100 RawMessage写入TEST_FILE,然后通过test_read方法宣读它们。
#include#include "cyber/cyber.h" #include "cyber/message/raw_message.h" #include "cyber/proto/record.pb.h" #include "cyber/record/record_message.h" #include "cyber/record/record_reader.h" #include "cyber/record/record_writer.h" using ::apollo::cyber::record::RecordReader; using ::apollo::cyber::record::RecordWriter; using ::apollo::cyber::record::RecordMessage; using apollo::cyber::message::RawMessage; const char CHANNEL_NAME_1[] = "/test/channel1"; const char CHANNEL_NAME_2[] = "/test/channel2"; const char MESSAGE_TYPE_1[] = "apollo.cyber.proto.Test"; const char MESSAGE_TYPE_2[] = "apollo.cyber.proto.Channel"; const char PROTO_DESC[] = "1234567890"; const char STR_10B[] = "1234567890"; const char TEST_FILE[] = "test.record"; void test_write(const std::string &writefile) { RecordWriter writer; writer.SetSizeOfFileSegmentation(0); writer.SetIntervalOfFileSegmentation(0); writer.Open(writefile); writer.WriteChannel(CHANNEL_NAME_1, MESSAGE_TYPE_1, PROTO_DESC); for (uint32_t i = 0; i < 100; ++i) { auto msg = std::make_shared ("abc" + std::to_string(i)); writer.WriteMessage(CHANNEL_NAME_1, msg, 888 + i); } writer.Close(); } void test_read(const std::string &readfile) { RecordReader reader(readfile); RecordMessage message; uint64_t msg_count = reader.GetMessageNumber(CHANNEL_NAME_1); AINFO << "MSGTYPE: " << reader.GetMessageType(CHANNEL_NAME_1); AINFO << "MSGDESC: " << reader.GetProtoDesc(CHANNEL_NAME_1); // read all message uint64_t i = 0; uint64_t valid = 0; for (i = 0; i < msg_count; ++i) { if (reader.ReadMessage(&message)) { AINFO << "msg[" << i << "]-> " << "channel name: " << message.channel_name << "; content: " << message.content << "; msg time: " << message.time; valid++; } else { AERROR << "read msg[" << i << "] failed"; } } AINFO << "static msg================="; AINFO << "MSG validmsg:totalcount: " << valid << ":" << msg_count; } int main(int argc, char *argv[]) { apollo::cyber::Init(argv[0]); test_write(TEST_FILE); sleep(1); test_read(TEST_FILE); return 0; }
I1124 16:56:27.248200 15118 record.cc:64] [record] msg[0]-> channel name: /test/channel1; content: abc0; msg time: 888 I1124 16:56:27.248227 15118 record.cc:64] [record] msg[1]-> channel name: /test/channel1; content: abc1; msg time: 889 I1124 16:56:27.248239 15118 record.cc:64] [record] msg[2]-> channel name: /test/channel1; content: abc2; msg time: 890 I1124 16:56:27.248252 15118 record.cc:64] [record] msg[3]-> channel name: /test/channel1; content: abc3; msg time: 891 I1124 16:56:27.248297 15118 record.cc:64] [record] msg[4]-> channel name: /test/channel1; content: abc4; msg time: 892 I1124 16:56:27.248378 15118 record.cc:64] [record] msg[5]-> channel name: /test/channel1; content: abc5; msg time: 893 ... I1124 16:56:27.250422 15118 record.cc:73] [record] static msg================= I1124 16:56:27.250434 15118 record.cc:74] [record] MSG validmsg:totalcount: 100:100