• 大数据(9f)Flink富函数RichFunction


    1、概述

    Rich Function,译名富函数,和普通函数相比,多了:
    生命周期( openclose方法)
    获取函数的运行时上下文( getRuntimeContext方法)
    本文版本
    Flink:1.14.6
    Java:1.8
    Scala:2.12

    2、示例

    2.1、普通函数

    MapFunction接口 继承了 Function接口

    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    public class H1 {
        public static void main(String[] args) throws Exception {
            //创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 设置并行度
            env.setParallelism(1);
            //获取数据源
            DataStreamSource<Integer> dss = env.fromElements(1, 2, 3);
            //普通函数
            dss.map(new MapFunction<Integer, Integer>() {
                @Override
                public Integer map(Integer i) {
                    return i * i;
                }
            }).print();
            //执行
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    测试结果

    2.2、富函数

    RichMapFunction抽象类 继承了 AbstractRichFunction抽象类
    AbstractRichFunction抽象类 实现了 RichFunction接口

    import org.apache.flink.api.common.functions.RichMapFunction;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    public class H1 {
        public static void main(String[] args) throws Exception {
            //创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 设置并行度
            env.setParallelism(1);
            //获取数据源
            DataStreamSource<Integer> dss = env.fromElements(1, 2, 3);
            //普通函数
            dss.map(new RichMapFunction<Integer, Integer>() {
                @Override
                public void open(Configuration parameters) {
                    System.out.println("生命周期开始");
                }
    
                @Override
                public void close() {
                    System.out.println("生命周期结束");
                }
    
                @Override
                public Integer map(Integer i) {
                    return i * i;
                }
            }).print();
            //执行
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    测试结果

    2.2.1、获取富函数的运行时上下文

    import org.apache.flink.api.common.functions.RichMapFunction;
    import org.apache.flink.api.common.functions.RuntimeContext;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    public class H1 {
        public static void main(String[] args) throws Exception {
            //创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 设置并行度
            env.setParallelism(2);
            //获取数据源
            DataStreamSource<Integer> dss = env.fromElements(1, 2, 3);
            //普通函数
            dss.map(new RichMapFunction<Integer, Integer>() {
                @Override
                public void open(Configuration parameters) {
                    System.out.println("生命周期开始");
                    //获取运行时上下文
                    RuntimeContext context = getRuntimeContext();
                    System.out.println("子任务索引:" + context.getIndexOfThisSubtask());
                }
    
                @Override
                public void close() {
                    System.out.println("生命周期结束");
                }
    
                @Override
                public Integer map(Integer i) {
                    return i * i;
                }
            }).print();
            //执行
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    并行度设置为2,测试结果

    3、源码截取

    3.1、RichFunction

    package org.apache.flink.api.common.functions;
    
    import org.apache.flink.annotation.Public;
    import org.apache.flink.configuration.Configuration;
    
    @Public
    public interface RichFunction extends Function {
        /** 函数的生命周期 */
        void open(Configuration parameters) throws Exception;
    
        void close() throws Exception;
    
        /** 获取函数运行时上下文对象,对象信息包含:并行度、作业ID、任务名、子任务索引… */
        RuntimeContext getRuntimeContext();
    
        /** 设置函数的运行时上下文。在创建函数的并行实例时,此方法被框架调用 */
        void setRuntimeContext(RuntimeContext t);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    3.2、RuntimeContext

    /**
     * RuntimeContext 包含 函数的运行时上下文信息
     * 函数的每个并行实例都有1个context对象,通过访问对象,可获取 静态信息、累加器、广播变量、状态
     */
    @Public
    public interface RuntimeContext {
    
        JobID getJobId();
    
        String getTaskName();
    
        int getIndexOfThisSubtask();
    
        int getAttemptNumber();
    
        String getTaskNameWithSubtasks();
    
        // ------------------------------------ 累加器 -------------------------------------------
    
        <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator);
    
        <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name);
    
        @PublicEvolving
        IntCounter getIntCounter(String name);
    
        @PublicEvolving
        LongCounter getLongCounter(String name);
    
        @PublicEvolving
        DoubleCounter getDoubleCounter(String name);
    
        @PublicEvolving
        Histogram getHistogram(String name);
    
        // ---------------------------------- 广播变量 -------------------------------------------
    
        @PublicEvolving
        boolean hasBroadcastVariable(String name);
    
        <RT> List<RT> getBroadcastVariable(String name);
    
        <T, C> C getBroadcastVariableWithInitializer(
                String name, BroadcastVariableInitializer<T, C> initializer);
    
        // -------------------------- 访问【状态】的方法 --------------------------------
    
        @PublicEvolving
        <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties);
    
        @PublicEvolving
        <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties);
    
        @PublicEvolving
        <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties);
    
        @PublicEvolving
        <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(
                AggregatingStateDescriptor<IN, ACC, OUT> stateProperties);
    
        @PublicEvolving
        <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
  • 相关阅读:
    arthars在线诊断
    javaweb JavaScript快速入门 对象 BOM DOM 事件监听
    RocketMQ特性--Broker是如何存储事务消息的?
    对卡巴斯基发现的一个将shellcode写入evenlog的植入物的复现
    【RuoYi-Vue-Plus】登陆逻辑的实现
    缓解缓存击穿的大杀器之---singleflight深入浅出
    【mcuclub】温度传感器DS18B20
    上新啦!请查收云原生虚拟数仓 PieCloudDB 十月动态
    酒水商城|基于Springboot实现酒水商城系统
    让你的对象变得拗口:JSON.stringify(),我把对象夹进了 JSON 魔法帽!
  • 原文地址:https://blog.csdn.net/Yellow_python/article/details/127922784