DataStream API是Flink的核心层API。一个Flink程序,其实就是对DataStream的各种转换。具体来说,代码基本上都由以下几部分构成:

- package com.atguigu.env;
-
- import org.apache.flink.api.common.JobExecutionResult;
- import org.apache.flink.api.common.RuntimeExecutionMode;
- import org.apache.flink.api.common.typeinfo.Types;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.configuration.RestOptions;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.util.Collector;
-
- /**
- * TODO
- *
- * @author cjp
- * @version 1.0
- */
- public class EnvDemo {
- public static void main(String[] args) throws Exception {
-
- Configuration conf = new Configuration();
- conf.set(RestOptions.BIND_PORT, "8082");
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment
- // .getExecutionEnvironment(); // 自动识别是 远程集群 ,还是idea本地环境
- .getExecutionEnvironment(conf); // conf对象可以去修改一些参数
-
- // .createLocalEnvironment()
- // .createRemoteEnvironment("hadoop102", 8081,"/xxx")
-
- // 流批一体:代码api是同一套,可以指定为 批,也可以指定为 流
- // 默认 STREAMING
- // 一般不在代码写死,提交时 参数指定:-Dexecution.runtime-mode=BATCH
- env.setRuntimeMode(RuntimeExecutionMode.BATCH);
-
-
- env
- // .socketTextStream("hadoop102", 7777)
- .readTextFile("input/word.txt")
- .flatMap(
- (String value, Collector
> out) -> { - String[] words = value.split(" ");
- for (String word : words) {
- out.collect(Tuple2.of(word, 1));
- }
- }
- )
- .returns(Types.TUPLE(Types.STRING, Types.INT))
- .keyBy(value -> value.f0)
- .sum(1)
- .print();
-
- env.execute();
- /** TODO 关于execute总结(了解)
- * 1、默认 env.execute()触发一个flink job:
- * 一个main方法可以调用多个execute,但是没意义,指定到第一个就会阻塞住
- * 2、env.executeAsync(),异步触发,不阻塞
- * => 一个main方法里 executeAsync()个数 = 生成的flink job数
- * 3、思考:
- * yarn-application 集群,提交一次,集群里会有几个flink job?
- * =》 取决于 调用了n个 executeAsync()
- * =》 对应 application集群里,会有n个job
- * =》 对应 Jobmanager当中,会有 n个 JobMaster
- */
- // env.executeAsync();
- // ……
- // env.executeAsync();
-
-
- }
- }
Flink程序可以在各种上下文环境中运行:我们可以在本地JVM中执行程序,也可以提交到远程集群上运行。
不同的环境,代码的提交运行的过程会有所不同。这就要求我们在提交作业执行计算时,首先必须获取当前Flink的运行环境,从而建立起与Flink框架之间的联系。
我们要获取的执行环境,是StreamExecutionEnvironment类的对象,这是所有Flink程序的基础。在代码中创建执行环境的方式,就是调用这个类的静态方法,具体有以下三种。
1)getExecutionEnvironment
最简单的方式,就是直接调用getExecutionEnvironment方法。它会根据当前运行的上下文直接得到正确的结果:如果程序是独立运行的,就返回一个本地执行环境;如果是创建了jar包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境。也就是说,这个方法会根据当前运行的方式,自行决定该返回什么样的运行环境。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
这种方式,用起来简单高效,是最常用的一种创建执行环境的方式。
2)createLocalEnvironment
这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的CPU核心数。
StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();
3)createRemoteEnvironment
这个方法返回集群执行环境。需要在调用时指定JobManager的主机名和端口号,并指定要在集群中运行的Jar包。
StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment
.createRemoteEnvironment(
"host", // JobManager主机名
1234, // JobManager进程端口号
"path/to/jarFile.jar" // 提交给JobManager的JAR包
);
在获取到程序执行环境后,我们还可以对执行环境进行灵活的设置。比如可以全局设置程序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制。
从Flink 1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理。不建议使用DataSet API。
// 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream API执行模式包括:流执行模式、批执行模式和自动模式。
这是DataStream API最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是Streaming执行模式。
专门用于批处理的执行模式。
在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。
批执行模式的使用。主要有两种方式:
(1)通过命令行配置
bin/flink run -Dexecution.runtime-mode=BATCH ...
在提交作业时,增加execution.runtime-mode参数,指定值为BATCH。
(2)通过代码配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
在代码中,直接基于执行环境调用setRuntimeMode方法,传入BATCH模式。
实际应用中一般不会在代码中配置,而是使用命令行,这样更加灵活。
需要注意的是,写完输出(sink)操作并不代表程序已经结束。因为当main()方法被调用时,其实只是定义了作业的每个执行操作,然后添加到数据流图中;这时并没有真正处理数据——因为数据可能还没来。Flink是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”。
所以我们需要显式地调用执行环境的execute()方法,来触发程序执行。execute()方法将一直等待作业完成,然后返回一个执行结果(JobExecutionResult)。
env.execute();
Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端。

在Flink1.12以前,旧的添加source的方式,是调用执行环境的addSource()方法:
DataStream
方法传入的参数是一个“源函数”(source function),需要实现SourceFunction接口。
从Flink1.12开始,主要使用流批统一的新Source架构:
DataStreamSource
Flink直接提供了很多预实现的接口,此外还有很多外部连接工具也帮我们实现了对应的Source,通常情况下足以应对我们的实际需求。
为了方便练习,这里使用WaterSensor作为数据模型。
| 字段名 | 数据类型 | 说明 |
| id | String | 水位传感器类型 |
| ts | Long | 传感器记录时间戳 |
| vc | Integer | 水位记录 |
具体代码如下:
- public class WaterSensor {
- public String id;
- public Long ts;
- public Integer vc;
-
- public WaterSensor() {
- }
-
- public WaterSensor(String id, Long ts, Integer vc) {
- this.id = id;
- this.ts = ts;
- this.vc = vc;
- }
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public Long getTs() {
- return ts;
- }
-
- public void setTs(Long ts) {
- this.ts = ts;
- }
-
- public Integer getVc() {
- return vc;
- }
-
- public void setVc(Integer vc) {
- this.vc = vc;
- }
-
- @Override
- public String toString() {
- return "WaterSensor{" +
- "id='" + id + '\'' +
- ", ts=" + ts +
- ", vc=" + vc +
- '}';
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- WaterSensor that = (WaterSensor) o;
- return Objects.equals(id, that.id) &&
- Objects.equals(ts, that.ts) &&
- Objects.equals(vc, that.vc);
- }
-
- @Override
- public int hashCode() {
-
- return Objects.hash(id, ts, vc);
- }
- }
这里需要注意,我们定义的WaterSensor,有这样几个特点:
Flink会把这样的类作为一种特殊的POJO(Plain Ordinary Java Object简单的Java对象,实际就是普通JavaBeans)数据类型来对待,方便数据的解析和序列化。另外我们在类中还重写了toString方法,主要是为了测试输出显示更清晰。
我们这里自定义的POJO类会在后面的代码中频繁使用,所以在后面的代码中碰到,把这里的POJO类导入就好了。
最简单的读取数据的方式,就是在代码中直接创建一个Java集合,然后调用执行环境的fromCollection方法进行读取。这相当于将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用,一般用于测试。
- package com.atguigu.source;
-
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
- import java.util.Arrays;
-
- /**
- * TODO
- *
- * @author cjp
- * @version 1.0
- */
- public class CollectionDemo {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- // TODO 从集合读取数据
- DataStreamSource
source = env - .fromElements(1,2,33); // 从元素读
- // .fromCollection(Arrays.asList(1, 22, 3)); // 从集合读
- source.print();
- env.execute();
-
- }
- }
真正的实际应用中,自然不会直接将数据写在代码中。通常情况下,我们会从存储介质中获取数据,一个比较常见的方式就是读取日志文件。这也是批处理中最常见的读取方式。
读取文件,需要添加文件连接器依赖:
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-connector-filesartifactId>
- <version>${flink.version}version>
- dependency>
示例如下:
- package com.atguigu.source;
-
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.connector.file.src.FileSource;
- import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
- import org.apache.flink.core.fs.Path;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
- /**
- * TODO
- *
- * @author cjp
- * @version 1.0
- */
- public class FileSourceDemo {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- // TODO 从文件读: 新Source架构
- FileSource
fileSource = FileSource - .forRecordStreamFormat(
- new TextLineInputFormat(),
- new Path("input/word.txt")
- )
- .build();
- // filesource 随意取名
- env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "filesource")
- .print();
-
- env.execute();
- }
- }
- /**
- * 新的Source写法:
- * env.fromSource(Source的实现类,Watermark,名字)
- */
说明:
不论从集合还是文件,我们读取的其实都是有界数据。在流处理的场景中,数据往往是无界的。
我们之前用到的读取socket文本流,就是流处理场景。但是这种方式由于吞吐量小、稳定性较差,一般也是用于测试。
DataStream
Flink官方提供了连接工具flink-connector-kafka,直接帮我们实现了一个消费者FlinkKafkaConsumer,它就是用来读取Kafka数据的SourceFunction。
所以想要以Kafka作为数据源获取数据,我们只需要引入Kafka连接器的依赖。Flink官方提供的是一个通用的Kafka连接器,它会自动跟踪最新版本的Kafka客户端。目前最新版本只支持0.10.0版本以上的Kafka。这里我们需要导入的依赖如下。
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-connector-kafkaartifactId>
- <version>${flink.version}version>
- dependency>
代码如下:
- package com.atguigu.source;
-
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.api.common.serialization.SimpleStringSchema;
- import org.apache.flink.connector.file.src.FileSource;
- import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
- import org.apache.flink.connector.kafka.source.KafkaSource;
- import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
- import org.apache.flink.core.fs.Path;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
- import java.time.Duration;
-
- /**
- * TODO
- *
- * @author cjp
- * @version 1.0
- */
- public class KafkaSourceDemo {
- public static void main(String[] args) throws Exception {
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- // TODO 从Kafka读: 新Source架构
- KafkaSource
kafkaSource = KafkaSource.builder() - .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092") // 指定kafka节点的地址和端口
- .setGroupId("atguigu") // 指定消费者组的id
- .setTopics("topic_1") // 指定消费的 Topic
- .setValueOnlyDeserializer(new SimpleStringSchema()) // 指定 反序列化器,这个是反序列化value
- .setStartingOffsets(OffsetsInitializer.latest()) // flink消费kafka的策略
- .build();
- env
- // .fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkasource")
- .fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource")
- .print();
- env.execute();
- }
- }
- /**
- * kafka消费者的参数:
- * auto.reset.offsets
- * earliest: 如果有offset,从offset继续消费; 如果没有offset,从 最早 消费
- * latest : 如果有offset,从offset继续消费; 如果没有offset,从 最新 消费
- *
- * flink的kafkasource,offset消费策略:OffsetsInitializer,默认是 earliest
- * earliest: 一定从 最早 消费
- * latest : 一定从 最新 消费
- *
- *
- *
- */
Flink从1.11开始提供了一个内置的DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等。1.17提供了新的Source写法,需要导入依赖:
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-connector-datagenartifactId>
- <version>${flink.version}version>
- dependency>
代码如下:
- package com.atguigu.source;
-
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.api.common.typeinfo.Types;
- import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
- import org.apache.flink.connector.datagen.source.DataGeneratorSource;
- import org.apache.flink.connector.datagen.source.GeneratorFunction;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
- /**
- * TODO
- *
- * @author cjp
- * @version 1.0
- */
- public class DataGeneratorDemo {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- // 如果有n个并行度, 最大值设为a
- // 将数值 均分成 n份, a/n ,比如,最大100,并行度2,每个并行度生成50个
- // 其中一个是 0-49,另一个50-99
- env.setParallelism(2);
- /**
- * 数据生成器Source,四个参数:
- * 第一个: GeneratorFunction接口,需要实现, 重写map方法, 输入类型固定是Long
- * 第二个: long类型, 自动生成的数字序列(从0自增)的最大值(小于),达到这个值就停止了
- * 第三个: 限速策略, 比如 每秒生成几条数据
- * 第四个: 返回的类型
- */
- DataGeneratorSource
dataGeneratorSource = new DataGeneratorSource<>( - new GeneratorFunction
() { - @Override
- public String map(Long value) throws Exception {
- return "Number:" + value;
- }
- },
- 100,
- RateLimiterStrategy.perSecond(1),
- Types.STRING
- );
- env
- .fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator")
- .print();
- env.execute();
- }
- }
1)Flink的类型系统
Flink使用“类型信息”(TypeInformation)来统一表示数据类型。TypeInformation类是Flink中所有类型描述符的基类。它涵盖了类型的一些基本属性,并为每个数据类型生成特定的序列化器、反序列化器和比较器。
2)Flink支持的数据类型
对于常见的Java和Scala数据类型,Flink都是支持的。Flink在内部,Flink对支持不同的类型进行了划分,这些类型可以在Types工具类中找到:
(1)基本类型
所有Java基本类型及其包装类,再加上Void、String、Date、BigDecimal和BigInteger。
(2)数组类型
包括基本类型数组(PRIMITIVE_ARRAY)和对象数组(OBJECT_ARRAY)。
(3)复合数据类型
(4)辅助类型
Option、Either、List、Map等。
(5)泛型类型(GENERIC)
Flink支持所有的Java类和Scala类。不过如果没有按照上面POJO类型的要求来定义,就会被Flink当作泛型类来处理。Flink会把泛型类型当作黑盒,无法获取它们内部的属性;它们也不是由Flink本身序列化的,而是由Kryo序列化的。
在这些类型中,元组类型和POJO类型最为灵活,因为它们支持创建复杂类型。而相比之下,POJO还支持在键(key)的定义中直接使用字段名,这会让我们的代码可读性大大增加。所以,在项目实践中,往往会将流处理程序中的元素类型定为Flink的POJO类型。
Flink对POJO类型的要求如下:
3)类型提示(Type Hints)
Flink还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于Java中泛型擦除的存在,在某些特殊情况下(比如Lambda表达式中),自动提取的信息是不够精细的——只告诉Flink当前的元素由“船头、船身、船尾”构成,根本无法重建出“大船”的模样;这时就需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。
为了解决这类问题,Java API提供了专门的“类型提示”(type hints)。
回忆一下之前的word count流处理程序,我们在将String类型的每个词转换成(word, count)二元组后,就明确地用returns指定了返回的类型。因为对于map里传入的Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));
Flink还专门提供了TypeHint类,它可以捕获泛型的类型信息,并且一直记录下来,为运行时提供足够的信息。我们同样可以通过.returns()方法,明确地指定转换之后的DataStream里元素的类型。
returns(new TypeHint
数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个DataStream转换为新的DataStream。

map是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素。

我们只需要基于DataStream调用map()方法就可以进行转换处理。方法需要传入的参数是接口MapFunction的实现;返回值类型还是DataStream,不过泛型(流中的元素类型)可能改变。
下面的代码用不同的方式,实现了提取WaterSensor中的id字段的功能。
- public class TransMap {
- public static void main(String[] args) throws Exception {
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource
stream = env.fromElements( - new WaterSensor("sensor_1", 1, 1),
- new WaterSensor("sensor_2", 2, 2)
- );
-
- // 方式一:传入匿名类,实现MapFunction
- stream.map(new MapFunction
() { - @Override
- public String map(WaterSensor e) throws Exception {
- return e.id;
- }
- }).print();
-
- // 方式二:传入MapFunction的实现类
- // stream.map(new UserMap()).print();
-
- env.execute();
- }
-
- public static class UserMap implements MapFunction
{ - @Override
- public String map(WaterSensor e) throws Exception {
- return e.id;
- }
- }
- }
上面代码中,MapFunction实现类的泛型类型,与输入数据类型和输出数据的类型有关。在实现MapFunction接口的时候,需要指定两个泛型,分别是输入事件和输出事件的类型,还需要重写一个map()方法,定义从一个输入事件转换为另一个输出事件的具体逻辑。
filter转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为true则元素正常输出,若为false则元素被过滤掉。

进行filter转换之后的新数据流的数据类型与原数据流是相同的。filter转换需要传入的参数需要实现FilterFunction接口,而FilterFunction内要实现filter()方法,就相当于一个返回布尔类型的条件表达式。
案例需求:下面的代码会将数据流中传感器id为sensor_1的数据过滤出来。
- public class TransFilter {
- public static void main(String[] args) throws Exception {
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource
stream = env.fromElements( -
- new WaterSensor("sensor_1", 1, 1),
- new WaterSensor("sensor_1", 2, 2),
- new WaterSensor("sensor_2", 2, 2),
- new WaterSensor("sensor_3", 3, 3)
- );
-
- // 方式一:传入匿名类实现FilterFunction
- stream.filter(new FilterFunction
() { - @Override
- public boolean filter(WaterSensor e) throws Exception {
- return e.id.equals("sensor_1");
- }
- }).print();
-
- // 方式二:传入FilterFunction实现类
- // stream.filter(new UserFilter()).print();
-
- env.execute();
- }
- public static class UserFilter implements FilterFunction
{ - @Override
- public boolean filter(WaterSensor e) throws Exception {
- return e.id.equals("sensor_1");
- }
- }
- }
flatMap操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生0到多个元素。flatMap可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。

同map一样,flatMap也可以使用Lambda表达式或者FlatMapFunction接口实现类的方式来进行传参,返回值类型取决于所传参数的具体逻辑,可以与原数据流相同,也可以不同。
案例需求:如果输入的数据是sensor_1,只打印vc;如果输入的数据是sensor_2,既打印ts又打印vc。
实现代码如下:
- package com.atguigu.transfrom;
-
- import com.atguigu.bean.WaterSensor;
- import org.apache.flink.api.common.functions.FilterFunction;
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.util.Collector;
-
- /**
- * TODO 如果输入的数据是sensor_1,只打印vc;如果输入的数据是sensor_2,既打印ts又打印vc
- *
- * @author cjp
- * @version 1.0
- */
- public class FlatmapDemo {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- DataStreamSource
sensorDS = env.fromElements( - new WaterSensor("s1", 1L, 1),
- new WaterSensor("s1", 11L, 11),
- new WaterSensor("s2", 2L, 2),
- new WaterSensor("s3", 3L, 3)
- );
- /**
- * TODO flatmap: 一进多出(包含0出)
- * 对于s1的数据,一进一出
- * 对于s2的数据,一进2出
- * 对于s3的数据,一进0出(类似于过滤的效果)
- *
- * map怎么控制一进一出:
- * =》 使用 return
- *
- * flatmap怎么控制的一进多出
- * =》 通过 Collector来输出, 调用几次就输出几条
- *
- *
- */
- SingleOutputStreamOperator
flatmap = sensorDS.flatMap(new FlatMapFunction() { - @Override
- public void flatMap(WaterSensor value, Collector
out) throws Exception { - if ("s1".equals(value.getId())) {
- // 如果是 s1,输出 vc
- out.collect(value.getVc().toString());
- } else if ("s2".equals(value.getId())) {
- // 如果是 s2,分别输出ts和vc
- out.collect(value.getTs().toString());
- out.collect(value.getVc().toString());
- }
- }
- });
- flatmap.print();
- env.execute();
- }
-
-
- }
计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并——这就是所谓的“聚合”(Aggregation),类似于MapReduce中的reduce操作。
对于Flink而言,DataStream是没有直接进行聚合的API的。因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。所以在Flink中,要做聚合,需要先进行分区;这个操作就是通过keyBy来完成的。
keyBy是聚合前必须要用到的一个算子。keyBy通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务。
基于不同的key,流中的数据将被分配到不同的分区中去;这样一来,所有具有相同的key的数据,都将被发往同一个分区。

在内部,是通过计算key的哈希值(hash code),对分区数进行取模运算来实现的。所以这里key如果是POJO的话,必须要重写hashCode()方法。
keyBy()方法需要传入一个参数,这个参数指定了一个或一组key。有很多不同的方法来指定key:比如对于Tuple数据类型,可以指定字段的位置或者多个位置的组合;对于POJO类型,可以指定字段的名称(String);另外,还可以传入Lambda表达式或者实现一个键选择器(KeySelector),用于说明从数据中提取key的逻辑。
我们可以以id作为key做一个分区操作,代码实现如下:
- package com.atguigu.aggreagte;
-
- import com.atguigu.bean.WaterSensor;
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.api.java.functions.KeySelector;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.KeyedStream;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.util.Collector;
-
- /**
- * TODO
- *
- * @author cjp
- * @version 1.0
- */
- public class KeybyDemo {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
- DataStreamSource
sensorDS = env.fromElements( - new WaterSensor("s1", 1L, 1),
- new WaterSensor("s1", 11L, 11),
- new WaterSensor("s2", 2L, 2),
- new WaterSensor("s3", 3L, 3)
- );
- // 按照 id 分组
- /**
- * TODO keyby: 按照id分组
- * 要点:
- * 1、返回的是 一个 KeyedStream,键控流
- * 2、keyby不是 转换算子, 只是对数据进行重分区, 不能设置并行度
- * 3、分组 与 分区 的关系:
- * 1) keyby是对数据分组,保证 相同key的数据 在同一个分区(子任务)
- * 2) 分区: 一个子任务可以理解为一个分区,一个分区(子任务)中可以存在多个分组(key)
- */
- KeyedStream
sensorKS = sensorDS - .keyBy(new KeySelector
() { - @Override
- public String getKey(WaterSensor value) throws Exception {
- return value.getId();
- }
- });
- sensorKS.print();
- env.execute();
- }
-
-
- }
需要注意的是,keyBy得到的结果将不再是DataStream,而是会将DataStream转换为KeyedStream。KeyedStream可以认为是“分区流”或者“键控流”,它是对DataStream按照key的一个逻辑分区,所以泛型有两个类型:除去当前流中的元素类型外,还需要指定key的类型。
KeyedStream也继承自DataStream,所以基于它的操作也都归属于DataStream API。但它跟之前的转换操作得到的SingleOutputStreamOperator不同,只是一个流的分区操作,并不是一个转换算子。KeyedStream是一个非常重要的数据结构,只有基于它才可以做后续的聚合操作(比如sum,reduce)。
有了按键分区的数据流KeyedStream,我们就可以基于它进行聚合操作了。Flink为我们内置实现了一些最基本、最简单的聚合API,主要有以下几种:
简单聚合算子使用非常方便,语义也非常明确。这些聚合方法调用时,也需要传入参数;但并不像基本转换算子那样需要实现自定义函数,只要说明聚合指定的字段就可以了。指定字段的方式有两种:指定位置,和指定名称。
对于元组类型的数据,可以使用这两种方式来指定字段。需要注意的是,元组中字段的名称,是以f0、f1、f2、…来命名的。
如果数据流的类型是POJO类,那么就只能通过字段名称来指定,不能通过位置来指定了。
- package com.atguigu.aggreagte;
-
- import com.atguigu.bean.WaterSensor;
- import org.apache.flink.api.java.functions.KeySelector;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.KeyedStream;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
- /**
- * TODO
- *
- * @author cjp
- * @version 1.0
- */
- public class SimpleAggregateDemo {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- DataStreamSource
sensorDS = env.fromElements( - new WaterSensor("s1", 1L, 1),
- new WaterSensor("s1", 11L, 11),
- new WaterSensor("s2", 2L, 2),
- new WaterSensor("s3", 3L, 3)
- );
-
- KeyedStream
sensorKS = sensorDS - .keyBy(new KeySelector
() { - @Override
- public String getKey(WaterSensor value) throws Exception {
- return value.getId();
- }
- });
- /**
- * TODO 简单聚合算子
- * 1、 keyby之后才能调用
- * 2、 分组内的聚合:对同一个key的数据进行聚合
- */
- // 传位置索引的,适用于 Tuple类型,POJO不行
- // SingleOutputStreamOperator
result = sensorKS.sum(2); - // SingleOutputStreamOperator
result = sensorKS.sum("vc"); -
- /**
- * max\maxby的区别: 同min
- * max:只会取比较字段的最大值,非比较字段保留第一次的值
- * maxby:取比较字段的最大值,同时非比较字段 取 最大值这条数据的值
- */
- // SingleOutputStreamOperator
result = sensorKS.max("vc"); - // SingleOutputStreamOperator
result = sensorKS.min("vc"); - SingleOutputStreamOperator
result = sensorKS.maxBy("vc"); - // SingleOutputStreamOperator
result = sensorKS.minby("vc"); -
- result.print();
-
-
- env.execute();
- }
-
-
- }

简单聚合算子返回的,同样是一个SingleOutputStreamOperator,也就是从KeyedStream又转换成了常规的DataStream。所以可以这样理解:keyBy和聚合是成对出现的,先分区、后聚合,得到的依然是一个DataStream。而且经过简单聚合之后的数据流,元素的数据类型保持不变。
一个聚合算子,会为每一个key保存一个聚合的值,在Flink中我们把它叫作“状态”(state)。所以每当有一个新的数据输入,算子就会更新保存的聚合结果,并发送一个带有更新后聚合值的事件到下游算子。对于无界流来说,这些状态是永远不会被清除的,所以我们使用聚合算子,应该只用在含有有限个key的数据流上。
reduce可以对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值,再做一个聚合计算。
reduce操作也会将KeyedStream转换为DataStream。它不会改变流的元素数据类型,所以输出类型和输入类型是一样的。
- package com.atguigu.aggreagte;
-
- import com.atguigu.bean.WaterSensor;
- import org.apache.flink.api.common.functions.ReduceFunction;
- import org.apache.flink.api.java.functions.KeySelector;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.KeyedStream;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
- /**
- * TODO
- *
- * @author cjp
- * @version 1.0
- */
- public class ReduceDemo {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- DataStreamSource
sensorDS = env.fromElements( - new WaterSensor("s1", 1L, 1),
- new WaterSensor("s1", 11L, 11),
- new WaterSensor("s1", 21L, 21),
- new WaterSensor("s2", 2L, 2),
- new WaterSensor("s3", 3L, 3)
- );
- KeyedStream
sensorKS = sensorDS - .keyBy(new KeySelector
() { - @Override
- public String getKey(WaterSensor value) throws Exception {
- return value.getId();
- }
- });
-
- /**
- * TODO reduce:
- * 1、keyby之后调用
- * 2、输入类型 = 输出类型,类型不能变
- * 3、每个key的第一条数据来的时候,不会执行reduce方法,存起来,直接输出
- * 4、reduce方法中的两个参数
- * value1: 之前的计算结果,存状态
- * value2: 现在来的数据
- */
- SingleOutputStreamOperator
reduce = sensorKS.reduce(new ReduceFunction() { - @Override
- public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
- System.out.println("value1=" + value1);
- System.out.println("value2=" + value2);
- return new WaterSensor(value1.id, value2.ts, value1.vc + value2.vc);
- }
- });
- reduce.print();
- env.execute();
- }
-
- }

调用KeyedStream的reduce方法时,需要传入一个参数,实现ReduceFunction接口。接口在源码中的定义如下:
- public interface ReduceFunction
extends Function, Serializable { - T reduce(T value1, T value2) throws Exception;
- }
ReduceFunction接口里需要实现reduce()方法,这个方法接收两个输入事件,经过转换处理之后输出一个相同类型的事件。在流处理的底层实现过程中,实际上是将中间“合并的结果”作为任务的一个状态保存起来的;之后每来一个新的数据,就和之前的聚合状态进一步做归约。
我们可以单独定义一个函数类实现ReduceFunction接口,也可以直接传入一个匿名类。当然,同样也可以通过传入Lambda表达式实现类似的功能。
为了方便后续使用,定义一个WaterSensorMapFunction:
- public class WaterSensorMapFunction implements MapFunction
{ - @Override
- public WaterSensor map(String value) throws Exception {
- String[] datas = value.split(",");
- return new WaterSensor(datas[0],Long.valueOf(datas[1]) ,Integer.valueOf(datas[2]) );
- }
- }
案例:使用reduce实现max和maxBy的功能。
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- env
- .socketTextStream("hadoop102", 7777)
- .map(new WaterSensorMapFunction())
- .keyBy(WaterSensor::getId)
- .reduce(new ReduceFunction
() - {
- @Override
- public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
- System.out.println("Demo7_Reduce.reduce");
-
- int maxVc = Math.max(value1.getVc(), value2.getVc());
- //实现max(vc)的效果 取最大值,其他字段以当前组的第一个为主
- //value1.setVc(maxVc);
- //实现maxBy(vc)的效果 取当前最大值的所有字段
- if (value1.getVc() > value2.getVc()){
- value1.setVc(maxVc);
- return value1;
- }else {
- value2.setVc(maxVc);
- return value2;
- }
- }
- })
- .print();
- env.execute();
reduce同简单聚合算子一样,也要针对每一个key保存状态。因为状态不会清空,所以我们需要将reduce算子作用在一个有限key的流上。
用户自定义函数(user-defined function,UDF),即用户可以根据自身需求,重新实现算子的逻辑。
用户自定义函数分为:函数类、匿名函数、富函数类。
Flink暴露了所有UDF函数的接口,具体实现方式为接口或者抽象类,例如MapFunction、FilterFunction、ReduceFunction等。所以用户可以自定义一个函数类,实现对应的接口。
需求:用来从用户的点击数据中筛选包含“sensor_1”的内容:
方式一:实现FilterFunction接口
- public class TransFunctionUDF {
-
- public static void main(String[] args) throws Exception {
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource
stream = env.fromElements( -
- new WaterSensor("sensor_1", 1, 1),
- new WaterSensor("sensor_1", 2, 2),
- new WaterSensor("sensor_2", 2, 2),
- new WaterSensor("sensor_3", 3, 3)
- );
-
- DataStream
filter = stream.filter(new UserFilter()); -
- filter.print();
- env.execute();
- }
-
- public static class UserFilter implements FilterFunction
{ - @Override
- public boolean filter(WaterSensor e) throws Exception {
- return e.id.equals("sensor_1");
- }
- }
- }
方式二:通过匿名类来实现FilterFunction接口:
- DataStream
stream = stream.filter(new FilterFunction< WaterSensor>() { - @Override
- public boolean filter(WaterSensor e) throws Exception {
- return e.id.equals("sensor_1");
- }
- });
方式二的优化:为了类可以更加通用,我们还可以将用于过滤的关键字"home"抽象出来作为类的属性,调用构造方法时传进去。
- DataStreamSource
stream = env.fromElements( - new WaterSensor("sensor_1", 1, 1),
- new WaterSensor("sensor_1", 2, 2),
- new WaterSensor("sensor_2", 2, 2),
- new WaterSensor("sensor_3", 3, 3)
- );
-
- DataStream
stream = stream.filter(new FilterFunctionImpl("sensor_1")); -
- public static class FilterFunctionImpl implements FilterFunction
{ - private String id;
-
- FilterFunctionImpl(String id) { this.id=id; }
-
- @Override
- public boolean filter(WaterSensor value) throws Exception {
- return thid.id.equals(value.id);
- }
- }
方式三:采用匿名函数(Lambda)
- public class TransFunctionUDF {
-
- public static void main(String[] args) throws Exception {
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource
stream = env.fromElements( -
- new WaterSensor("sensor_1", 1, 1),
- new WaterSensor("sensor_1", 2, 2),
- new WaterSensor("sensor_2", 2, 2),
- new WaterSensor("sensor_3", 3, 3)
- );
-
- //map函数使用Lambda表达式,不需要进行类型声明
- SingleOutputStreamOperator
filter = stream.filter(sensor -> "sensor_1".equals(sensor.id)); -
- filter.print();
-
- env.execute();
- }
- }
“富函数类”也是DataStream API提供的一个函数类的接口,所有的Flink函数类都有其Rich版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction等。
与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
Rich Function有生命周期的概念。典型的生命周期方法有:
需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如RichMapFunction中的map(),在每条数据到来后都会触发一次调用。
来看一个例子说明:
- package com.atguigu.transfrom;
-
- import org.apache.flink.api.common.functions.RichMapFunction;
- import org.apache.flink.api.common.functions.RuntimeContext;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
- /**
- * TODO
- *
- * @author cjp
- * @version 1.0
- */
- public class RichFunctionDemo {
- public static void main(String[] args) throws Exception {
- // StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
- env.setParallelism(2);
-
- DataStreamSource
source = env.socketTextStream("hadoop102", 7777); - SingleOutputStreamOperator
map = source.map(new RichMapFunction() { -
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- System.out.println(
- "子任务编号=" + getRuntimeContext().getIndexOfThisSubtask()
- + ",子任务名称=" + getRuntimeContext().getTaskNameWithSubtasks()
- + ",调用open()");
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- System.out.println(
- "子任务编号=" + getRuntimeContext().getIndexOfThisSubtask()
- + ",子任务名称=" + getRuntimeContext().getTaskNameWithSubtasks()
- + ",调用close()");
- }
-
- @Override
- public Integer map(String value) throws Exception {
- return Integer.parseInt(value) + 1;
- }
- });
-
-
- /**
- * TODO RichXXXFunction: 富函数
- * 1、多了生命周期管理方法:
- * open(): 每个子任务,在启动时,调用一次
- * close():每个子任务,在结束时,调用一次
- * => 如果是flink程序异常挂掉,不会调用close
- * => 如果是正常调用 cancel命令,可以close
- * 2、多了一个 运行时上下文
- * 可以获取一些运行时的环境信息,比如 子任务编号、名称、其他的.....
- */
- // DataStreamSource
source = env.fromElements(1, 2, 3, 4); - // SingleOutputStreamOperator
map = source.map(new RichMapFunction() { - //
- // @Override
- // public void open(Configuration parameters) throws Exception {
- // super.open(parameters);
- // System.out.println(
- // "子任务编号="+getRuntimeContext().getIndexOfThisSubtask()
- // +",子任务名称="+getRuntimeContext().getTaskNameWithSubtasks()
- // +",调用open()");
- // }
- //
- // @Override
- // public void close() throws Exception {
- // super.close();
- // System.out.println(
- // "子任务编号="+getRuntimeContext().getIndexOfThisSubtask()
- // +",子任务名称="+getRuntimeContext().getTaskNameWithSubtasks()
- // +",调用close()");
- // }
- //
- // @Override
- // public Integer map(Integer value) throws Exception {
- // return value + 1;
- // }
- // });
-
-
- map.print();
-
- env.execute();
- }
- }
常见的物理分区策略有:随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)和广播(Broadcast)。
最简单的重分区方式就是直接“洗牌”。通过调用DataStream的.shuffle()方法,将数据随机地分配到下游算子的并行任务中去。
随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地传递到下游任务分区。因为是完全随机的,所以对于同样的输入数据, 每次执行得到的结果也不会相同。

经过随机分区之后,得到的依然是一个DataStream。
我们可以做个简单测试:将数据读入之后直接打印到控制台,将输出的并行度设置为2,中间经历一次shuffle。执行多次,观察结果是否相同。
- public class ShuffleExample {
- public static void main(String[] args) throws Exception {
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- env.setParallelism(2);
-
- DataStreamSource
stream = env.socketTextStream("hadoop102", 7777);; -
- stream.shuffle().print()
-
- env.execute();
- }
- }
轮询,简单来说就是“发牌”,按照先后顺序将数据做依次分发。通过调用DataStream的.rebalance()方法,就可以实现轮询重分区。rebalance使用的是Round-Robin负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去。

stream.rebalance()
重缩放分区和轮询分区非常相似。当调用rescale()方法时,其实底层也是使用Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中。rescale的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌。

stream.rescale()
这种方式其实不应该叫做“重分区”,因为经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用DataStream的broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去。
stream.broadcast()
全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了1,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力。
stream.global()
当Flink提供的所有分区策略都不能满足用户的需求时,我们可以通过使用partitionCustom()方法来自定义分区策略。
1)自定义分区器
- public class MyPartitioner implements Partitioner
{ -
- @Override
- public int partition(String key, int numPartitions) {
- return Integer.parseInt(key) % numPartitions;
- }
- }
2)使用自定义分区
- public class PartitionCustomDemo {
- public static void main(String[] args) throws Exception {
- // StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
-
- env.setParallelism(2);
-
- DataStreamSource
socketDS = env.socketTextStream("hadoop102", 7777); -
- DataStream
myDS = socketDS - .partitionCustom(
- new MyPartitioner(),
- value -> value);
-
-
- myDS.print();
-
- env.execute();
- }
- }
所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。

其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用.filter()方法进行筛选,就可以得到拆分之后的流了。
案例需求:读取一个整数数字流,将数据流划分为奇数流和偶数流。
代码实现:
- public class SplitStreamByFilter {
-
- public static void main(String[] args) throws Exception {
-
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- SingleOutputStreamOperator
ds = env.socketTextStream("hadoop102", 7777) - .map(Integer::valueOf);
- //将ds 分为两个流 ,一个是奇数流,一个是偶数流
- //使用filter 过滤两次
- SingleOutputStreamOperator
ds1 = ds.filter(x -> x % 2 == 0); - SingleOutputStreamOperator
ds2 = ds.filter(x -> x % 2 == 1); -
- ds1.print("偶数");
- ds2.print("奇数");
-
- env.execute();
- }
- }
这种实现非常简单,但代码显得有些冗余——我们的处理逻辑对拆分出的三条流其实是一样的,却重复写了三次。而且这段代码背后的含义,是将原始数据流stream复制三份,然后对每一份分别做筛选;这明显是不够高效的。我们自然想到,能不能不用复制流,直接用一个算子就把它们都拆分开呢?
关于处理函数中侧输出流的用法,我们已经在7.5节做了详细介绍。简单来说,只需要调用上下文ctx的.output()方法,就可以输出任意类型的数据了。而侧输出流的标记和提取,都离不开一个“输出标签”(OutputTag),指定了侧输出流的id和类型。
代码实现:将WaterSensor按照Id类型进行分流。
- /**
- * TODO
- *
- * @author cjp
- * @version 1.0
- */
- public class WaterSensorMapFunction implements MapFunction
{ - @Override
- public WaterSensor map(String value) throws Exception {
- String[] datas = value.split(",");
- return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
- }
- }
- package com.atguigu.split;
-
- import com.atguigu.bean.WaterSensor;
- import com.atguigu.functions.WaterSensorMapFunction;
- import org.apache.flink.api.common.typeinfo.Types;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.ProcessFunction;
- import org.apache.flink.util.Collector;
- import org.apache.flink.util.OutputTag;
-
- /**
- * TODO 分流: 奇数、偶数拆分成不同流
- *
- * @author cjp
- * @version 1.0
- */
- public class SideOutputDemo {
- public static void main(String[] args) throws Exception {
- // StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
- env.setParallelism(1);
- SingleOutputStreamOperator
sensorDS = env - .socketTextStream("hadoop102", 7777)
- .map(new WaterSensorMapFunction());
-
- /**
- * TODO 使用侧输出流 实现分流
- * 需求: watersensor的数据,s1、s2的数据分别分开
- *
- * TODO 总结步骤:
- * 1、使用 process算子
- * 2、定义 OutputTag对象
- * 3、调用 ctx.output
- * 4、通过主流 获取 测流
- */
- /**
- * 创建OutputTag对象
- * 第一个参数: 标签名
- * 第二个参数: 放入侧输出流中的 数据的 类型,Typeinformation
- */
- OutputTag
s1Tag = new OutputTag<>("s1", Types.POJO(WaterSensor.class)); - OutputTag
s2Tag = new OutputTag<>("s2", Types.POJO(WaterSensor.class)); - SingleOutputStreamOperator
process = sensorDS - .process(
- new ProcessFunction
() { - @Override
- public void processElement(WaterSensor value, Context ctx, Collector
out) throws Exception { - String id = value.getId();
- if ("s1".equals(id)) {
- // 如果是 s1,放到侧输出流s1中
- /**
- * 上下文ctx 调用ouput,将数据放入侧输出流
- * 第一个参数: Tag对象
- * 第二个参数: 放入侧输出流中的 数据
- */
- ctx.output(s1Tag, value);
- } else if ("s2".equals(id)) {
- // 如果是 s2,放到侧输出流s2中
- ctx.output(s2Tag, value);
- } else {
- // 非s1、s2的数据,放到主流中
- out.collect(value);
- }
- }
- }
- );
- // 从主流中,根据标签 获取 侧输出流
- SideOutputDataStream
s1 = process.getSideOutput(s1Tag); - SideOutputDataStream
s2 = process.getSideOutput(s2Tag); - // 打印主流
- process.print("主流-非s1、s2");
- //打印 侧输出流
- s1.printToErr("s1");
- s2.printToErr("s2");
- env.execute();
- }
- }
-
- /**
- */
在实际应用中,我们经常会遇到来源不同的多条流,需要将它们的数据进行联合处理。所以Flink中合流的操作会更加普遍,对应的API也更加丰富。
最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union)。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。

在代码中,我们只要基于DataStream直接调用.union()方法,传入其他DataStream作为参数,就可以实现流的联合了;得到的依然是一个DataStream:
stream1.union(stream2, stream3, ...)
注意:union()的参数可以是多个DataStream,所以联合操作可以实现多条流的合并。
代码实现:我们可以用下面的代码做一个简单测试:
- package com.atguigu.combine;
-
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
- /**
- * TODO
- *
- * @author cjp
- * @version 1.0
- */
- public class UnionDemo {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- DataStreamSource
source1 = env.fromElements(1, 2, 3); - DataStreamSource
source2 = env.fromElements(11, 22, 33); - DataStreamSource
source3 = env.fromElements("111", "222", "333"); - /**
- * TODO union:合并数据流
- * 1、 流的数据类型必须一致
- * 2、 一次可以合并多条流
- */
- // DataStream
union = source1.union(source2).union(source3.map(r -> Integer.valueOf(r))); - DataStream
union = source1.union(source2, source3.map(r -> Integer.valueOf(r))); - union.print();
- env.execute();
- }
- }
流的联合虽然简单,不过受限于数据类型不能改变,灵活性大打折扣,所以实际应用较少出现。除了联合(union),Flink还提供了另外一种方便的合流操作——连接(connect)。
1)连接流(ConnectedStreams)\

代码实现:需要分为两步:首先基于一条DataStream调用.connect()方法,传入另外一条DataStream作为参数,将两条流连接起来,得到一个ConnectedStreams;然后再调用同处理方法得到DataStream。这里可以的调用的同处理方法有.map()/.flatMap(),以及.process()方法。
- public class ConnectDemo {
-
- public static void main(String[] args) throws Exception {
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- // DataStreamSource
source1 = env.fromElements(1, 2, 3); - // DataStreamSource
source2 = env.fromElements("a", "b", "c"); -
- SingleOutputStreamOperator
source1 = env - .socketTextStream("hadoop102", 7777)
- .map(i -> Integer.parseInt(i));
-
- DataStreamSource
source2 = env.socketTextStream("hadoop102", 8888); -
- /**
- * TODO 使用 connect 合流
- * 1、一次只能连接 2条流
- * 2、流的数据类型可以不一样
- * 3、 连接后可以调用 map、flatmap、process来处理,但是各处理各的
- */
- ConnectedStreams
connect = source1.connect(source2); -
- SingleOutputStreamOperator
result = connect.map(new CoMapFunction() { - @Override
- public String map1(Integer value) throws Exception {
- return "来源于数字流:" + value.toString();
- }
-
- @Override
- public String map2(String value) throws Exception {
- return "来源于字母流:" + value;
- }
- });
-
- result.print();
-
- env.execute(); }
- }
上面的代码中,ConnectedStreams有两个类型参数,分别表示内部包含的两条流各自的数据类型;由于需要“一国两制”,因此调用.map()方法时传入的不再是一个简单的MapFunction,而是一个CoMapFunction,表示分别对两条流中的数据执行map操作。这个接口有三个类型参数,依次表示第一条流、第二条流,以及合并后的流中的数据类型。需要实现的方法也非常直白:.map1()就是对第一条流中数据的map操作,.map2()则是针对第二条流。
2)CoProcessFunction
与CoMapFunction类似,如果是调用.map()就需要传入一个CoMapFunction,需要实现map1()、map2()两个方法;而调用.process()时,传入的则是一个CoProcessFunction。它也是“处理函数”家族中的一员,用法非常相似。它需要实现的就是processElement1()、processElement2()两个方法,在每个数据到来时,会根据来源的流调用其中的一个方法进行处理。
值得一提的是,ConnectedStreams也可以直接调用.keyBy()进行按键分区的操作,得到的还是一个ConnectedStreams:
connectedStreams.keyBy(keySelector1, keySelector2);
这里传入两个参数keySelector1和keySelector2,是两条流中各自的键选择器;当然也可以直接传入键的位置值(keyPosition),或者键的字段名(field),这与普通的keyBy用法完全一致。ConnectedStreams进行keyBy操作,其实就是把两条流中key相同的数据放到了一起,然后针对来源的流再做各自处理,这在一些场景下非常有用。
案例需求:连接两条流,输出能根据id匹配上的数据(类似inner join效果)
- package com.atguigu.combine;
-
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.api.java.tuple.Tuple3;
- import org.apache.flink.streaming.api.datastream.ConnectedStreams;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
- import org.apache.flink.util.Collector;
-
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
-
- /**
- * TODO 连接两条流,输出能根据id匹配上的数据(类似inner join效果)
- *
- * @author cjp
- * @version 1.0
- */
- public class ConnectKeybyDemo {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
-
- DataStreamSource
> source1 = env.fromElements( - Tuple2.of(1, "a1"),
- Tuple2.of(1, "a2"),
- Tuple2.of(2, "b"),
- Tuple2.of(3, "c")
- );
- DataStreamSource
> source2 = env.fromElements( - Tuple3.of(1, "aa1", 1),
- Tuple3.of(1, "aa2", 2),
- Tuple3.of(2, "bb", 1),
- Tuple3.of(3, "cc", 1)
- );
-
- ConnectedStreams
, Tuple3> connect = source1.connect(source2); -
- // 多并行度下,需要根据 关联条件进行 keyby,才能保证 key相同的数据到一起去,才能匹配上
- ConnectedStreams
, Tuple3> connectKeyby = connect.keyBy(s1 -> s1.f0, s2 -> s2.f0); -
- /**
- * 实现互相匹配的效果: 两条流,,不一定谁的数据先来
- * 1、每条流,有数据来,存到一个变量中
- * hashmap
- * =》key=id,第一个字段值
- * =》value=List<数据>
- * 2、每条流有数据来的时候,除了存变量中, 不知道对方是否有匹配的数据,要去另一条流存的变量中 查找是否有匹配上的
- */
- SingleOutputStreamOperator
process = connectKeyby.process( - new CoProcessFunction
, Tuple3, String>() { - // 每条流定义一个hashmap,用来存数据
- Map
>> s1Cache = new HashMap<>(); - Map
>> s2Cache = new HashMap<>(); -
-
- /**
- * 第一条流的处理逻辑
- * @param value 第一条流的数据
- * @param ctx 上下文
- * @param out 采集器
- * @throws Exception
- */
- @Override
- public void processElement1(Tuple2
value, Context ctx, Collector out) throws Exception { - Integer id = value.f0;
- // TODO 1. s1的数据来了,就存到变量中
- if (!s1Cache.containsKey(id)) {
- // 1.1 如果key不存在,说明是该key的第一条数据,初始化,put进map中
- List
> s1Values = new ArrayList<>(); - s1Values.add(value);
- s1Cache.put(id, s1Values);
- } else {
- // 1.2 key存在,不是该key的第一条数据,直接添加到 value的list中
- s1Cache.get(id).add(value);
- }
-
- // TODO 2.去 s2Cache中查找是否有id能匹配上的,匹配上就输出,没有就不输出
- if (s2Cache.containsKey(id)) {
- for (Tuple3
s2Element : s2Cache.get(id)) { - out.collect("s1:" + value + "<========>" + "s2:" + s2Element);
- }
- }
-
- }
-
- /**
- * 第二条流的处理逻辑
- * @param value 第二条流的数据
- * @param ctx 上下文
- * @param out 采集器
- * @throws Exception
- */
- @Override
- public void processElement2(Tuple3
value, Context ctx, Collector out) throws Exception { - Integer id = value.f0;
- // TODO 1. s2的数据来了,就存到变量中
- if (!s2Cache.containsKey(id)) {
- // 1.1 如果key不存在,说明是该key的第一条数据,初始化,put进map中
- List
> s2Values = new ArrayList<>(); - s2Values.add(value);
- s2Cache.put(id, s2Values);
- } else {
- // 1.2 key存在,不是该key的第一条数据,直接添加到 value的list中
- s2Cache.get(id).add(value);
- }
-
- // TODO 2.去 s1Cache中查找是否有id能匹配上的,匹配上就输出,没有就不输出
- if (s1Cache.containsKey(id)) {
- for (Tuple2
s1Element : s1Cache.get(id)) { - out.collect("s1:" + s1Element + "<========>" + "s2:" + value);
- }
- }
- }
- }
- );
-
- process.print();
-
-
- env.execute();
- }
- }
Flink作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持。

Flink的DataStream API专门提供了向外部写入数据的方法:addSink。与addSource类似,addSink方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。
Flink1.12以前,Sink算子的创建是通过调用DataStream的.addSink()方法实现的。
stream.addSink(new SinkFunction(…));
addSink方法同样需要传入一个参数,实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。
Flink1.12开始,同样重构了Sink架构,
stream.sinkTo(…)
当然,Sink多数情况下同样并不需要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink,它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。如下图所示,列出了Flink官方目前支持的第三方系统连接器:

我们可以看到,像Kafka之类流式系统,Flink提供了完美对接,source/sink两端都能连接,可读可写;而对于Elasticsearch、JDBC等数据存储系统,则只提供了输出写入的sink连接器。
除Flink官方之外,Apache Bahir框架,也实现了一些其他第三方系统与Flink的连接器。

Flink专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。
FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用FileSink的静态方法:
示例:
- package com.atguigu.sink;
-
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.api.common.serialization.SimpleStringEncoder;
- import org.apache.flink.api.common.typeinfo.Types;
- import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
- import org.apache.flink.configuration.MemorySize;
- import org.apache.flink.connector.datagen.source.DataGeneratorSource;
- import org.apache.flink.connector.datagen.source.GeneratorFunction;
- import org.apache.flink.connector.file.sink.FileSink;
- import org.apache.flink.core.fs.Path;
- import org.apache.flink.streaming.api.CheckpointingMode;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
- import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
- import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
-
- import java.time.Duration;
- import java.time.ZoneId;
- import java.util.TimeZone;
-
- /**
- * TODO
- *
- * @author cjp
- * @version 1.0
- */
- public class SinkFile {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- // TODO 每个目录中,都有 并行度个数的 文件在写入
- env.setParallelism(2);
- // 必须开启checkpoint,否则一直都是 .inprogress
- env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
- DataGeneratorSource
dataGeneratorSource = new DataGeneratorSource<>( - new GeneratorFunction
() { - @Override
- public String map(Long value) throws Exception {
- return "Number:" + value;
- }
- },
- Long.MAX_VALUE,
- RateLimiterStrategy.perSecond(1000),
- Types.STRING
- );
- DataStreamSource
dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator"); -
- // TODO 输出到文件系统
- FileSink
fieSink = FileSink - // 输出行式存储的文件,指定路径、指定编码
- .
forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8")) - // 输出文件的一些配置: 文件名的前缀、后缀
- .withOutputFileConfig(
- OutputFileConfig.builder()
- .withPartPrefix("atguigu-")
- .withPartSuffix(".log")
- .build()
- )
- // 按照目录分桶:如下,就是每个小时一个目录
- .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
- // 文件滚动策略: 1分钟 或 1m
- .withRollingPolicy(
- DefaultRollingPolicy.builder()
- .withRolloverInterval(Duration.ofMinutes(1))
- .withMaxPartSize(new MemorySize(1024*1024))
- .build()
- )
- .build();
- dataGen.sinkTo(fieSink);
- env.execute();
- }
- }
(1)添加Kafka 连接器依赖
由于我们已经测试过从Kafka数据源读取数据,连接器相关依赖已经引入,这里就不重复介绍了。
(2)启动Kafka集群
(3)编写输出到Kafka的示例代码
输出无key的record:
- public class SinkKafka {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- // 如果是精准一次,必须开启checkpoint(后续章节介绍)
- env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
-
-
- SingleOutputStreamOperator
sensorDS = env - .socketTextStream("hadoop102", 7777);
-
- /**
- * Kafka Sink:
- * TODO 注意:如果要使用 精准一次 写入Kafka,需要满足以下条件,缺一不可
- * 1、开启checkpoint(后续介绍)
- * 2、设置事务前缀
- * 3、设置事务超时时间: checkpoint间隔 < 事务超时时间 < max的15分钟
- */
- KafkaSink
kafkaSink = KafkaSink.builder() - // 指定 kafka 的地址和端口
- .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
- // 指定序列化器:指定Topic名称、具体的序列化
- .setRecordSerializer(
- KafkaRecordSerializationSchema.
builder() - .setTopic("ws")
- .setValueSerializationSchema(new SimpleStringSchema())
- .build()
- )
- // 写到kafka的一致性级别: 精准一次、至少一次
- .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
- // 如果是精准一次,必须设置 事务的前缀
- .setTransactionalIdPrefix("atguigu-")
- // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟
- .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"")
- .build();
-
-
- sensorDS.sinkTo(kafkaSink);
-
-
- env.execute();
- }
- }
自定义序列化器,实现带key的record:
- public class SinkKafkaWithKey {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
- env.setRestartStrategy(RestartStrategies.noRestart());
-
-
- SingleOutputStreamOperator
sensorDS = env - .socketTextStream("hadoop102", 7777);
-
-
- /**
- * 如果要指定写入kafka的key,可以自定义序列化器:
- * 1、实现 一个接口,重写 序列化 方法
- * 2、指定key,转成 字节数组
- * 3、指定value,转成 字节数组
- * 4、返回一个 ProducerRecord对象,把key、value放进去
- */
- KafkaSink
kafkaSink = KafkaSink.builder() - .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
- .setRecordSerializer(
- new KafkaRecordSerializationSchema
() { -
- @Nullable
- @Override
- public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
- String[] datas = element.split(",");
- byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
- byte[] value = element.getBytes(StandardCharsets.UTF_8);
- return new ProducerRecord<>("ws", key, value);
- }
- }
- )
- .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
- .setTransactionalIdPrefix("atguigu-")
- .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
- .build();
-
-
- sensorDS.sinkTo(kafkaSink);
-
-
- env.execute();
- }
- }
(4)运行代码,在Linux主机启动一个消费者,查看是否收到数据
[atguigu@hadoop102 ~]$
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ws
写入数据的MySQL的测试步骤如下。
(1)添加依赖
添加MySQL驱动:
- <dependency>
- <groupId>mysqlgroupId>
- <artifactId>mysql-connector-javaartifactId>
- <version>8.0.27version>
- dependency>
官方 已经 提供flink-connector-jdbc的1.17.0的正式依赖
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-connector-jdbcartifactId>
- <version>3.1.1-1.17version>
- dependency>
(2)启动MySQL,在test库下建表ws
mysql>
CREATE TABLE `ws` (
`id` varchar(100) NOT NULL,
`ts` bigint(20) DEFAULT NULL,
`vc` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
(3)编写输出到MySQL的示例代码
- public class SinkMySQL {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
-
- SingleOutputStreamOperator
sensorDS = env - .socketTextStream("hadoop102", 7777)
- .map(new WaterSensorMapFunction());
-
-
- /**
- * TODO 写入mysql
- * 1、只能用老的sink写法: addsink
- * 2、JDBCSink的4个参数:
- * 第一个参数: 执行的sql,一般就是 insert into
- * 第二个参数: 预编译sql, 对占位符填充值
- * 第三个参数: 执行选项 ---》 攒批、重试
- * 第四个参数: 连接选项 ---》 url、用户名、密码
- */
- SinkFunction
jdbcSink = JdbcSink.sink( - "insert into ws values(?,?,?)",
- new JdbcStatementBuilder
() { - @Override
- public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
- //每收到一条WaterSensor,如何去填充占位符
- preparedStatement.setString(1, waterSensor.getId());
- preparedStatement.setLong(2, waterSensor.getTs());
- preparedStatement.setInt(3, waterSensor.getVc());
- }
- },
- JdbcExecutionOptions.builder()
- .withMaxRetries(3) // 重试次数
- .withBatchSize(100) // 批次的大小:条数
- .withBatchIntervalMs(3000) // 批次的时间
- .build(),
- new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
- .withUrl("jdbc:mysql://hadoop102:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
- .withUsername("root")
- .withPassword("000000")
- .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
- .build()
- );
-
-
- sensorDS.addSink(jdbcSink);
-
-
- env.execute();
- }
- }
如果我们想将数据存储到我们自己的存储设备中,而Flink并没有提供可以直接使用的连接器,就只能自定义Sink进行输出了。与Source类似,Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。
stream.addSink(new MySinkFunction
在实现SinkFunction的时候,需要重写的一个关键方法invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。
这种方式比较通用,对于任何外部存储系统都有效;不过自定义Sink想要实现状态一致性并不容易,所以一般只在没有其它选择时使用。实际项目中用到的外部连接器Flink官方基本都已实现,而且在不断地扩充,因此自定义的场景并不常见。
- package com.atguigu.sink;
-
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.CheckpointingMode;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
- import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-
- import java.sql.Connection;
-
- /**
- * TODO
- *
- * @author cjp
- * @version 1.0
- */
- public class SinkCustom {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- SingleOutputStreamOperator
sensorDS = env - .socketTextStream("hadoop102", 7777);
- sensorDS.addSink(new MySink());
- env.execute();
- }
-
- public static class MySink extends RichSinkFunction
{ - Connection conn = null;
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- // 在这里 创建连接
- // conn = new xxxx
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- // 做一些清理、销毁连接
- }
-
- /**
- * sink的核心逻辑,写出的逻辑就写在这个方法里
- * @param value
- * @param context
- * @throws Exception
- */
- @Override
- public void invoke(String value, Context context) throws Exception {
- // 写出逻辑
- // 这个方法是 来一条数据,调用一次,所以不要在这里创建 连接对象
-
- }
- }
- }