一、背景需求
客流仿真系统是用C#写的,生成客流明细数据实时写入kafka。但是,,,

同时写的也很慢,性能达不到要求。讨论之后,因为仿真数据是在redis集群上分布生成的,现直接以收集到的对象数据封装为protobuf,经实时数据总线(接口)转入kafak,flink实时消费数据,,,那么问题是,我要解析 protobuf数据格式、拆分为明细数据、md5、去重、关联基础数据,压力在我这边了,没办法先测试一把吧!
二、protobuf模板对象生成
1、使用protobuf将模板生成java文件
首先下载一个window版本的protobuf,用的是protoc-2.5.0-win32.zip版本:
https://github.com/protocolbuffers/protobuf/releases/download/v2.5.0/protoc-2.5.0-win32.zip
其他最新的版本可参考:
https://github.com/protocolbuffers/protobuf/releases
解压后在根目录新建java文件夹

在proto新建模板

内容如下(具体根据需求定义):
- syntax = "proto2";
-
- package dwd;
- option java_package = "com.xxx.train.dwd";
- option java_outer_classname = "BaseTrainsProto";
-
- message BaseTrains{
- required string line_id = 1;
- required string section_id = 2;
- required string station_id = 3;
- required string arriving_time = 4;
- required string departure_time = 5;
- required string timetable_type = 6;
- required string trip_id = 7;
- required string ed_num = 8;
- required string pdate = 9;
- required string period_end = 10;
- }
打开cmd 进入到protoc.exe所在的目录,并执行如下命令:
protoc -I=C:\protoc-2.5.0-win32\proto --java_out=C:\protoc-2.5.0-win32 C:\protoc-2.5.0-win32\proto\BaseTrains.proto
成功后,进入java目录即可看到java模板实例,直接将其拷贝到您的项目即可使用
注意:包路径必须和项目包路径一直!

三、flink消费kafka protobuf格式数据
1、pom.xml配置
对于 Google Protobuf 需要添加以下 Maven 依赖:
- <dependency>
- <groupId>com.twitter</groupId>
- <artifactId>chill-protobuf</artifactId>
- <version>0.7.6</version>
- <!-- exclusions for dependency conversion -->
- <exclusions>
- <exclusion>
- <groupId>com.esotericsoftware.kryo</groupId>
- <artifactId>kryo</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <!-- We need protobuf for chill-protobuf -->
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- <version>3.7.0</version>
- </dependency>
-
-
2、flink消费kafka数据代码
- object DWDTrainAppV2ProToBuf {
-
- def main(args: Array[String]): Unit = {
-
- // 1.创建执行环境
- val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
- //设置并发度
- env.setParallelism(1)
- env.disableOperatorChaining()
- // 2. 从kafka中读取数据
- val properties = new Properties()
- properties.setProperty("bootstrap.servers", "xxxx:9092,xxxx:9092,xxxx:9092")
- val sourceTopic: String = "test_protocol"
- val consumer: FlinkKafkaConsumer[Array[Byte]] = new FlinkKafkaConsumer[Array[Byte]](sourceTopic, new ByteArrayDeserializationSchema[Array[Byte]](), properties)
- consumer.setStartFromEarliest()
- val trainMsgStream: DataStream[Array[Byte]] = env.addSource(consumer)
- //1、先获取Byte类型
- //2、传至自定义实体类BaseTrains
- val msg = trainMsgStream.map(message => new BaseTrains(message))
- msg.print()
- env.execute("start")
- }
- }
类ByteArrayDeserializationSchema在下面定义
3、写入Kafka中的数据是Protobuf类型的数据,需要用二进制的Schema进行接收,可以自己实现一个类。
- class ByteArrayDeserializationSchema[T] extends AbstractDeserializationSchema[Array[Byte]] {
- @throws[IOException]
- override def deserialize(message: Array[Byte]): Array[Byte] = message
- }
4、BaseTrains类
- public class BaseTrains implements Serializable, Protobufable {
- private String line_id;
- private String section_id;
- private String station_id;
- private String arriving_time;
- private String departure_time;
- private String timetable_type;
- private String trip_id;
- private String ed_num;
- private String pdate;
- private String period_end;
-
- public BaseTrains() {
-
- }
-
- public BaseTrains(String line_id, String section_id, String station_id, String arriving_time, String departure_time, String timetable_type, String trip_id, String ed_num, String pdate, String period_end) {
- super();
- this.line_id = line_id;
- this.section_id = section_id;
- this.station_id = station_id;
- this.arriving_time = arriving_time;
- this.departure_time = departure_time;
- this.timetable_type = timetable_type;
- this.trip_id = trip_id;
- this.ed_num = ed_num;
- this.pdate = pdate;
- this.period_end = period_end;
- }
-
- @Override
- public byte[] encode() {
- BaseTrainsProto.BaseTrains.Builder builder = BaseTrainsProto.BaseTrains.newBuilder();
- builder.setLineId(line_id);
- builder.setSectionId(section_id);
- builder.setStationId(station_id);
- builder.setArrivingTime(arriving_time);
- builder.setDepartureTime(departure_time);
- builder.setTimetableType(timetable_type);
- builder.setTripId(trip_id);
- builder.setEdNum(ed_num);
- builder.setPdate(pdate);
- builder.setPeriodEnd(period_end);
- return builder.build().toByteArray();
- }
-
- public BaseTrains(byte[] bytes) {
- try {
- //3、调用protobuf生成的模板解析二进制为文本数据
- BaseTrainsProto.BaseTrains trains = BaseTrainsProto.BaseTrains.parseFrom(bytes);
- this.line_id = trains.getLineId();
- this.section_id = trains.getSectionId();
- this.station_id = trains.getStationId();
- this.arriving_time = trains.getArrivingTime();
- this.departure_time = trains.getDepartureTime();
- this.timetable_type = trains.getTimetableType();
- this.trip_id = trains.getTripId();
- this.ed_num = trains.getEdNum();
- this.pdate = trains.getPdate();
- this.period_end = trains.getPeriodEnd();
- } catch (InvalidProtocolBufferException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public String toString() {
- return "{" + line_id + "," + section_id + "," + station_id + "," + arriving_time + "," + departure_time + "," + timetable_type + "," + trip_id + "," + ed_num + "," + pdate + "," + period_end + '}';
- }
-
- public String getLine_id() {
- return line_id;
- }
-
- public void setLine_id(String line_id) {
- this.line_id = line_id;
- }
-
- public String getSection_id() {
- return section_id;
- }
-
- public void setSection_id(String section_id) {
- this.section_id = section_id;
- }
-
- public String getStation_id() {
- return station_id;
- }
-
- public void setStation_id(String station_id) {
- this.station_id = station_id;
- }
-
- public String getArriving_time() {
- return arriving_time;
- }
-
- public void setArriving_time(String arriving_time) {
- this.arriving_time = arriving_time;
- }
-
- public String getDeparture_time() {
- return departure_time;
- }
-
- public void setDeparture_time(String departure_time) {
- this.departure_time = departure_time;
- }
-
- public String getTimetable_type() {
- return timetable_type;
- }
-
- public void setTimetable_type(String timetable_type) {
- this.timetable_type = timetable_type;
- }
-
- public String getTrip_id() {
- return trip_id;
- }
-
- public void setTrip_id(String trip_id) {
- this.trip_id = trip_id;
- }
-
- public String getEd_num() {
- return ed_num;
- }
-
- public void setEd_num(String ed_num) {
- this.ed_num = ed_num;
- }
-
- public String getPdate() {
- return pdate;
- }
-
- public void setPdate(String pdate) {
- this.pdate = pdate;
- }
-
- public String getPeriod_end() {
- return period_end;
- }
-
- public void setPeriod_end(String period_end) {
- this.period_end = period_end;
- }
- }
整体过程:
1、基于提供的 BaseTrains.proto模板 调用命令protoc.exe 生成BaseTrainsProto
2、拷贝 BaseTrainsProto至项目目录
3、编写FlinkKafkaConsumer[Array[Byte]]
4、新建BaseTrains POJO对象并调用 BaseTrainsProto解析 protouf 的二进制格式数据