Flink任务提交流程分析

任务启动

启动流程见 $FLINK_HOME/bin/flink
可见其启动入口类为:org.apache.flink.client.cli.CliFrontend

CliFrontend 任务入口类

main方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public static void main(final String[] args) {
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);

// 1. find the configuration directory
final String configurationDirectory = getConfigurationDirectoryFromEnv();

// 2. load the global configuration
// 加载配置目录下的 flink-conf.yaml 文件,先判断目录是否有效,flink-conf.yaml 文件是否存在
// 使用 loadYAMLResource 读取配置,返回 Configuration
final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);

// 3. load the custom command lines
// 获取 客户端 命令行帮助 文档
// 其中 loadCustomCommandLines 方法 负责加载两个不同的解析flink集群。一个是 flinkYarnSessionCLI ,另一个是 DefaultCLI

final List<CustomCommandLine<?>> customCommandLines = loadCustomCommandLines(
configuration,
configurationDirectory);

// 初始化 CliFrontend
try {
final CliFrontend cli = new CliFrontend(
configuration,
customCommandLines);

SecurityUtils.install(new SecurityConfiguration(cli.configuration));
int retCode = SecurityUtils
.getInstalledContext()
.runSecured( () -> cli.parseParameters(args) ); // 处理参数,即 run、stop、cancel等。并根据命令执行...
System.exit(retCode);
}
catch (Throwable t) {
final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
LOG.error("Fatal error while running command line interface.", strippedThrowable);
strippedThrowable.printStackTrace();
System.exit(31);
}
}
添加两种解析集群模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static List<CustomCommandLine<?>> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {
List<CustomCommandLine<?>> customCommandLines = new ArrayList<>(2);

// Command line interface of the YARN session, with a special initialization here
// to prefix all options with y/yarn.
// Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get the
// active CustomCommandLine in order and DefaultCLI isActive always return true.
final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
try {
customCommandLines.add(
loadCustomCommandLine(flinkYarnSessionCLI,
configuration,
configurationDirectory,
"y",
"yarn"));
} catch (NoClassDefFoundError | Exception e) {
LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
}

customCommandLines.add(new DefaultCLI(configuration));

return customCommandLines;
}
run方法 流程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
protected void run(String[] args) throws Exception {
LOG.info("Running 'run' command.");

// 获取 run 指令 可选参数列表,并将传入参数设置进 run 任务
final Options commandOptions = CliFrontendParser.getRunCommandOptions();

final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions);

final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, true);

final RunOptions runOptions = new RunOptions(commandLine);

// evaluate help flag
// 输出 run 命令 帮助文档
if (runOptions.isPrintHelp()) {
CliFrontendParser.printHelpForRun(customCommandLines);
return;
}

// 未指定 jar 包抛异常,退出
if (runOptions.getJarFilePath() == null) {
throw new CliArgsException("The program JAR file was not specified.");
}


final PackagedProgram program;

// 根据 jar 包和参数生成 program 即运行程序 根据 entryPointClass 通过 PackagedProgram 生成
try {
LOG.info("Building program from JAR file");
program = buildProgram(runOptions);
}
catch (FileNotFoundException e) {
throw new CliArgsException("Could not build the program from JAR file.", e);
}

// 解析集群是 yarn cluster 还是 standalone 模式
// 因为 在上一步 flinkYarnSessionCLI 是先添加的,所以会判断是否符合 flinkYarnSessionCLI 的条件,否则构建 standalone 集群
final CustomCommandLine<?> customCommandLine = getActiveCustomCommandLine(commandLine);

// 执行生成的 program
try {
runProgram(customCommandLine, commandLine, runOptions, program);
} finally {
// 删除抽取的依赖文件
program.deleteExtractedLibraries();
}
}