本系列是针对 Flink 源码进行的一系列探究过程,旨在通过 Flink 源码全面地、详细地了解 Flink 原理
写过 Flink 程序的朋友都知道,Flink 程序的第一行代码就是创建可执行环境,如下:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
根据 getExecutionEnvironment 可以自动获取可执行环境,那么具体过程究竟是怎样的呢?
接下来我们从 StreamExecutionEnvironment 说起
StreamExecutionEnvironment.java
public static StreamExecutionEnvironment getExecutionEnvironment() {
return getExecutionEnvironment(new Configuration());
}
StreamExecutionEnvironment 方法最终还是调用 getExecutionEnvironment(Configuration configuration) 方法
其中,新建的类 Configuration 用于获取 Flink 的键值对配置
接下来我们来看 getExecutionEnvironment(Configuration configuration) 方法
public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
// 检查当前上下文是否存在可用的 EnvironmentFactory
return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
// 若当前上下文存在可用的 EnvironmentFactory,则基于该工厂类创建 ExecutionEnvironment
.map(factory -> factory.createExecutionEnvironment(configuration))
// 若工厂类未能创建 ExecutionEnvironment,则调用 createLocalEnvironment(configuration) 创建 LocalStreamEnvironment
.orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));
}
Utils.java
public static <T> Optional<T> resolveFactory(
ThreadLocal<T> threadLocalFactory, @Nullable T staticFactory) {
final T localFactory = threadLocalFactory.get(); // 从本地线程中取出 T 对象
final T factory = localFactory == null ? staticFactory : localFactory; // 若 T 对象为 null,则返回 staticFactory
return Optional.ofNullable(factory);
}
resolveFactory 先从本地线程中查找 StreamExecutionEnvironmentFactory 对象,若无法找到则返回静态工厂
接下来,我们再来仔细阅读 createLocalEnvironment 的实现过程
public static LocalStreamEnvironment createLocalEnvironment() {
return createLocalEnvironment(defaultLocalParallelism);
}
该方法调用的是 createLocalEnvironment(int parallelism),其中参数 defaultLocalParallelism 是本地环境中的默认并行度,如下所示,返回的是 JVM 可用的处理器个数,以此作为默认的本地并行度,实际就是本机 CPU 核心数。
private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
接下来我们再看 createLocalEnvironment(int parallelism)
public static LocalStreamEnvironment createLocalEnvironment(int parallelism) {
return createLocalEnvironment(parallelism, new Configuration());
}
调用的是 createLocalEnvironment(int parallelism, Configuration configuration) 加入了配置,再继续看
public static LocalStreamEnvironment createLocalEnvironment(
int parallelism, Configuration configuration) {
Configuration copyOfConfiguration = new Configuration();
copyOfConfiguration.addAll(configuration);
copyOfConfiguration.set(CoreOptions.DEFAULT_PARALLELISM, parallelism);
return createLocalEnvironment(copyOfConfiguration);
}
该函数首先对配置做了一次拷贝并加入了并行度的配置,然后调用 createLocalEnvironment(Configuration configuration)
public static LocalStreamEnvironment createLocalEnvironment(Configuration configuration) {
if (configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM).isPresent()) {
return new LocalStreamEnvironment(configuration);
} else {
Configuration copyOfConfiguration = new Configuration();
copyOfConfiguration.addAll(configuration);
copyOfConfiguration.set(CoreOptions.DEFAULT_PARALLELISM, defaultLocalParallelism);
return new LocalStreamEnvironment(copyOfConfiguration);
}
}
这里做了对配置中是否存在并行度配置的判断,若不存在则加入,确保创建本地运行环境时一定存在关于并行度的配置。最终通过 LocalStreamEnvironment 构造函数构建本地运行环境。
LocalStreamEnvironment 是 StreamExecutionEnvironment 的子类,关于该类的作用,源码中是这样解释的:
大意是 LocalStreamEnvironment 在实例化环境的 JVM 中本地多线程地运行程序,在后台生成一个嵌入式 Flink 集群,并在该集群上执行程序
我们可以发现,getExecutionEnvironment 最终调用的是 LocalStreamEnvironment 的构造方法,该方法如下:
public LocalStreamEnvironment(@Nonnull Configuration configuration) {
super(validateAndGetConfiguration(configuration));
}
该方法调用的父类的 validateAndGetConfiguration,而该方法不可见。