• flink&kafka-connector消费 protobuf格式数据


    一、背景需求

    客流仿真系统是用C#写的,生成客流明细数据实时写入kafka。但是,,,

    同时写的也很慢,性能达不到要求。讨论之后,因为仿真数据是在redis集群上分布生成的,现直接以收集到的对象数据封装为protobuf,经实时数据总线(接口)转入kafak,flink实时消费数据,,,那么问题是,我要解析 protobuf数据格式、拆分为明细数据、md5、去重、关联基础数据,压力在我这边了,没办法先测试一把吧!

    二、protobuf模板对象生成

    1、使用protobuf将模板生成java文件

    首先下载一个window版本的protobuf,用的是protoc-2.5.0-win32.zip版本:
    https://github.com/protocolbuffers/protobuf/releases/download/v2.5.0/protoc-2.5.0-win32.zip

    其他最新的版本可参考:
    https://github.com/protocolbuffers/protobuf/releases

    解压后在根目录新建java文件夹

    在proto新建模板

    内容如下(具体根据需求定义):

    1. syntax = "proto2";
    2. package dwd;
    3. option java_package = "com.xxx.train.dwd";
    4. option java_outer_classname = "BaseTrainsProto";
    5. message BaseTrains{
    6. required string line_id = 1;
    7. required string section_id = 2;
    8. required string station_id = 3;
    9. required string arriving_time = 4;
    10. required string departure_time = 5;
    11. required string timetable_type = 6;
    12. required string trip_id = 7;
    13. required string ed_num = 8;
    14. required string pdate = 9;
    15. required string period_end = 10;
    16. }

     打开cmd 进入到protoc.exe所在的目录,并执行如下命令:

    protoc -I=C:\protoc-2.5.0-win32\proto --java_out=C:\protoc-2.5.0-win32 C:\protoc-2.5.0-win32\proto\BaseTrains.proto

     成功后,进入java目录即可看到java模板实例,直接将其拷贝到您的项目即可使用

    注意:包路径必须和项目包路径一直!

     

    三、flink消费kafka protobuf格式数据

    1、pom.xml配置

    自定义序列化器 | Apache Flink

    对于 Google Protobuf 需要添加以下 Maven 依赖:

    1. <dependency>
    2. <groupId>com.twitter</groupId>
    3. <artifactId>chill-protobuf</artifactId>
    4. <version>0.7.6</version>
    5. <!-- exclusions for dependency conversion -->
    6. <exclusions>
    7. <exclusion>
    8. <groupId>com.esotericsoftware.kryo</groupId>
    9. <artifactId>kryo</artifactId>
    10. </exclusion>
    11. </exclusions>
    12. </dependency>
    13. <!-- We need protobuf for chill-protobuf -->
    14. <dependency>
    15. <groupId>com.google.protobuf</groupId>
    16. <artifactId>protobuf-java</artifactId>
    17. <version>3.7.0</version>
    18. </dependency>

    2、flink消费kafka数据代码

    1. object DWDTrainAppV2ProToBuf {
    2. def main(args: Array[String]): Unit = {
    3. // 1.创建执行环境
    4. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    5. //设置并发度
    6. env.setParallelism(1)
    7. env.disableOperatorChaining()
    8. // 2. 从kafka中读取数据
    9. val properties = new Properties()
    10. properties.setProperty("bootstrap.servers", "xxxx:9092,xxxx:9092,xxxx:9092")
    11. val sourceTopic: String = "test_protocol"
    12. val consumer: FlinkKafkaConsumer[Array[Byte]] = new FlinkKafkaConsumer[Array[Byte]](sourceTopic, new ByteArrayDeserializationSchema[Array[Byte]](), properties)
    13. consumer.setStartFromEarliest()
    14. val trainMsgStream: DataStream[Array[Byte]] = env.addSource(consumer)
    15. //1、先获取Byte类型
    16. //2、传至自定义实体类BaseTrains
    17. val msg = trainMsgStream.map(message => new BaseTrains(message))
    18. msg.print()
    19. env.execute("start")
    20. }
    21. }

    类ByteArrayDeserializationSchema在下面定义

    3、写入Kafka中的数据是Protobuf类型的数据,需要用二进制的Schema进行接收,可以自己实现一个类。

    1. class ByteArrayDeserializationSchema[T] extends AbstractDeserializationSchema[Array[Byte]] {
    2. @throws[IOException]
    3. override def deserialize(message: Array[Byte]): Array[Byte] = message
    4. }

    4、BaseTrains类

    1. public class BaseTrains implements Serializable, Protobufable {
    2. private String line_id;
    3. private String section_id;
    4. private String station_id;
    5. private String arriving_time;
    6. private String departure_time;
    7. private String timetable_type;
    8. private String trip_id;
    9. private String ed_num;
    10. private String pdate;
    11. private String period_end;
    12. public BaseTrains() {
    13. }
    14. public BaseTrains(String line_id, String section_id, String station_id, String arriving_time, String departure_time, String timetable_type, String trip_id, String ed_num, String pdate, String period_end) {
    15. super();
    16. this.line_id = line_id;
    17. this.section_id = section_id;
    18. this.station_id = station_id;
    19. this.arriving_time = arriving_time;
    20. this.departure_time = departure_time;
    21. this.timetable_type = timetable_type;
    22. this.trip_id = trip_id;
    23. this.ed_num = ed_num;
    24. this.pdate = pdate;
    25. this.period_end = period_end;
    26. }
    27. @Override
    28. public byte[] encode() {
    29. BaseTrainsProto.BaseTrains.Builder builder = BaseTrainsProto.BaseTrains.newBuilder();
    30. builder.setLineId(line_id);
    31. builder.setSectionId(section_id);
    32. builder.setStationId(station_id);
    33. builder.setArrivingTime(arriving_time);
    34. builder.setDepartureTime(departure_time);
    35. builder.setTimetableType(timetable_type);
    36. builder.setTripId(trip_id);
    37. builder.setEdNum(ed_num);
    38. builder.setPdate(pdate);
    39. builder.setPeriodEnd(period_end);
    40. return builder.build().toByteArray();
    41. }
    42. public BaseTrains(byte[] bytes) {
    43. try {
    44. //3、调用protobuf生成的模板解析二进制为文本数据
    45. BaseTrainsProto.BaseTrains trains = BaseTrainsProto.BaseTrains.parseFrom(bytes);
    46. this.line_id = trains.getLineId();
    47. this.section_id = trains.getSectionId();
    48. this.station_id = trains.getStationId();
    49. this.arriving_time = trains.getArrivingTime();
    50. this.departure_time = trains.getDepartureTime();
    51. this.timetable_type = trains.getTimetableType();
    52. this.trip_id = trains.getTripId();
    53. this.ed_num = trains.getEdNum();
    54. this.pdate = trains.getPdate();
    55. this.period_end = trains.getPeriodEnd();
    56. } catch (InvalidProtocolBufferException e) {
    57. e.printStackTrace();
    58. }
    59. }
    60. @Override
    61. public String toString() {
    62. return "{" + line_id + "," + section_id + "," + station_id + "," + arriving_time + "," + departure_time + "," + timetable_type + "," + trip_id + "," + ed_num + "," + pdate + "," + period_end + '}';
    63. }
    64. public String getLine_id() {
    65. return line_id;
    66. }
    67. public void setLine_id(String line_id) {
    68. this.line_id = line_id;
    69. }
    70. public String getSection_id() {
    71. return section_id;
    72. }
    73. public void setSection_id(String section_id) {
    74. this.section_id = section_id;
    75. }
    76. public String getStation_id() {
    77. return station_id;
    78. }
    79. public void setStation_id(String station_id) {
    80. this.station_id = station_id;
    81. }
    82. public String getArriving_time() {
    83. return arriving_time;
    84. }
    85. public void setArriving_time(String arriving_time) {
    86. this.arriving_time = arriving_time;
    87. }
    88. public String getDeparture_time() {
    89. return departure_time;
    90. }
    91. public void setDeparture_time(String departure_time) {
    92. this.departure_time = departure_time;
    93. }
    94. public String getTimetable_type() {
    95. return timetable_type;
    96. }
    97. public void setTimetable_type(String timetable_type) {
    98. this.timetable_type = timetable_type;
    99. }
    100. public String getTrip_id() {
    101. return trip_id;
    102. }
    103. public void setTrip_id(String trip_id) {
    104. this.trip_id = trip_id;
    105. }
    106. public String getEd_num() {
    107. return ed_num;
    108. }
    109. public void setEd_num(String ed_num) {
    110. this.ed_num = ed_num;
    111. }
    112. public String getPdate() {
    113. return pdate;
    114. }
    115. public void setPdate(String pdate) {
    116. this.pdate = pdate;
    117. }
    118. public String getPeriod_end() {
    119. return period_end;
    120. }
    121. public void setPeriod_end(String period_end) {
    122. this.period_end = period_end;
    123. }
    124. }


    整体过程:

    1、基于提供的 BaseTrains.proto模板 调用命令protoc.exe 生成BaseTrainsProto

    2、拷贝  BaseTrainsProto至项目目录

    3、编写FlinkKafkaConsumer[Array[Byte]]

    4、新建BaseTrains  POJO对象并调用 BaseTrainsProto解析 protouf 的二进制格式数据

  • 相关阅读:
    【Harmony OS】【ArkUI】ets开发 创建视图与构建布局
    面试20220803
    【正点原子I.MX6U-MINI应用篇】3、Framebuffer应用编程,操作屏幕
    采用ModelSim创建一个简单的实例
    RV1126/RV1109 IPC板 + RK3568+鸿蒙AI视频解决方案
    大话手游原始服务端搭建教程Centos
    【蓝桥杯选拔赛真题46】Scratch磁铁游戏 少儿编程scratch蓝桥杯选拔赛真题讲解
    工厂因封控停工,客户问到一般怎么说?
    vmware for linux 下载和安装
    测试工程师多年面试问题整理
  • 原文地址:https://blog.csdn.net/u014539465/article/details/125599657