• yarn集群NodeManager日志聚合慢问题解决方案


    背景

               正常情况作业提交到 Yarn 集群时,作业完成或者失败后,每个 NM 节点都会对每个 app 作业进行日志聚合操作,存储到hdfs指定的目录下,但是最近发现越来越多的任务通过yarn logs命令无法查询,经过排查发现很多任务的日志聚合变慢了,需要半小时甚至更多时间才能聚合完成。通过阅读源码才发现需要调大yarn.nodemanager.logaggregation.threadpool-size-max这个参数,默认是100,如果节点任务超过100,超过的任务日志聚合就会进行排队,因此导致聚合缓慢,可以增加该参数到500左右。

    1. <property>
    2. <name>yarn.nodemanager.logaggregation.threadpool-size-max</name>
    3. <value>500</value>
    4. </property>

    Yarn日志聚合源码分析

                      为了彻底弄明白聚合日志如何工作的,就需要了解 Yarn 中处理聚合日志的服务在哪里创建的,根据 ApplicationMaster启动及资源申请源码分析 文章分析,了解到Yarn 的第一个 Container 启动是用于 AppAttmpt 角色,也就是我们通常在 Yarn UI 界面看到的 ApplicationMaster 服务。所以我们来看看一个作业的第一个 Container 是如何启动以及如何创建日志记录组件 LogHandler 的。ApplicationMaster 通过调用 RPC 函数ContainerManagementProtocol#startContainers() 开始启动 Container,即 startContainerInternal() 方法,这部分逻辑做了两件事:

    • 发送 ApplicationEventType.INIT_APPLICATION 事件,对应用程序资源的初始化,主要是初始化各类必需的服务组件(如日志记录组件 LogHandler、资源状态追踪组件 LocalResourcesTrackerImpl等),供后续 Container 启动,通常来自 ApplicationMaster 的第一个 Container 完成,这里的 if 逻辑针对一个 NM 节点上运行作业的所有 Containers 只调用一次,后续的 Container 跳过这段 Application 初始化过程。
    • 发送 ApplicationEventType.INIT_CONTAINER 事件,对 Container 进行初始化操作。(这部分事件留在 Container 启动环节介绍)
    1. //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
    2. private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
    3. ContainerTokenIdentifier containerTokenIdentifier,
    4. StartContainerRequest request) throws YarnException, IOException {
    5. // 省略Token认证及ContainerLaunchContext上下文初始化
    6. this.readLock.lock();
    7. try {
    8. if (!serviceStopped) {
    9. // Create the application
    10. Application application =
    11. new ApplicationImpl(dispatcher, user, applicationID, credentials, context);
    12. // 应用程序的初始化,供后续Container使用,这个逻辑只调用一次,通常由来自ApplicationMaster的第一个Container完成
    13. if (null == context.getApplications().putIfAbsent(applicationID,
    14. application)) {
    15. LOG.info("Creating a new application reference for app " + applicationID);
    16. LogAggregationContext logAggregationContext =
    17. containerTokenIdentifier.getLogAggregationContext();
    18. Map<ApplicationAccessType, String> appAcls =
    19. container.getLaunchContext().getApplicationACLs();
    20. context.getNMStateStore().storeApplication(applicationID,
    21. buildAppProto(applicationID, user, credentials, appAcls,
    22. logAggregationContext));
    23. // 1.向 ApplicationImpl 发送 ApplicationEventType.INIT_APPLICATION 事件
    24. dispatcher.getEventHandler().handle(
    25. new ApplicationInitEvent(applicationID, appAcls,
    26. logAggregationContext));
    27. }
    28. // 2.向 ApplicationImpl 发送 ApplicationEventType.INIT_CONTAINER 事件
    29. this.context.getNMStateStore().storeContainer(containerId, request);
    30. dispatcher.getEventHandler().handle(
    31. new ApplicationContainerInitEvent(container));
    32. this.context.getContainerTokenSecretManager().startContainerSuccessful(
    33. containerTokenIdentifier);
    34. NMAuditLogger.logSuccess(user, AuditConstants.START_CONTAINER,
    35. "ContainerManageImpl", applicationID, containerId);
    36. // TODO launchedContainer misplaced -> doesn't necessarily mean a container
    37. // launch. A finished Application will not launch containers.
    38. metrics.launchedContainer();
    39. metrics.allocateContainer(containerTokenIdentifier.getResource());
    40. } else {
    41. throw new YarnException(
    42. "Container start failed as the NodeManager is " +
    43. "in the process of shutting down");
    44. }
    45. } finally {
    46. this.readLock.unlock();
    47. }
    48. }

    这里主要看看第1件事情,即向 ApplicationImpl 发送 ApplicationEventType.INIT_APPLICATION 事件,事件对应的状态机为 AppInitTransition 状态机。

    1. //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
    2. // Transitions from NEW state
    3. .addTransition(ApplicationState.NEW, ApplicationState.INITING,
    4. ApplicationEventType.INIT_APPLICATION, new AppInitTransition())

    AppInitTransition 状态机会对日志聚合组件服务进行初始化,关键行动是向调度器发送 LogHandlerEventType.APPLICATION_STARTED 事件。

    1. //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
    2. /**
    3. * Notify services of new application.
    4. *
    5. * In particular, this initializes the {@link LogAggregationService}
    6. */
    7. @SuppressWarnings("unchecked")
    8. static class AppInitTransition implements
    9. SingleArcTransition<ApplicationImpl, ApplicationEvent> {
    10. @Override
    11. public void transition(ApplicationImpl app, ApplicationEvent event) {
    12. ApplicationInitEvent initEvent = (ApplicationInitEvent)event;
    13. app.applicationACLs = initEvent.getApplicationACLs();
    14. app.aclsManager.addApplication(app.getAppId(), app.applicationACLs);
    15. // 初始化日志聚合组件服务
    16. // Inform the logAggregator
    17. app.logAggregationContext = initEvent.getLogAggregationContext();
    18. // 向调度器发送 LogHandlerEventType.APPLICATION_STARTED 事件
    19. app.dispatcher.getEventHandler().handle(
    20. new LogHandlerAppStartedEvent(app.appId, app.user,
    21. app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
    22. app.applicationACLs, app.logAggregationContext));
    23. }
    24. }

    想要弄清楚 LogHandlerEventType.APPLICATION_STARTED 事件做了什么,就要知道 LogHandlerEventType 类注册的事件处理器是什么以及事件处理器做了什么事情。这里的 register 方法对 LogHandlerEventType 类进行了注册,对应的 logHandler 事件处理器为 LogAggregationService 服务。

    1. //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
    2. @Override
    3. public void serviceInit(Configuration conf) throws Exception {
    4. // 定义日志处理器
    5. LogHandler logHandler =
    6. createLogHandler(conf, this.context, this.deletionService);
    7. addIfService(logHandler);
    8. // 注册 LogHandlerEventType 事件,logHandler 为对应的处理器
    9. dispatcher.register(LogHandlerEventType.class, logHandler);
    10. waitForContainersOnShutdownMillis =
    11. conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
    12. YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) +
    13. conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS,
    14. YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS) +
    15. SHUTDOWN_CLEANUP_SLOP_MS;
    16. super.serviceInit(conf);
    17. recover();
    18. }

    具体创建 logHandler 对象的调用,由于集群开启了日志聚合功能(由参数 yarn.log-aggregation-enable 控制),这里返回 LogAggregationService 服务。

    1. //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
    2. protected LogHandler createLogHandler(Configuration conf, Context context,
    3. DeletionService deletionService) {
    4. if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
    5. YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
    6. // 判断是否启用了日志聚合,由于集群开启了日志聚合,这里初始化 LogAggregationService 服务
    7. return new LogAggregationService(this.dispatcher, context,
    8. deletionService, dirsHandler);
    9. } else {
    10. return new NonAggregatingLogHandler(this.dispatcher, deletionService,
    11. dirsHandler,
    12. context.getNMStateStore());
    13. }
    14. }

    弄清楚了 LogHandlerEventType 类注册的服务是 LogAggregationService,我们就进入 LogAggregationService 类的 handle() 方法,看看上面的 LogHandlerEventType.APPLICATION_STARTED 事件做了什么事。

    1. //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
    2. @Override
    3. public void handle(LogHandlerEvent event) {
    4. switch (event.getType()) {
    5. // APPLICATION_STARTED 事件处理流程
    6. case APPLICATION_STARTED:
    7. LogHandlerAppStartedEvent appStartEvent =
    8. (LogHandlerAppStartedEvent) event;
    9. initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(),
    10. appStartEvent.getCredentials(),
    11. appStartEvent.getLogRetentionPolicy(),
    12. appStartEvent.getApplicationAcls(),
    13. appStartEvent.getLogAggregationContext());
    14. break;
    15. case CONTAINER_FINISHED:
    16. // 省略
    17. case APPLICATION_FINISHED:
    18. //省略
    19. default:
    20. ; // Ignore
    21. }
    22. }

    LogHandlerEventType.APPLICATION_STARTED 事件的关键逻辑在 initApp() 方法的调用。这段逻辑主要做了三件事:

    1. 判断 HDFS 上日志聚合的根目录是否存在,即 /tmp/logs/ 目录(具体为 hdfs://nameservice/tmp/logs),由参数 yarn.nodemanager.remote-app-log-dir 控制。(注意:这里的请求会阻塞读 HDFS)
    2. 创建作业日志聚合的 HDFS 目录,并初始化 app 日志聚合实例,采用线程池的方式启动日志聚合进程。(重点,这里会有请求阻塞写 HDFS,并且通过有限大小的线程池异步创建日志聚合线程去做日志的聚合)
    3. 根据构建的 ApplicationEvent 事件,向发送 ApplicationEventType.APPLICATION_LOG_HANDLING_INITED 事件,告知处理器日志聚合服务初始化完成。
    1. //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
    2. private void initApp(final ApplicationId appId, String user,
    3. Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
    4. Map<ApplicationAccessType, String> appAcls,
    5. LogAggregationContext logAggregationContext) {
    6. ApplicationEvent eventResponse;
    7. try {
    8. // 1、 判断 HDFS 上日志聚合的根目录是否存在,即 `/tmp/logs/` 目录(具体为 `hdfs://nameservice/tmp/logs`),由参数 `yarn.nodemanager.remote-app-log-dir` 控制
    9. verifyAndCreateRemoteLogDir(getConfig());
    10. // 重点:2、创建作业日志聚合的 HDFS 目录,并初始化 app 日志聚合实例,采用线程池的方式启动日志聚合进程
    11. initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls,
    12. logAggregationContext);
    13. // 构建 ApplicationEvent 事件
    14. eventResponse = new ApplicationEvent(appId,
    15. ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
    16. } catch (YarnRuntimeException e) {
    17. LOG.warn("Application failed to init aggregation", e);
    18. eventResponse = new ApplicationEvent(appId,
    19. ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED);
    20. }
    21. // 3、根据构建的 ApplicationEvent 事件,向发送 ApplicationEventType.APPLICATION_LOG_HANDLING_INITED 事件,告知处理器日志聚合服务初始化完成
    22. this.dispatcher.getEventHandler().handle(eventResponse);
    23. }

    第1件事比较简单,主要是是判断 HDFS 聚合日志的根目录是否存在,由于目录一般都存在,这一块主要是读 HDFS 请求。我们主要来看看 initApp() 方法做的第2件事,可以看到第3件事是发送 ApplicationEventType.APPLICATION_LOG_HANDLING_INITED 表示日志聚合服务初始化完成,包括创建作业在 HDFS 的日志聚合目录和启动日志聚合线程。所以基本可以知道第2件事的 initAppAggregator() 是会创建作业日志聚合目录,并启动日志聚合线程,具体的我们来看代码。

    这段代码其实主要做了两件事:

    1. 调用 createAppDir() 方法执行 HDFS 写请求为作业创建日志聚合的目录,即 hdfs://nameservice/tmp/logs//logs/ 目录,这里的写逻辑如果成功则只调用一次,一般是由第一个 Container 创建(即作业的 ApplicationMaster Container),其他 Container 只执行 HDFS 读请求判断该目录是否存在即可。
    2. 通过 threadPool 线程池创建每个作业在 NM 节点的日志聚合线程,异步处理本地日志的上传,该线程池大小由参数 yar、n.nodemanager.logaggregation.threadpool-size-max 控制,默认大小为 100.
    1. //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
    2. protected void initAppAggregator(final ApplicationId appId, String user,
    3. Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
    4. Map appAcls,
    5. LogAggregationContext logAggregationContext) {
    6. // Get user's FileSystem credentials
    7. final UserGroupInformation userUgi =
    8. UserGroupInformation.createRemoteUser(user);
    9. if (credentials != null) {
    10. userUgi.addCredentials(credentials);
    11. }
    12. // New application
    13. final AppLogAggregator appLogAggregator =
    14. new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
    15. getConfig(), appId, userUgi, this.nodeId, dirsHandler,
    16. getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy,
    17. appAcls, logAggregationContext, this.context,
    18. getLocalFileContext(getConfig()));
    19. if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
    20. throw new YarnRuntimeException("Duplicate initApp for " + appId);
    21. }
    22. // wait until check for existing aggregator to create dirs
    23. YarnRuntimeException appDirException = null;
    24. try {
    25. // 创建作业日志聚合目录,即 hdfs://nameservice/tmp/logs//logs/ 目录
    26. // Create the app dir
    27. createAppDir(user, appId, userUgi);
    28. } catch (Exception e) {
    29. appLogAggregator.disableLogAggregation();
    30. if (!(e instanceof YarnRuntimeException)) {
    31. appDirException = new YarnRuntimeException(e);
    32. } else {
    33. appDirException = (YarnRuntimeException)e;
    34. }
    35. appLogAggregators.remove(appId);
    36. closeFileSystems(userUgi);
    37. throw appDirException;
    38. }
    39. // 创建作业的日志聚合线程,并通过线程池启动日志聚合线程,异步上传 NM 节点的日志
    40. // Schedule the aggregator.
    41. Runnable aggregatorWrapper = new Runnable() {
    42. public void run() {
    43. try {
    44. appLogAggregator.run();
    45. } finally {
    46. appLogAggregators.remove(appId);
    47. closeFileSystems(userUgi);
    48. }
    49. }
    50. };
    51. this.threadPool.execute(aggregatorWrapper);
    52. }

    至此,从日志聚合服务组件的创建,到为作业初始化 HDFS 聚合日志目录,到启动日志聚合线程,整个日志聚合的调用逻辑已介绍完毕

  • 相关阅读:
    在线汽车交易平台数字化转型案例—BI工具是关键!
    9.14--贪心算法列题
    进程与线程的区别
    【应用层协议】初始Http,fiddler的使用
    Go数据库线程池操作问题
    NodeJS 实战系列:个人开发者应该如何选购云服务
    买卖股票系列问题——DP
    部署Redis集群
    再学责任链和代理模式
    图论(算法竞赛、蓝桥杯)--Dijkstra算法最短路
  • 原文地址:https://blog.csdn.net/aA518189/article/details/127578624