• 【Flink源码】从StreamExecutionEnvironment说起


    本系列是针对 Flink 源码进行的一系列探究过程,旨在通过 Flink 源码全面地、详细地了解 Flink 原理

    写过 Flink 程序的朋友都知道,Flink 程序的第一行代码就是创建可执行环境,如下:

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    • 1

    根据 getExecutionEnvironment 可以自动获取可执行环境,那么具体过程究竟是怎样的呢?
    接下来我们从 StreamExecutionEnvironment 说起


    以 getExecutionEnvironment 开篇

    StreamExecutionEnvironment.java

    public static StreamExecutionEnvironment getExecutionEnvironment() {
        return getExecutionEnvironment(new Configuration());
    }
    
    • 1
    • 2
    • 3

    StreamExecutionEnvironment 方法最终还是调用 getExecutionEnvironment(Configuration configuration) 方法
    其中,新建的类 Configuration 用于获取 Flink 的键值对配置

    接下来我们来看 getExecutionEnvironment(Configuration configuration) 方法

    public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
                // 检查当前上下文是否存在可用的 EnvironmentFactory
        return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
                // 若当前上下文存在可用的 EnvironmentFactory,则基于该工厂类创建 ExecutionEnvironment
                .map(factory -> factory.createExecutionEnvironment(configuration))
                // 若工厂类未能创建 ExecutionEnvironment,则调用 createLocalEnvironment(configuration) 创建 LocalStreamEnvironment
                .orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 首先,使用配置类 Utils 的 resolveFactory 方法实现对当前上下文中是否存在可用的 EnvironmentFactory 的检查,返回一个 StreamExecutionEnvironmentFactory 对象

    Utils.java

    public static <T> Optional<T> resolveFactory(
                ThreadLocal<T> threadLocalFactory, @Nullable T staticFactory) {
        final T localFactory = threadLocalFactory.get();     // 从本地线程中取出 T 对象
        final T factory = localFactory == null ? staticFactory : localFactory;     // 若 T 对象为 null,则返回 staticFactory
    
        return Optional.ofNullable(factory);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    resolveFactory 先从本地线程中查找 StreamExecutionEnvironmentFactory 对象,若无法找到则返回静态工厂

    • 接下来第二步使用 StreamExecutionEnvironmentFactory 工厂类方法 createExecutionEnvironment 使用 Configuration 配置创建 StreamExecutionEnvironment
    • 若未能创建 StreamExecutionEnvironment,则调用本类的另一个方法 createLocalEnvironment 创建本地执行环境

    接下来,我们再来仔细阅读 createLocalEnvironment 的实现过程

    public static LocalStreamEnvironment createLocalEnvironment() {
        return createLocalEnvironment(defaultLocalParallelism);
    }
    
    • 1
    • 2
    • 3

    该方法调用的是 createLocalEnvironment(int parallelism),其中参数 defaultLocalParallelism 是本地环境中的默认并行度,如下所示,返回的是 JVM 可用的处理器个数,以此作为默认的本地并行度,实际就是本机 CPU 核心数。

    private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
    
    • 1

    接下来我们再看 createLocalEnvironment(int parallelism)

    public static LocalStreamEnvironment createLocalEnvironment(int parallelism) {
        return createLocalEnvironment(parallelism, new Configuration());
    }
    
    • 1
    • 2
    • 3

    调用的是 createLocalEnvironment(int parallelism, Configuration configuration) 加入了配置,再继续看

    public static LocalStreamEnvironment createLocalEnvironment(
            int parallelism, Configuration configuration) {
        Configuration copyOfConfiguration = new Configuration();
        copyOfConfiguration.addAll(configuration);
        copyOfConfiguration.set(CoreOptions.DEFAULT_PARALLELISM, parallelism);
        return createLocalEnvironment(copyOfConfiguration);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    该函数首先对配置做了一次拷贝并加入了并行度的配置,然后调用 createLocalEnvironment(Configuration configuration)

    public static LocalStreamEnvironment createLocalEnvironment(Configuration configuration) {
        if (configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM).isPresent()) {
            return new LocalStreamEnvironment(configuration);
        } else {
            Configuration copyOfConfiguration = new Configuration();
            copyOfConfiguration.addAll(configuration);
            copyOfConfiguration.set(CoreOptions.DEFAULT_PARALLELISM, defaultLocalParallelism);
            return new LocalStreamEnvironment(copyOfConfiguration);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    这里做了对配置中是否存在并行度配置的判断,若不存在则加入,确保创建本地运行环境时一定存在关于并行度的配置。最终通过 LocalStreamEnvironment 构造函数构建本地运行环境。

    LocalStreamEnvironment 是 StreamExecutionEnvironment 的子类,关于该类的作用,源码中是这样解释的:

    1662347043111

    大意是 LocalStreamEnvironment 在实例化环境的 JVM 中本地多线程地运行程序,在后台生成一个嵌入式 Flink 集群,并在该集群上执行程序
    我们可以发现,getExecutionEnvironment 最终调用的是 LocalStreamEnvironment 的构造方法,该方法如下:

    public LocalStreamEnvironment(@Nonnull Configuration configuration) {
        super(validateAndGetConfiguration(configuration));
    }
    
    • 1
    • 2
    • 3

    该方法调用的父类的 validateAndGetConfiguration,而该方法不可见。

    • 总结:
    • getExecutionEnvironment 首先会读取 Flink 配置
    • 检查上下文中是否存在可用的环境工厂,若有则直接根据工厂生产执行环境
    • 若没有则在调用 LocalStreamEnvironment 创建本地执行环境,其中,需要在配置中指定并行度,默认为本地 CPU 核数
  • 相关阅读:
    Luffy项目:2、项目需求(2),项目库的创建,软件开发目录,Django配置文件介绍
    【开源】基于Vue.js的森林火灾预警系统的设计和实现
    内衣洗衣机有必要买吗?口碑好的小型洗衣机测评
    了解这几点,让你轻松掌握滑台模组的选型方法!
    Vue ElementUI el-tooltip 全局样式修改
    小望电商通:无代码开发,轻松实现电商平台、客服系统和用户运营的集成
    分享团队在软件开发中用到的神仙工具
    Android Lottie动画
    麦子-linux字符设备驱动初探
    使用Python实现一个简单的密码管理器
  • 原文地址:https://blog.csdn.net/wwb44444/article/details/127722659