• Flink DataStream API


    DataStream API是Flink的核心层API。一个Flink程序,其实就是对DataStream的各种转换。具体来说,代码基本上都由以下几部分构成:

    1. package com.atguigu.env;
    2. import org.apache.flink.api.common.JobExecutionResult;
    3. import org.apache.flink.api.common.RuntimeExecutionMode;
    4. import org.apache.flink.api.common.typeinfo.Types;
    5. import org.apache.flink.api.java.tuple.Tuple2;
    6. import org.apache.flink.configuration.Configuration;
    7. import org.apache.flink.configuration.RestOptions;
    8. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    9. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    11. import org.apache.flink.util.Collector;
    12. /**
    13. * TODO
    14. *
    15. * @author cjp
    16. * @version 1.0
    17. */
    18. public class EnvDemo {
    19. public static void main(String[] args) throws Exception {
    20. Configuration conf = new Configuration();
    21. conf.set(RestOptions.BIND_PORT, "8082");
    22. StreamExecutionEnvironment env = StreamExecutionEnvironment
    23. // .getExecutionEnvironment(); // 自动识别是 远程集群 ,还是idea本地环境
    24. .getExecutionEnvironment(conf); // conf对象可以去修改一些参数
    25. // .createLocalEnvironment()
    26. // .createRemoteEnvironment("hadoop102", 8081,"/xxx")
    27. // 流批一体:代码api是同一套,可以指定为 批,也可以指定为 流
    28. // 默认 STREAMING
    29. // 一般不在代码写死,提交时 参数指定:-Dexecution.runtime-mode=BATCH
    30. env.setRuntimeMode(RuntimeExecutionMode.BATCH);
    31. env
    32. // .socketTextStream("hadoop102", 7777)
    33. .readTextFile("input/word.txt")
    34. .flatMap(
    35. (String value, Collector> out) -> {
    36. String[] words = value.split(" ");
    37. for (String word : words) {
    38. out.collect(Tuple2.of(word, 1));
    39. }
    40. }
    41. )
    42. .returns(Types.TUPLE(Types.STRING, Types.INT))
    43. .keyBy(value -> value.f0)
    44. .sum(1)
    45. .print();
    46. env.execute();
    47. /** TODO 关于execute总结(了解)
    48. * 1、默认 env.execute()触发一个flink job:
    49. * 一个main方法可以调用多个execute,但是没意义,指定到第一个就会阻塞住
    50. * 2、env.executeAsync(),异步触发,不阻塞
    51. * => 一个main方法里 executeAsync()个数 = 生成的flink job数
    52. * 3、思考:
    53. * yarn-application 集群,提交一次,集群里会有几个flink job?
    54. * =》 取决于 调用了n个 executeAsync()
    55. * =》 对应 application集群里,会有n个job
    56. * =》 对应 Jobmanager当中,会有 n个 JobMaster
    57. */
    58. // env.executeAsync();
    59. // ……
    60. // env.executeAsync();
    61. }
    62. }

    5.1 执行环境(Execution Environment

    Flink程序可以在各种上下文环境中运行:我们可以在本地JVM中执行程序,也可以提交到远程集群上运行。

    不同的环境,代码的提交运行的过程会有所不同。这就要求我们在提交作业执行计算时,首先必须获取当前Flink的运行环境,从而建立起与Flink框架之间的联系。

    5.1.1 创建执行环境

    我们要获取的执行环境,是StreamExecutionEnvironment类的对象,这是所有Flink程序的基础。在代码中创建执行环境的方式,就是调用这个类的静态方法,具体有以下三种。

    1)getExecutionEnvironment

    最简单的方式,就是直接调用getExecutionEnvironment方法。它会根据当前运行的上下文直接得到正确的结果:如果程序是独立运行的,就返回一个本地执行环境;如果是创建了jar包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境。也就是说,这个方法会根据当前运行的方式,自行决定该返回什么样的运行环境。

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    这种方式,用起来简单高效,是最常用的一种创建执行环境的方式。

    2)createLocalEnvironment

    这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的CPU核心数。

    StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();

    3createRemoteEnvironment

    这个方法返回集群执行环境。需要在调用时指定JobManager的主机名和端口号,并指定要在集群中运行的Jar包。

    StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment

           .createRemoteEnvironment(

              "host",                   // JobManager主机名

              1234,                     // JobManager进程端口号

              "path/to/jarFile.jar"  // 提交给JobManagerJAR

          );

    在获取到程序执行环境后,我们还可以对执行环境进行灵活的设置。比如可以全局设置程序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制。

    5.1.2 执行模式(Execution Mode

    从Flink 1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理。不建议使用DataSet API。

    // 流处理环境

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream API执行模式包括:流执行模式、批执行模式和自动模式。

    1. 流执行模式(Streaming)

    这是DataStream API最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下程序使用的就是Streaming执行模式。

    1. 批执行模式(Batch)

    专门用于批处理的执行模式。

    1. 自动模式(AutoMatic)

    在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。

    批执行模式的使用。主要有两种方式:

    1)通过命令行配置

    bin/flink run -Dexecution.runtime-mode=BATCH ...

    在提交作业时,增加execution.runtime-mode参数,指定值为BATCH。

    2)通过代码配置

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.setRuntimeMode(RuntimeExecutionMode.BATCH);

    在代码中,直接基于执行环境调用setRuntimeMode方法,传入BATCH模式。

    实际应用中一般不会在代码中配置,而是使用命令行,这样更加灵活。

    5.1.3 触发程序执行

    需要注意的是,写完输出(sink)操作并不代表程序已经结束。因为当main()方法被调用时,其实只是定义了作业的每个执行操作,然后添加到数据流图中;这时并没有真正处理数据——因为数据可能还没来。Flink是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”

    所以我们需要显式地调用执行环境的execute()方法,来触发程序执行。execute()方法将一直等待作业完成,然后返回一个执行结果(JobExecutionResult)。

    env.execute();

    5.2 源算子(Source

    Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端。

    在Flink1.12以前,旧的添加source的方式,是调用执行环境的addSource()方法:

    DataStream stream = env.addSource(...);

    方法传入的参数是一个“源函数”(source function),需要实现SourceFunction接口。

    从Flink1.12开始,主要使用流批统一的新Source架构:

    DataStreamSource stream = env.fromSource(…)

    Flink直接提供了很多预实现的接口,此外还有很多外部连接工具也帮我们实现了对应的Source,通常情况下足以应对我们的实际需求。

    5.2.1 准备工作

    为了方便练习,这里使用WaterSensor作为数据模型。

    字段名

    数据类型

    说明

    id

    String

    水位传感器类型

    ts

    Long

    传感器记录时间戳

    vc

    Integer

    水位记录

     具体代码如下:

    1. public class WaterSensor {
    2. public String id;
    3. public Long ts;
    4. public Integer vc;
    5. public WaterSensor() {
    6. }
    7. public WaterSensor(String id, Long ts, Integer vc) {
    8. this.id = id;
    9. this.ts = ts;
    10. this.vc = vc;
    11. }
    12. public String getId() {
    13. return id;
    14. }
    15. public void setId(String id) {
    16. this.id = id;
    17. }
    18. public Long getTs() {
    19. return ts;
    20. }
    21. public void setTs(Long ts) {
    22. this.ts = ts;
    23. }
    24. public Integer getVc() {
    25. return vc;
    26. }
    27. public void setVc(Integer vc) {
    28. this.vc = vc;
    29. }
    30. @Override
    31. public String toString() {
    32. return "WaterSensor{" +
    33. "id='" + id + '\'' +
    34. ", ts=" + ts +
    35. ", vc=" + vc +
    36. '}';
    37. }
    38. @Override
    39. public boolean equals(Object o) {
    40. if (this == o) {
    41. return true;
    42. }
    43. if (o == null || getClass() != o.getClass()) {
    44. return false;
    45. }
    46. WaterSensor that = (WaterSensor) o;
    47. return Objects.equals(id, that.id) &&
    48. Objects.equals(ts, that.ts) &&
    49. Objects.equals(vc, that.vc);
    50. }
    51. @Override
    52. public int hashCode() {
    53. return Objects.hash(id, ts, vc);
    54. }
    55. }

    这里需要注意,我们定义的WaterSensor,有这样几个特点:

    1. 类是公有(public)的
    2. 有一个无参的构造方法
    3. 所有属性都是公有(public)的
    4. 所有属性的类型都是可以序列化的

    Flink会把这样的类作为一种特殊的POJO(Plain Ordinary Java Object简单的Java对象,实际就是普通JavaBeans)数据类型来对待,方便数据的解析和序列化。另外我们在类中还重写了toString方法,主要是为了测试输出显示更清晰。

    我们这里自定义的POJO类会在后面的代码中频繁使用,所以在后面的代码中碰到,把这里的POJO类导入就好了。

    5.2.2 从集合中读取数据

    最简单的读取数据的方式,就是在代码中直接创建一个Java集合,然后调用执行环境的fromCollection方法进行读取。这相当于将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用,一般用于测试。

    1. package com.atguigu.source;
    2. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    4. import java.util.Arrays;
    5. /**
    6. * TODO
    7. *
    8. * @author cjp
    9. * @version 1.0
    10. */
    11. public class CollectionDemo {
    12. public static void main(String[] args) throws Exception {
    13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    14. // TODO 从集合读取数据
    15. DataStreamSource source = env
    16. .fromElements(1,2,33); // 从元素读
    17. // .fromCollection(Arrays.asList(1, 22, 3)); // 从集合读
    18. source.print();
    19. env.execute();
    20. }
    21. }

    5.2.3 从文件读取数据

    真正的实际应用中,自然不会直接将数据写在代码中。通常情况下,我们会从存储介质中获取数据,一个比较常见的方式就是读取日志文件。这也是批处理中最常见的读取方式。

    读取文件,需要添加文件连接器依赖:

    1. <dependency>
    2. <groupId>org.apache.flinkgroupId>
    3. <artifactId>flink-connector-filesartifactId>
    4. <version>${flink.version}version>
    5. dependency>

     示例如下:

    1. package com.atguigu.source;
    2. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    3. import org.apache.flink.connector.file.src.FileSource;
    4. import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
    5. import org.apache.flink.core.fs.Path;
    6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    7. /**
    8. * TODO
    9. *
    10. * @author cjp
    11. * @version 1.0
    12. */
    13. public class FileSourceDemo {
    14. public static void main(String[] args) throws Exception {
    15. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    16. env.setParallelism(1);
    17. // TODO 从文件读: 新Source架构
    18. FileSource fileSource = FileSource
    19. .forRecordStreamFormat(
    20. new TextLineInputFormat(),
    21. new Path("input/word.txt")
    22. )
    23. .build();
    24. // filesource 随意取名
    25. env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "filesource")
    26. .print();
    27. env.execute();
    28. }
    29. }
    30. /**
    31. * 新的Source写法:
    32. * env.fromSource(Source的实现类,Watermark,名字)
    33. */

    说明:

    1. 参数可以是目录,也可以是文件;还可以从HDFS目录下读取,使用路径hdfs://...;
    2. 路径可以是相对路径,也可以是绝对路径;
    3. 相对路径是从系统属性user.dir获取路径:idea下是project的根目录,standalone模式下是集群节点根目录;

    5.2.4 Socket读取数据

    不论从集合还是文件,我们读取的其实都是有界数据。在流处理的场景中,数据往往是无界的。

    我们之前用到的读取socket文本流,就是流处理场景。但是这种方式由于吞吐量小、稳定性较差,一般也是用于测试。

    DataStream stream = env.socketTextStream("localhost", 7777);

    5.2.5 Kafka读取数据

    Flink官方提供了连接工具flink-connector-kafka,直接帮我们实现了一个消费者FlinkKafkaConsumer,它就是用来读取Kafka数据的SourceFunction。

    所以想要以Kafka作为数据源获取数据,我们只需要引入Kafka连接器的依赖。Flink官方提供的是一个通用的Kafka连接器,它会自动跟踪最新版本的Kafka客户端。目前最新版本只支持0.10.0版本以上的Kafka。这里我们需要导入的依赖如下。

    1. <dependency>
    2. <groupId>org.apache.flinkgroupId>
    3. <artifactId>flink-connector-kafkaartifactId>
    4. <version>${flink.version}version>
    5. dependency>

    代码如下:

    1. package com.atguigu.source;
    2. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    3. import org.apache.flink.api.common.serialization.SimpleStringSchema;
    4. import org.apache.flink.connector.file.src.FileSource;
    5. import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
    6. import org.apache.flink.connector.kafka.source.KafkaSource;
    7. import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
    8. import org.apache.flink.core.fs.Path;
    9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    10. import java.time.Duration;
    11. /**
    12. * TODO
    13. *
    14. * @author cjp
    15. * @version 1.0
    16. */
    17. public class KafkaSourceDemo {
    18. public static void main(String[] args) throws Exception {
    19. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    20. env.setParallelism(1);
    21. // TODO 从Kafka读: 新Source架构
    22. KafkaSource kafkaSource = KafkaSource.builder()
    23. .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092") // 指定kafka节点的地址和端口
    24. .setGroupId("atguigu") // 指定消费者组的id
    25. .setTopics("topic_1") // 指定消费的 Topic
    26. .setValueOnlyDeserializer(new SimpleStringSchema()) // 指定 反序列化器,这个是反序列化value
    27. .setStartingOffsets(OffsetsInitializer.latest()) // flink消费kafka的策略
    28. .build();
    29. env
    30. // .fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkasource")
    31. .fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource")
    32. .print();
    33. env.execute();
    34. }
    35. }
    36. /**
    37. * kafka消费者的参数:
    38. * auto.reset.offsets
    39. * earliest: 如果有offset,从offset继续消费; 如果没有offset,从 最早 消费
    40. * latest : 如果有offset,从offset继续消费; 如果没有offset,从 最新 消费
    41. *
    42. * flink的kafkasource,offset消费策略:OffsetsInitializer,默认是 earliest
    43. * earliest: 一定从 最早 消费
    44. * latest : 一定从 最新 消费
    45. *
    46. *
    47. *
    48. */

    5.2.6 从数据生成器读取数据

    Flink从1.11开始提供了一个内置的DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等。1.17提供了新的Source写法,需要导入依赖:

    1. <dependency>
    2. <groupId>org.apache.flinkgroupId>
    3. <artifactId>flink-connector-datagenartifactId>
    4. <version>${flink.version}version>
    5. dependency>

    代码如下:

    1. package com.atguigu.source;
    2. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    3. import org.apache.flink.api.common.typeinfo.Types;
    4. import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
    5. import org.apache.flink.connector.datagen.source.DataGeneratorSource;
    6. import org.apache.flink.connector.datagen.source.GeneratorFunction;
    7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    8. /**
    9. * TODO
    10. *
    11. * @author cjp
    12. * @version 1.0
    13. */
    14. public class DataGeneratorDemo {
    15. public static void main(String[] args) throws Exception {
    16. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    17. // 如果有n个并行度, 最大值设为a
    18. // 将数值 均分成 n份, a/n ,比如,最大100,并行度2,每个并行度生成50个
    19. // 其中一个是 0-49,另一个50-99
    20. env.setParallelism(2);
    21. /**
    22. * 数据生成器Source,四个参数:
    23. * 第一个: GeneratorFunction接口,需要实现, 重写map方法, 输入类型固定是Long
    24. * 第二个: long类型, 自动生成的数字序列(从0自增)的最大值(小于),达到这个值就停止了
    25. * 第三个: 限速策略, 比如 每秒生成几条数据
    26. * 第四个: 返回的类型
    27. */
    28. DataGeneratorSource dataGeneratorSource = new DataGeneratorSource<>(
    29. new GeneratorFunction() {
    30. @Override
    31. public String map(Long value) throws Exception {
    32. return "Number:" + value;
    33. }
    34. },
    35. 100,
    36. RateLimiterStrategy.perSecond(1),
    37. Types.STRING
    38. );
    39. env
    40. .fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator")
    41. .print();
    42. env.execute();
    43. }
    44. }

    5.2.7 Flink支持的数据类型

    1Flink的类型系统

    Flink使用“类型信息”(TypeInformation)来统一表示数据类型。TypeInformation类是Flink中所有类型描述符的基类。它涵盖了类型的一些基本属性,并为每个数据类型生成特定的序列化器、反序列化器和比较器。

    2Flink支持的数据类型

    对于常见的Java和Scala数据类型,Flink都是支持的。Flink在内部,Flink对支持不同的类型进行了划分,这些类型可以在Types工具类中找到:

    1)基本类型

    所有Java基本类型及其包装类,再加上Void、String、Date、BigDecimal和BigInteger。

    2)数组类型

    包括基本类型数组(PRIMITIVE_ARRAY)和对象数组(OBJECT_ARRAY)。

    3)复合数据类型

    1. Java元组类型(TUPLE):这是Flink内置的元组类型,是Java API的一部分。最多25个字段,也就是从Tuple0~Tuple25,不支持空字段。
    2. Scala 样例类及Scala元组:不支持空字段。
    3. 行类型(ROW):可以认为是具有任意个字段的元组,并支持空字段。
    4. POJO:Flink自定义的类似于Java bean模式的类。

    4)辅助类型

    Option、Either、List、Map等。

    5)泛型类型(GENERIC

    Flink支持所有的Java类和Scala类。不过如果没有按照上面POJO类型的要求来定义,就会被Flink当作泛型类来处理。Flink会把泛型类型当作黑盒,无法获取它们内部的属性;它们也不是由Flink本身序列化的,而是由Kryo序列化的

    在这些类型中,元组类型和POJO类型最为灵活,因为它们支持创建复杂类型。而相比之下,POJO还支持在键(key)的定义中直接使用字段名,这会让我们的代码可读性大大增加。所以,在项目实践中,往往会将流处理程序中的元素类型定为Flink的POJO类型。

    Flink对POJO类型的要求如下:

    1. 类是公有(public)的
    2. 有一个无参的构造方法
    3. 所有属性都是公有(public)的
    4. 所有属性的类型都是可以序列化的

    3)类型提示(Type Hints

    Flink还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于Java中泛型擦除的存在,在某些特殊情况下(比如Lambda表达式中),自动提取的信息是不够精细的——只告诉Flink当前的元素由“船头、船身、船尾”构成,根本无法重建出“大船”的模样;这时就需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。

    为了解决这类问题,Java API提供了专门的“类型提示”(type hints)。

    回忆一下之前的word count流处理程序,我们在将String类型的每个词转换成(word, count)二元组后,就明确地用returns指定了返回的类型。因为对于map里传入的Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2。只有显式地告诉系统当前的返回类型,才能正确地解析出完整数据。

    .map(word -> Tuple2.of(word, 1L))

    .returns(Types.TUPLE(Types.STRING, Types.LONG));

    Flink还专门提供了TypeHint类,它可以捕获泛型的类型信息,并且一直记录下来,为运行时提供足够的信息。我们同样可以通过.returns()方法,明确地指定转换之后的DataStream里元素的类型。

    returns(new TypeHint>(){})

    5.3 转换算子(Transformation

    数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个DataStream转换为新的DataStream。

    5.3.1 基本转换算子(map/ filter/ flatMap

    5.3.1.1 映射(map)

    map是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素。

     

    我们只需要基于DataStream调用map()方法就可以进行转换处理。方法需要传入的参数是接口MapFunction的实现;返回值类型还是DataStream,不过泛型(流中的元素类型)可能改变。

    下面的代码用不同的方式,实现了提取WaterSensor中的id字段的功能。

    1. public class TransMap {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. DataStreamSource stream = env.fromElements(
    5. new WaterSensor("sensor_1", 1, 1),
    6. new WaterSensor("sensor_2", 2, 2)
    7. );
    8. // 方式一:传入匿名类,实现MapFunction
    9. stream.map(new MapFunction() {
    10. @Override
    11. public String map(WaterSensor e) throws Exception {
    12. return e.id;
    13. }
    14. }).print();
    15. // 方式二:传入MapFunction的实现类
    16. // stream.map(new UserMap()).print();
    17. env.execute();
    18. }
    19. public static class UserMap implements MapFunction {
    20. @Override
    21. public String map(WaterSensor e) throws Exception {
    22. return e.id;
    23. }
    24. }
    25. }

    上面代码中,MapFunction实现类的泛型类型,与输入数据类型和输出数据的类型有关。在实现MapFunction接口的时候,需要指定两个泛型,分别是输入事件和输出事件的类型,还需要重写一个map()方法,定义从一个输入事件转换为另一个输出事件的具体逻辑。

    5.3.1.2 过滤(filter)

    filter转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为true则元素正常输出,若为false则元素被过滤掉。

    进行filter转换之后的新数据流的数据类型与原数据流是相同的。filter转换需要传入的参数需要实现FilterFunction接口,而FilterFunction内要实现filter()方法,就相当于一个返回布尔类型的条件表达式。

    案例需求:下面的代码会将数据流中传感器id为sensor_1的数据过滤出来。

    1. public class TransFilter {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. DataStreamSource stream = env.fromElements(
    5. new WaterSensor("sensor_1", 1, 1),
    6. new WaterSensor("sensor_1", 2, 2),
    7. new WaterSensor("sensor_2", 2, 2),
    8. new WaterSensor("sensor_3", 3, 3)
    9. );
    10. // 方式一:传入匿名类实现FilterFunction
    11. stream.filter(new FilterFunction() {
    12. @Override
    13. public boolean filter(WaterSensor e) throws Exception {
    14. return e.id.equals("sensor_1");
    15. }
    16. }).print();
    17. // 方式二:传入FilterFunction实现类
    18. // stream.filter(new UserFilter()).print();
    19. env.execute();
    20. }
    21. public static class UserFilter implements FilterFunction {
    22. @Override
    23. public boolean filter(WaterSensor e) throws Exception {
    24. return e.id.equals("sensor_1");
    25. }
    26. }
    27. }

    5.3.1.3 扁平映射(flatMap)

    flatMap操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生0到多个元素。flatMap可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。

    同map一样,flatMap也可以使用Lambda表达式或者FlatMapFunction接口实现类的方式来进行传参,返回值类型取决于所传参数的具体逻辑,可以与原数据流相同,也可以不同。

    案例需求:如果输入的数据是sensor_1,只打印vc;如果输入的数据是sensor_2,既打印ts又打印vc

    实现代码如下:

    1. package com.atguigu.transfrom;
    2. import com.atguigu.bean.WaterSensor;
    3. import org.apache.flink.api.common.functions.FilterFunction;
    4. import org.apache.flink.api.common.functions.FlatMapFunction;
    5. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    6. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    8. import org.apache.flink.util.Collector;
    9. /**
    10. * TODO 如果输入的数据是sensor_1,只打印vc;如果输入的数据是sensor_2,既打印ts又打印vc
    11. *
    12. * @author cjp
    13. * @version 1.0
    14. */
    15. public class FlatmapDemo {
    16. public static void main(String[] args) throws Exception {
    17. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    18. env.setParallelism(1);
    19. DataStreamSource sensorDS = env.fromElements(
    20. new WaterSensor("s1", 1L, 1),
    21. new WaterSensor("s1", 11L, 11),
    22. new WaterSensor("s2", 2L, 2),
    23. new WaterSensor("s3", 3L, 3)
    24. );
    25. /**
    26. * TODO flatmap: 一进多出(包含0出)
    27. * 对于s1的数据,一进一出
    28. * 对于s2的数据,一进2出
    29. * 对于s3的数据,一进0出(类似于过滤的效果)
    30. *
    31. * map怎么控制一进一出:
    32. * =》 使用 return
    33. *
    34. * flatmap怎么控制的一进多出
    35. * =》 通过 Collector来输出, 调用几次就输出几条
    36. *
    37. *
    38. */
    39. SingleOutputStreamOperator flatmap = sensorDS.flatMap(new FlatMapFunction() {
    40. @Override
    41. public void flatMap(WaterSensor value, Collector out) throws Exception {
    42. if ("s1".equals(value.getId())) {
    43. // 如果是 s1,输出 vc
    44. out.collect(value.getVc().toString());
    45. } else if ("s2".equals(value.getId())) {
    46. // 如果是 s2,分别输出ts和vc
    47. out.collect(value.getTs().toString());
    48. out.collect(value.getVc().toString());
    49. }
    50. }
    51. });
    52. flatmap.print();
    53. env.execute();
    54. }
    55. }

    5.3.2 聚合算子(Aggregation

    计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并——这就是所谓的“聚合”(Aggregation),类似于MapReduce中的reduce操作。

    5.3.2.1 按键分区(keyBy)

    对于Flink而言,DataStream是没有直接进行聚合的API的。因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。所以在Flink中,要做聚合,需要先进行分区;这个操作就是通过keyBy来完成的。

    keyBy是聚合前必须要用到的一个算子。keyBy通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务。

    基于不同的key,流中的数据将被分配到不同的分区中去;这样一来,所有具有相同的key的数据,都将被发往同一个分区。

    在内部,是通过计算key的哈希值(hash code对分区数进行取模运算来实现的。所以这里key如果是POJO的话,必须要重写hashCode()方法。

    keyBy()方法需要传入一个参数,这个参数指定了一个或一组key。有很多不同的方法来指定key:比如对于Tuple数据类型,可以指定字段的位置或者多个位置的组合;对于POJO类型,可以指定字段的名称(String);另外,还可以传入Lambda表达式或者实现一个键选择器(KeySelector),用于说明从数据中提取key的逻辑。

    我们可以以id作为key做一个分区操作,代码实现如下:

    1. package com.atguigu.aggreagte;
    2. import com.atguigu.bean.WaterSensor;
    3. import org.apache.flink.api.common.functions.FlatMapFunction;
    4. import org.apache.flink.api.java.functions.KeySelector;
    5. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    6. import org.apache.flink.streaming.api.datastream.KeyedStream;
    7. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    9. import org.apache.flink.util.Collector;
    10. /**
    11. * TODO
    12. *
    13. * @author cjp
    14. * @version 1.0
    15. */
    16. public class KeybyDemo {
    17. public static void main(String[] args) throws Exception {
    18. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    19. env.setParallelism(2);
    20. DataStreamSource sensorDS = env.fromElements(
    21. new WaterSensor("s1", 1L, 1),
    22. new WaterSensor("s1", 11L, 11),
    23. new WaterSensor("s2", 2L, 2),
    24. new WaterSensor("s3", 3L, 3)
    25. );
    26. // 按照 id 分组
    27. /**
    28. * TODO keyby: 按照id分组
    29. * 要点:
    30. * 1、返回的是 一个 KeyedStream,键控流
    31. * 2、keyby不是 转换算子, 只是对数据进行重分区, 不能设置并行度
    32. * 3、分组 与 分区 的关系:
    33. * 1) keyby是对数据分组,保证 相同key的数据 在同一个分区(子任务)
    34. * 2) 分区: 一个子任务可以理解为一个分区,一个分区(子任务)中可以存在多个分组(key)
    35. */
    36. KeyedStream sensorKS = sensorDS
    37. .keyBy(new KeySelector() {
    38. @Override
    39. public String getKey(WaterSensor value) throws Exception {
    40. return value.getId();
    41. }
    42. });
    43. sensorKS.print();
    44. env.execute();
    45. }
    46. }

    需要注意的是,keyBy得到的结果将不再是DataStream,而是会将DataStream转换为KeyedStream。KeyedStream可以认为是“分区流”或者“键控流”,它是对DataStream按照key的一个逻辑分区,所以泛型有两个类型:除去当前流中的元素类型外,还需要指定key的类型。

    KeyedStream也继承自DataStream,所以基于它的操作也都归属于DataStream API。但它跟之前的转换操作得到的SingleOutputStreamOperator不同,只是一个流的分区操作,并不是一个转换算子。KeyedStream是一个非常重要的数据结构,只有基于它才可以做后续的聚合操作(比如sum,reduce)。

    5.3.2.2 简单聚合(sum/min/max/minBy/maxBy)

    有了按键分区的数据流KeyedStream,我们就可以基于它进行聚合操作了。Flink为我们内置实现了一些最基本、最简单的聚合API,主要有以下几种:

    1. sum():在输入流上,对指定的字段做叠加求和的操作。
    2. min():在输入流上,对指定的字段求最小值。
    3. max():在输入流上,对指定的字段求最大值。
    4. minBy():与min()类似,在输入流上针对指定字段求最小值。不同的是,min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而minBy()则会返回包含字段最小值的整条数据。
    5. maxBy():与max()类似,在输入流上针对指定字段求最大值。两者区别与min()/minBy()完全一致。

    简单聚合算子使用非常方便,语义也非常明确。这些聚合方法调用时,也需要传入参数;但并不像基本转换算子那样需要实现自定义函数,只要说明聚合指定的字段就可以了。指定字段的方式有两种:指定位置,和指定名称

    对于元组类型的数据,可以使用这两种方式来指定字段。需要注意的是,元组中字段的名称,是以f0、f1、f2、…来命名的。

    如果数据流的类型是POJO类,那么就只能通过字段名称来指定,不能通过位置来指定了。

    1. package com.atguigu.aggreagte;
    2. import com.atguigu.bean.WaterSensor;
    3. import org.apache.flink.api.java.functions.KeySelector;
    4. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    5. import org.apache.flink.streaming.api.datastream.KeyedStream;
    6. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    8. /**
    9. * TODO
    10. *
    11. * @author cjp
    12. * @version 1.0
    13. */
    14. public class SimpleAggregateDemo {
    15. public static void main(String[] args) throws Exception {
    16. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    17. env.setParallelism(1);
    18. DataStreamSource sensorDS = env.fromElements(
    19. new WaterSensor("s1", 1L, 1),
    20. new WaterSensor("s1", 11L, 11),
    21. new WaterSensor("s2", 2L, 2),
    22. new WaterSensor("s3", 3L, 3)
    23. );
    24. KeyedStream sensorKS = sensorDS
    25. .keyBy(new KeySelector() {
    26. @Override
    27. public String getKey(WaterSensor value) throws Exception {
    28. return value.getId();
    29. }
    30. });
    31. /**
    32. * TODO 简单聚合算子
    33. * 1、 keyby之后才能调用
    34. * 2、 分组内的聚合:对同一个key的数据进行聚合
    35. */
    36. // 传位置索引的,适用于 Tuple类型,POJO不行
    37. // SingleOutputStreamOperator result = sensorKS.sum(2);
    38. // SingleOutputStreamOperator result = sensorKS.sum("vc");
    39. /**
    40. * max\maxby的区别: 同min
    41. * max:只会取比较字段的最大值,非比较字段保留第一次的值
    42. * maxby:取比较字段的最大值,同时非比较字段 取 最大值这条数据的值
    43. */
    44. // SingleOutputStreamOperator result = sensorKS.max("vc");
    45. // SingleOutputStreamOperator result = sensorKS.min("vc");
    46. SingleOutputStreamOperator result = sensorKS.maxBy("vc");
    47. // SingleOutputStreamOperator result = sensorKS.minby("vc");
    48. result.print();
    49. env.execute();
    50. }
    51. }

     

    简单聚合算子返回的,同样是一个SingleOutputStreamOperator,也就是从KeyedStream又转换成了常规的DataStream。所以可以这样理解:keyBy和聚合是成对出现的,先分区、后聚合,得到的依然是一个DataStream。而且经过简单聚合之后的数据流,元素的数据类型保持不变。

    一个聚合算子,会为每一个key保存一个聚合的值,在Flink中我们把它叫作“状态”(state)。所以每当有一个新的数据输入,算子就会更新保存的聚合结果,并发送一个带有更新后聚合值的事件到下游算子。对于无界流来说,这些状态是永远不会被清除的,所以我们使用聚合算子,应该只用在含有有限个key的数据流上。

    5.3.2.3 归约聚合(reduce)

    reduce可以对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值,再做一个聚合计算。

    reduce操作也会将KeyedStream转换为DataStream。它不会改变流的元素数据类型,所以输出类型和输入类型是一样的。

    1. package com.atguigu.aggreagte;
    2. import com.atguigu.bean.WaterSensor;
    3. import org.apache.flink.api.common.functions.ReduceFunction;
    4. import org.apache.flink.api.java.functions.KeySelector;
    5. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    6. import org.apache.flink.streaming.api.datastream.KeyedStream;
    7. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    9. /**
    10. * TODO
    11. *
    12. * @author cjp
    13. * @version 1.0
    14. */
    15. public class ReduceDemo {
    16. public static void main(String[] args) throws Exception {
    17. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    18. env.setParallelism(1);
    19. DataStreamSource sensorDS = env.fromElements(
    20. new WaterSensor("s1", 1L, 1),
    21. new WaterSensor("s1", 11L, 11),
    22. new WaterSensor("s1", 21L, 21),
    23. new WaterSensor("s2", 2L, 2),
    24. new WaterSensor("s3", 3L, 3)
    25. );
    26. KeyedStream sensorKS = sensorDS
    27. .keyBy(new KeySelector() {
    28. @Override
    29. public String getKey(WaterSensor value) throws Exception {
    30. return value.getId();
    31. }
    32. });
    33. /**
    34. * TODO reduce:
    35. * 1、keyby之后调用
    36. * 2、输入类型 = 输出类型,类型不能变
    37. * 3、每个key的第一条数据来的时候,不会执行reduce方法,存起来,直接输出
    38. * 4、reduce方法中的两个参数
    39. * value1: 之前的计算结果,存状态
    40. * value2: 现在来的数据
    41. */
    42. SingleOutputStreamOperator reduce = sensorKS.reduce(new ReduceFunction() {
    43. @Override
    44. public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
    45. System.out.println("value1=" + value1);
    46. System.out.println("value2=" + value2);
    47. return new WaterSensor(value1.id, value2.ts, value1.vc + value2.vc);
    48. }
    49. });
    50. reduce.print();
    51. env.execute();
    52. }
    53. }

     

    调用KeyedStream的reduce方法时,需要传入一个参数,实现ReduceFunction接口。接口在源码中的定义如下:

    1. public interface ReduceFunction extends Function, Serializable {
    2. T reduce(T value1, T value2) throws Exception;
    3. }

    ReduceFunction接口里需要实现reduce()方法,这个方法接收两个输入事件,经过转换处理之后输出一个相同类型的事件。在流处理的底层实现过程中,实际上是将中间“合并的结果”作为任务的一个状态保存起来的;之后每来一个新的数据,就和之前的聚合状态进一步做归约。

    我们可以单独定义一个函数类实现ReduceFunction接口,也可以直接传入一个匿名类。当然,同样也可以通过传入Lambda表达式实现类似的功能

    为了方便后续使用,定义一个WaterSensorMapFunction:

    1. public class WaterSensorMapFunction implements MapFunction {
    2. @Override
    3. public WaterSensor map(String value) throws Exception {
    4. String[] datas = value.split(",");
    5. return new WaterSensor(datas[0],Long.valueOf(datas[1]) ,Integer.valueOf(datas[2]) );
    6. }
    7. }

    案例:使用reduce实现max和maxBy的功能。

    1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    2. env
    3. .socketTextStream("hadoop102", 7777)
    4. .map(new WaterSensorMapFunction())
    5. .keyBy(WaterSensor::getId)
    6. .reduce(new ReduceFunction()
    7. {
    8. @Override
    9. public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
    10. System.out.println("Demo7_Reduce.reduce");
    11. int maxVc = Math.max(value1.getVc(), value2.getVc());
    12. //实现max(vc)的效果 取最大值,其他字段以当前组的第一个为主
    13. //value1.setVc(maxVc);
    14. //实现maxBy(vc)的效果 取当前最大值的所有字段
    15. if (value1.getVc() > value2.getVc()){
    16. value1.setVc(maxVc);
    17. return value1;
    18. }else {
    19. value2.setVc(maxVc);
    20. return value2;
    21. }
    22. }
    23. })
    24. .print();
    25. env.execute();

    reduce同简单聚合算子一样,也要针对每一个key保存状态。因为状态不会清空,所以我们需要将reduce算子作用在一个有限key的流上。

    5.3.3 用户自定义函数(UDF

    用户自定义函数(user-defined function,UDF),即用户可以根据自身需求,重新实现算子的逻辑。

    用户自定义函数分为:函数类、匿名函数、富函数类。

    5.3.3.1 函数类(Function Classes)

    Flink暴露了所有UDF函数的接口,具体实现方式为接口或者抽象类,例如MapFunction、FilterFunction、ReduceFunction等。所以用户可以自定义一个函数类,实现对应的接口。

    需求:用来从用户的点击数据中筛选包含“sensor_1”的内容:

    方式一:实现FilterFunction接口

    1. public class TransFunctionUDF {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. DataStreamSource stream = env.fromElements(
    5. new WaterSensor("sensor_1", 1, 1),
    6. new WaterSensor("sensor_1", 2, 2),
    7. new WaterSensor("sensor_2", 2, 2),
    8. new WaterSensor("sensor_3", 3, 3)
    9. );
    10. DataStream filter = stream.filter(new UserFilter());
    11. filter.print();
    12. env.execute();
    13. }
    14. public static class UserFilter implements FilterFunction {
    15. @Override
    16. public boolean filter(WaterSensor e) throws Exception {
    17. return e.id.equals("sensor_1");
    18. }
    19. }
    20. }

    方式二:通过匿名类来实现FilterFunction接口:

    1. DataStream stream = stream.filter(new FilterFunction< WaterSensor>() {
    2. @Override
    3. public boolean filter(WaterSensor e) throws Exception {
    4. return e.id.equals("sensor_1");
    5. }
    6. });

    方式二的优化:为了类可以更加通用,我们还可以将用于过滤的关键字"home"抽象出来作为类的属性,调用构造方法时传进去。

    1. DataStreamSource stream = env.fromElements(
    2. new WaterSensor("sensor_1", 1, 1),
    3. new WaterSensor("sensor_1", 2, 2),
    4. new WaterSensor("sensor_2", 2, 2),
    5. new WaterSensor("sensor_3", 3, 3)
    6. );
    7. DataStream stream = stream.filter(new FilterFunctionImpl("sensor_1"));
    8. public static class FilterFunctionImpl implements FilterFunction {
    9. private String id;
    10. FilterFunctionImpl(String id) { this.id=id; }
    11. @Override
    12. public boolean filter(WaterSensor value) throws Exception {
    13. return thid.id.equals(value.id);
    14. }
    15. }

    方式三:采用匿名函数(Lambda)

    1. public class TransFunctionUDF {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. DataStreamSource stream = env.fromElements(
    5. new WaterSensor("sensor_1", 1, 1),
    6. new WaterSensor("sensor_1", 2, 2),
    7. new WaterSensor("sensor_2", 2, 2),
    8. new WaterSensor("sensor_3", 3, 3)
    9. );
    10. //map函数使用Lambda表达式,不需要进行类型声明
    11. SingleOutputStreamOperator filter = stream.filter(sensor -> "sensor_1".equals(sensor.id));
    12. filter.print();
    13. env.execute();
    14. }
    15. }
    5.3.3.2 富函数类(Rich Function Classes)

    “富函数类”也是DataStream API提供的一个函数类的接口,所有的Flink函数类都有其Rich版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction等。

    与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

    Rich Function有生命周期的概念。典型的生命周期方法有:

    1. open()方法,是Rich Function的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如map()或者filter()方法被调用之前,open()会首先被调用。
    2. close()方法,是生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。

    需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如RichMapFunction中的map(),在每条数据到来后都会触发一次调用。

    来看一个例子说明:

    1. package com.atguigu.transfrom;
    2. import org.apache.flink.api.common.functions.RichMapFunction;
    3. import org.apache.flink.api.common.functions.RuntimeContext;
    4. import org.apache.flink.configuration.Configuration;
    5. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    6. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    8. /**
    9. * TODO
    10. *
    11. * @author cjp
    12. * @version 1.0
    13. */
    14. public class RichFunctionDemo {
    15. public static void main(String[] args) throws Exception {
    16. // StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    17. StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
    18. env.setParallelism(2);
    19. DataStreamSource source = env.socketTextStream("hadoop102", 7777);
    20. SingleOutputStreamOperator map = source.map(new RichMapFunction() {
    21. @Override
    22. public void open(Configuration parameters) throws Exception {
    23. super.open(parameters);
    24. System.out.println(
    25. "子任务编号=" + getRuntimeContext().getIndexOfThisSubtask()
    26. + ",子任务名称=" + getRuntimeContext().getTaskNameWithSubtasks()
    27. + ",调用open()");
    28. }
    29. @Override
    30. public void close() throws Exception {
    31. super.close();
    32. System.out.println(
    33. "子任务编号=" + getRuntimeContext().getIndexOfThisSubtask()
    34. + ",子任务名称=" + getRuntimeContext().getTaskNameWithSubtasks()
    35. + ",调用close()");
    36. }
    37. @Override
    38. public Integer map(String value) throws Exception {
    39. return Integer.parseInt(value) + 1;
    40. }
    41. });
    42. /**
    43. * TODO RichXXXFunction: 富函数
    44. * 1、多了生命周期管理方法:
    45. * open(): 每个子任务,在启动时,调用一次
    46. * close():每个子任务,在结束时,调用一次
    47. * => 如果是flink程序异常挂掉,不会调用close
    48. * => 如果是正常调用 cancel命令,可以close
    49. * 2、多了一个 运行时上下文
    50. * 可以获取一些运行时的环境信息,比如 子任务编号、名称、其他的.....
    51. */
    52. // DataStreamSource source = env.fromElements(1, 2, 3, 4);
    53. // SingleOutputStreamOperator map = source.map(new RichMapFunction() {
    54. //
    55. // @Override
    56. // public void open(Configuration parameters) throws Exception {
    57. // super.open(parameters);
    58. // System.out.println(
    59. // "子任务编号="+getRuntimeContext().getIndexOfThisSubtask()
    60. // +",子任务名称="+getRuntimeContext().getTaskNameWithSubtasks()
    61. // +",调用open()");
    62. // }
    63. //
    64. // @Override
    65. // public void close() throws Exception {
    66. // super.close();
    67. // System.out.println(
    68. // "子任务编号="+getRuntimeContext().getIndexOfThisSubtask()
    69. // +",子任务名称="+getRuntimeContext().getTaskNameWithSubtasks()
    70. // +",调用close()");
    71. // }
    72. //
    73. // @Override
    74. // public Integer map(Integer value) throws Exception {
    75. // return value + 1;
    76. // }
    77. // });
    78. map.print();
    79. env.execute();
    80. }
    81. }

    5.3.4 物理分区算子(Physical Partitioning

    常见的物理分区策略有:随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)和广播(Broadcast)。

    5.3.4.1 随机分区(shuffle)

    最简单的重分区方式就是直接“洗牌”。通过调用DataStream的.shuffle()方法,将数据随机地分配到下游算子的并行任务中去。

    随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地传递到下游任务分区。因为是完全随机的,所以对于同样的输入数据, 每次执行得到的结果也不会相同。

     

    经过随机分区之后,得到的依然是一个DataStream。

    我们可以做个简单测试:将数据读入之后直接打印到控制台,将输出的并行度设置为2,中间经历一次shuffle。执行多次,观察结果是否相同。

    1. public class ShuffleExample {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. env.setParallelism(2);
    5. DataStreamSource stream = env.socketTextStream("hadoop102", 7777);;
    6. stream.shuffle().print()
    7. env.execute();
    8. }
    9. }
    5.3.4.2 轮询分区(Round-Robin)

    轮询,简单来说就是“发牌”,按照先后顺序将数据做依次分发。通过调用DataStream的.rebalance()方法,就可以实现轮询重分区。rebalance使用的是Round-Robin负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去。

     stream.rebalance()

    5.3.4.3 重缩放分区(rescale)

    重缩放分区和轮询分区非常相似。当调用rescale()方法时,其实底层也是使用Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中。rescale的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌

     

     stream.rescale()

    5.3.4.4 广播(broadcast)

    这种方式其实不应该叫做“重分区”,因为经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用DataStream的broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去。

    stream.broadcast()

    5.3.4.5 全局分区(global)

    全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了1,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力。

    stream.global()

    5.3.4.6 自定义分区(Custom)

    当Flink提供的所有分区策略都不能满足用户的需求时,我们可以通过使用partitionCustom()方法来自定义分区策略。

    1)自定义分区器

    1. public class MyPartitioner implements Partitioner {
    2. @Override
    3. public int partition(String key, int numPartitions) {
    4. return Integer.parseInt(key) % numPartitions;
    5. }
    6. }

    2)使用自定义分区

    1. public class PartitionCustomDemo {
    2. public static void main(String[] args) throws Exception {
    3. // StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
    5. env.setParallelism(2);
    6. DataStreamSource socketDS = env.socketTextStream("hadoop102", 7777);
    7. DataStream myDS = socketDS
    8. .partitionCustom(
    9. new MyPartitioner(),
    10. value -> value);
    11. myDS.print();
    12. env.execute();
    13. }
    14. }

    5.3.5 分流

    所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。

    5.3.5.1 简单实现

    其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用.filter()方法进行筛选,就可以得到拆分之后的流了。

    案例需求:读取一个整数数字流,将数据流划分为奇数流和偶数流。

    代码实现:

    1. public class SplitStreamByFilter {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. SingleOutputStreamOperator ds = env.socketTextStream("hadoop102", 7777)
    5. .map(Integer::valueOf);
    6. //将ds 分为两个流 ,一个是奇数流,一个是偶数流
    7. //使用filter 过滤两次
    8. SingleOutputStreamOperator ds1 = ds.filter(x -> x % 2 == 0);
    9. SingleOutputStreamOperator ds2 = ds.filter(x -> x % 2 == 1);
    10. ds1.print("偶数");
    11. ds2.print("奇数");
    12. env.execute();
    13. }
    14. }

    这种实现非常简单,但代码显得有些冗余——我们的处理逻辑对拆分出的三条流其实是一样的,却重复写了三次。而且这段代码背后的含义,是将原始数据流stream复制三份然后对每一份分别做筛选;这明显是不够高效的。我们自然想到,能不能不用复制流,直接用一个算子就把它们都拆分开呢?

    5.3.5.2 使用侧输出流

    关于处理函数中侧输出流的用法,我们已经在7.5节做了详细介绍。简单来说,只需要调用上下文ctx的.output()方法,就可以输出任意类型的数据了。而侧输出流的标记和提取,都离不开一个“输出标签”(OutputTag),指定了侧输出流的id和类型。

    代码实现:将WaterSensor按照Id类型进行分流。

    1. /**
    2. * TODO
    3. *
    4. * @author cjp
    5. * @version 1.0
    6. */
    7. public class WaterSensorMapFunction implements MapFunction {
    8. @Override
    9. public WaterSensor map(String value) throws Exception {
    10. String[] datas = value.split(",");
    11. return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
    12. }
    13. }
    1. package com.atguigu.split;
    2. import com.atguigu.bean.WaterSensor;
    3. import com.atguigu.functions.WaterSensorMapFunction;
    4. import org.apache.flink.api.common.typeinfo.Types;
    5. import org.apache.flink.configuration.Configuration;
    6. import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
    7. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    9. import org.apache.flink.streaming.api.functions.ProcessFunction;
    10. import org.apache.flink.util.Collector;
    11. import org.apache.flink.util.OutputTag;
    12. /**
    13. * TODO 分流: 奇数、偶数拆分成不同流
    14. *
    15. * @author cjp
    16. * @version 1.0
    17. */
    18. public class SideOutputDemo {
    19. public static void main(String[] args) throws Exception {
    20. // StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    21. StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
    22. env.setParallelism(1);
    23. SingleOutputStreamOperator sensorDS = env
    24. .socketTextStream("hadoop102", 7777)
    25. .map(new WaterSensorMapFunction());
    26. /**
    27. * TODO 使用侧输出流 实现分流
    28. * 需求: watersensor的数据,s1、s2的数据分别分开
    29. *
    30. * TODO 总结步骤:
    31. * 1、使用 process算子
    32. * 2、定义 OutputTag对象
    33. * 3、调用 ctx.output
    34. * 4、通过主流 获取 测流
    35. */
    36. /**
    37. * 创建OutputTag对象
    38. * 第一个参数: 标签名
    39. * 第二个参数: 放入侧输出流中的 数据的 类型,Typeinformation
    40. */
    41. OutputTag s1Tag = new OutputTag<>("s1", Types.POJO(WaterSensor.class));
    42. OutputTag s2Tag = new OutputTag<>("s2", Types.POJO(WaterSensor.class));
    43. SingleOutputStreamOperator process = sensorDS
    44. .process(
    45. new ProcessFunction() {
    46. @Override
    47. public void processElement(WaterSensor value, Context ctx, Collector out) throws Exception {
    48. String id = value.getId();
    49. if ("s1".equals(id)) {
    50. // 如果是 s1,放到侧输出流s1中
    51. /**
    52. * 上下文ctx 调用ouput,将数据放入侧输出流
    53. * 第一个参数: Tag对象
    54. * 第二个参数: 放入侧输出流中的 数据
    55. */
    56. ctx.output(s1Tag, value);
    57. } else if ("s2".equals(id)) {
    58. // 如果是 s2,放到侧输出流s2中
    59. ctx.output(s2Tag, value);
    60. } else {
    61. // 非s1、s2的数据,放到主流中
    62. out.collect(value);
    63. }
    64. }
    65. }
    66. );
    67. // 从主流中,根据标签 获取 侧输出流
    68. SideOutputDataStream s1 = process.getSideOutput(s1Tag);
    69. SideOutputDataStream s2 = process.getSideOutput(s2Tag);
    70. // 打印主流
    71. process.print("主流-非s1、s2");
    72. //打印 侧输出流
    73. s1.printToErr("s1");
    74. s2.printToErr("s2");
    75. env.execute();
    76. }
    77. }
    78. /**
    79. */

    5.3.6 基本合流操作

    在实际应用中,我们经常会遇到来源不同的多条流,需要将它们的数据进行联合处理。所以Flink中合流的操作会更加普遍,对应的API也更加丰富。

    5.3.6.1 联合(Union)

    最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union)。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。

    在代码中,我们只要基于DataStream直接调用.union()方法,传入其他DataStream作为参数,就可以实现流的联合了;得到的依然是一个DataStream:

    stream1.union(stream2, stream3, ...)

    注意:union()的参数可以是多个DataStream,所以联合操作可以实现多条流的合并。

    代码实现:我们可以用下面的代码做一个简单测试:

    1. package com.atguigu.combine;
    2. import org.apache.flink.streaming.api.datastream.DataStream;
    3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    5. /**
    6. * TODO
    7. *
    8. * @author cjp
    9. * @version 1.0
    10. */
    11. public class UnionDemo {
    12. public static void main(String[] args) throws Exception {
    13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    14. env.setParallelism(1);
    15. DataStreamSource source1 = env.fromElements(1, 2, 3);
    16. DataStreamSource source2 = env.fromElements(11, 22, 33);
    17. DataStreamSource source3 = env.fromElements("111", "222", "333");
    18. /**
    19. * TODO union:合并数据流
    20. * 1、 流的数据类型必须一致
    21. * 2、 一次可以合并多条流
    22. */
    23. // DataStream union = source1.union(source2).union(source3.map(r -> Integer.valueOf(r)));
    24. DataStream union = source1.union(source2, source3.map(r -> Integer.valueOf(r)));
    25. union.print();
    26. env.execute();
    27. }
    28. }
    5.3.6.2 连接(Connect)

    流的联合虽然简单,不过受限于数据类型不能改变,灵活性大打折扣,所以实际应用较少出现。除了联合(union),Flink还提供了另外一种方便的合流操作——连接(connect

    1)连接流(ConnectedStreams)\

     代码实现:需要分为两步:首先基于一条DataStream调用.connect()方法,传入另外一条DataStream作为参数,将两条流连接起来,得到一个ConnectedStreams;然后再调用同处理方法得到DataStream。这里可以的调用的同处理方法有.map()/.flatMap(),以及.process()方法。

    1. public class ConnectDemo {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. env.setParallelism(1);
    5. // DataStreamSource source1 = env.fromElements(1, 2, 3);
    6. // DataStreamSource source2 = env.fromElements("a", "b", "c");
    7. SingleOutputStreamOperator source1 = env
    8. .socketTextStream("hadoop102", 7777)
    9. .map(i -> Integer.parseInt(i));
    10. DataStreamSource source2 = env.socketTextStream("hadoop102", 8888);
    11. /**
    12. * TODO 使用 connect 合流
    13. * 1、一次只能连接 2条流
    14. * 2、流的数据类型可以不一样
    15. * 3、 连接后可以调用 map、flatmap、process来处理,但是各处理各的
    16. */
    17. ConnectedStreams connect = source1.connect(source2);
    18. SingleOutputStreamOperator result = connect.map(new CoMapFunction() {
    19. @Override
    20. public String map1(Integer value) throws Exception {
    21. return "来源于数字流:" + value.toString();
    22. }
    23. @Override
    24. public String map2(String value) throws Exception {
    25. return "来源于字母流:" + value;
    26. }
    27. });
    28. result.print();
    29. env.execute(); }
    30. }

    上面的代码中,ConnectedStreams有两个类型参数,分别表示内部包含的两条流各自的数据类型;由于需要“一国两制”,因此调用.map()方法时传入的不再是一个简单的MapFunction,而是一个CoMapFunction,表示分别对两条流中的数据执行map操作。这个接口有三个类型参数,依次表示第一条流、第二条流,以及合并后的流中的数据类型。需要实现的方法也非常直白:.map1()就是对第一条流中数据的map操作,.map2()则是针对第二条流。

    2CoProcessFunction

    与CoMapFunction类似,如果是调用.map()就需要传入一个CoMapFunction,需要实现map1()、map2()两个方法;而调用.process()时,传入的则是一个CoProcessFunction。它也是“处理函数”家族中的一员,用法非常相似。它需要实现的就是processElement1()、processElement2()两个方法,在每个数据到来时,会根据来源的流调用其中的一个方法进行处理。

    值得一提的是,ConnectedStreams也可以直接调用.keyBy()进行按键分区的操作,得到的还是一个ConnectedStreams:

    connectedStreams.keyBy(keySelector1, keySelector2);

    这里传入两个参数keySelector1和keySelector2,是两条流中各自的键选择器;当然也可以直接传入键的位置值(keyPosition),或者键的字段名(field),这与普通的keyBy用法完全一致。ConnectedStreams进行keyBy操作,其实就是把两条流中key相同的数据放到了一起,然后针对来源的流再做各自处理,这在一些场景下非常有用。

    案例需求:连接两条流,输出能根据id匹配上的数据(类似inner join效果)

    1. package com.atguigu.combine;
    2. import org.apache.flink.api.java.tuple.Tuple2;
    3. import org.apache.flink.api.java.tuple.Tuple3;
    4. import org.apache.flink.streaming.api.datastream.ConnectedStreams;
    5. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    6. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    8. import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
    9. import org.apache.flink.util.Collector;
    10. import java.util.ArrayList;
    11. import java.util.HashMap;
    12. import java.util.List;
    13. import java.util.Map;
    14. /**
    15. * TODO 连接两条流,输出能根据id匹配上的数据(类似inner join效果)
    16. *
    17. * @author cjp
    18. * @version 1.0
    19. */
    20. public class ConnectKeybyDemo {
    21. public static void main(String[] args) throws Exception {
    22. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    23. env.setParallelism(2);
    24. DataStreamSource> source1 = env.fromElements(
    25. Tuple2.of(1, "a1"),
    26. Tuple2.of(1, "a2"),
    27. Tuple2.of(2, "b"),
    28. Tuple2.of(3, "c")
    29. );
    30. DataStreamSource> source2 = env.fromElements(
    31. Tuple3.of(1, "aa1", 1),
    32. Tuple3.of(1, "aa2", 2),
    33. Tuple3.of(2, "bb", 1),
    34. Tuple3.of(3, "cc", 1)
    35. );
    36. ConnectedStreams, Tuple3> connect = source1.connect(source2);
    37. // 多并行度下,需要根据 关联条件进行 keyby,才能保证 key相同的数据到一起去,才能匹配上
    38. ConnectedStreams, Tuple3> connectKeyby = connect.keyBy(s1 -> s1.f0, s2 -> s2.f0);
    39. /**
    40. * 实现互相匹配的效果: 两条流,,不一定谁的数据先来
    41. * 1、每条流,有数据来,存到一个变量中
    42. * hashmap
    43. * =》key=id,第一个字段值
    44. * =》value=List<数据>
    45. * 2、每条流有数据来的时候,除了存变量中, 不知道对方是否有匹配的数据,要去另一条流存的变量中 查找是否有匹配上的
    46. */
    47. SingleOutputStreamOperator process = connectKeyby.process(
    48. new CoProcessFunction, Tuple3, String>() {
    49. // 每条流定义一个hashmap,用来存数据
    50. Map>> s1Cache = new HashMap<>();
    51. Map>> s2Cache = new HashMap<>();
    52. /**
    53. * 第一条流的处理逻辑
    54. * @param value 第一条流的数据
    55. * @param ctx 上下文
    56. * @param out 采集器
    57. * @throws Exception
    58. */
    59. @Override
    60. public void processElement1(Tuple2 value, Context ctx, Collector out) throws Exception {
    61. Integer id = value.f0;
    62. // TODO 1. s1的数据来了,就存到变量中
    63. if (!s1Cache.containsKey(id)) {
    64. // 1.1 如果key不存在,说明是该key的第一条数据,初始化,put进map中
    65. List> s1Values = new ArrayList<>();
    66. s1Values.add(value);
    67. s1Cache.put(id, s1Values);
    68. } else {
    69. // 1.2 key存在,不是该key的第一条数据,直接添加到 value的list中
    70. s1Cache.get(id).add(value);
    71. }
    72. // TODO 2.去 s2Cache中查找是否有id能匹配上的,匹配上就输出,没有就不输出
    73. if (s2Cache.containsKey(id)) {
    74. for (Tuple3 s2Element : s2Cache.get(id)) {
    75. out.collect("s1:" + value + "<========>" + "s2:" + s2Element);
    76. }
    77. }
    78. }
    79. /**
    80. * 第二条流的处理逻辑
    81. * @param value 第二条流的数据
    82. * @param ctx 上下文
    83. * @param out 采集器
    84. * @throws Exception
    85. */
    86. @Override
    87. public void processElement2(Tuple3 value, Context ctx, Collector out) throws Exception {
    88. Integer id = value.f0;
    89. // TODO 1. s2的数据来了,就存到变量中
    90. if (!s2Cache.containsKey(id)) {
    91. // 1.1 如果key不存在,说明是该key的第一条数据,初始化,put进map中
    92. List> s2Values = new ArrayList<>();
    93. s2Values.add(value);
    94. s2Cache.put(id, s2Values);
    95. } else {
    96. // 1.2 key存在,不是该key的第一条数据,直接添加到 value的list中
    97. s2Cache.get(id).add(value);
    98. }
    99. // TODO 2.去 s1Cache中查找是否有id能匹配上的,匹配上就输出,没有就不输出
    100. if (s1Cache.containsKey(id)) {
    101. for (Tuple2 s1Element : s1Cache.get(id)) {
    102. out.collect("s1:" + s1Element + "<========>" + "s2:" + value);
    103. }
    104. }
    105. }
    106. }
    107. );
    108. process.print();
    109. env.execute();
    110. }
    111. }

    5.4 输出算子(Sink

    Flink作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持。

     

    5.4.1 连接到外部系统

    Flink的DataStream API专门提供了向外部写入数据的方法:addSink。与addSource类似,addSink方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。

    Flink1.12以前,Sink算子的创建是通过调用DataStream的.addSink()方法实现的。

    stream.addSink(new SinkFunction(…));

    addSink方法同样需要传入一个参数,实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。

    Flink1.12开始,同样重构了Sink架构,

    stream.sinkTo(…)

    当然,Sink多数情况下同样并不需要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink,它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。如下图所示,列出了Flink官方目前支持的第三方系统连接器:

    我们可以看到,像Kafka之类流式系统,Flink提供了完美对接,source/sink两端都能连接,可读可写;而对于Elasticsearch、JDBC等数据存储系统,则只提供了输出写入的sink连接器。

    除Flink官方之外,Apache Bahir框架,也实现了一些其他第三方系统与Flink的连接器。

    5.4.2 输出到文件

    Flink专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。

    FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用FileSink的静态方法:

    1. 行编码: FileSink.forRowFormat(basePath,rowEncoder)。
    2. 批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)。

    示例:

    1. package com.atguigu.sink;
    2. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    3. import org.apache.flink.api.common.serialization.SimpleStringEncoder;
    4. import org.apache.flink.api.common.typeinfo.Types;
    5. import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
    6. import org.apache.flink.configuration.MemorySize;
    7. import org.apache.flink.connector.datagen.source.DataGeneratorSource;
    8. import org.apache.flink.connector.datagen.source.GeneratorFunction;
    9. import org.apache.flink.connector.file.sink.FileSink;
    10. import org.apache.flink.core.fs.Path;
    11. import org.apache.flink.streaming.api.CheckpointingMode;
    12. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    13. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    14. import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
    15. import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
    16. import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
    17. import java.time.Duration;
    18. import java.time.ZoneId;
    19. import java.util.TimeZone;
    20. /**
    21. * TODO
    22. *
    23. * @author cjp
    24. * @version 1.0
    25. */
    26. public class SinkFile {
    27. public static void main(String[] args) throws Exception {
    28. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    29. // TODO 每个目录中,都有 并行度个数的 文件在写入
    30. env.setParallelism(2);
    31. // 必须开启checkpoint,否则一直都是 .inprogress
    32. env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
    33. DataGeneratorSource dataGeneratorSource = new DataGeneratorSource<>(
    34. new GeneratorFunction() {
    35. @Override
    36. public String map(Long value) throws Exception {
    37. return "Number:" + value;
    38. }
    39. },
    40. Long.MAX_VALUE,
    41. RateLimiterStrategy.perSecond(1000),
    42. Types.STRING
    43. );
    44. DataStreamSource dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");
    45. // TODO 输出到文件系统
    46. FileSink fieSink = FileSink
    47. // 输出行式存储的文件,指定路径、指定编码
    48. .forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8"))
    49. // 输出文件的一些配置: 文件名的前缀、后缀
    50. .withOutputFileConfig(
    51. OutputFileConfig.builder()
    52. .withPartPrefix("atguigu-")
    53. .withPartSuffix(".log")
    54. .build()
    55. )
    56. // 按照目录分桶:如下,就是每个小时一个目录
    57. .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
    58. // 文件滚动策略: 1分钟 或 1m
    59. .withRollingPolicy(
    60. DefaultRollingPolicy.builder()
    61. .withRolloverInterval(Duration.ofMinutes(1))
    62. .withMaxPartSize(new MemorySize(1024*1024))
    63. .build()
    64. )
    65. .build();
    66. dataGen.sinkTo(fieSink);
    67. env.execute();
    68. }
    69. }

    5.4.3 输出到Kafka

    (1)添加Kafka 连接器依赖

    由于我们已经测试过从Kafka数据源读取数据,连接器相关依赖已经引入,这里就不重复介绍了。

    (2)启动Kafka集群

    (3)编写输出到Kafka的示例代码

    输出无key的record:

    1. public class SinkKafka {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. env.setParallelism(1);
    5. // 如果是精准一次,必须开启checkpoint(后续章节介绍)
    6. env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
    7. SingleOutputStreamOperator sensorDS = env
    8. .socketTextStream("hadoop102", 7777);
    9. /**
    10. * Kafka Sink:
    11. * TODO 注意:如果要使用 精准一次 写入Kafka,需要满足以下条件,缺一不可
    12. * 1、开启checkpoint(后续介绍)
    13. * 2、设置事务前缀
    14. * 3、设置事务超时时间: checkpoint间隔 < 事务超时时间 < max的15分钟
    15. */
    16. KafkaSink kafkaSink = KafkaSink.builder()
    17. // 指定 kafka 的地址和端口
    18. .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
    19. // 指定序列化器:指定Topic名称、具体的序列化
    20. .setRecordSerializer(
    21. KafkaRecordSerializationSchema.builder()
    22. .setTopic("ws")
    23. .setValueSerializationSchema(new SimpleStringSchema())
    24. .build()
    25. )
    26. // 写到kafka的一致性级别: 精准一次、至少一次
    27. .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    28. // 如果是精准一次,必须设置 事务的前缀
    29. .setTransactionalIdPrefix("atguigu-")
    30. // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟
    31. .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"")
    32. .build();
    33. sensorDS.sinkTo(kafkaSink);
    34. env.execute();
    35. }
    36. }

    自定义序列化器,实现带key的record:

    1. public class SinkKafkaWithKey {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. env.setParallelism(1);
    5. env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
    6. env.setRestartStrategy(RestartStrategies.noRestart());
    7. SingleOutputStreamOperator sensorDS = env
    8. .socketTextStream("hadoop102", 7777);
    9. /**
    10. * 如果要指定写入kafka的key,可以自定义序列化器:
    11. * 1、实现 一个接口,重写 序列化 方法
    12. * 2、指定key,转成 字节数组
    13. * 3、指定value,转成 字节数组
    14. * 4、返回一个 ProducerRecord对象,把key、value放进去
    15. */
    16. KafkaSink kafkaSink = KafkaSink.builder()
    17. .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
    18. .setRecordSerializer(
    19. new KafkaRecordSerializationSchema() {
    20. @Nullable
    21. @Override
    22. public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
    23. String[] datas = element.split(",");
    24. byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
    25. byte[] value = element.getBytes(StandardCharsets.UTF_8);
    26. return new ProducerRecord<>("ws", key, value);
    27. }
    28. }
    29. )
    30. .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    31. .setTransactionalIdPrefix("atguigu-")
    32. .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
    33. .build();
    34. sensorDS.sinkTo(kafkaSink);
    35. env.execute();
    36. }
    37. }

    (4)运行代码,在Linux主机启动一个消费者,查看是否收到数据

    [atguigu@hadoop102 ~]$

    bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ws

    5.4.4 输出到MySQLJDBC

    写入数据的MySQL的测试步骤如下。

    (1)添加依赖

    添加MySQL驱动:

    1. <dependency>
    2. <groupId>mysqlgroupId>
    3. <artifactId>mysql-connector-javaartifactId>
    4. <version>8.0.27version>
    5. dependency>

     官方 已经 提供flink-connector-jdbc的1.17.0的正式依赖

    1. <dependency>
    2. <groupId>org.apache.flinkgroupId>
    3. <artifactId>flink-connector-jdbcartifactId>
    4. <version>3.1.1-1.17version>
    5. dependency>

    (2)启动MySQL,在test库下建表ws

    mysql>    

    CREATE TABLE `ws` (

      `id` varchar(100) NOT NULL,

      `ts` bigint(20) DEFAULT NULL,

      `vc` int(11) DEFAULT NULL,

      PRIMARY KEY (`id`)

    ) ENGINE=InnoDB DEFAULT CHARSET=utf8

    (3)编写输出到MySQL的示例代码

    1. public class SinkMySQL {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. env.setParallelism(1);
    5. SingleOutputStreamOperator sensorDS = env
    6. .socketTextStream("hadoop102", 7777)
    7. .map(new WaterSensorMapFunction());
    8. /**
    9. * TODO 写入mysql
    10. * 1、只能用老的sink写法: addsink
    11. * 2、JDBCSink的4个参数:
    12. * 第一个参数: 执行的sql,一般就是 insert into
    13. * 第二个参数: 预编译sql, 对占位符填充值
    14. * 第三个参数: 执行选项 ---》 攒批、重试
    15. * 第四个参数: 连接选项 ---》 url、用户名、密码
    16. */
    17. SinkFunction jdbcSink = JdbcSink.sink(
    18. "insert into ws values(?,?,?)",
    19. new JdbcStatementBuilder() {
    20. @Override
    21. public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
    22. //每收到一条WaterSensor,如何去填充占位符
    23. preparedStatement.setString(1, waterSensor.getId());
    24. preparedStatement.setLong(2, waterSensor.getTs());
    25. preparedStatement.setInt(3, waterSensor.getVc());
    26. }
    27. },
    28. JdbcExecutionOptions.builder()
    29. .withMaxRetries(3) // 重试次数
    30. .withBatchSize(100) // 批次的大小:条数
    31. .withBatchIntervalMs(3000) // 批次的时间
    32. .build(),
    33. new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
    34. .withUrl("jdbc:mysql://hadoop102:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
    35. .withUsername("root")
    36. .withPassword("000000")
    37. .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
    38. .build()
    39. );
    40. sensorDS.addSink(jdbcSink);
    41. env.execute();
    42. }
    43. }
    1. 运行代码,用客户端连接MySQL,查看是否成功写入数据。

    5.4.5 自定义Sink输出

    如果我们想将数据存储到我们自己的存储设备中,而Flink并没有提供可以直接使用的连接器,就只能自定义Sink进行输出了。与Source类似,Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。

    stream.addSink(new MySinkFunction());

    在实现SinkFunction的时候,需要重写的一个关键方法invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。

    这种方式比较通用,对于任何外部存储系统都有效;不过自定义Sink想要实现状态一致性并不容易,所以一般只在没有其它选择时使用。实际项目中用到的外部连接器Flink官方基本都已实现,而且在不断地扩充,因此自定义的场景并不常见。

    1. package com.atguigu.sink;
    2. import org.apache.flink.configuration.Configuration;
    3. import org.apache.flink.streaming.api.CheckpointingMode;
    4. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    6. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    7. import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    8. import java.sql.Connection;
    9. /**
    10. * TODO
    11. *
    12. * @author cjp
    13. * @version 1.0
    14. */
    15. public class SinkCustom {
    16. public static void main(String[] args) throws Exception {
    17. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    18. env.setParallelism(1);
    19. SingleOutputStreamOperator sensorDS = env
    20. .socketTextStream("hadoop102", 7777);
    21. sensorDS.addSink(new MySink());
    22. env.execute();
    23. }
    24. public static class MySink extends RichSinkFunction {
    25. Connection conn = null;
    26. @Override
    27. public void open(Configuration parameters) throws Exception {
    28. super.open(parameters);
    29. // 在这里 创建连接
    30. // conn = new xxxx
    31. }
    32. @Override
    33. public void close() throws Exception {
    34. super.close();
    35. // 做一些清理、销毁连接
    36. }
    37. /**
    38. * sink的核心逻辑,写出的逻辑就写在这个方法里
    39. * @param value
    40. * @param context
    41. * @throws Exception
    42. */
    43. @Override
    44. public void invoke(String value, Context context) throws Exception {
    45. // 写出逻辑
    46. // 这个方法是 来一条数据,调用一次,所以不要在这里创建 连接对象
    47. }
    48. }
    49. }

  • 相关阅读:
    拿来就能用的前端酷炫登录注册模板
    K8S 二进制部署
    通过js来实现用身份证号来判断性别和出生年月
    【vue3页面展示代码】展示代码codemirror插件
    Java基础学习四---数组基础和排序算法
    http和https区别与上网过程
    NPI加速器在烽火科技SMT车间的应用:贴片机程序制作效率的革新
    把key-value键值对的数据转换成xml格式,并根据key来匹配元素
    Ubuntu 18.04安装最新版Visual Studio Code(VS Code)报依赖库版本过低错误
    无线Mesh自组网方案,CV5200无线模组应用,支持高清数据远距离传输
  • 原文地址:https://blog.csdn.net/yuzheh521/article/details/132962788