• 22.flink参数工具 ParameterTool


    摘要

    参数很重要,有时候参数被配置在配置文件中, 有时候我们可能会通过控制台传递参数,可以说参数很多地方都会用到,基于此,flink提供了一个工具可以很方便的读取参数,可以通过配置文件,也可以通过控制台,甚至我们可以把参数广播到全局,乃至于每个算子内部都可以访问到此参数。

    1.通过读取.properties配置文件

    在这里插入图片描述

     ParameterTool parameter = ParameterTool.fromPropertiesFile("..../my.properties");
     System.out.println(parameter.get("age"));
     输出结果为:20
    
    • 1
    • 2
    • 3

    2.从命令行获取

    –input 张三 --input 李四 --my 20

    ParameterTool parameter = ParameterTool.fromArgs(args);
    System.out.println(parameter.get("input"));//输出:李四
    System.out.println(parameter.get("my"));//输出:20
    
    • 1
    • 2
    • 3

    3.从系统属性中获取

    “ -Dkey=value ” 可以向java程序传递系统变量,我们可以通过flink的工具获取这部分参数。flink官网有这么一句话。When starting a JVM, you can pass system properties to it: -Dinput=hdfs:///mydata. You can also initialize the ParameterTool from these system properties:
    下面我们借助idea模拟下:
    在这里插入图片描述

     ParameterTool parameter2 = ParameterTool.fromSystemProperties();
     System.out.println(parameter2.get("hdfs"));
     结果为:128.196.234.45:9000/pg
    
    • 1
    • 2
    • 3

    4.在算子内部获取参数

    因为ParameterTool 是可序列化的,因此可以在算子内部任意传递的,每个算子都可以,比如下面的例子。我们通过自定义flatMap,在其构造函数内传递ParamterTool.
    在这里插入图片描述

    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    public class MainProperties {
        public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    ParameterTool parameter = ParameterTool.fromArgs(args);
    DataStreamSource source = env.fromElements("name","age");
    source.flatMap(new MyFlatMap(parameter)).print();
    env.execute("test ParameterTool.");
        }
    }
    class MyFlatMap implements FlatMapFunction{
       public  ParameterTool parameters;
    
        public MyFlatMap(ParameterTool parameters) {
            this.parameters = parameters;
        }
    
        @Override
        public void flatMap(String value, Collector out) throws Exception {
            if(value.equals("name")){
                value = value+"="+parameters.get("name");
            }
            if (value.equals("age")){
                value = value+"="+parameters.get("age");
            }
            out.collect(value);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    结果为:
    name=张三
    age=33

    5.设置为全局参数,可在程序的任何地方访问

    第四节讲了将参数传递给算子,但是那种写法需要借助构造函数,不是很方便,所以还有一种更屌的,那就是全局设置, 一键访问。但是此方式只适用于rich函数。因为全局设置的只能用getRuntimeContext访问到,而普通函数无法访问getRuntimeContext,只有rich函数可以调用getRuntimeContext方法

    
    import org.apache.flink.api.common.functions.RichFlatMapFunction;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    public class GlobalMainProperties {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            final ParameterTool parameter = ParameterTool.fromArgs(args);
            env.getConfig().setGlobalJobParameters(parameter);
            DataStreamSource source = env.fromElements("name", "age");
            source.flatMap(new MyFlatMapFunction()).print();
            env.execute("test ParameterTool.");
    
    
        }
    }
    
    class MyFlatMapFunction extends RichFlatMapFunction {
        @Override
        public void flatMap(String value, Collector out)  {
            ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
            if (value.equals("name")) {
                value = value + "=" + parameters.get("name");
            }
            if (value.equals("age")) {
                value = value + "=" + parameters.get("age");
            }
            out.collect(value);
    //    }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
  • 相关阅读:
    【App自动化测试】(三)使用Appium进行自动化用例录制
    Ubuntu 20.04.05安装ceres-1.14.0
    第8章 Spring(二)
    线程安全问题
    Umi 相关
    Strus2 系列漏洞
    java——注释与空行
    【网页设计】期末大作业html+css(音乐网站)
    2022年中总结关键词:裁员、年终奖、晋升、涨薪、疫情
    Unity 编辑器资源导入处理函数 OnPreprocessTexture:深入解析与实用案例
  • 原文地址:https://blog.csdn.net/qq_36066039/article/details/126431001