本文主要梳理一下Flink on Yarn集群启动流程,主要关注各组件之间调用关系的源码梳理,期望对大家阅读Flink源码时把握整体流程有一定的帮助。更细致的每个组件内部的构成及作用还仍待进一步的学习才能整理。
觉得文章有收获,欢迎关注公众号鼓励一下作者呀~
在学习的过程中,也搜集了一些量化、技术的视频及书籍资源,欢迎大家关注公众号【亚里随笔】获取
参考学习资料中绘制的动图,这里来梳理一下Flink on Yarn集群的启动流程。
1. 首先,用户通过脚本bin/flink run -t yarn-per-job -c xxx启动集群,并提交作业。
1.1 bin/flink脚本通过CliFrontend来运行
1.2 通过CliFrontendParser来解析命令参数
1.3 由命令行参数-t yarn-per-job 确定使用FlinkYarnSessionCli
1.4 executeProgram执行用户代码callMainMethod
1.5 StreamExecutionEnvironment.execute生成streamGraph
1.6 YarnJobClusterExecutor.execute生成jobGraph
1.7 & 1.8 YarnClusterDescripter.deployJobCluster将jobGraph部署到集群,startAppMaster
1.7 fileUploader上传jar包和配置至HDFS
1.8 封装Yarn ApplicationMaster的提交参数和命令
1.9 yarnClient.submitApplication(appContext) 向Yarn的Resource Manager提交任务信息
2. 启动yarn的ApplicationMaster,入口类YarnJobClusterEntrypoint
3. clusterEntrypoint.startCluster()启动集群
3.1 dispatcherRunnerFactory.createDispatcherRunner创建Dispatcher
3.2 Dispatcher.onStart()->startRecoveredJobs 创建并启动jobMaster (jobManagerRunner),JobMaster.startJobMasterServices启动slotPool
3.3 启动schedulerNG,生成executionGraph
3.4 resourceManagerService.start() 启动resourceManager
4. slotPool向slotManager注册、请求slot
5. slotManager向Yarn的ResourceManager申请新资源requestNewWorker
6. Yarn的ResourceManager启动新的TaskManager,入口类YarnTaskExecutorRunner
7. taskManagerRunner.start() 启动 taskExecutor
8. taskExecutor向slotManager注册slot
9. slotManager将slot分配给jobMaster
10. taskExecutor将slot提供给jobMaster的slotPool
11. submitTask() jobMaster提交任务至slot执行
上面我们整理了Flink on Yarn集群各模块的启动流程,这里我们从源码的调用关系的角度梳理各启动流程对应的源码调用。可以结合图1阅读对应的源码,应该能帮助大家快速地理清集群启动的流程。