1、获取配置参数-1
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.io.IOException;
import java.util.Map;
public class _01_ParameterToolReadArgs {
public static void main(String[] args) throws IOException {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 配置值来自 .properties 文件
// String propertiesFilePath = "*";
// ParameterTool parameterTool = ParameterTool.fromPropertiesFile(propertiesFilePath);
// File file = new File("*");
// ParameterTool parameterTool = ParameterTool.fromPropertiesFile(file);
// FileInputStream fileInputStream = new FileInputStream(new File("*"));
// ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fileInputStream);
// 配置值来自命令行
// 输入
// --input hdfs:///mydata
// --elements 42
// 输出
// input=hdfs:///mydata
// elements=42
// ParameterTool parameterTool = ParameterTool.fromArgs(args);
// 配置值来自系统属性,VmOptions
// 输入
// -Dinput=hdfs:///mydata
// 输出
// input=hdfs:///mydata
ParameterTool parameterTool = ParameterTool.fromSystemProperties();
for (Map.Entry<String, String> entry : parameterTool.toMap().entrySet()) {
if ("input".equals(entry.getKey())) {
System.out.println(entry.getKey() + "=" + entry.getValue());
}
}
}
}
2、获取配置参数-2
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Map;
/**
* 输入 --input myGlobalParamsInput
* 输出 myGlobalParamsInput
*/
public class _02_ParameterToolGlobalParams {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameters = ParameterTool.fromArgs(args);
env.getConfig().setGlobalJobParameters(parameters);
DataStreamSource<Integer> source = env.fromData(1, 2, 3);
source.map(new MyRichMapFunc()).print();
env.execute();
}
}
class MyRichMapFunc extends RichMapFunction<Integer, String> {
@Override
public String map(Integer value) throws Exception {
Map<String, String> globalJobParameters = getRuntimeContext().getGlobalJobParameters();
return globalJobParameters.get("input");
}
}