• 52、Flink 使用 Parametertool 获取应用参数代码示例


    1、获取配置参数-1

    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.io.IOException;
    import java.util.Map;
    
    public class _01_ParameterToolReadArgs {
        public static void main(String[] args) throws IOException {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 配置值来自 .properties 文件
    //        String propertiesFilePath = "*";
    //        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(propertiesFilePath);
    
    //        File file = new File("*");
    //        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(file);
    
    //        FileInputStream fileInputStream = new FileInputStream(new File("*"));
    //        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fileInputStream);
    
            // 配置值来自命令行
            // 输入
            // --input hdfs:///mydata
            // --elements 42
            // 输出
            // input=hdfs:///mydata
            // elements=42
    //        ParameterTool parameterTool = ParameterTool.fromArgs(args);
    
            // 配置值来自系统属性,VmOptions
            // 输入
            // -Dinput=hdfs:///mydata
            // 输出
            // input=hdfs:///mydata
            ParameterTool parameterTool = ParameterTool.fromSystemProperties();
    
            for (Map.Entry<String, String> entry : parameterTool.toMap().entrySet()) {
                if ("input".equals(entry.getKey())) {
                    System.out.println(entry.getKey() + "=" + entry.getValue());
                }
            }
        }
    }
    
    

    2、获取配置参数-2

    import org.apache.flink.api.common.functions.RichMapFunction;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.util.Map;
    
    /**
     * 输入 --input myGlobalParamsInput
     * 输出 myGlobalParamsInput
     */
    public class _02_ParameterToolGlobalParams {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            ParameterTool parameters = ParameterTool.fromArgs(args);
            env.getConfig().setGlobalJobParameters(parameters);
    
            DataStreamSource<Integer> source = env.fromData(1, 2, 3);
            source.map(new MyRichMapFunc()).print();
    
            env.execute();
        }
    }
    
    class MyRichMapFunc extends RichMapFunction<Integer, String> {
    
        @Override
        public String map(Integer value) throws Exception {
            Map<String, String> globalJobParameters = getRuntimeContext().getGlobalJobParameters();
            return globalJobParameters.get("input");
        }
    }
    
  • 相关阅读:
    利用Redis来实现分布式锁
    k8s之Deployment
    图解 LeetCode 算法汇总——二分查找
    3i平台体验性能加持,13600KF+B760M+撼与科技A770 TITAN装机体验
    如何进行自动化测试,提高测试效率?
    光源基础(1)——常见光源性能比对和好图像评价指标
    Win:使用 netsh 命令配置 Port Forwarding
    提升B端图表设计技能:教程分享
    SparkSQL项目实战
    PIE:1979-2018年中国气温数据产品(空间分辨率为0.1º)
  • 原文地址:https://blog.csdn.net/m0_50186249/article/details/140058323