正常情况作业提交到 Yarn 集群时,作业完成或者失败后,每个 NM 节点都会对每个 app 作业进行日志聚合操作,存储到hdfs指定的目录下,但是最近发现越来越多的任务通过yarn logs命令无法查询,经过排查发现很多任务的日志聚合变慢了,需要半小时甚至更多时间才能聚合完成。通过阅读源码才发现需要调大yarn.nodemanager.logaggregation.threadpool-size-max这个参数,默认是100,如果节点任务超过100,超过的任务日志聚合就会进行排队,因此导致聚合缓慢,可以增加该参数到500左右。
- <property>
- <name>yarn.nodemanager.logaggregation.threadpool-size-max</name>
- <value>500</value>
- </property>
为了彻底弄明白聚合日志如何工作的,就需要了解 Yarn 中处理聚合日志的服务在哪里创建的,根据 ApplicationMaster启动及资源申请源码分析 文章分析,了解到Yarn 的第一个 Container 启动是用于 AppAttmpt 角色,也就是我们通常在 Yarn UI 界面看到的 ApplicationMaster 服务。所以我们来看看一个作业的第一个 Container 是如何启动以及如何创建日志记录组件 LogHandler 的。ApplicationMaster 通过调用 RPC 函数ContainerManagementProtocol#startContainers() 开始启动 Container,即 startContainerInternal() 方法,这部分逻辑做了两件事:
- //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
- private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
- ContainerTokenIdentifier containerTokenIdentifier,
- StartContainerRequest request) throws YarnException, IOException {
-
- // 省略Token认证及ContainerLaunchContext上下文初始化
-
- this.readLock.lock();
- try {
- if (!serviceStopped) {
- // Create the application
- Application application =
- new ApplicationImpl(dispatcher, user, applicationID, credentials, context);
-
- // 应用程序的初始化,供后续Container使用,这个逻辑只调用一次,通常由来自ApplicationMaster的第一个Container完成
- if (null == context.getApplications().putIfAbsent(applicationID,
- application)) {
- LOG.info("Creating a new application reference for app " + applicationID);
- LogAggregationContext logAggregationContext =
- containerTokenIdentifier.getLogAggregationContext();
- Map<ApplicationAccessType, String> appAcls =
- container.getLaunchContext().getApplicationACLs();
- context.getNMStateStore().storeApplication(applicationID,
- buildAppProto(applicationID, user, credentials, appAcls,
- logAggregationContext));
-
-
- // 1.向 ApplicationImpl 发送 ApplicationEventType.INIT_APPLICATION 事件
- dispatcher.getEventHandler().handle(
- new ApplicationInitEvent(applicationID, appAcls,
- logAggregationContext));
- }
-
- // 2.向 ApplicationImpl 发送 ApplicationEventType.INIT_CONTAINER 事件
- this.context.getNMStateStore().storeContainer(containerId, request);
- dispatcher.getEventHandler().handle(
- new ApplicationContainerInitEvent(container));
-
- this.context.getContainerTokenSecretManager().startContainerSuccessful(
- containerTokenIdentifier);
- NMAuditLogger.logSuccess(user, AuditConstants.START_CONTAINER,
- "ContainerManageImpl", applicationID, containerId);
- // TODO launchedContainer misplaced -> doesn't necessarily mean a container
- // launch. A finished Application will not launch containers.
- metrics.launchedContainer();
- metrics.allocateContainer(containerTokenIdentifier.getResource());
- } else {
- throw new YarnException(
- "Container start failed as the NodeManager is " +
- "in the process of shutting down");
- }
- } finally {
- this.readLock.unlock();
- }
- }
这里主要看看第1件事情,即向 ApplicationImpl 发送 ApplicationEventType.INIT_APPLICATION 事件,事件对应的状态机为 AppInitTransition 状态机。
- //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
- // Transitions from NEW state
- .addTransition(ApplicationState.NEW, ApplicationState.INITING,
- ApplicationEventType.INIT_APPLICATION, new AppInitTransition())
AppInitTransition 状态机会对日志聚合组件服务进行初始化,关键行动是向调度器发送 LogHandlerEventType.APPLICATION_STARTED 事件。
- //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
- /**
- * Notify services of new application.
- *
- * In particular, this initializes the {@link LogAggregationService}
- */
- @SuppressWarnings("unchecked")
- static class AppInitTransition implements
- SingleArcTransition<ApplicationImpl, ApplicationEvent> {
- @Override
- public void transition(ApplicationImpl app, ApplicationEvent event) {
- ApplicationInitEvent initEvent = (ApplicationInitEvent)event;
- app.applicationACLs = initEvent.getApplicationACLs();
- app.aclsManager.addApplication(app.getAppId(), app.applicationACLs);
-
- // 初始化日志聚合组件服务
- // Inform the logAggregator
- app.logAggregationContext = initEvent.getLogAggregationContext();
- // 向调度器发送 LogHandlerEventType.APPLICATION_STARTED 事件
- app.dispatcher.getEventHandler().handle(
- new LogHandlerAppStartedEvent(app.appId, app.user,
- app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
- app.applicationACLs, app.logAggregationContext));
- }
- }
想要弄清楚 LogHandlerEventType.APPLICATION_STARTED 事件做了什么,就要知道 LogHandlerEventType 类注册的事件处理器是什么以及事件处理器做了什么事情。这里的 register 方法对 LogHandlerEventType 类进行了注册,对应的 logHandler 事件处理器为 LogAggregationService 服务。
- //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
- @Override
- public void serviceInit(Configuration conf) throws Exception {
- // 定义日志处理器
- LogHandler logHandler =
- createLogHandler(conf, this.context, this.deletionService);
- addIfService(logHandler);
- // 注册 LogHandlerEventType 事件,logHandler 为对应的处理器
- dispatcher.register(LogHandlerEventType.class, logHandler);
-
- waitForContainersOnShutdownMillis =
- conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
- YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) +
- conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS,
- YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS) +
- SHUTDOWN_CLEANUP_SLOP_MS;
-
- super.serviceInit(conf);
- recover();
- }
具体创建 logHandler 对象的调用,由于集群开启了日志聚合功能(由参数 yarn.log-aggregation-enable 控制),这里返回 LogAggregationService 服务。
- //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
- protected LogHandler createLogHandler(Configuration conf, Context context,
- DeletionService deletionService) {
- if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
- YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
- // 判断是否启用了日志聚合,由于集群开启了日志聚合,这里初始化 LogAggregationService 服务
- return new LogAggregationService(this.dispatcher, context,
- deletionService, dirsHandler);
- } else {
- return new NonAggregatingLogHandler(this.dispatcher, deletionService,
- dirsHandler,
- context.getNMStateStore());
- }
- }
弄清楚了 LogHandlerEventType 类注册的服务是 LogAggregationService,我们就进入 LogAggregationService 类的 handle() 方法,看看上面的 LogHandlerEventType.APPLICATION_STARTED 事件做了什么事。
- //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
- @Override
- public void handle(LogHandlerEvent event) {
- switch (event.getType()) {
- // APPLICATION_STARTED 事件处理流程
- case APPLICATION_STARTED:
- LogHandlerAppStartedEvent appStartEvent =
- (LogHandlerAppStartedEvent) event;
- initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(),
- appStartEvent.getCredentials(),
- appStartEvent.getLogRetentionPolicy(),
- appStartEvent.getApplicationAcls(),
- appStartEvent.getLogAggregationContext());
- break;
- case CONTAINER_FINISHED:
- // 省略
- case APPLICATION_FINISHED:
- //省略
- default:
- ; // Ignore
- }
- }
LogHandlerEventType.APPLICATION_STARTED 事件的关键逻辑在 initApp() 方法的调用。这段逻辑主要做了三件事:
/tmp/logs/ 目录(具体为 hdfs://nameservice/tmp/logs),由参数 yarn.nodemanager.remote-app-log-dir 控制。(注意:这里的请求会阻塞读 HDFS)- //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
- private void initApp(final ApplicationId appId, String user,
- Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
- Map<ApplicationAccessType, String> appAcls,
- LogAggregationContext logAggregationContext) {
- ApplicationEvent eventResponse;
- try {
- // 1、 判断 HDFS 上日志聚合的根目录是否存在,即 `/tmp/logs/` 目录(具体为 `hdfs://nameservice/tmp/logs`),由参数 `yarn.nodemanager.remote-app-log-dir` 控制
- verifyAndCreateRemoteLogDir(getConfig());
- // 重点:2、创建作业日志聚合的 HDFS 目录,并初始化 app 日志聚合实例,采用线程池的方式启动日志聚合进程
- initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls,
- logAggregationContext);
- // 构建 ApplicationEvent 事件
- eventResponse = new ApplicationEvent(appId,
- ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
- } catch (YarnRuntimeException e) {
- LOG.warn("Application failed to init aggregation", e);
- eventResponse = new ApplicationEvent(appId,
- ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED);
- }
- // 3、根据构建的 ApplicationEvent 事件,向发送 ApplicationEventType.APPLICATION_LOG_HANDLING_INITED 事件,告知处理器日志聚合服务初始化完成
- this.dispatcher.getEventHandler().handle(eventResponse);
- }
第1件事比较简单,主要是是判断 HDFS 聚合日志的根目录是否存在,由于目录一般都存在,这一块主要是读 HDFS 请求。我们主要来看看 initApp() 方法做的第2件事,可以看到第3件事是发送 ApplicationEventType.APPLICATION_LOG_HANDLING_INITED 表示日志聚合服务初始化完成,包括创建作业在 HDFS 的日志聚合目录和启动日志聚合线程。所以基本可以知道第2件事的 initAppAggregator() 是会创建作业日志聚合目录,并启动日志聚合线程,具体的我们来看代码。
这段代码其实主要做了两件事:
hdfs://nameservice/tmp/logs//logs/ 目录,这里的写逻辑如果成功则只调用一次,一般是由第一个 Container 创建(即作业的 ApplicationMaster Container),其他 Container 只执行 HDFS 读请求判断该目录是否存在即可。yar、n.nodemanager.logaggregation.threadpool-size-max 控制,默认大小为 100.- //位置:org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
- protected void initAppAggregator(final ApplicationId appId, String user,
- Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
- Map
appAcls, - LogAggregationContext logAggregationContext) {
-
- // Get user's FileSystem credentials
- final UserGroupInformation userUgi =
- UserGroupInformation.createRemoteUser(user);
- if (credentials != null) {
- userUgi.addCredentials(credentials);
- }
-
- // New application
- final AppLogAggregator appLogAggregator =
- new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
- getConfig(), appId, userUgi, this.nodeId, dirsHandler,
- getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy,
- appAcls, logAggregationContext, this.context,
- getLocalFileContext(getConfig()));
- if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
- throw new YarnRuntimeException("Duplicate initApp for " + appId);
- }
- // wait until check for existing aggregator to create dirs
- YarnRuntimeException appDirException = null;
- try {
- // 创建作业日志聚合目录,即 hdfs://nameservice/tmp/logs/
/logs/ 目录 - // Create the app dir
- createAppDir(user, appId, userUgi);
- } catch (Exception e) {
- appLogAggregator.disableLogAggregation();
- if (!(e instanceof YarnRuntimeException)) {
- appDirException = new YarnRuntimeException(e);
- } else {
- appDirException = (YarnRuntimeException)e;
- }
- appLogAggregators.remove(appId);
- closeFileSystems(userUgi);
- throw appDirException;
- }
-
- // 创建作业的日志聚合线程,并通过线程池启动日志聚合线程,异步上传 NM 节点的日志
- // Schedule the aggregator.
- Runnable aggregatorWrapper = new Runnable() {
- public void run() {
- try {
- appLogAggregator.run();
- } finally {
- appLogAggregators.remove(appId);
- closeFileSystems(userUgi);
- }
- }
- };
- this.threadPool.execute(aggregatorWrapper);
- }
至此,从日志聚合服务组件的创建,到为作业初始化 HDFS 聚合日志目录,到启动日志聚合线程,整个日志聚合的调用逻辑已介绍完毕