Flink集群启动脚本分析

news/2024/4/29 23:29:13/文章来源:https://blog.csdn.net/weixin_44852067/article/details/137117924

1.概述

  Flink 集群的启动脚本在:flink-dist 子项目中,位于 flink-bin 下的 bin 目录:启动脚本为:start-cluster.sh。flink-bin 目录管理的是集群运维需要的shell脚本。

2.启动脚本

2.1 start-cluster.sh

    1. 调用 config.sh 来获取 masters 和 workers,masters 的信息(从 conf/masters 配置

    文件中获取)

    1. 通过 jobmanager.sh 来启动 JobManager
    1. 通过 taskmanager.sh 来启动 TaskManager
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`#  先执行: config.sh
. "$bin"/config.sh#  启动 JobManager 有可能有多个
# Start the JobManager instance(s)
shopt -s nocasematch
if [[ $HIGH_AVAILABILITY == "zookeeper" ]]; then# HA Mode 高可用模式readMastersecho "Starting HA cluster with ${#MASTERS[@]} masters."for ((i=0;i<${#MASTERS[@]};++i)); domaster=${MASTERS[i]}webuiport=${WEBUIPORTS[i]}if [ ${MASTERS_ALL_LOCALHOST} = true ] ; then"${FLINK_BIN_DIR}"/jobmanager.sh start "${master}" "${webuiport}"else# 如果是 HA 模式,则需要远程启动命令 jobmanager.sh startssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/jobmanager.sh\" start ${master} ${webuiport} &"fidone
elseecho "Starting cluster."#  否则,当前节点直接启动 JobManager# Start single JobManager on this machine"$FLINK_BIN_DIR"/jobmanager.sh start
fi
shopt -u nocasematch#  该函数的定义在 config.sh 脚本中
# Start TaskManager instance(s)
TMWorkers start

2.2 config.sh

constructFlinkClassPath() {local FLINK_DISTlocal FLINK_CLASSPATHwhile read -d '' -r jarfile ; doif [[ "$jarfile" =~ .*/flink-dist[^/]*.jar$ ]]; thenFLINK_DIST="$FLINK_DIST":"$jarfile"elif [[ "$FLINK_CLASSPATH" == "" ]]; thenFLINK_CLASSPATH="$jarfile";elseFLINK_CLASSPATH="$FLINK_CLASSPATH":"$jarfile"fidone < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0 | sort -z)if [[ "$FLINK_DIST" == "" ]]; then# write error message to stderr since stdout is stored as the classpath(>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.")# exit function with empty classpath to force process failureexit 1fiecho "$FLINK_CLASSPATH""$FLINK_DIST"
}findFlinkDistJar() {local FLINK_DIST="`find "$FLINK_LIB_DIR" -name 'flink-dist*.jar'`"if [[ "$FLINK_DIST" == "" ]]; then# write error message to stderr since stdout is stored as the classpath(>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.")# exit function with empty classpath to force process failureexit 1fiecho "$FLINK_DIST"
}# These are used to mangle paths that are passed to java when using
# cygwin. Cygwin paths are like linux paths, i.e. /path/to/somewhere
# but the windows java version expects them in Windows Format, i.e. C:\bla\blub.
# "cygpath" can do the conversion.
manglePath() {UNAME=$(uname -s)if [ "${UNAME:0:6}" == "CYGWIN" ]; thenecho `cygpath -w "$1"`elseecho $1fi
}manglePathList() {UNAME=$(uname -s)# a path list, for example a java classpathif [ "${UNAME:0:6}" == "CYGWIN" ]; thenecho `cygpath -wp "$1"`elseecho $1fi
}# Looks up a config value by key from a simple YAML-style key-value map.
# $1: key to look up
# $2: default value to return if key does not exist
# $3: config file to read from
readFromConfig() {local key=$1local defaultValue=$2local configFile=$3# first extract the value with the given key (1st sed), then trim the result (2nd sed)# if a key exists multiple times, take the "last" one (tail)local value=`sed -n "s/^[ ]*${key}[ ]*: \([^#]*\).*$/\1/p" "${configFile}" | sed "s/^ *//;s/ *$//" | tail -n 1`[ -z "$value" ] && echo "$defaultValue" || echo "$value"
}########################################################################################################################
# DEFAULT CONFIG VALUES: These values will be used when nothing has been specified in conf/flink-conf.yaml
# -or- the respective environment variables are not set.
########################################################################################################################
# WARNING !!! , these values are only used if there is nothing else is specified in
# conf/flink-conf.yamlDEFAULT_ENV_PID_DIR="/tmp"                          # Directory to store *.pid files to
DEFAULT_ENV_LOG_MAX=5                               # Maximum number of old log files to keep
DEFAULT_ENV_JAVA_OPTS=""                            # Optional JVM args
DEFAULT_ENV_JAVA_OPTS_JM=""                         # Optional JVM args (JobManager)
DEFAULT_ENV_JAVA_OPTS_TM=""                         # Optional JVM args (TaskManager)
DEFAULT_ENV_JAVA_OPTS_HS=""                         # Optional JVM args (HistoryServer)
DEFAULT_ENV_JAVA_OPTS_CLI=""                        # Optional JVM args (Client)
DEFAULT_ENV_SSH_OPTS=""                             # Optional SSH parameters running in cluster mode
DEFAULT_YARN_CONF_DIR=""                            # YARN Configuration Directory, if necessary
DEFAULT_HADOOP_CONF_DIR=""                          # Hadoop Configuration Directory, if necessary
########################################################################################################################
# CONFIG KEYS: The default values can be overwritten by the following keys in conf/flink-conf.yaml
########################################################################################################################
KEY_TASKM_COMPUTE_NUMA="taskmanager.compute.numa"
KEY_ENV_PID_DIR="env.pid.dir"
KEY_ENV_LOG_DIR="env.log.dir"
KEY_ENV_LOG_MAX="env.log.max"
KEY_ENV_YARN_CONF_DIR="env.yarn.conf.dir"
KEY_ENV_HADOOP_CONF_DIR="env.hadoop.conf.dir"
KEY_ENV_JAVA_HOME="env.java.home"
KEY_ENV_JAVA_OPTS="env.java.opts"
KEY_ENV_JAVA_OPTS_JM="env.java.opts.jobmanager"
KEY_ENV_JAVA_OPTS_TM="env.java.opts.taskmanager"
KEY_ENV_JAVA_OPTS_HS="env.java.opts.historyserver"
KEY_ENV_JAVA_OPTS_CLI="env.java.opts.client"
KEY_ENV_SSH_OPTS="env.ssh.opts"
KEY_HIGH_AVAILABILITY="high-availability"
KEY_ZK_HEAP_MB="zookeeper.heap.mb"
########################################################################################################################
# PATHS AND CONFIG
########################################################################################################################target="$0"
# For the case, the executable has been directly symlinked, figure out
# the correct bin path by following its symlink up to an upper bound.
# Note: we can't use the readlink utility here if we want to be POSIX
# compatible.
iteration=0
while [ -L "$target" ]; doif [ "$iteration" -gt 100 ]; thenecho "Cannot resolve path: You have a cyclic symlink in $target."breakfils=`ls -ld -- "$target"`target=`expr "$ls" : '.* -> \(.*\)$'`iteration=$((iteration + 1))
done# Convert relative path to absolute path and resolve directory symlinks
bin=`dirname "$target"`
SYMLINK_RESOLVED_BIN=`cd "$bin"; pwd -P`# Define the main directory of the flink installation
# If config.sh is called by pyflink-shell.sh in python bin directory(pip installed), then do not need to set the FLINK_HOME here.
if [ -z "$_FLINK_HOME_DETERMINED" ]; thenFLINK_HOME=`dirname "$SYMLINK_RESOLVED_BIN"`
fi
FLINK_LIB_DIR=$FLINK_HOME/lib
FLINK_PLUGINS_DIR=$FLINK_HOME/plugins
FLINK_OPT_DIR=$FLINK_HOME/opt# These need to be mangled because they are directly passed to java.
# The above lib path is used by the shell script to retrieve jars in a
# directory, so it needs to be unmangled.
FLINK_HOME_DIR_MANGLED=`manglePath "$FLINK_HOME"`
if [ -z "$FLINK_CONF_DIR" ]; then FLINK_CONF_DIR=$FLINK_HOME_DIR_MANGLED/conf; fi
FLINK_BIN_DIR=$FLINK_HOME_DIR_MANGLED/bin
DEFAULT_FLINK_LOG_DIR=$FLINK_HOME_DIR_MANGLED/log
FLINK_CONF_FILE="flink-conf.yaml"
YAML_CONF=${FLINK_CONF_DIR}/${FLINK_CONF_FILE}### Exported environment variables ###
export FLINK_CONF_DIR
export FLINK_BIN_DIR
export FLINK_PLUGINS_DIR
# export /lib dir to access it during deployment of the Yarn staging files
export FLINK_LIB_DIR
# export /opt dir to access it for the SQL client
export FLINK_OPT_DIR########################################################################################################################
# ENVIRONMENT VARIABLES
######################################################################################################################### read JAVA_HOME from config with no default value
MY_JAVA_HOME=$(readFromConfig ${KEY_ENV_JAVA_HOME} "" "${YAML_CONF}")
# check if config specified JAVA_HOME
if [ -z "${MY_JAVA_HOME}" ]; then# config did not specify JAVA_HOME. Use system JAVA_HOMEMY_JAVA_HOME=${JAVA_HOME}
fi
# check if we have a valid JAVA_HOME and if java is not available
if [ -z "${MY_JAVA_HOME}" ] && ! type java > /dev/null 2> /dev/null; thenecho "Please specify JAVA_HOME. Either in Flink config ./conf/flink-conf.yaml or as system-wide JAVA_HOME."exit 1
elseJAVA_HOME=${MY_JAVA_HOME}
fiUNAME=$(uname -s)
if [ "${UNAME:0:6}" == "CYGWIN" ]; thenJAVA_RUN=java
elseif [[ -d $JAVA_HOME ]]; thenJAVA_RUN=$JAVA_HOME/bin/javaelseJAVA_RUN=javafi
fi# Define HOSTNAME if it is not already set
if [ -z "${HOSTNAME}" ]; thenHOSTNAME=`hostname`
fiIS_NUMBER="^[0-9]+$"# Verify that NUMA tooling is available
command -v numactl >/dev/null 2>&1
if [[ $? -ne 0 ]]; thenFLINK_TM_COMPUTE_NUMA="false"
else# Define FLINK_TM_COMPUTE_NUMA if it is not already setif [ -z "${FLINK_TM_COMPUTE_NUMA}" ]; thenFLINK_TM_COMPUTE_NUMA=$(readFromConfig ${KEY_TASKM_COMPUTE_NUMA} "false" "${YAML_CONF}")fi
fiif [ -z "${MAX_LOG_FILE_NUMBER}" ]; thenMAX_LOG_FILE_NUMBER=$(readFromConfig ${KEY_ENV_LOG_MAX} ${DEFAULT_ENV_LOG_MAX} "${YAML_CONF}")
fiif [ -z "${FLINK_LOG_DIR}" ]; thenFLINK_LOG_DIR=$(readFromConfig ${KEY_ENV_LOG_DIR} "${DEFAULT_FLINK_LOG_DIR}" "${YAML_CONF}")
fiif [ -z "${YARN_CONF_DIR}" ]; thenYARN_CONF_DIR=$(readFromConfig ${KEY_ENV_YARN_CONF_DIR} "${DEFAULT_YARN_CONF_DIR}" "${YAML_CONF}")
fiif [ -z "${HADOOP_CONF_DIR}" ]; thenHADOOP_CONF_DIR=$(readFromConfig ${KEY_ENV_HADOOP_CONF_DIR} "${DEFAULT_HADOOP_CONF_DIR}" "${YAML_CONF}")
fiif [ -z "${FLINK_PID_DIR}" ]; thenFLINK_PID_DIR=$(readFromConfig ${KEY_ENV_PID_DIR} "${DEFAULT_ENV_PID_DIR}" "${YAML_CONF}")
fiif [ -z "${FLINK_ENV_JAVA_OPTS}" ]; thenFLINK_ENV_JAVA_OPTS=$(readFromConfig ${KEY_ENV_JAVA_OPTS} "${DEFAULT_ENV_JAVA_OPTS}" "${YAML_CONF}")# Remove leading and ending double quotes (if present) of valueFLINK_ENV_JAVA_OPTS="$( echo "${FLINK_ENV_JAVA_OPTS}" | sed -e 's/^"//'  -e 's/"$//' )"
fiif [ -z "${FLINK_ENV_JAVA_OPTS_JM}" ]; thenFLINK_ENV_JAVA_OPTS_JM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_JM} "${DEFAULT_ENV_JAVA_OPTS_JM}" "${YAML_CONF}")# Remove leading and ending double quotes (if present) of valueFLINK_ENV_JAVA_OPTS_JM="$( echo "${FLINK_ENV_JAVA_OPTS_JM}" | sed -e 's/^"//'  -e 's/"$//' )"
fiif [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; thenFLINK_ENV_JAVA_OPTS_TM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_TM} "${DEFAULT_ENV_JAVA_OPTS_TM}" "${YAML_CONF}")# Remove leading and ending double quotes (if present) of valueFLINK_ENV_JAVA_OPTS_TM="$( echo "${FLINK_ENV_JAVA_OPTS_TM}" | sed -e 's/^"//'  -e 's/"$//' )"
fiif [ -z "${FLINK_ENV_JAVA_OPTS_HS}" ]; thenFLINK_ENV_JAVA_OPTS_HS=$(readFromConfig ${KEY_ENV_JAVA_OPTS_HS} "${DEFAULT_ENV_JAVA_OPTS_HS}" "${YAML_CONF}")# Remove leading and ending double quotes (if present) of valueFLINK_ENV_JAVA_OPTS_HS="$( echo "${FLINK_ENV_JAVA_OPTS_HS}" | sed -e 's/^"//'  -e 's/"$//' )"
fiif [ -z "${FLINK_ENV_JAVA_OPTS_CLI}" ]; thenFLINK_ENV_JAVA_OPTS_CLI=$(readFromConfig ${KEY_ENV_JAVA_OPTS_CLI} "${DEFAULT_ENV_JAVA_OPTS_CLI}" "${YAML_CONF}")# Remove leading and ending double quotes (if present) of valueFLINK_ENV_JAVA_OPTS_CLI="$( echo "${FLINK_ENV_JAVA_OPTS_CLI}" | sed -e 's/^"//'  -e 's/"$//' )"
fiif [ -z "${FLINK_SSH_OPTS}" ]; thenFLINK_SSH_OPTS=$(readFromConfig ${KEY_ENV_SSH_OPTS} "${DEFAULT_ENV_SSH_OPTS}" "${YAML_CONF}")
fi# Define ZK_HEAP if it is not already set
if [ -z "${ZK_HEAP}" ]; thenZK_HEAP=$(readFromConfig ${KEY_ZK_HEAP_MB} 0 "${YAML_CONF}")
fi# High availability
if [ -z "${HIGH_AVAILABILITY}" ]; thenHIGH_AVAILABILITY=$(readFromConfig ${KEY_HIGH_AVAILABILITY} "" "${YAML_CONF}")if [ -z "${HIGH_AVAILABILITY}" ]; then# Try deprecated valueDEPRECATED_HA=$(readFromConfig "recovery.mode" "" "${YAML_CONF}")if [ -z "${DEPRECATED_HA}" ]; thenHIGH_AVAILABILITY="none"elif [ ${DEPRECATED_HA} == "standalone" ]; then# Standalone is now 'none'HIGH_AVAILABILITY="none"elseHIGH_AVAILABILITY=${DEPRECATED_HA}fifi
fi# Arguments for the JVM. Used for job and task manager JVMs.
# DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys
# JobManagerOptions#TOTAL_PROCESS_MEMORY and TaskManagerOptions#TOTAL_PROCESS_MEMORY for that!
if [ -z "${JVM_ARGS}" ]; thenJVM_ARGS=""
fi# Check if deprecated HADOOP_HOME is set, and specify config path to HADOOP_CONF_DIR if it's empty.
if [ -z "$HADOOP_CONF_DIR" ]; thenif [ -n "$HADOOP_HOME" ]; then# HADOOP_HOME is set. Check if its a Hadoop 1.x or 2.x HADOOP_HOME pathif [ -d "$HADOOP_HOME/conf" ]; then# It's Hadoop 1.xHADOOP_CONF_DIR="$HADOOP_HOME/conf"fiif [ -d "$HADOOP_HOME/etc/hadoop" ]; then# It's Hadoop 2.2+HADOOP_CONF_DIR="$HADOOP_HOME/etc/hadoop"fifi
fi# if neither HADOOP_CONF_DIR nor HADOOP_CLASSPATH are set, use some common default (if available)
if [ -z "$HADOOP_CONF_DIR" ] && [ -z "$HADOOP_CLASSPATH" ]; thenif [ -d "/etc/hadoop/conf" ]; thenecho "Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or HADOOP_CLASSPATH was set."HADOOP_CONF_DIR="/etc/hadoop/conf"fi
fiINTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}"if [ -n "${HBASE_CONF_DIR}" ]; thenINTERNAL_HADOOP_CLASSPATHS="${INTERNAL_HADOOP_CLASSPATHS}:${HBASE_CONF_DIR}"
fi# Auxilliary function which extracts the name of host from a line which
# also potentially includes topology information and the taskManager type
extractHostName() {# handle comments: extract first part of string (before first # character)WORKER=`echo $1 | cut -d'#' -f 1`# Extract the hostname from the network hierarchyif [[ "$WORKER" =~ ^.*/([0-9a-zA-Z.-]+)$ ]]; thenWORKER=${BASH_REMATCH[1]}fiecho $WORKER
}# Auxilliary functions for log file rotation
rotateLogFilesWithPrefix() {dir=$1prefix=$2while read -r log ; dorotateLogFile "$log"# find distinct set of log file names, ignoring the rotation number (trailing dot and digit)done < <(find "$dir" ! -type d -path "${prefix}*" | sed s/\.[0-9][0-9]*$// | sort | uniq)
}rotateLogFile() {log=$1;num=$MAX_LOG_FILE_NUMBERif [ -f "$log" -a "$num" -gt 0 ]; thenwhile [ $num -gt 1 ]; doprev=`expr $num - 1`[ -f "$log.$prev" ] && mv "$log.$prev" "$log.$num"num=$prevdonemv "$log" "$log.$num";fi
}readMasters() {MASTERS_FILE="${FLINK_CONF_DIR}/masters"if [[ ! -f "${MASTERS_FILE}" ]]; thenecho "No masters file. Please specify masters in 'conf/masters'."exit 1fiMASTERS=()WEBUIPORTS=()MASTERS_ALL_LOCALHOST=trueGOON=truewhile $GOON; doread line || GOON=falseHOSTWEBUIPORT=$( extractHostName $line)if [ -n "$HOSTWEBUIPORT" ]; thenHOST=$(echo $HOSTWEBUIPORT | cut -f1 -d:)WEBUIPORT=$(echo $HOSTWEBUIPORT | cut -s -f2 -d:)MASTERS+=(${HOST})if [ -z "$WEBUIPORT" ]; thenWEBUIPORTS+=(0)elseWEBUIPORTS+=(${WEBUIPORT})fiif [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; thenMASTERS_ALL_LOCALHOST=falsefifidone < "$MASTERS_FILE"
}readWorkers() {WORKERS_FILE="${FLINK_CONF_DIR}/workers"if [[ ! -f "$WORKERS_FILE" ]]; thenecho "No workers file. Please specify workers in 'conf/workers'."exit 1fiWORKERS=()WORKERS_ALL_LOCALHOST=trueGOON=truewhile $GOON; doread line || GOON=falseHOST=$( extractHostName $line)if [ -n "$HOST" ] ; thenWORKERS+=(${HOST})if [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; thenWORKERS_ALL_LOCALHOST=falsefifidone < "$WORKERS_FILE"
}
# starts or stops TMs on all workers
# TMWorkers start|stop
TMWorkers() {CMD=$1# TMS  读取配置文件,获取所有的 TM 的节点readWorkersif [ ${WORKERS_ALL_LOCALHOST} = true ] ; then# all-local setupfor worker in ${WORKERS[@]}; do"${FLINK_BIN_DIR}"/taskmanager.sh "${CMD}"doneelse# non-local setup# start/stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) when availablecommand -v pdsh >/dev/null 2>&1if [[ $? -ne 0 ]]; thenfor worker in ${WORKERS[@]}; do#  通过远程命令启动远程节点上的 TaskManagerssh -n $FLINK_SSH_OPTS $worker -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\" &"doneelsePDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, ; echo "${WORKERS[*]}") \"nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\""fifi
}runBashJavaUtilsCmd() {local cmd=$1local conf_dir=$2local class_path=$3local dynamic_args=${@:4}class_path=`manglePathList "${class_path}"`local output=`${JAVA_RUN} -classpath "${class_path}" org.apache.flink.runtime.util.bash.BashJavaUtils ${cmd} --configDir "${conf_dir}" $dynamic_args 2>&1 | tail -n 1000`if [[ $? -ne 0 ]]; thenecho "[ERROR] Cannot run BashJavaUtils to execute command ${cmd}." 1>&2# Print the output in case the user redirect the log to console.echo "$output" 1>&2exit 1fiecho "$output"
}extractExecutionResults() {local output="$1"local expected_lines="$2"local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:"local execution_resultslocal num_linesexecution_results=$(echo "${output}" | grep ${EXECUTION_PREFIX})num_lines=$(echo "${execution_results}" | wc -l)# explicit check for empty result, becuase if execution_results is empty, then wc returns 1if [[ -z ${execution_results} ]]; thenecho "[ERROR] The execution result is empty." 1>&2exit 1fiif [[ ${num_lines} -ne ${expected_lines} ]]; thenecho "[ERROR] The execution results has unexpected number of lines, expected: ${expected_lines}, actual: ${num_lines}." 1>&2echo "[ERROR] An execution result line is expected following the prefix '${EXECUTION_PREFIX}'" 1>&2echo "$output" 1>&2exit 1fiecho "${execution_results//${EXECUTION_PREFIX}/}"
}
extractLoggingOutputs() {local output="$1"local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:"echo "${output}" | grep -v ${EXECUTION_PREFIX}
}parseJmJvmArgsAndExportLogs() {java_utils_output=$(runBashJavaUtilsCmd GET_JM_RESOURCE_PARAMS "${FLINK_CONF_DIR}" "${FLINK_BIN_DIR}/bash-java-utils.jar:$(findFlinkDistJar)" "$@")logging_output=$(extractLoggingOutputs "${java_utils_output}")jvm_params=$(extractExecutionResults "${java_utils_output}" 1)if [[ $? -ne 0 ]]; thenecho "[ERROR] Could not get JVM parameters and dynamic configurations properly."echo "[ERROR] Raw output from BashJavaUtils:"echo "$java_utils_output"exit 1fiexport JVM_ARGS="${JVM_ARGS} ${jvm_params}"export FLINK_INHERITED_LOGS="
$FLINK_INHERITED_LOGSJM_RESOURCE_PARAMS extraction logs:
jvm_params: $jvm_params
logs: $logging_output
"
}

2.3 jobmanager.sh

JobManager的启动通过 flink-daemon.sh 脚本来启动 JVM 进程,分析 flink-daemon.sh 脚本发现:

JobManager 的启动代号:standalonesession,实现类是:StandaloneSessionClusterEntrypoint

# Start/stop a Flink JobManager. 
USAGE="Usage: jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all"STARTSTOP=$1
HOST=$2 # optional when starting multiple instances
WEBUIPORT=$3 # optional when starting multiple instancesif [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; thenecho $USAGEexit 1
fibin=`dirname "$0"`
bin=`cd "$bin"; pwd`. "$bin"/config.sh# 注释: TaskManager 的启动主类
ENTRYPOINT=standalonesessionif [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then# Add JobManager-specific JVM optionsexport FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"parseJmJvmArgsAndExportLogs "${ARGS[@]}"args=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster")if [ ! -z $HOST ]; thenargs+=("--host")args+=("${HOST}")fiif [ ! -z $WEBUIPORT ]; thenargs+=("--webui-port")args+=("${WEBUIPORT}")fi
fiif [[ $STARTSTOP == "start-foreground" ]]; thenexec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${args[@]}"
else# 注释: 通过 flink-daemon.sh 脚本来启动 TaskManager(StandaloneSessionClusterEndpoint)# 注释: flink-daemon.sh start standalonesession"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${args[@]}"
fi

2.4 taskmanager.sh

TaskManager的启动通过 flink-daemon.sh 脚本来启动 JVM 进程,分析 flink-daemon.sh 脚本发现:TaskManager 的启动代号:taskexecutor,实现类是:TaskManagerRunner。

# Start/stop a Flink TaskManager.
USAGE="Usage: taskmanager.sh (start|start-foreground|stop|stop-all)"STARTSTOP=$1ARGS=("${@:2}")if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; thenecho $USAGEexit 1
fibin=`dirname "$0"`
bin=`cd "$bin"; pwd`. "$bin"/config.sh# 注释: TaskManager 的启动类: TaskManagerRunner
ENTRYPOINT=taskexecutorif [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then# if no other JVM options are set, set the GC to G1if [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; thenexport JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"fi# Add TaskManager-specific JVM optionsexport FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"# Startup parametersjava_utils_output=$(runBashJavaUtilsCmd GET_TM_RESOURCE_PARAMS "${FLINK_CONF_DIR}" "$FLINK_BIN_DIR/bash-java-utils.jar:$(findFlinkDistJar)" "${ARGS[@]}")logging_output=$(extractLoggingOutputs "${java_utils_output}")params_output=$(extractExecutionResults "${java_utils_output}" 2)if [[ $? -ne 0 ]]; thenecho "[ERROR] Could not get JVM parameters and dynamic configurations properly."echo "[ERROR] Raw output from BashJavaUtils:"echo "$java_utils_output"exit 1fijvm_params=$(echo "${params_output}" | head -n 1)export JVM_ARGS="${JVM_ARGS} ${jvm_params}"IFS=$" " dynamic_configs=$(echo "${params_output}" | tail -n 1)ARGS+=("--configDir" "${FLINK_CONF_DIR}" ${dynamic_configs[@]})export FLINK_INHERITED_LOGS="
$FLINK_INHERITED_LOGSTM_RESOURCE_PARAMS extraction logs:
jvm_params: $jvm_params
dynamic_configs: $dynamic_configs
logs: $logging_output
"
fiif [[ $STARTSTOP == "start-foreground" ]]; thenexec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}"
elseif [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then# Start a single TaskManager"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"else# Example output from `numactl --show` on an AWS c4.8xlarge:# policy: default# preferred node: current# physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35# cpubind: 0 1# nodebind: 0 1# membind: 0 1read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")for NODE_ID in "${NODE_LIST[@]:1}"; do# 注释: 通过 flink-daemon.sh 启动 TaskManager# 注释: flink-daemon.sh start taskExecutor# Start a TaskManager for each NUMA nodenumactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"donefi
fi

2.5 flink-daemon.sh

# Start/stop a Flink daemon.
USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]"# 注释: 启动命令: start  stop 等
STARTSTOP=$1# 注释: 启动角色: standalonesession, taskexecutor 等
DAEMON=$2
ARGS=("${@:3}") # get remaining arguments as arraybin=`dirname "$0"`
bin=`cd "$bin"; pwd`. "$bin"/config.shcase $DAEMON in(taskexecutor)#注释: TaskManager 的启动主类 TaskManagerRunnerCLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner;;(zookeeper)CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer;;(historyserver)CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer;;(standalonesession)# 注释: JobManager 的启动主类 StandaloneSessionClusterEntrypointCLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;;(standalonejob)CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint;;(*)echo "Unknown daemon '${DAEMON}'. $USAGE."exit 1;;
esacif [ "$FLINK_IDENT_STRING" = "" ]; thenFLINK_IDENT_STRING="$USER"
fiFLINK_TM_CLASSPATH=`constructFlinkClassPath`pid=$FLINK_PID_DIR/flink-$FLINK_IDENT_STRING-$DAEMON.pidmkdir -p "$FLINK_PID_DIR"# Log files for daemons are indexed from the process ID's position in the PID
# file. The following lock prevents a race condition during daemon startup
# when multiple daemons read, index, and write to the PID file concurrently.
# The lock is created on the PID directory since a lock file cannot be safely
# removed. The daemon is started with the lock closed and the lock remains
# active in this script until the script exits.
command -v flock >/dev/null 2>&1
if [[ $? -eq 0 ]]; thenexec 200<"$FLINK_PID_DIR"flock 200
fi# Ascending ID depending on number of lines in pid file.
# This allows us to start multiple daemon of each type.
id=$([ -f "$pid" ] && echo $(wc -l < "$pid") || echo "0")FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-${DAEMON}-${id}-${HOSTNAME}"
log="${FLINK_LOG_PREFIX}.log"
out="${FLINK_LOG_PREFIX}.out"log_setting=("-Dlog.file=${log}" "-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j.properties" "-Dlog4j.configurationFile=file:${FLINK_CONF_DIR}/log4j.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback.xml")JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed 's/.*version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')# Only set JVM 8 arguments if we have correctly extracted the version
if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; thenif [ "$JAVA_VERSION" -lt 18 ]; thenJVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m"fi
ficase $STARTSTOP in(start)# Rotate log filesrotateLogFilesWithPrefix "$FLINK_LOG_DIR" "$FLINK_LOG_PREFIX"# Print a warning if daemons are already running on hostif [ -f "$pid" ]; thenactive=()while IFS='' read -r p || [[ -n "$p" ]]; dokill -0 $p >/dev/null 2>&1if [ $? -eq 0 ]; thenactive+=($p)fidone < "${pid}"count="${#active[@]}"if [ ${count} -gt 0 ]; thenecho "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME."fifi# Evaluate user options for local variable expansionFLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS})# 注释: $JAVA_RUN = $JAVA_HOME/bin/javaecho "Starting $DAEMON daemon on host $HOSTNAME."$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 200<&- 2>&1 < /dev/null &mypid=$!# Add to pid file if successful startif [[ ${mypid} =~ ${IS_NUMBER} ]] && kill -0 $mypid > /dev/null 2>&1 ; thenecho $mypid >> "$pid"elseecho "Error starting $DAEMON daemon."exit 1fi;;(stop)if [ -f "$pid" ]; then# Remove last in pid fileto_stop=$(tail -n 1 "$pid")if [ -z $to_stop ]; thenrm "$pid" # If all stopped, clean up pid fileecho "No $DAEMON daemon to stop on host $HOSTNAME."elsesed \$d "$pid" > "$pid.tmp" # all but last line# If all stopped, clean up pid file[ $(wc -l < "$pid.tmp") -eq 0 ] && rm "$pid" "$pid.tmp" || mv "$pid.tmp" "$pid"if kill -0 $to_stop > /dev/null 2>&1; thenecho "Stopping $DAEMON daemon (pid: $to_stop) on host $HOSTNAME."kill $to_stopelseecho "No $DAEMON daemon (pid: $to_stop) is running anymore on $HOSTNAME."fifielseecho "No $DAEMON daemon to stop on host $HOSTNAME."fi;;(stop-all)if [ -f "$pid" ]; thenmv "$pid" "${pid}.tmp"while read to_stop; doif kill -0 $to_stop > /dev/null 2>&1; thenecho "Stopping $DAEMON daemon (pid: $to_stop) on host $HOSTNAME."kill $to_stopelseecho "Skipping $DAEMON daemon (pid: $to_stop), because it is not running anymore on $HOSTNAME."fidone < "${pid}.tmp"rm "${pid}.tmp"fi;;(*)echo "Unexpected argument '$STARTSTOP'. $USAGE."exit 1;;
esac

3.总结

  • JobManager 和 TaskManager 的启动都是通过 flink-daemon.sh 脚本运行启动

  • JobManager 的启动代号:standalonesession,实现类是StandaloneSessionClusterEntrypoint

  • TaskManager 的启动代号:taskexecutor,实现类是:TaskManagerRunner

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_1027637.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

单例模式如何保证实例的唯一性

前言 什么是单例模式 指一个类只有一个实例&#xff0c;且该类能自行创建这个实例的一种创建型设计模式。使用目的&#xff1a;确保在整个系统中只能出现类的一个实例&#xff0c;即一个类只有一个对象。对于频繁使用的对象&#xff0c;“忽略”创建时的开销。特点&#xff1a…

快速上手Spring Cloud 七:事件驱动架构与Spring Cloud

快速上手Spring Cloud 一&#xff1a;Spring Cloud 简介 快速上手Spring Cloud 二&#xff1a;核心组件解析 快速上手Spring Cloud 三&#xff1a;API网关深入探索与实战应用 快速上手Spring Cloud 四&#xff1a;微服务治理与安全 快速上手Spring Cloud 五&#xff1a;Spring …

基于PaddleNLP的深度学习对文本自动添加标点符号(二)

前言 基于PaddleNLP的深度学习对文本自动添加标点符号的源码版来了&#xff0c;本篇文章主要讲解如何文本自动添加标点符号的原理和相关训练方法&#xff0c;前一篇文章讲解的是使用paddlepaddle已经训练好的一些模型&#xff0c;在一些简单场景下可以通过这些模型进行预测&…

【Unity】调整Player Settings的Resolution设置无效

【背景】 Build时修改了Player Settings下的Resolution设置&#xff0c;但是再次Building时仍然不生效。 【分析】 明显是沿用了之前的分辨率设定&#xff0c;所以盲猜解决办法是Build相关的缓存文件&#xff0c;或者修改打包名称。 【解决】 实测修改版本号无效&#xf…

指针数组的有趣程序【C语言】

文章目录 指针数组的有趣程序指针数组是什么&#xff1f;指针数组的魅力指针数组的应用示例&#xff1a;命令行计算器有趣的颜色打印 结语 指针数组的有趣程序 在C语言的世界里&#xff0c;指针是一种强大的工具&#xff0c;它不仅能够指向变量&#xff0c;还能指向数组&#…

[机缘参悟-162/管理者与领导者-151] :受害者心态与受害者思维模式,如何克服受害者思维模式,管理者如何管理这种思维模式的人?

目录 一、受害者心态概述 1.1 什么是受害者心态 1.2 受害者心态的表现形式 1.3 受害者心态在职场上的表现 1.4 受害者思维模式 1.5 受害者心态的危害 二、受害者心态的成因 2.1 概述 2.2 神经网络与受害者心态 三、如何克服受害者心态 3.1 概述 3.2 职场 3.3 家庭…

verilog 从入门到看得懂---verilog 的基本语法各种语句

本篇文章主要介绍verilog里面常用的语句&#xff0c; 包括条件语句、循环语句块语句和生成语句。出了块语句和生成语句&#xff0c;其他的基本和c语言或者m语言一致。 1&#xff0c;if 语句&#xff0c;在需要判断逻辑的时候可以使用if语句&#xff0c;如 从输入a&#xff0c;…

《QT实用小工具·二》图片文字转base64编码

1、概述 源码放在文章末尾 base64编码转换类 图片转base64字符串。base64字符串转图片。字符转base64字符串。base64字符串转字符。后期增加数据压缩。Qt6对base64编码转换进行了重写效率提升至少200%。 下面是demo演示&#xff1a; 项目部分代码如下所示&#xff1a; #ifn…

解决npm init vue@latest证书过期问题:npm ERR! code CERT_HAS_EXPIRED

目录 一. 问题背景 二. 错误信息 三. 解决方案 3.1 临时解决办法 3.2 安全性考量 一. 问题背景 我在试图创建一个新的Vue.js项目时遇到了一个问题&#xff1a;npm init vuelatest命令出现了证书过期的错误。不过这是一个常见的问题&#xff0c;解决起来也简单。 二. 错误…

【aws】架构图工具推荐

碎碎念 以前以为日本冰箱论是个梗&#xff0c;结果居然是真的。用光盘传真其实还能理解&#xff08;毕竟我也喜欢电子古董2333&#xff09;&#xff0c;但是画架构图居然用的是excel&#xff0b;截图&#xff01;啊苍天呐&#xff0c;然后看到隔壁工位用excel画web原型又感觉释…

Python 从0开始 一步步基于Django创建项目(13)将数据关联到用户

在city_infos应用程序中&#xff0c;每个城市信息条目是关联到城市的&#xff0c;所以只需要将城市条目关联到用户即可。 将数据关联到用户&#xff0c;就是把‘顶层’数据关联到用户。 设计思路&#xff1a; 1、修改顶层数据模型&#xff0c;向其中添加‘用户’属性 2、根…

kubernetes K8s的监控系统Prometheus升级Grafana,来一个酷炫的Node监控界面(二)

上一篇文章《kubernetes K8s的监控系统Prometheus安装使用(一)》中使用的监控界面总感觉监控的节点数据太少&#xff0c;不能快算精准的判断出数据节点运行的状况。 今天我找一款非常酷炫的多维度数据监控界面&#xff0c;能够非常有把握的了解到各节点的数据&#xff0c;以及运…

「DevExpress中文教程」如何将DevExtreme JS HTML编辑器集成到WinForms应用

在本文中我们将演示一个混合实现&#xff1a;如何将web UI工具集成到WinForms桌面应用程序中。具体来说&#xff0c;我们将把DevExtreme JavaScript WYSIWYG HTML编辑器(作为DevExtreme UI组件套件的一部分发布的组件)集成到Windows Forms应用程序中。 获取DevExtreme v23.2正式…

数据分析之Power BI

POWER QUERY 获取清洗 POWER PIVOT建模分析 如何加载power pivot 文件-选项-加载项-com加载项-转到 POWER VIEW 可视呈现 如何加载power view 文件-选项-自定义功能区-不在功能区中的命令-新建组-power view-添加-确定 POWER MAP可视地图

2.4 比较检验 机器学习

目录 常见比较检验方法 总述 2.4.1 假设检验 2.4.2 交叉验证T检验 2.4.3 McNemar 检验 接我们的上一篇《性能度量》&#xff0c;那么我们在某种度量下取得评估结果后&#xff0c;是否可以直接比较以评判优劣呢&#xff1f;实际上是不可以的。因为我们第一&#xff0c;测试…

uniapp h5 touch事件踩坑记录

场景&#xff1a;悬浮球功能 当我给悬浮球设置了 position: fixed; 然后监听悬浮球的touch事件&#xff0c;从事件对象中拿到clientY和clientX赋值给悬浮球的left和top属性。当直接赋值后效果应该是这样子&#xff1a; 注意鼠标相对悬浮球的位置&#xff0c;应该就是左上角&a…

在 Windows 11 上安装 MongoDB

MongoDB 是一个流行的 NoSQL 数据库&#xff0c;它提供了灵活的数据存储方案&#xff0c;而 MongoDB Compass 则是一个可视化管理工具&#xff0c;可以更轻松地与 MongoDB 数据库交互和管理。在本文中&#xff0c;我们将介绍如何在 Windows 11 上安装 MongoDB&#xff0c;并配置…

Radio Silence for mac 好用的防火墙软件

Radio Silence for Mac是一款功能强大的网络防火墙软件&#xff0c;专为Mac用户设计&#xff0c;旨在保护用户的隐私和网络安全。它具备实时网络监视和控制功能&#xff0c;可以精确显示每个网络连接的状态&#xff0c;让用户轻松掌握网络活动情况。 软件下载&#xff1a;Radio…

B2902A是德科技B2902A精密型电源

181/2461/8938产品概述&#xff1a; Agilent B2902A 精密源/测量单元 (SMU) 是一款 2 通道、紧凑且经济高效的台式 SMU&#xff0c;能够源和测量电压和电流。它用途广泛&#xff0c;可以轻松、高精度地执行 I/V&#xff08;电流与电压&#xff09;测量。4 象限源和测量功能的集…

基于SpringCloud+Hadoop+Vue实现的企业级网盘系统实现

编程语言&#xff1a;Java、Mybatis、Spring、SpringBoot、SpringCloud、Node、Vue 开发环境&#xff1a;Windows 10 Mysql 开发工具&#xff1a;WebStorm、IDEA编译器、Git、Maven 应用部署服务器&#xff1a;SpringBoot内置Tomcat插件 Node服务器&#xff1a;Node v10.1…