• Flink源码篇【2】Flink提交流程之脚本运行过程解析和IDEA本地调试debug


    1. Flink脚本运行过程解析

    当我们通过命令/root/flink-1.15.0/bin/flink run /root/flink-1.15.0/examples/batch/WordCount.jar提交一个Flink的应用程序。看看Flink脚本的运行过程

    这里我们直接给出Flink脚本的源码,然后在里面看各个变量的具体值,并给出说明

    #!/usr/bin/env bash
    ################################################################################
    #  Licensed to the Apache Software Foundation (ASF) under one
    #  or more contributor license agreements.  See the NOTICE file
    #  distributed with this work for additional information
    #  regarding copyright ownership.  The ASF licenses this file
    #  to you under the Apache License, Version 2.0 (the
    #  "License"); you may not use this file except in compliance
    #  with the License.  You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    #  Unless required by applicable law or agreed to in writing, software
    #  distributed under the License is distributed on an "AS IS" BASIS,
    #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    #  See the License for the specific language governing permissions and
    # limitations under the License.
    ################################################################################
    
    # 获取Linux执行命令的第一个参数,即Flink脚本
    # target=/root/flink-1.15.0/bin/flink
    target="$0"
    # For the case, the executable has been directly symlinked, figure out
    # the correct bin path by following its symlink up to an upper bound.
    # Note: we can't use the readlink utility here if we want to be POSIX
    # compatible.
    iteration=0
    # 如果target是软链接,则查看软链接的详细信息,然后用正则表达式匹配出软链接指向的路径
    # 如果软链接指向的路径还是一个软链接,则重复的一层层进行解析
    # 如果解析超过一百次,则跳出循环。可能会导致后面运行config.sh脚本报找不到文件的错误
    while [ -L "$target" ]; do
        if [ "$iteration" -gt 100 ]; then
            echo "Cannot resolve path: You have a cyclic symlink in $target."
            break
        fi
        ls=`ls -ld -- "$target"`
        target=`expr "$ls" : '.* -> \(.*\)$'`
        iteration=$((iteration + 1))
    done
    
    # 获取bin目录的路径
    # bin=/root/flink-1.15.0/bin
    # Convert relative path to absolute path
    bin=`dirname "$target"`
    
    # 运行config.sh脚本,这个脚本会设置很多变量,和定义了很多函数。我们可以在当前脚本引用
    # get flink config
    . "$bin"/config.sh
    
    # 获取运行Flink脚本的Linux用户
    # FLINK_IDENT_STRING=root
    if [ "$FLINK_IDENT_STRING" = "" ]; then
            FLINK_IDENT_STRING="$USER"
    fi
    
    # 调用config.sh中的函数constructFlinkClassPath,获取Flink的classpath
    # CC_CLASSPATH=/root/flink-1.15.0/lib/flink-cep-1.15.0.jar:/root/flink-1.15.0/lib/flink-connector-files-1.15.0.jar:......省略部分......:/root/flink-1.15.0/lib/flink-dist-1.15.0.jar
    CC_CLASSPATH=`constructFlinkClassPath`
    
    # 定义log文件,定义Java运行的日志参数
    log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-client-$HOSTNAME.log
    log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)
    
    # Flink客户端指定的JVM参数,本示例为空字符串
    # Add Client-specific JVM options
    FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_CLI}"
    
    # -classpath设置了Flink的classpath和hadoop的classpath
    # 执行的Java类是:org.apache.flink.client.cli.CliFrontend
    # $@是Linux执行命令除一个参数,后面的所有参数
    # $@=run /root/flink-1.15.0/examples/batch/WordCount.jar
    # Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems
    exec "${JAVA_RUN}" $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73

    可以看到最终执行的是Java类:org.apache.flink.client.cli.CliFrontend。我们下面接着看这个类

    2. CliFrontend客户端入口类查看

    在IDEA按下Ctrl + Shift + n,弹出文件搜索框,输入CliFrontend,就可以看到我们需要的CliFrontend.java类文件,点击跳转到CliFrontend.java
    CliFrontend.java
    按下Ctrl + F12,弹出当前类CliFrontend的文件结构,找到main方法然后点击,就可以跳转到main方法

    在这里插入图片描述
    还可以按ctrl + h查看一个抽象方法的具体实现

    main方法的代码如下:

        /** Submits the job based on the arguments. */
        public static void main(final String[] args) {
            EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
    
            // 1. find the configuration directory
            final String configurationDirectory = getConfigurationDirectoryFromEnv();
    
            // 2. load the global configuration
            final Configuration configuration =
                    GlobalConfiguration.loadConfiguration(configurationDirectory);
    
            // 3. load the custom command lines
            final List customCommandLines =
                    loadCustomCommandLines(configuration, configurationDirectory);
    
            int retCode = 31;
            try {
                final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
    
                SecurityUtils.install(new SecurityConfiguration(cli.configuration));
                retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
            } catch (Throwable t) {
                final Throwable strippedThrowable =
                        ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
                LOG.error("Fatal error while running command line interface.", strippedThrowable);
                strippedThrowable.printStackTrace();
            } finally {
                System.exit(retCode);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    3. 在IDEA本地运行CliFrontend程序

    为了方便,我们会选择在IDEA本地调试Debug,运行CliFrontend需要进行配置

    Configuration进行程序运行的Configuration设置,设置flink命令运行的自定义参数和环境变量就可以了。这里我们使用local的方式进行程序的运行,会在本地启动一个MiniCluster集群,application运行完成,集群关闭

  • 相关阅读:
    计算机网络:网络层 - IP数据报的转发
    mmdetection/mmdetection3d多机多卡训练
    只是因为上了那个网站,就被公安局没收百万财产!
    C++进阶篇1---继承
    feign远程调用时如何在请求头加入数据
    DM8的列存储HUGE表
    基于单片机的家居智能系统设计与实现
    HDTune和CrystalDiskInfo硬盘检测S.M.A.R.T.参数当前值最差值阈值
    基于CLIP的色情图片识别;油管最新ML课程大合集;交互式编写shell管道;机器人仓库环境增量感知数据集;最新AI论文 | ShowMeAI资讯日报
    Azkaban简介
  • 原文地址:https://blog.csdn.net/yy8623977/article/details/125659249