• Flink从入门到放弃—Stream API—常用算子(reduce)


    导航

    若需要,请前往
    (十)Flink Datastream API 编程指南 算子-1 (转换算子、物理分区、任务链、资源组 、算子和作业)等基本介绍
    Flink从入门到放弃—Stream API—常用算子(filter 和 keyBy)
    更多代码使用案例

    本章介绍算子

    • reduce

    KeyedStream → DataStream
    对键控数据流的“滚动”统计或者计算。将当前元素与最后减少的值合并并发出新值。

    其实reduce也分两种情况(当然我没说富函数的两种情况):

    • 普通reduce
    • 窗口reduce

    reduce

    先上用户代码

    package com.stream.samples;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    
    /**
     * @author DeveloperZJQ
     * @since 2022/11/13
     */
    public class CustomReduceOperator {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> source = env.socketTextStream("192.168.112.147", 7777);
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> map = source.map(
                    (MapFunction<String, Tuple2<String, Integer>>)
                            value -> Tuple2.of(value, value.length())).returns(Types.TUPLE(Types.STRING, Types.INT));
    
    
            // map-> keyBy-> reduce-> print
            SingleOutputStreamOperator<Tuple2<String, Integer>> reduce = map.keyBy(value -> value.f0)
                    .reduce((v1, v2) -> Tuple2.of(v1.f0, v1.f1 + v2.f1));
    
            // map-> keyBy-> window-> reduce-> print
            SingleOutputStreamOperator<Tuple2<String, Integer>> reduceWindow = map.keyBy((in) -> in, Types.TUPLE(Types.STRING, Types.INT))
                    .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                    .reduce((v1, v2) -> Tuple2.of(v1.f0, v1.f1 + v2.f1));
    
    
            reduce.print("reduce->");
            reduceWindow.print("reduceWindow->");
    
            env.execute(CustomReduceOperator.class.getSimpleName());
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    源码

    普通reduce算子

    public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reducer) {
    // 看着名字, Keyed Reduce 是不是顿时知道了为啥经常和keyBy联用了,可以看到reduce算子也是在KeyedStream类里
            ReduceTransformation<T, KEY> reduce = new ReduceTransformation("Keyed Reduce", this.environment.getParallelism(), this.transformation, (ReduceFunction)this.clean(reducer), this.keySelector, this.getKeyType());
            // 这里直接就添加到List>了
            this.getExecutionEnvironment().addOperator(reduce);
            return new SingleOutputStreamOperator(this.getExecutionEnvironment(), reduce);
        }
    
    @Public
    @FunctionalInterface
    public interface ReduceFunction<T> extends Function, Serializable {
       // 返回泛型,输入的和输出的是一样的,适用于聚合和统计计算
        T reduce(T value1, T value2) throws Exception;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    基本上就是这样,基于Keyed Stream的算子。

    window reduce算子

    public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function) {
    // 不匹配抛异常
            if (function instanceof RichFunction) {
                throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction. Please use reduce(ReduceFunction, WindowFunction) instead.");
            } else {
                function = (ReduceFunction)this.input.getExecutionEnvironment().clean(function);
                return this.reduce(function, (WindowFunction)(new PassThroughWindowFunction()));
            }
        }
    // Window函数
    @Internal
    public class PassThroughWindowFunction<K, W extends Window, T> implements WindowFunction<T, T, K, W> {
        private static final long serialVersionUID = 1L;
    
        public PassThroughWindowFunction() {
        }
    // 窗口内数据
        public void apply(K k, W window, Iterable<T> input, Collector<T> out) throws Exception {
            Iterator var5 = input.iterator();
    // 窗口中数据遍历并且收集
            while(var5.hasNext()) {
                T in = var5.next();
                out.collect(in);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    可以看到很明显和上面普通reduce算子的区别。
    这时作用于reduce的类是一个WindowedStream,基于WindowedStream的操作,每次reduce的操作都是针对同一个窗口内同一个key的数据进行计算,每个窗口计算完成后,才会把数据发出来。

    
     public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) {
            TypeInformation<T> inType = this.input.getType();
            // 获取窗口函数返回类型
            TypeInformation<R> resultType = getWindowFunctionReturnType(function, inType);
    // 继续重载方法
            return this.reduce(reduceFunction, function, resultType);
        }
    
    public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
            function = (WindowFunction)this.input.getExecutionEnvironment().clean(function);
            reduceFunction = (ReduceFunction)this.input.getExecutionEnvironment().clean(reduceFunction);
            String opName = this.builder.generateOperatorName();
            String opDescription = this.builder.generateOperatorDescription(reduceFunction, function);
    // WindowOperatorBuilder reduce方法
            OneInputStreamOperator<T, R> operator = this.builder.reduce(reduceFunction, function);
            // 返回
            return this.input.transform(opName, resultType, operator).setDescription(opDescription);
        }
    
    
    public <R> WindowOperator<K, T, ?, R, W> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) {
    // 判断不能为空
            Preconditions.checkNotNull(reduceFunction, "ReduceFunction cannot be null");
            Preconditions.checkNotNull(function, "WindowFunction cannot be null");
            // 抛异常
            if (reduceFunction instanceof RichFunction) {
                throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction.");
            } else if (this.evictor != null) {
            // 判断剔除器不为空
                return this.buildEvictingWindowOperator(new InternalIterableWindowFunction(new ReduceApplyWindowFunction(reduceFunction, function)));
            } else {
            // 这里能看到operator状态
                ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor("window-contents", reduceFunction, this.inputType.createSerializer(this.config));
                return this.buildWindowOperator(stateDesc, new InternalSingleValueWindowFunction(function));
            }
        }
        
        // 这个时候进到WindowOperator类了,重点看processElement方法,这个时候会初始化构造器
    private <ACC, R> WindowOperator<K, T, ACC, R, W> buildWindowOperator(StateDescriptor<? extends AppendingState<T, ACC>, ?> stateDesc, InternalWindowFunction<ACC, R, K, W> function) {
            return new WindowOperator(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.config), this.keySelector, this.keyType.createSerializer(this.config), stateDesc, function, this.trigger, this.allowedLateness, this.lateDataOutputTag);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    最后,看到和前面介绍的算子处理逻辑是一样的。

    this.input.transform(opName, resultType, operator).setDescription(opDescription);
    
    @PublicEvolving
        public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
            return this.doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
        }
    
    protected <R> SingleOutputStreamOperator<R> doTransform(String operatorName, TypeInformation<R> outTypeInfo, StreamOperatorFactory<R> operatorFactory) {
            this.transformation.getOutputType();
            OneInputTransformation<T, R> resultTransform = new OneInputTransformation(this.transformation, operatorName, operatorFactory, outTypeInfo, this.environment.getParallelism());
            SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(this.environment, resultTransform);
            this.getExecutionEnvironment().addOperator(resultTransform);
            return returnStream;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    结语

    1、keyBy() -> reduce 算子组合,会计算历史以来所有数据的和,每过来一条数据,就输出一次结果;
    2、window()->reduce 算子组合,计算的是window窗口内的数据和(也就是一定时间内的数据统计),每次窗口触发的时候,才会输出一次结果。

  • 相关阅读:
    Mac pt-online-schema-change 图文并茂、不锁表在线修改 MySQL 表结构
    vue使日历组件点击时间渲染到时间输入框
    软件测试100天上岸2-测试必须有策略
    NC49 最长的括号子串
    Credo推出业界首款单片集成CMOS VCSEL驱动器的800G光DSP芯片
    Opencv
    JSP企业内部交流系统myeclipse开发mysql数据库bs框架java编程jdbc
    Scala基础教程--13--函数进阶
    抚琴成一快-しゃぼん玉
    rk3568适配温控风扇
  • 原文地址:https://blog.csdn.net/u010772882/article/details/127833599