• Flink从入门到放弃—Stream API—常用算子(map和flatMap)


    导航

    观看本文章之前,请先看之前文章
    (十)Flink Datastream API 编程指南 算子-1 (转换算子、物理分区、任务链、资源组 、算子和作业)等基本介绍

    常用算子基本使用

    效果

    本次主要是阅读几个常用算子的源码。

    阅读算子列表

    • map
    • flatmap
    • keyby
    • filter
    • reduce
    • window
    • windowAll
    • 其它

    Map()

    首先说一下这个算子是one to one的,通俗的讲就是进一条数据 经过逻辑处理后 必出一条数据。

    用户代码

    package com.stream.samples;
    
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    /**
     * @author DeveoperZJQ
     * @since 2022/11/12
     */
    public class MapOperator {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 消费socket数据源
            DataStreamSource<String> dataStream = env.socketTextStream("192.168.112.147", 7777);
    
    // map => String::length 是传入的虚函数map的逻辑
            SingleOutputStreamOperator<Integer> map = dataStream.map(String::length);
    
    // print输出
            map.print();
    
            env.execute(MapOperator.class.getSimpleName());
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    上面的逻辑非常简单,目的只是为了追踪map的入口,你可以使用debug模式,并且在map上打上断点,可以一层一层的往下看。

    两种变形

    public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
    // 根据传入函数获取返回值类型
           TypeInformation<R> outType = TypeExtractor.getMapReturnTypes((MapFunction)this.clean(mapper), this.getType(), Utils.getCallLocationName(), true);
           // 调用下面的map方法
           return this.map(mapper, outType);
       }
    
    // 带返回的输出类型参数,可以看到传入的operator默认名字就叫Map
       public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) {
           return this.transform("Map", outputType, (OneInputStreamOperator)(new StreamMap((MapFunction)this.clean(mapper))));
       }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    在这里插入图片描述
    上面两个重载方法刚好对应上面截图中的两个方法。

     @PublicEvolving
        public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
        // 抽象工厂
            return this.doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    上面代码有一个抽象工厂的简单 算子工厂类,传入的是OneInputStreamOperator类型的算子,然后把用户代码的operator转换成了StreamOperatorFactory operatorFactory, 工厂类是真的多啊,如果设计模式关于工厂方法和抽象工厂没学牢固的同学,可以来这里细品。

    // protected 只能包能调用
    protected <R> SingleOutputStreamOperator<R> doTransform(String operatorName, TypeInformation<R> outTypeInfo, StreamOperatorFactory<R> operatorFactory) {
    // 我理解这里调用这个就是为了抛出异常的返回类型,如果可用,typeUsed则标识为true
            this.transformation.getOutputType();
            // 构建OneInputTransformation
            OneInputTransformation<T, R> resultTransform = new OneInputTransformation(this.transformation, operatorName, operatorFactory, outTypeInfo, this.environment.getParallelism());
            SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(this.environment, resultTransform);
            // 这里讲用户逻辑封装到resultTransform,全部装填到List> 为构建流图StreamGraph做准备
            this.getExecutionEnvironment().addOperator(resultTransform);
            return returnStream;
        }
    
    // 添加算子到Transformations<>集合中
    @Internal
        public void addOperator(Transformation<?> transformation) {
            Preconditions.checkNotNull(transformation, "transformation must not be null.");
            this.transformations.add(transformation);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    env.execute(MapOperator.class.getSimpleName());这里开始执行,物理执行图部署成功,这个就开始了消费数据。

    当然通过上面说的,你可以尝试着自定义map算子或者其它算子,只要使用transform(String operatorName, TypeInformation outTypeInfo, OneInputStreamOperator operator)得当,就会实现你的功能,但是一般不建议入门选手这样做。

    这里面,咱们还看到很多使用clean() 方法的地方,后面章节会拿出一章说明这个。

    flatMap()

    可以看出,flatMap的源码和map非常类似。有一个地方值得咱们看一下,就是传入的函数和map是不同 ,flatmap可以一进多出,一进不出,但是map做不到,单纯从算子性能上说flatMap的开销是要比map大的。

    来看一下下面两者的区别:new StreamMap((MapFunction) vs new StreamFlatMap((FlatMapFunction)

    StreamMap

    public StreamMap(MapFunction<IN, OUT> mapper) {
            super(mapper);
            this.chainingStrategy = ChainingStrategy.ALWAYS;
        }
    
    • 1
    • 2
    • 3
    • 4

    单从这个各自的构造器上看不出什么,传入的函数不一样,可以点击进去

    @Public
    @FunctionalInterface
    public interface MapFunction<T, O> extends Function, Serializable {
        O map(T value) throws Exception;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    map方法是有返回值的。

    再来看下具体处理方法:

    public void processElement(StreamRecord<IN> element) throws Exception {
            this.output.collect(element.replace(((MapFunction)this.userFunction).map(element.getValue())));
        }
    
    • 1
    • 2
    • 3

    从上面可以看出map直接把数据通过collect回收了,等于没有对用户暴露,那么flatMatp呢?看下面的源码。

    StreamFlatMap

    public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
            super(flatMapper);
            this.chainingStrategy = ChainingStrategy.ALWAYS;
        }
    
    • 1
    • 2
    • 3
    • 4

    点击进去,是下面的接口类,里面有一个flatMap方法,继承的类也是一样的。

    @Public
    @FunctionalInterface
    public interface FlatMapFunction<T, O> extends Function, Serializable {
    
        void flatMap(T value, Collector<O> out) throws Exception;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    flatMap没有返回值,它是利用Collector out返回数据的。

    再来看下具体处理方法:

    public void open() throws Exception {
            super.open();
            this.collector = new TimestampedCollector(this.output);
        }
    
     public void processElement(StreamRecord<IN> element) throws Exception {
            this.collector.setTimestamp(element);
            ((FlatMapFunction)this.userFunction).flatMap(element.getValue(), this.collector);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    把this.collector作为参数封装到flatMap里了,用户可以通过collector进行操作。

    那么现在是不是明白了,为啥map是一对一,flatMap是一对多和一对0了吧。

    结语

    本文章是系列文章中的一篇,如果有错误的地方,欢迎批评指正!欢迎同行交流!

  • 相关阅读:
    APMCM历年题目汇总
    基于粒子群优化算法的微型燃气轮机冷热电联供系统优化调度(Matlab代码实现)
    C#中的数组探究与学习
    基于matlab创建基于物理统计的雷达模型(附源码)
    X DevAPI--C++ mysql数据库连接池
    网络编程开发及实战(上)
    如何在本地搭建Oracle数据库实现公网环境下通过PLSQL工具进行远程访问
    华为OD 整数最小和(100分)【java】A卷+B卷
    JSD-2204-创建Spring项目-Day19
    docker之dockerFile
  • 原文地址:https://blog.csdn.net/u010772882/article/details/127822520