• java Flink(四十一)Flink+avro+广播流broadcast实现流量的清洗


    背景简介

    本文简单模拟对流量的处理,大概步骤如下:

    1、通过获取一个维度流,内容是流量内容的元数据信息,获取解析并进行广播

    2、获取实时流量流,做延迟处理(防止数据关联不上)

    3、流量流关联元数据广播流,通过元数据信息获取对应的数据

    4、打包成avro格式(自行百度)数据并进行sink

    代码开发

    1、pom引入

    flink版本1.14

    1. "1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    5. <modelVersion>4.0.0modelVersion>
    6. <groupId>org.examplegroupId>
    7. <artifactId>FlinkCodeartifactId>
    8. <version>1.0-SNAPSHOTversion>
    9. <properties>
    10. <maven.compiler.source>8maven.compiler.source>
    11. <maven.compiler.target>8maven.compiler.target>
    12. <jdk.version>1.8jdk.version>
    13. <jar.name>ubs-data-converterjar.name>
    14. <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
    15. <flink.version>1.14.4flink.version>
    16. properties>
    17. <dependencies>
    18. <dependency>
    19. <groupId>org.apache.flinkgroupId>
    20. <artifactId>flink-streaming-java_2.11artifactId>
    21. <version>${flink.version}version>
    22. <scope>providedscope>
    23. dependency>
    24. <dependency>
    25. <groupId>org.apache.flinkgroupId>
    26. <artifactId>flink-connector-kafka_2.11artifactId>
    27. <version>${flink.version}version>
    28. <exclusions>
    29. <exclusion>
    30. <groupId>org.apache.kafkagroupId>
    31. <artifactId>kafka-clientsartifactId>
    32. exclusion>
    33. exclusions>
    34. dependency>
    35. <dependency>
    36. <groupId>org.apache.flinkgroupId>
    37. <artifactId>flink-runtime-web_2.11artifactId>
    38. <version>${flink.version}version>
    39. <scope>providedscope>
    40. dependency>
    41. <dependency>
    42. <groupId>org.apache.httpcomponentsgroupId>
    43. <artifactId>httpclientartifactId>
    44. <version>4.5.10version>
    45. dependency>
    46. <dependency>
    47. <groupId>com.alibabagroupId>
    48. <artifactId>fastjsonartifactId>
    49. <version>1.2.8version>
    50. dependency>
    51. <dependency>
    52. <groupId>org.apache.avrogroupId>
    53. <artifactId>avroartifactId>
    54. <version>1.9.2version>
    55. dependency>
    56. <dependency>
    57. <groupId>org.apache.httpcomponentsgroupId>
    58. <artifactId>httpcoreartifactId>
    59. <version>4.4.1version>
    60. dependency>
    61. <dependency>
    62. <groupId>org.projectlombokgroupId>
    63. <artifactId>lombokartifactId>
    64. <version>1.18.16version>
    65. dependency>
    66. <dependency>
    67. <groupId>org.projectlombokgroupId>
    68. <artifactId>lombokartifactId>
    69. <version>1.18.16version>
    70. <scope>compilescope>
    71. dependency>
    72. dependencies>
    73. <build>
    74. <plugins>
    75. <plugin>
    76. <groupId>org.apache.avrogroupId>
    77. <artifactId>avro-maven-pluginartifactId>
    78. <version>1.9.2version>
    79. <executions>
    80. <execution>
    81. <phase>generate-sourcesphase>
    82. <goals>
    83. <goal>schemagoal>
    84. goals>
    85. <configuration>
    86. <sourceDirectory>${project.basedir}/src/main/resources/sourceDirectory>
    87. <outputDirectory>${project.basedir}/src/main/java/com/msxfoutputDirectory>
    88. configuration>
    89. execution>
    90. executions>
    91. plugin>
    92. <plugin>
    93. <groupId>org.apache.maven.pluginsgroupId>
    94. <artifactId>maven-compiler-pluginartifactId>
    95. <version>3.1version>
    96. <configuration>
    97. <source>${jdk.version}source>
    98. <target>${jdk.version}target>
    99. <encoding>${project.build.sourceEncoding}encoding>
    100. configuration>
    101. plugin>
    102. <plugin>
    103. <groupId>org.apache.maven.pluginsgroupId>
    104. <artifactId>maven-shade-pluginartifactId>
    105. <version>3.1.1version>
    106. <executions>
    107. <execution>
    108. <phase>packagephase>
    109. <goals>
    110. <goal>shadegoal>
    111. goals>
    112. <configuration>
    113. <finalName>${jar.name}finalName>
    114. <artifactSet>
    115. <excludes>
    116. <exclude>com.google.code.findbugs:jsr305exclude>
    117. <exclude>org.slf4j:*exclude>
    118. <exclude>log4j:*exclude>
    119. <exclude>org.glassfish.jersey.core:jersey-commonexclude>
    120. excludes>
    121. artifactSet>
    122. <relocations>
    123. <relocation>
    124. <pattern>com.google.commonpattern>
    125. <shadedPattern>com.shade.google.commonshadedPattern>
    126. relocation>
    127. <relocation>
    128. <pattern>org.apache.kafkapattern>
    129. <shadedPattern>org.shade.apache.kafkashadedPattern>
    130. relocation>
    131. relocations>
    132. <filters>
    133. <filter>
    134. <artifact>*artifact>
    135. <includes>
    136. <include>org/apache/htrace/**include>
    137. <include>org/apache/avro/**include>
    138. <include>com/msxf/**include>
    139. <include>org/apache/flink/streaming/**include>
    140. <include>org/apache/flink/connector/**include>
    141. <include>org/apache/kafka/**include>
    142. <include>org/apache/hive/**include>
    143. <include>org/apache/hadoop/hive/**include>
    144. <include>org/apache/curator/**include>
    145. <include>org/apache/zookeeper/**include>
    146. <include>org/apache/jute/**include>
    147. <include>org/apache/thrift/**include>
    148. <include>org/apache/http/**include>
    149. <include>org/I0Itec/**include>
    150. <include>jline/**include>
    151. <include>com/yammer/**include>
    152. <include>kafka/**include>
    153. <include>org/apache/hadoop/hbase/**include>
    154. <include>com/alibaba/fastjson/**include>
    155. <include>org/elasticsearch/action/**include>
    156. <include>io/confluent/**include>
    157. <include>com/fasterxml/**include>
    158. <include>org/elasticsearch/**include>
    159. <include>hbase-default.xmlinclude>
    160. <include>hbase-site.xmlinclude>
    161. includes>
    162. filter>
    163. <filter>
    164. <artifact>org.apache.hadoop.hive.*:*artifact>
    165. <excludes>
    166. <exclude>exclude>
    167. <exclude>exclude>
    168. <exclude>exclude>
    169. excludes>
    170. filter>
    171. filters>
    172. configuration>
    173. execution>
    174. executions>
    175. plugin>
    176. plugins>
    177. build>
    178. project>

    2、模拟元数据流

    正常数据流应该通过其他方式(比如访问数据库、KAFKA流)获取,本次我们直接自定义source

    1. package source;
    2. import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
    3. public class StringSource extends RichSourceFunction {
    4. Boolean running = true;
    5. @Override
    6. public void run(SourceContext ctx) throws Exception {
    7. while (running){
    8. String value = String.format("{\"data\":[{\"name\":\"id\",\"comment\":\"ID\"}" +
    9. ",{\"name\":\"age\",\"comment\":\"年龄\"}" +
    10. ",{\"name\":\"sex\",\"comment\":\"性别\"}]}");
    11. ctx.collect(value);
    12. running=false;
    13. }
    14. }
    15. @Override
    16. public void cancel() {
    17. running=false;
    18. }
    19. }

    3、模拟流量流

    模拟流量数据,本身也是通过KAFKA获取实时流量数据,本文是简单Demo,所以也通过自定义Source获取

    1. package source;
    2. import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
    3. import java.util.Arrays;
    4. import java.util.List;
    5. import java.util.Random;
    6. public class FlowSourceFunction extends RichSourceFunction {
    7. Boolean running=true;
    8. private final List USERS = Arrays.asList("张三","李四","牛二");
    9. private final List BEHAVIOR = Arrays.asList("login", "out", "delete");
    10. private final List SEX = Arrays.asList("男", "女");
    11. Random random = new Random();
    12. @Override
    13. public void run(SourceContext ctx) throws Exception {
    14. while (running){
    15. String id = USERS.get(random.nextInt(USERS.size()));
    16. String age = String.valueOf(random.nextInt(100));
    17. String sex = SEX.get(random.nextInt(SEX.size()));
    18. String time = String.valueOf(System.currentTimeMillis());
    19. String res = String.format("{\"id\":\"%s\"," +
    20. "\"age\":\"%s\"," +
    21. "\"sex\":\"%s\"," +
    22. "\"time\":\"%s\"}",id,age,sex,time);
    23. ctx.collect(res);
    24. Thread.sleep(1000);
    25. }
    26. }
    27. @Override
    28. public void cancel() {
    29. running=false;
    30. }
    31. }

    4、MapFunction处理元数据信息

    处理元数据信息

    1. package func;
    2. import bean.SchemaInfo;
    3. import com.alibaba.fastjson.JSON;
    4. import com.alibaba.fastjson.JSONObject;
    5. import org.apache.avro.Schema;
    6. import org.apache.flink.api.common.functions.RichMapFunction;
    7. import org.apache.avro.SchemaBuilder;
    8. import org.apache.flink.util.StringUtils;
    9. import java.util.HashSet;
    10. import java.util.stream.Stream;
    11. public class MetaDataMapFunction extends RichMapFunction {
    12. private String db;
    13. private String table;
    14. private final String NAME="name";
    15. public MetaDataMapFunction(String db, String table) {
    16. this.db = db;
    17. this.table = table;
    18. }
    19. @Override
    20. public SchemaInfo map(String value) throws Exception {
    21. String[] aliases = {db.concat(".").concat(table)};
    22. //存储fields
    23. HashSet fieldsSet = new HashSet<>();
    24. //schema支持多种类型,这里我们选择构建常见的record类型
    25. //初始化结果:
    26. //{"type":"record","name":"flow","namespace":"com.flow","doc":"fow_event","fields":[],"aliases":["db.table"]}
    27. SchemaBuilder.RecordBuilder recordBuilder = SchemaBuilder.record("flow").namespace("com.flow").aliases(aliases).doc("fow_event");
    28. //初始化之后要完善schema中的fields,首先获取fields
    29. SchemaBuilder.FieldAssembler fields = recordBuilder.fields();
    30. //处理元数据流
    31. JSONObject obj = JSON.parseObject(value);
    32. //过滤掉不包含ID的数据
    33. Stream data= obj.getJSONArray("data").stream().filter(
    34. o -> !StringUtils.isNullOrWhitespaceOnly(JSON.parseObject(o.toString()).getOrDefault(NAME, "").toString())
    35. ).map(o->JSONObject.parseObject(o.toString()));
    36. data.forEach(
    37. o->{
    38. String name = o.get("name").toString();
    39. String comment = o.get("comment").toString();
    40. buildFields(fields,name,comment);
    41. fieldsSet.add(name);
    42. }
    43. );
    44. Schema schema = fields.endRecord();
    45. return new SchemaInfo(schema.toString(),fieldsSet);
    46. }
    47. public void buildFields(SchemaBuilder.FieldAssembler fields,String name,String comment){
    48. fields.name(name)//字段名称
    49. .doc(comment)//描述内容、注释
    50. .orderAscending()//排序方式无
    51. .type()//类型
    52. .optional()
    53. .stringType();
    54. }
    55. }
    56. 定义返回对象

      1. package bean;
      2. import lombok.AllArgsConstructor;
      3. import lombok.Data;
      4. import java.util.HashSet;
      5. @Data
      6. @AllArgsConstructor
      7. public class SchemaInfo {
      8. public String info;
      9. public HashSet set;
      10. }

      5、延迟处理主流(流量流)

      1. package func;
      2. import com.alibaba.fastjson.JSON;
      3. import org.apache.flink.api.common.functions.RichMapFunction;
      4. import org.apache.flink.configuration.Configuration;
      5. /**
      6. *
      7. * 1、延时map的初始化,阻塞主流数据,等待广播流schema写入广播状态,保证主流数据可以获取到schema
      8. * 2、标准化数据
      9. */
      10. public class DelayEtlMap extends RichMapFunction {
      11. private final long delayTime;
      12. public DelayEtlMap(long delayTime) {
      13. this.delayTime = delayTime;
      14. }
      15. @Override
      16. public void open(Configuration parameters) throws Exception {
      17. super.open(parameters);
      18. Thread.sleep(delayTime);
      19. }
      20. @Override
      21. public String map(String value) throws Exception {
      22. return value;
      23. }
      24. }

      6、双流关联,处理数据

      1. package func;
      2. import bean.FlowData;
      3. import bean.SchemaInfo;
      4. import com.alibaba.fastjson.JSON;
      5. import com.alibaba.fastjson.JSONObject;
      6. import org.apache.flink.api.common.state.BroadcastState;
      7. import org.apache.flink.api.common.state.MapStateDescriptor;
      8. import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
      9. import org.apache.flink.api.common.typeinfo.TypeInformation;
      10. import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
      11. import org.apache.flink.util.Collector;
      12. import java.text.SimpleDateFormat;
      13. import java.util.HashMap;
      14. /**
      15. *
      16. * processElement 处理业务流数据
      17. * processBroadcastElement 处理广播流数据
      18. */
      19. public class FlowBrodCastFunction extends BroadcastProcessFunction {
      20. @Override
      21. public void processElement(String value, BroadcastProcessFunction.ReadOnlyContext ctx, Collector out) throws Exception {
      22. ReadOnlyBroadcastState flowMetaData = ctx.getBroadcastState(new MapStateDescriptor(
      23. "flowMetaData"
      24. , TypeInformation.of(String.class)
      25. , TypeInformation.of(SchemaInfo.class)));
      26. SchemaInfo schema = flowMetaData.get("schema");
      27. JSONObject jsonObject = JSON.parseObject(value);
      28. Long time = jsonObject.getLong("time");
      29. //创建时间格式
      30. SimpleDateFormat yyyyMMdd = new SimpleDateFormat("yyyyMMdd");
      31. String dt = yyyyMMdd.format(time);
      32. //获取元数据字段
      33. HashMap map = new HashMap<>();
      34. schema.getSet().forEach(
      35. o->{
      36. map.put(o.toString(),jsonObject.get(o).toString());
      37. }
      38. );
      39. out.collect(new FlowData(schema.getInfo(),map,time));
      40. }
      41. @Override
      42. public void processBroadcastElement(SchemaInfo value, BroadcastProcessFunction.Context ctx, Collector out) throws Exception {
      43. if(value != null){
      44. BroadcastState flowMetaData = ctx.getBroadcastState(new MapStateDescriptor(
      45. "flowMetaData"
      46. , TypeInformation.of(String.class)
      47. , TypeInformation.of(SchemaInfo.class)));
      48. flowMetaData.put("schema", value);
      49. }
      50. }
      51. }

      FlowData对象

      1. package bean;
      2. import lombok.AllArgsConstructor;
      3. import lombok.Data;
      4. import java.util.HashMap;
      5. @Data
      6. @AllArgsConstructor
      7. public class FlowData {
      8. public String schema;
      9. public HashMap values;
      10. private long time;
      11. }

      7、主代码

      1. package ubs.app;
      2. import bean.FlowData;
      3. import bean.SchemaInfo;
      4. import func.DelayEtlMap;
      5. import func.FlowBrodCastFunction;
      6. import func.FlowSinkFunction;
      7. import func.MetaDataMapFunction;
      8. import org.apache.flink.api.common.state.MapStateDescriptor;
      9. import org.apache.flink.api.common.typeinfo.TypeInformation;
      10. import org.apache.flink.api.java.utils.ParameterTool;
      11. import org.apache.flink.streaming.api.datastream.BroadcastStream;
      12. import org.apache.flink.streaming.api.datastream.DataStreamSource;
      13. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
      14. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      15. import org.apache.flink.util.StringUtils;
      16. import source.FlowSourceFunction;
      17. import source.StringSource;
      18. import java.util.Objects;
      19. public class FlowApp {
      20. public static void main(String[] args) throws Exception{
      21. //解析参数
      22. ParameterTool parameterTool = ParameterTool.fromArgs(args);
      23. //获取参数 db
      24. String db = parameterTool.get("db")==null?"":parameterTool.get("db");
      25. //获取参数 table
      26. String table = parameterTool.get("table")==null?"":parameterTool.get("table");
      27. //初始化环境
      28. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      29. //模拟元数据流,后续获取到元数据进行广播
      30. DataStreamSource configSource = env.addSource(new StringSource());
      31. //设置并行度
      32. configSource.setParallelism(1);
      33. //获取实时流量数据流:kafka source
      34. // KafkaSource source = SourceGetter.getValueOnlySimpleStrDesSource(parameterTool);
      35. // DataStreamSource realData = env.fromSource(source, WatermarkStrategy.noWatermarks(), "liuliang");
      36. DataStreamSource realData = env.addSource(new FlowSourceFunction());
      37. realData.print("realData: ");
      38. // //处理元数据并广播:将元数据流接收并处理为schema
      39. BroadcastStream metaData = configSource.filter(data -> !StringUtils.isNullOrWhitespaceOnly(data))
      40. .setParallelism(1)
      41. .map(new MetaDataMapFunction("a", "b"))
      42. .broadcast(new MapStateDescriptor(
      43. "flowMetaData"
      44. , TypeInformation.of(String.class)
      45. , TypeInformation.of(SchemaInfo.class)
      46. ));
      47. //延迟主流
      48. SingleOutputStreamOperator realData2 = realData.map(new DelayEtlMap(1000))
      49. .filter(Objects::nonNull);
      50. //连接流,解析数据
      51. SingleOutputStreamOperator res = realData2.connect(metaData).process(new FlowBrodCastFunction());
      52. res.addSink(new FlowSinkFunction());
      53. res.print("res");
      54. env.execute();
      55. }
      56. }

    57. 相关阅读:
      消息队列MQ核心原理全面总结(11大必会原理)
      Java学习 - MySQL数据库中提到的 视图 是什么? 如何使用?
      【案例30】WebSphere诡异宕机
      mysql foreach 写法,多条 sql 与一条 sql 的性能消耗分析
      Java对List的操作
      【计算机网络——应用层】http协议
      【Arduino25】液晶模拟值实验
      Python重点数据结构基本用法
      【英雄哥七月集训】第 26天:并查集
      xshell不能通过账户密码连接虚拟机
    58. 原文地址:https://blog.csdn.net/qq_40771567/article/details/136379911