参数很重要,有时候参数被配置在配置文件中, 有时候我们可能会通过控制台传递参数,可以说参数很多地方都会用到,基于此,flink提供了一个工具可以很方便的读取参数,可以通过配置文件,也可以通过控制台,甚至我们可以把参数广播到全局,乃至于每个算子内部都可以访问到此参数。
ParameterTool parameter = ParameterTool.fromPropertiesFile("..../my.properties");
System.out.println(parameter.get("age"));
输出结果为:20
–input 张三 --input 李四 --my 20
ParameterTool parameter = ParameterTool.fromArgs(args);
System.out.println(parameter.get("input"));//输出:李四
System.out.println(parameter.get("my"));//输出:20
“ -Dkey=value ” 可以向java程序传递系统变量,我们可以通过flink的工具获取这部分参数。flink官网有这么一句话。When starting a JVM, you can pass system properties to it: -Dinput=hdfs:///mydata. You can also initialize the ParameterTool from these system properties:
下面我们借助idea模拟下:
ParameterTool parameter2 = ParameterTool.fromSystemProperties();
System.out.println(parameter2.get("hdfs"));
结果为:128.196.234.45:9000/pg
因为ParameterTool 是可序列化的,因此可以在算子内部任意传递的,每个算子都可以,比如下面的例子。我们通过自定义flatMap,在其构造函数内传递ParamterTool.
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class MainProperties {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameter = ParameterTool.fromArgs(args);
DataStreamSource source = env.fromElements("name","age");
source.flatMap(new MyFlatMap(parameter)).print();
env.execute("test ParameterTool.");
}
}
class MyFlatMap implements FlatMapFunction{
public ParameterTool parameters;
public MyFlatMap(ParameterTool parameters) {
this.parameters = parameters;
}
@Override
public void flatMap(String value, Collector out) throws Exception {
if(value.equals("name")){
value = value+"="+parameters.get("name");
}
if (value.equals("age")){
value = value+"="+parameters.get("age");
}
out.collect(value);
}
}
结果为:
name=张三
age=33
第四节讲了将参数传递给算子,但是那种写法需要借助构造函数,不是很方便,所以还有一种更屌的,那就是全局设置, 一键访问。但是此方式只适用于rich函数。因为全局设置的只能用getRuntimeContext访问到,而普通函数无法访问getRuntimeContext,只有rich函数可以调用getRuntimeContext方法
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class GlobalMainProperties {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final ParameterTool parameter = ParameterTool.fromArgs(args);
env.getConfig().setGlobalJobParameters(parameter);
DataStreamSource source = env.fromElements("name", "age");
source.flatMap(new MyFlatMapFunction()).print();
env.execute("test ParameterTool.");
}
}
class MyFlatMapFunction extends RichFlatMapFunction {
@Override
public void flatMap(String value, Collector out) {
ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
if (value.equals("name")) {
value = value + "=" + parameters.get("name");
}
if (value.equals("age")) {
value = value + "=" + parameters.get("age");
}
out.collect(value);
// }
}