• 【入门Flink】- 07Flink DataStream API【万字篇】


    DataStream API 是 Flink 的核心层 API。一个 Flink 程序,其实就是对DataStream的各种转换。

    代码基本上都由以下几部分构成:

    image-20231104230137211

    执行环境(Execution Environment)

    1)创建执行环境StreamExecutionEnvironment

    StreamExecutionEnvironment 类的对象,这是所有Flink程序的基础。调用这个类的静态方法,具体有以下三种

    (1)getExecutionEnvironment √

    最简单的方式,就是直接调用 getExecutionEnvironment 方法。这个方法会根据当前运行的方式,自行决定该返回什么样的运行环境。

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
    
    • 1

    (2)createLocalEnvironment

    返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的 CPU 核心数。

    StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();
    
    • 1

    (3)createRemoteEnvironment

    返回集群执行环境。需要在调用时指定 JobManager 的主机名和端口号,并指定要在集群中运行的 Jar 包。

    StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment(
                                "host", // JobManager 主机名
                                1234, // JobManager 进程端口号
                                "path/to/jarFile.jar" // 提交给 JobManager 的JAR包
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在获取到程序执行环境后,还可以对执行环境进行灵活的设置。比如可以全局设置程序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制。

    2)执行模式(Execution Mode)

    从 Flink 1.12 开始,官方推荐的做法是直接使用 DataStream API,在提交任务时通过将执行模式设为 BATCH 来进行批处理。不建议使用 DataSet API。

    // 流处理环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    • 1
    • 2

    DataStream API 执行模式包括:流执行模式、批执行模式和自动模式

    • 流执行模式(Streaming)

      DataStream API 最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是 Streaming 执行模式

    • 批执行模式(Batch)

      专门用于批处理的执行模式。

    • 自动模式(AutoMatic)

      在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。

    (1)通过命令行配置

    bin/flink run -Dexecution.runtime-mode=BATCH ...
    
    • 1

    在提交作业时,增加 execution.runtime-mode 参数,指定值为BATCH。

    (2)通过代码配置

    image-20231104231309108

    实际应用中一般不会在代码中配置,而是使用命令行,这样更加灵活。

    3)触发程序执行

    Flink 是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”“懒执行”

    同步执行 or 异步执行(了解)

    execute() or executeAsync()

    env.execute(); // 同步
    env.executeAsync(); // 异步
    
    • 1
    • 2

    exexute总结

    1. 默认 env.execute() 触发一个Flink job
      • 一个main方法可以调用多个execute,但是没有意义,指定一个就会阻塞住(同步)
    2. env.executeAsync(),异步,不阻塞
      • 一个main方法里 executeAsync()个数 = 生成Flink job数

    源算子(Source)

    Flink 可以从各种来源获取数据,然后构建 DataStream 进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source 是整个处理程序的输入端。

    在 Flink1.12 以前,旧的添加 source 的方式,是调用执行环境的addSource()方法:

    DataStream<String> stream = env.addSource(...);
    
    • 1

    方法传入的参数是一个“源函数”(source function),需要实现SourceFunction 接口。

    从 Flink1.12 开始,主要使用流批统一的新 Source 架构fromSource()方法):

    DataStreamSource<String> stream = env.fromSource();
    
    • 1

    Flink 直接提供了很多预实现的接口,此外还有很多外部连接工具也实现了对应的Source,通常情况下足以应对实际需求。

    1)从集合中读取数据

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env
                    .fromElements(1, 2, 3, 4, 5)  // 直接填写元素
    //                .fromCollection(Arrays.asList(1, 2, 3, 4, 5))  // 从集合读取元素
                    .print();
    
            env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    2)从文件读取数据

    通常情况,会从存储介质中获取数据,一个比较常见的方式就是读取日志文件。这也是批处理中最常见的读取方式。

    读取文件,需要添加文件连接器依赖:

            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-connector-filesartifactId>
                <version>${flink.version}version>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            FileSource<String> fileSource = FileSource.forRecordStreamFormat(
                    new TextLineInputFormat(),
                    // 文件路径,相对路径不行就用绝对路径
                    Path.fromLocalFile(new File("D:\\workspace\\IdeaProjects\\second-java\\day5-flink\\src\\main\\resources\\input\\words.txt"))
    
            ).build();
            env.
                    fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource")
                    .print();
    
            env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    说明

    • 参数可以是目录,也可以是文件;还可以从 HDFS 目录下读取,使用路径hdfs://…;
    • 路径可以是相对路径,也可以是绝对路径;
    • 相对路径是从系统属性 user.dir 获取路径:idea 下是project 的根目录,standalone模式下是集群节点根目录。

    3)从 Socket 读取数据

    读取 socket 文本流,就是流处理场景。但是这种方式由于吞吐量小、稳定性较差,一般也是用于测试。

    DataStreamSource<String> lineStream = env.socketTextStream("localhost", 7777);
    
    • 1

    4)从 Kafka 读取数据

    Flink 官方提供了连接工具 flink-connector-kafka , 直接实现了一个消费者FlinkKafkaConsumer,它就是用来读取 Kafka 数据的 SourceFunction。

    引入Kafka 连接器的依赖:

            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-connector-kafkaartifactId>
                <version>${flink.version}version>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    代码如下:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                    .setBootstrapServers("124.222.253.33:9092") // ip:port
                    .setTopics("topic_1") // topic
                    .setGroupId("group1") // 消费者组
                    // latest 将偏移初始化为最新偏移的OffsetInitializer
                    // earliest 偏移初始化为最早可用偏移的OffsetInitializer
                    .setStartingOffsets(OffsetsInitializer.latest())
                    .setValueOnlyDeserializer(new SimpleStringSchema()).build(); // 仅Value反序列化
    
            env
                    .fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source")
                    .print();
            env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    5)从数据生成器读取数据

    Flink 从 1.11 开始提供了一个内置的 DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等。

    需要导入依赖:

      		<dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-connector-datagenartifactId>
                <version>${flink.version}version>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    代码如下:

     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.setParallelism(1);
            DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
                    // 数据转换生成函数
                    new GeneratorFunction<Long, String>() {
                        @Override
                        public String map(Long value) throws Exception {
                            return "Number:" + value;
                        }
                    },
                    // 从0开始递增至这个数
                    Long.MAX_VALUE,
                    // 数据生产效率,每秒10个
                    RateLimiterStrategy.perSecond(10), Types.STRING
    
            );
            env
                    .fromSource(dataGeneratorSource,
                            WatermarkStrategy.noWatermarks(), "dataGenerator")
                    .print();
    
            env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    Flink 支持的数据类型

    1)Flink 的类型系统

    Flink 使用“类型信息”(TypeInformation)来统一表示数据类型。TypeInformation类是Flink 中所有类型描述符的基类。它涵盖了类型的一些基本属性,并为每个数据类型生成特定的序列化器、反序列化器和比较器。

    2)Flink 支持的数据类型

    对于常见的 Java 和 Scala 数据类型,Flink 都是支持的。Flink 在内部,Flink 对支持不同的类型进行了划分,这些类型可以在 Types 工具类中找到:

    (1)基本类型

    所有 Java 基本类型及其包装类,再加上 Void、String、Date、BigDecimal 和BigInteger。

    (2)数组类型

    包括基本类型数组(PRIMITIVE_ARRAY)和对象数组(OBJECT_ARRAY)。

    (3)复合数据类型(Tuple、ROW、POJO)

    • Java 元组类型(TUPLE):这是 Flink 内置的元组类型,是Java API 的一部分。最多25 个字段,也就是从 Tuple0~Tuple25,不支持空字段。
    • 行类型(ROW):可以认为是具有任意个字段的元组,并支持空字段。
    • POJO:Flink 自定义的类似于 Java bean 模式的类。

    Flink 对 POJO 类型的要求如下:

    • 类是公有(public)的
    • 有一个无参的构造方法
    • 所有属性都是公有(public)的
    • 所有属性的类型都是可以序列化的

    对象被Flink视为POJO对象,序列化使用的是 PojoSerializer【效率比较高】
    序列化的时候, 就只会序列化字段,方法不会被序列化
    反序列化的时候,会根据公有的无参构造方法, 去构造对象

    可以使用 TypeExtractor.createTypeInfo(xxx.class) 判断对象是不是Flink世界中的POJO对象:

    • 返回的是PojoType,对象会被Flink视为POJO对象
    • 返回的是GenericType,对象不会被Flink视为POJO对象

    (4)辅助类型

    Option、Either、List、Map 等。

    (5)泛型类型(GENERIC)

    Flink 会把泛型类型当作黑盒,无法获取它们内部的属性;它们也不是由 Flink 本身序列化的,而是由 Kryo 序列化的

    3)类型提示(Type Hints)

    有时,需要显式地告诉系统当前的返回类型,才能正确地解析出完整数据。

    .map(word -> Tuple2.of(word, 1L))
    .returns(Types.TUPLE(Types.STRING, Types.LONG));
    
    • 1
    • 2

    或者,使用TypeHint 类,它可以捕获泛型的类型信息,并且一直记录下来,为运行时提供足够的信息。仍通过.returns()方法,明确地指定转换之后的DataStream 里元素的类型。

    returns(new TypeHint<Tuple2<Integer, SomeType>>(){})
    
    • 1

    转换算子(Transformation)

    开始之前,准备 WaterSensor 作为数据模型。

    字段分别代表如下含义:

    字段名数据类型说明
    idString水位传感器类型
    tsLong传感器记录时间戳
    vcInteger水位记录
    public class WaterSensor {
    
        public String id;
        public Long ts;
        public Integer vc;
    
        public WaterSensor() {
        }
    
        public WaterSensor(String id, Long ts, Integer vc) {
            this.id = id;
            this.ts = ts;
            this.vc = vc;
        }
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public Long getTs() {
            return ts;
        }
    
        public void setTs(Long ts) {
            this.ts = ts;
        }
    
        public Integer getVc() {
            return vc;
        }
    
        public void setVc(Integer vc) {
            this.vc = vc;
        }
    
        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            WaterSensor that = (WaterSensor) o;
            return Objects.equals(id, that.id) && Objects.equals(ts, that.ts) && Objects.equals(vc, that.vc);
        }
    
        @Override
        public int hashCode() {
            return Objects.hash(id, ts, vc);
        }
    
        @Override
        public String toString() {
            return "WaterSensor{" +
                    "id='" + id + '\'' +
                    ", ts=" + ts +
                    ", vc=" + vc +
                    '}';
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    数据源读入数据之后,就可以使用各种转换算子,将一个或多个DataStream转换为新的 DataStream。

    转换算子一般有三种写法:

    1. 匿名类
    2. lambda表达式
    3. 实现函数接口

    1)基本转换算子(map/ filter/ flatMap)

    (1)映射(map)

    “一一映射”,消费一个元素就产出一个元素。

    需求:提取 WaterSensor 中的id 字段

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStreamSource<WaterSensor> stream = env.fromElements(
                    new WaterSensor("sensor_1", 1L, 1),
                    new WaterSensor("sensor_2", 2L, 2)
            );
    
            stream.map(WaterSensor::getId)
                    .print();
    
            env.execute();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    (2)过滤(filter)

    需求:将数据流中传感器 id 为 sensor_1 的数据过滤出来

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStreamSource<WaterSensor> stream = env.fromElements(
                    new WaterSensor("sensor_1", 1L, 1),
                    new WaterSensor("sensor_2", 2L, 2)
            );
    
            stream.filter(el -> "sensor_1".equals(el.getId()))
                    .print();
    
            env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    (3)扁平映射(flatMap)

    flatMap 一进多出

    需求:如果输入的数据是 sensor_1,只打印 vc;如果输入的数据是sensor_2,既打印 ts 又打印 vc。

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStreamSource<WaterSensor> stream = env.fromElements(
                    new WaterSensor("sensor_1", 1L, 1),
                    new WaterSensor("sensor_2", 2L, 2)
            );
    
            stream.flatMap(
                    new FlatMapFunction<WaterSensor, String>() {
                        @Override
                        public void flatMap(WaterSensor value, Collector<String> out) throws Exception {
                            if ("sensor_1".equals(value.getId())) {
                                out.collect(value.getVc().toString());
                            } else if ("sensor_2".equals(value.getId())) {
                                out.collect(value.getTs().toString());
                                out.collect(value.getVc().toString());
                            }
                        }
                    }
            ).print();
    
            env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    输出结果:

    image-20231105170456761

    map如何控制一进一出:

    ​ 使用return

    flatmap怎么控制一进多出

    ​ 通过Collector输出,调用几次就输出几条(向下游输送数据)

    2) 聚合算子(Aggregation)

    计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并——这就是所谓的“聚合”(Aggregation),类似于MapReduce 中的reduce操作。

    (1 )按键分区(keyBy)

    在 Flink 中,要做聚合,需要先进行分区;这个操作就是通过 keyBy 来完成的。

    keyBy 是聚合前必须要用到的一个算子。keyBy 通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务。

    通过计算 key 的哈希值(hash code),对分区数进行取模运算来实现的。所以这里 key 如果是 POJO 的话,必须要重写 hashCode()方法。

    id 作为 key 做一个分区操作,代码实现如下:

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStreamSource<WaterSensor> stream = env.fromElements(
                    new WaterSensor("sensor_1", 1L, 1),
                    new WaterSensor("sensor_1", 2L, 2),
                    new WaterSensor("sensor_2", 2L, 2),
                    new WaterSensor("sensor_3", 3L, 3)
            );
    
            KeyedStream<WaterSensor, String> keyedStream = stream.keyBy(WaterSensor::getId);
            keyedStream.print();
            env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    运行结果如下:

    image-20231105174334517

    keyBy:

    1. 返回的是 KeyedStream ,键控流
    2. 不是转换算子,只是对数据进行重分区,不能设置并行度

    keyBy分组 和 分区 的关系:

    1. keyBy对数据分组(相同key的数据被分为一组),同时保证 相同的key的数据 在同一个分区
    2. 分区):一个子任务可以理解为一个分区,一个分区(子任务)中可以存在多个分组(key)

    KeyedStream 是一个非常重要的数据结构,只有基于它才可以做后续的聚合操作(比如 sum,reduce)。

    (2)简单聚合(sum/min/max/minBy/maxBy)

    有了按键分区的数据流 KeyedStream,就可以基于它进行聚合操作了。

    Flink内置实现了一些最基本、最简单的聚合 API,主要有以下几种:

    • sum():在输入流上,对指定的字段做叠加求和的操作。

    • min():在输入流上,对指定的字段求最小值。

    • max():在输入流上,对指定的字段求最大值。

    • minBy():与 min()类似,在输入流上针对指定字段求最小值。不同的是,min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而minBy()则会返回包含字段最小值的整条数据。

    • maxBy():同上

    聚合方法调用时,也需要传入参数,聚合指定的字段。指定字段的方式有两种:指定位置,和指定名称。【指定位置索引,适用于 Tuple类型,POJP不行】

     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          env.setParallelism(1);
    
          DataStreamSource<WaterSensor> stream = env.fromElements(
                  new WaterSensor("sensor_1", 1L, 1),
                  new WaterSensor("sensor_1", 2L, 2),
                  new WaterSensor("sensor_2", 2L, 2),
                  new WaterSensor("sensor_3", 3L, 3)
          );
    
          KeyedStream<WaterSensor, String> keyedStream = stream.keyBy(WaterSensor::getId);
          keyedStream
                  .maxBy("vc") // 指定字段名称
                  .print();
          env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    keyBy和聚合是成对出现的,先分区、后聚合,得到的依然是一个 DataStream。

    一个聚合算子,会为每一个 key 保存一个聚合的值,在Flink 中把它叫作“状态”(state)

    每当有一个新的数据输入,算子就会更新保存的聚合结果,并发送一个带有更新后聚合值的事件到下游算子。对于无界流来说,这些状态是永远不会被清除的。

    使用聚合算子,应该只用在含有有限个 key 的数据流上。

    (3)归约聚合(reduce)

    案例:使用 reduce 实现 max 和 maxBy 的功能。

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.setParallelism(1);
    
            DataStreamSource<WaterSensor> stream = env.fromElements(
                    new WaterSensor("sensor_1", 1L, 1),
                    new WaterSensor("sensor_1", 2L, 2),
                    new WaterSensor("sensor_2", 2L, 2),
                    new WaterSensor("sensor_3", 3L, 3)
            );
    
            KeyedStream<WaterSensor, String> keyedStream = stream.keyBy(WaterSensor::getId);
            keyedStream
                    .reduce(new ReduceFunction<WaterSensor>() {
                        // 同组元素规约处理
                        @Override
                        public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
    
                            System.out.println("value1: " + value1);
                            System.out.println("value2: " + value2);
                            int maxVc = Math.max(value1.getVc(), value2.getVc());
                            if (value1.getVc() > value2.getVc()) {
                                value1.setVc(maxVc);
                                return value1;
                            } else {
                                value2.setVc(maxVc);
                                return value2;
                            }
                        }
                    })
                    .print();
            env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    reduce 输入类型 = 输出类型,类型不变

    每个key的第一条数据来的时候,不会执行reduce方法,存起来,直接输出

    reduce方法中的两个参数

    ​ value1:之前计算结果,有状态

    value2:现在来的数据

    reduce 算子也应该作用在一个有限 key 的流上。

    用户自定义函数(UDF)

    用户自定义函数(user-defined function,UDF),即用户可以根据自身需求,重新实现算子的逻辑。

    用户自定义函数分为:函数类、匿名函数、富函数类。

    1)函数类

    函数类可以传参数更加灵活

    public class FilterIdFunction implements FilterFunction<WaterSensor> {
        
        private final String id;
    
        public FilterIdFunction(String id) {
            this.id = id;
        }
    
        @Override
        public boolean filter(WaterSensor value) throws Exception {
            return id.equals(value.getId());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    2)富函数类(Rich Function Classes)

    “富函数类”也是 DataStream API 提供的一个函数类的接口,所有的Flink 函数类都有其 Rich版本

    富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction 等。

    富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

    • open()方法,是 Rich Function 的初始化方法,每个子任务启动时,调用一次。
    • close()方法,是生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。
      • 如果是Flink程序异常挂掉,不会调用close
      • 正常调用cancel命令,可以close

    代码实现:

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(2);
            env
                    .fromElements(1, 2, 3, 4)
                    .map(new RichMapFunction<Integer, Integer>() {
                        @Override
                        public void open(Configuration parameters) throws Exception {
                            super.open(parameters);
                            System.out.println(" 索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期开始");
                        }
    
                        @Override
                        public Integer map(Integer integer)throws Exception {
                            return integer + 1;
                        }
    
                        @Override
                        public void close() throws Exception {
                            super.close();
                            System.out.println(" 索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期结束");
                        }
                    })
                    .print();
            env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    物理分区算子(Physical Partitioning)

    常见的物理分区策略有:随机分配(Random)轮询分配(Round-Robin)重缩放(Rescale)广播(Broadcast)

    1)随机分区(shuffle)

    将数据随机地分配到下游算子的并行任务中去。可以打乱数据。

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(2);
    
            DataStreamSource<String> stream = env.socketTextStream("124.222.253.33", 7777);
            stream.shuffle().print();
            env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2)轮询分区(Round-Robin)

    雨露均沾,下游算子一个一个来(所有算子)

    stream.rebalance()
    
    • 1

    3)重缩放分区(rescale)

    重缩放分区和轮询分区非常相似。当调用 rescale()方法时,其实底层也是使用Round-Robin 算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中。(部分算子)

    stream.rescale()
    
    • 1

    4)广播(broadcast)

    将输入数据复制并发送到下游算子的所有并行任务中去。

    stream.broadcast()
    
    • 1

    5)全局分区(global)

    一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了 1。

    stream.global()
    
    • 1

    6)自定义分区(Custom)

    Flink 内置所有分区策略都不能满足用户的需求时,可以通过使用partitionCustom()方法来自定义分区策略。

    (1)自定义分区器

    public class MyPartitioner implements Partitioner<String> {
    
        // numPartitions: 子任务数量(分区数量)
        @Override
        public int partition(String key, int numPartitions) {
            return Integer.parseInt(key) % numPartitions;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    (2)使用自定义分区

    public class PartitionCustomDemo {
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(2);
            DataStreamSource<String> socketDS = env.socketTextStream("124.222.253.33", 7777);
            DataStream<String> myDS = socketDS
                    .partitionCustom(
                            new MyPartitioner(),
                            value -> value);
            myDS.print();
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    八种分区器

    Flink提供了8种分区器,7种内置,1种自定义

    image-20231106001644153

    分流

    将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。

    1)简单实现

    使用filter筛选多次,将原始数据流stream复制多份,然后对每一份分别做筛选;不够高效。

    2)侧输出流(process)

    process算子

    调用上下文 ctx 的.output()方法,就可以输出任意类型的数据了。而侧输出流的标记和提取,都离不开一个**“输出标签”(OutputTag)**,指定了侧输出流的 id 和类型。

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            SingleOutputStreamOperator<WaterSensor> ds = env.socketTextStream("124.222.253.33", 7777)
                    .map(new WaterSensorMapFunction());
    		// 定义输出标签
            OutputTag<WaterSensor> s1 = new OutputTag<WaterSensor>("s1", Types.POJO(WaterSensor.class)) {
            };
            OutputTag<WaterSensor> s2 = new OutputTag<WaterSensor>("s2", Types.POJO(WaterSensor.class)) {
            };
    
            // 返回的都是主流
            SingleOutputStreamOperator<WaterSensor> ds1 = ds.process(new ProcessFunction<WaterSensor, WaterSensor>() {
                @Override
                public void processElement(WaterSensor value, ProcessFunction<WaterSensor, WaterSensor>.Context ctx, Collector<WaterSensor> out) throws Exception {
                    if ("s1".equals(value.getId())) {
                        ctx.output(s1, value);
                    } else if ("s2".equals(value.getId())) {
                        ctx.output(s2, value);
                    } else {
                        // 主流
                        out.collect(value);
                    }
                }
            });
    
            // 打印主流
            ds1.print("主流数据");
            // 通过主流获取侧边流
            SideOutputDataStream<WaterSensor> sideOutput1 = ds1.getSideOutput(s1);
            SideOutputDataStream<WaterSensor> sideOutput2 = ds1.getSideOutput(s2);
            sideOutput1.printToErr("测流s1");
            sideOutput2.printToErr("测流s2");
    
            env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    合流

    1)联合(Union)

    联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。(可以将多条流联合在一起)

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            DataStreamSource<Integer> ds1 = env.fromElements(1, 2, 3);
            DataStreamSource<Integer> ds2 = env.fromElements(2, 2, 3);
            DataStreamSource<String> ds3 = env.fromElements("2", "2", "3");
            // ds3 类型不一致,不能联合
            ds1.union(ds2).print();
    
            env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    image-20231105224515470

    2)连接(Connect)

    (1)简单使用

    连接一次只能连接2条流,流的数据类型可以不一样

    代码实现:

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);
    
            DataStreamSource<String> source2 = env.fromElements("a", "b", "c");
            ConnectedStreams<Integer, String> connect = source1.connect(source2);
            /*
              connect:
                  1、一次只能连接 2 条流
                  2、流的数据类型可以不一样
                  3、连接后可以调用 map、flatmap、process 来处理,但是各处理各的
             */
            connect.map(new CoMapFunction<Integer, String, String>() {
                        @Override
                        public String map1(Integer value) throws Exception {
                            return "来源于数字流:" + value.toString();
                        }
    
                        @Override
                        public String map2(String value) throws Exception {
                            return "来源于字母流:" + value;
                        }
                    })
                    .print();
    
            env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    (2)CoProcessFunction

    需求:连接两条流,输出能根据 id 匹配上的数据(类似inner join 效果)

    注意:connectedStreams.keyBy(keySelector1, keySelector2);

         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //        env.setParallelism(2);
            DataStreamSource<Tuple2<Integer, String>> source1 = env.fromElements(
                    Tuple2.of(1, "a1"),
                    Tuple2.of(1, "a2"),
                    Tuple2.of(2, "b"),
                    Tuple2.of(3, "c")
            );
            DataStreamSource<Tuple3<Integer, String, Integer>> source2 = env.fromElements(
                    Tuple3.of(1, "aa1", 1),
                    Tuple3.of(1, "aa2", 2),
                    Tuple3.of(2, "bb", 1),
                    Tuple3.of(3, "cc", 1)
            );
    
            // 定义HashMap , 缓存来过的数据,key=id,value=list<数据>
            Map<Integer, List<Tuple2<Integer, String>>> s1Cache = new HashMap<>();
            Map<Integer, List<Tuple3<Integer, String, Integer>>> s2Cache = new HashMap<>();
            ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connect = source1.connect(source2);
            // 将合流元素,按key分到同一分区才能得到如下结果 ***
            ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connectKey = connect.keyBy(v -> v.f0, v1 -> v1.f0);
            connectKey.process(
                            new CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>() {
                                @Override
                                public void processElement1(Tuple2<Integer, String> value, CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>.Context ctx, Collector<String> out) throws Exception {
                                    Integer id = value.f0;
    
                                    if (!s1Cache.containsKey(id)) {
                                        List<Tuple2<Integer, String>> s1Values = new ArrayList<>();
                                        s1Values.add(value);
                                        s1Cache.put(id, s1Values);
                                    } else {
                                        s1Cache.get(id).add(value);
                                    }
    
                                    if (s2Cache.containsKey(id)) {
                                        for (Tuple3<Integer, String, Integer> s2Element : s2Cache.get(id)) {
                                            out.collect("s1:" + value + "<--------->s2:" + s2Element);
                                        }
                                    }
    
                                }
    
                                @Override
                                public void processElement2
                                        (Tuple3<Integer, String, Integer> value, CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>.
                                                Context ctx, Collector<String> out) throws Exception {
                                    Integer id = value.f0;
                                    if (!s2Cache.containsKey(id)) {
                                        List<Tuple3<Integer, String, Integer>> s2Values = new ArrayList<>();
                                        s2Values.add(value);
                                        s2Cache.put(id, s2Values);
                                    } else {
                                        s2Cache.get(id).add(value);
                                    }
    
                                    if (s1Cache.containsKey(id)) {
                                        for (Tuple2<Integer, String> s1Element : s1Cache.get(id)) {
                                            out.collect("s1:" + s1Element + "<--------->s2:" + value);
                                        }
                                    }
    
                                }
                            }
                    ).
    
                    print();
    
            env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69

    输出算子(Sink)

    1)连接到外部系统

    Flink1.12 以前,Sink 算子的创建是通过调用 DataStream 的.addSink()方法实现的。

    Flink1.12 开始,同样重构了 Sink 架构,使用 .sinkTo()方法实现。

    2)输出到文件

    FileSink 支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用 FileSink 的静态方法:

    • 行编码: FileSink.forRowFormat(basePath,rowEncoder)。
    • 批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)。

    代码实现:

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 每个目录中,都有并行度个数的文件在写入
            env.setParallelism(2);
    
            // 【必须开启】 checkpoint,否则一直都是 .inprogress
            env.enableCheckpointing(2000,
                    CheckpointingMode.EXACTLY_ONCE);
    
            // 数据生成器
            DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
                    new GeneratorFunction<Long, String>() {
                        @Override
                        public String map(Long value) throws Exception {
                            return "Number:" + value;
                        }
                    },
                    Long.MAX_VALUE,
                    RateLimiterStrategy.perSecond(1000),
                    Types.STRING
            );
    
            DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource,
                    WatermarkStrategy.noWatermarks(), "data-generator");
    
            // 输出到文件系统
            FileSink<String> fieSink = FileSink
                    // 输出行式存储的文件,指定路径、指定编码
                    .<String>forRowFormat(new Path("d:/tmp"), new SimpleStringEncoder<>("UTF-8"))
                    // 输出文件的一些配置: 文件名的前缀、后缀
                    .withOutputFileConfig(
                            OutputFileConfig.builder()
                                    .withPartPrefix("li-").withPartSuffix(".log")
                                    .build()
                    )
                    // 按照目录分桶:如下,就是每个小时一个目录
                    .withBucketAssigner(new
                            DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
                    // 文件滚动策略: 1 分钟 或 1m
                    .withRollingPolicy(
                            DefaultRollingPolicy.builder()
                                    .withRolloverInterval(Duration.ofMinutes(1))
                                    .withMaxPartSize(new
                                            MemorySize(1024 * 1024))
                                    .build()
                    )
                    .build();
    
            dataGen.sinkTo(fieSink);
    
            env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    3)输出到 Kafka

    添加 Kafka 连接器依赖。

    代码实现:

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            // 如果是【精准一次,必须开启】 checkpoint
            env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
    
            SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("124.222.253.33", 7777);
    
            /*
              Kafka Sink:
               注意:如果要使用 【精准一次】 写入 Kafka,需要满足以下条件,缺一不可
              1、开启 checkpoint
              2、设置事务前缀
              3、设置事务超时时间: checkpoint 间隔 < 事务超时时间 < max的 15分钟
             */
            KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 指定 kafka 的地址和端口
                    .setBootstrapServers("124.222.253.33:9092")
                    // 指定序列化器:指定 Topic 名称、具体的序列化
                    .setRecordSerializer(
                            KafkaRecordSerializationSchema.<String>builder()
                                    .setTopic("ws")
                                    .setValueSerializationSchema(new SimpleStringSchema())
                                    .build()
                    )
                    // 写到 kafka 的一致性级别: 精准一次、至少一次
                    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                    // 如果是精准一次,必须设置 事务的前缀
                    .setTransactionalIdPrefix("li-")
                    // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15 分钟
                    .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
                    .build();
            sensorDS.sinkTo(kafkaSink);
            env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    自定义序列化器,实现带 key 的 record:

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
            env.setRestartStrategy(RestartStrategies.noRestart());
            SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("124.222.253.33", 7777);
            
            /*
             如果要指定写入 kafka 的 key,可以自定义序列化器:
              1、实现 一个接口,重写 序列化 方法
              2、指定 key,转成 字节数组
              3、指定 value,转成 字节数组
              4、返回一个 ProducerRecord 对象,把 key、value 放进去
             */
            KafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("124.222.253.33:9092")
                    .setRecordSerializer(
                            new KafkaRecordSerializationSchema<String>() {
                                @Override
                                public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
                                    String[] datas = element.split(",");
                                    byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
                                    byte[] value = element.getBytes(StandardCharsets.UTF_8);
                                    return new ProducerRecord<>("ws", key, value);
                                }
                            }
                    )
                    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                    .setTransactionalIdPrefix("li-")
                    .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
                    .build();
            sensorDS.sinkTo(kafkaSink);
            env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    4)输出到 MySQL(JDBC)

    (1)添加 MySQL 驱动

            <dependency>
                <groupId>mysqlgroupId>
                <artifactId>mysql-connector-javaartifactId>
                <version>8.0.27version>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    (2)jdbc连接器

            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-connector-jdbcartifactId>
                <version>3.1.0-1.17version>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    官网案例:

    public class JdbcSinkExample {
    
        static class Book {
            public Book(Long id, String title, String authors, Integer year) {
                this.id = id;
                this.title = title;
                this.authors = authors;
                this.year = year;
            }
    
            final Long id;
            final String title;
            final String authors;
            final Integer year;
        }
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            /*
                写入 mysql
                1、只能用老的 sink 写法: addSink
                2、JDBCSink 的 4 个参数:
                    第一个参数: 执行的 sql,一般就是 insert into
                    第二个参数: 预编译 sql, 对占位符填充值
                    第三个参数: 执行选项 ---》 攒批、重试
                    第四个参数: 连接选项 ---》 url、用户名、密码
             */
            env.fromElements(
                    new Book(101L, "Stream Processing with Apache Flink", "Fabian Hueske, Vasiliki Kalavri", 2019),
                    new Book(102L, "Streaming Systems", "Tyler Akidau, Slava Chernyak, Reuven Lax", 2018),
                    new Book(103L, "Designing Data-Intensive Applications", "Martin Kleppmann", 2017),
                    new Book(104L, "Kafka: The Definitive Guide", "Gwen Shapira, Neha Narkhede, Todd Palino", 2017)
            ).addSink(
                    JdbcSink.sink(
                            "insert into books (id, title, authors, year) values (?, ?, ?, ?)",
                            (statement, book) -> {
                                statement.setLong(1, book.id);
                                statement.setString(2, book.title);
                                statement.setString(3, book.authors);
                                statement.setInt(4, book.year);
                            },
                            JdbcExecutionOptions.builder()
                                    .withBatchSize(1000) // 批次的大小:条数
                                    .withBatchIntervalMs(200) // 批次的时间
                                    .withMaxRetries(5) // 重试次数
                                    .build(),
                            new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                                    .withUrl("jdbc:mysql://localhost:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF8")
                                    .withUsername("root")
                                    .withPassword("root")
                                    .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
                                    .build()
                    ));
    
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    5)自定义Sink输出

    实现RichSinkDunction 抽象类,自定义逻辑比较麻烦,不建议。

  • 相关阅读:
    【Unity细节】Json序列化时出现:An item with the same key has already been added. Key:
    du命令:显示文件或者目录的大小
    【Ruoyi管理后台】用户登录强制修改密码
    全球最受欢迎低代码平台排行榜出炉
    asp.net高校留学生信息管理系统VS开发sqlserver数据库web结构c#编程Microsoft Visual Studio
    【CSP-J/S初赛知识点整理】
    flink-sql所有语法详解-1.15
    docker和Helm Chart的基本命令和操作
    C 语言宏 + 内联汇编实现 MIPS 系统调用
    单元测试的重要性
  • 原文地址:https://blog.csdn.net/qq_43417581/article/details/134257712