• Flink1.15源码解析--启动脚本----start-cluster.sh


    一、 start-cluster.sh

    [root@chb1 bin]# cat start-cluster.sh 
    #!/usr/bin/env bash
    
    bin=`dirname "$0"`
    bin=`cd "$bin"; pwd`
    
    
    # 加载配置
    . "$bin"/config.sh
    
    # Start the JobManager instance(s)
    shopt -s nocasematch
    if [[ $HIGH_AVAILABILITY == "zookeeper" ]]; then
        # HA Mode config.sh中的方法
        readMasters
    
        echo "Starting HA cluster with ${#MASTERS[@]} masters."
    	# 启动jobmanager
        for ((i=0;i<${#MASTERS[@]};++i)); do
            master=${MASTERS[i]}
            webuiport=${WEBUIPORTS[i]}
    		# 启动本地
            if [ ${MASTERS_ALL_LOCALHOST} = true ] ; then
                "${FLINK_BIN_DIR}"/jobmanager.sh start "${master}" "${webuiport}"
          	# 否则通过ssh启动远程jobmanager
            else
                ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/jobmanager.sh\" start ${master} ${webuiport} &"
            fi
        done
    
    else
        echo "Starting cluster."
    
        # Start single JobManager on this machine
        "$FLINK_BIN_DIR"/jobmanager.sh start
    fi
    shopt -u nocasematch
    
    # Start TaskManager instance(s)
    # 启动 taskmanager
    TMWorkers start
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    整个流程如下:

    • 1、加载 config.sh,环境变量,以及后面需要调用的方法
    • 2、启动jobmanager
      • 如果是 HA 模式, 调用 config.sh 中的 readMasters 获取 master ,一一启动 jobmanager
        • 本机是master,直接调用 "${FLINK_BIN_DIR}"/jobmanager.sh start "${master}" "${webuiport}"
        • 否则通过 ssh 调用远程 jobmanager
    • 3、启动taskmanager
      • TMWorkers start

    1.1、config.sh

    这个文件很长,定义一些变量 、方法

    1.2、jobmanager.sh

    [root@chb1 bin]# cat jobmanager.sh 
    #!/usr/bin/env bash
    
    # Start/stop a Flink JobManager.
    USAGE="Usage: jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all"
    
    STARTSTOP=$1
    HOST=$2 # optional when starting multiple instances
    WEBUIPORT=$3 # optional when starting multiple instances
    
    if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
      echo $USAGE
      exit 1
    fi
    
    bin=`dirname "$0"`
    bin=`cd "$bin"; pwd`
    
    . "$bin"/config.sh 
    
    # 注意该变量的值,如果使用start-cluster脚本启动flink,启动的就是一个standalonesession模式的集群
    ENTRYPOINT=standalonesession
    
    if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
        # Add JobManager-specific JVM options
        export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"
        parseJmArgsAndExportLogs "${ARGS[@]}"
    
        args=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster")
        if [ ! -z $HOST ]; then
            args+=("--host")
            args+=("${HOST}")
        fi
    
        if [ ! -z $WEBUIPORT ]; then
            args+=("--webui-port")
            args+=("${WEBUIPORT}")
        fi
    
        if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then
            args+=(${DYNAMIC_PARAMETERS[@]})
        fi
    fi
    
    
    # 判断是否为前台模式启动
    if [[ $STARTSTOP == "start-foreground" ]]; then
        exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${args[@]}"
    else
    	# 通过 flink-daemon.sh启动, 将 ENTRYPOINT=standalonesession作为参数
        "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${args[@]}"
    fi
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    执行案例命令如下

    # 为了显示好看,将一行转为多行显示
    /uardata1/soft/flink-1.15.2/bin/flink-daemon.sh start standalonesession 
    --configDir /uardata1/soft/flink-1.15.2/conf 
    --executionMode cluster 
    -D jobmanager.memory.off-heap.size=134217728b 
    -D jobmanager.memory.jvm-overhead.min=201326592b 
    -D jobmanager.memory.jvm-metaspace.size=268435456b 
    -D jobmanager.memory.heap.size=1073741824b 
    -D jobmanager.memory.jvm-overhead.max=201326592b
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    这个脚本里最重要的内容,就是配置了 ENTRYPOINT=standalonesession 这个变量,确定了集群启动模式,从这里我们也可以看出,如果使用 start-cluster.sh 脚本启动 flink,那么我们启动的就是一个 standalonesession 模式的集群。

    我们继续往下看,在配置完 ENTRYPOINT 的值后,脚本去调用了 flink-daemon.sh 脚本,并将 ENTRYPOINT 的值传入。flink-daemon.sh 脚本的内容我们稍等下去看,先来看看 taskmanager.sh 脚本做了什么。

    1.3、taskmanager.sh

    start-cluster.sh 中 TMWorkers start 启动 taskmanager

    # Start TaskManager instance(s)
    # 启动 taskmanager
    TMWorkers start
    
    • 1
    • 2
    • 3

    实际调用是config.sh中的TMWorkers()

    # starts or stops TMs on all workers
    # TMWorkers start|stop
    TMWorkers() {
        CMD=$1
    	# 获取 workers
        readWorkers
    
    	# worker 是否都是本地节点
        if [ ${WORKERS_ALL_LOCALHOST} = true ] ; then
            # all-local setup
            for worker in ${WORKERS[@]}; do
                "${FLINK_BIN_DIR}"/taskmanager.sh "${CMD}"
            done
        else
            # non-local setup
            # start/stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) when available
            command -v pdsh >/dev/null 2>&1
            if [[ $? -ne 0 ]]; then
                for worker in ${WORKERS[@]}; do
                    ssh -n $FLINK_SSH_OPTS $worker -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\" &"
                done
            else
                PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, ; echo "${WORKERS[*]}") \
                    "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\""
            fi
        fi
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    taskmanager.sh 脚本里有很多内容,我们来看最重要的两点:

    1、为ENTRYPOINT赋值taskexecutor

    2、调用flink-daemon.sh脚本,并将ENTRYPOINT传入

    #!/usr/bin/env bash
     
    # Start/stop a Flink TaskManager.
    USAGE="Usage: taskmanager.sh (start|start-foreground|stop|stop-all)"
     
    STARTSTOP=$1
     
    ARGS=("${@:2}")
     
    if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
      echo $USAGE
      exit 1
    fi
     
    bin=`dirname "$0"`
    bin=`cd "$bin"; pwd`
     
    . "$bin"/config.sh
     
    # 注意该变量的值
    ENTRYPOINT=taskexecutor
     
    if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
     
        # if no other JVM options are set, set the GC to G1
        if [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then
            export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"
        fi
     
        # Add TaskManager-specific JVM options
        export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"
     
        # Startup parameters
     
        parseTmArgsAndExportLogs "${ARGS[@]}"
     
        if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then
            ARGS=(${DYNAMIC_PARAMETERS[@]} "${ARGS[@]}")
        fi
     
        ARGS=("--configDir" "${FLINK_CONF_DIR}" "${ARGS[@]}")
    fi
     
    if [[ $STARTSTOP == "start-foreground" ]]; then
        exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}"
    else
        if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then
            # Start a single TaskManager
            "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
        else
            # Example output from `numactl --show` on an AWS c4.8xlarge:
            # policy: default
            # preferred node: current
            # physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
            # cpubind: 0 1
            # nodebind: 0 1
            # membind: 0 1
            read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")
            for NODE_ID in "${NODE_LIST[@]:1}"; do
                # Start a TaskManager for each NUMA node
                # 依然是调用 flink-daemon.sh 脚本,并将ENTRYPOINT=taskexecutor作为参数传入
                numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
            done
        fi
    fi
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    这里我们看到,再一次的调用了flink-daemon.sh脚本,现在我们去看看flink-daemon.sh做了什么

    1.4、flink-daemon.sh

    这个脚本很长,但是里面主要分为两部分:

    1.4.1、根据传入的ENTRYPOINT参数确定入口类

    在前面的jobmanager.sh我们将standalonesession作为参数传入了该脚本,在taskmanager.sh脚本中我们将taskexecutor作为参数传入了该脚本,可以看到这个操作就是为了确定主节点和从节点的入口类分别为什么

    case $DAEMON in
        (taskexecutor)
            CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
        ;;
    
        (zookeeper)
            CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
        ;;
    
        (historyserver)
            CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer
        ;;
    
        (standalonesession)
            CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
        ;;
    
        (standalonejob)
            CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint
        ;;
    
        (*)
            echo "Unknown daemon '${DAEMON}'. $USAGE."
            exit 1
        ;;
    esac
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    1.4.2、将入口类作为参数启动jar

    case $STARTSTOP in
    
        (start)
    
            # Print a warning if daemons are already running on host
            if [ -f "$pid" ]; then
              active=()
              while IFS='' read -r p || [[ -n "$p" ]]; do
                kill -0 $p >/dev/null 2>&1
                if [ $? -eq 0 ]; then
                  active+=($p)
                fi
              done < "${pid}"
    
              count="${#active[@]}"
    
              if [ ${count} -gt 0 ]; then
                echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME."
              fi
            fi
    
            # Evaluate user options for local variable expansion
            FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS})
    
            echo "Starting $DAEMON daemon on host $HOSTNAME."
            # 启动
            "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 200<&- 2>&1 < /dev/null &
    
            mypid=$!
    
            # Add to pid file if successful start
            if [[ ${mypid} =~ ${IS_NUMBER} ]] && kill -0 $mypid > /dev/null 2>&1 ; then
                echo $mypid >> "$pid"
            else
                echo "Error starting $DAEMON daemon."
                exit 1
            fi
        ;;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    到此为止,我们找到了真正启动主节点和从节点的地方,也找到了

    • 主节点的入口类为org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint,
    • 从节点的入口类为org.apache.flink.runtime.taskexecutor.TaskManagerRunner。

    接下来我们就可以将这两个类作为Flink源码的入口来一探究竟了。

    返回Flink1.15源码解析-总目录

  • 相关阅读:
    03-Redis主从架构
    Intel汇编语言程序设计(第7版)第六章编程学习过程中写的小例子
    Python实现精确控制asyncio并发过程中的多个任务(1)
    如何取消自动播放音乐:取消手机汽车连上后汽车自动播放音乐?
    记一次Linux server偶发CPU飙升问题的跟进与解决
    【Verycapture】离线文字提取,视频录制
    小程序canvas画画板签字版,touchmove时卡顿的问题(根本原因是因为vue语法中page.data导致视图层和逻辑层的频繁通讯导致)
    php中$this->的解释
    Iceberg教程
    DB2 设置explain
  • 原文地址:https://blog.csdn.net/wuxintdrh/article/details/127816141