YARN部分
介绍下YARN
可回答:1)YARN的RM和NM;2)YARN有哪些组件,如何分配资源;3)YARN的AM和RM的作用; 回答技巧:YARN的架构,执行流程等
问过的一些公司:字节,字节(2021.08),网易云音乐×2,蘑菇街x2,美团,小鹏汽车,一点资讯,头 条,央视网,海康x2,海康(2021.09),恒生(2021.09)
参考答案:
介绍YARN,可以先考虑下面两个问题
YARN基础架构
YARN工作机制
可回答:1)YARN的任务提交流程;2)YARN的通信流程或工作流程;3)YARN执行一个application的流 程;4)Hadoop任务的YARN调度过程;5)YARN上提交资源(程序)的流程;6)YARN的资源调度流
程;7)YARN的任务提交流程
问过的一些公司:字节x2,字节(2021.08)-(2022.03),网易云音乐,快手,海康x2,转转,滴滴,作业 帮,虎牙(2021.09),四方伟业(2021.08),蔚来(2021.09)
参考答案:
1)MapReduce程序提交到客户端所在的节点。2)YarnRunner向ResourceManager申请一个Application。
YARN有什么优势,能解决什么问题?
问过的一些公司:祖龙娱乐参考答案:
YARN的优点
解决了单点故障问题,由于每一个任务由一个AppMaster进行调度,且可进行AppMaster出错重试, 从而使单点故障影响到多个任务进行问题不存在。
解决了单点压力过大问题,每一个任务由一个AppMaster进行调度,而每一个AppMaster都是由集群 中资源较为充足的结点进行启动,调度任务,起到一个负载均衡的作用。
完成了资源管理和任务调度的解耦,Yarn只负责对集群资源的管理,各个计算框架只要继承了
AppMaster,就可以共同使用Yarn资源管理,更加充分地利用集群资源。
解决的问题
在Hadoop 1.x版本时,JobTracker和TaskTracker是常服务,资源管理和任务调度的耦合,而在Hadoop 2.x 版本之后,Yarn将二者分离,只有资源管理成为了常服务,而任务调度则变成只有任务在调度时,才启 用的临时服务。
YARN容错机制
问过的一些公司:美团参考答案:
在现实情况中,用户代码错误不断,进程奔溃,机器故障等等。使用hadoop的好处之一就是可以它能处 理这类故障并成功完成任务。需要考虑的实体失败任务为:任务(job),Application Master, NodeManager和ResourceManager。
任务失败
任务失败可能存在以下几种情况:
MapTask或者ReduceTask中由于代码原因抛出异常,jvm在关闭之前,会通知mrAppMaster这个task任务 失败,在mrAppMaster中,错误报告被写入到用户日志并且任务标记为失败,并释放jvm资源,供其他任 务使用。对于streaming任务,如果streaming进程以非0退出代码退出,则被标记为失败。这种行为由stream.non.zero.is.failure属性(默认值为true)控制。
jvm突然退出,可能是由于jvm缺陷而导致mr用户代码由于某种特殊原因造成jvm退出。nodeManage会将 这消息通知到mrAppMaster,标记此次任务失败。
任务挂起(可能是由于资源不足造成):一旦mrAppMaster一段时间没有接收到进度的更新,则将任务 标记为失败,nodeManager会将该jvm进程杀死。任务失败时长可以由mapreduce.task.timeout来设置。 如果为0 ,则表示关闭。如果关闭这个属性,那么可能会造成长时间运行的任务不会被标记为失败,被挂起的任务就会一直不被释放资源,长时间会造成集群效率降低,因此尽量避免这个设置。同时充分保 证每个任务定期更新进度。
处理阶段:
当mrAppMaster被告知,一个任务失败的时候,会重新调度该任务。mrAppMaster会尝试避免在以前失败 过的nodeManager重新调度该任务。此外,一个任务失败的次数超过4次,将不会再重新调度。这个数值 由mapreduce.map.maxattempts控制。如果一个任务失败次数大于该属性设置的,则整个作业都会失
败。对于一些应用程序中,不希望少部分任务失败,而导致整个作业失败,因为即使一些任务失败,作 业的输出结果也是可用的,我们可用通过运行任务失败的最大比例:maptask由mapreduce.map.failures.maxpercent,reducetask由mapreduce.reduce.failures.maxpercent来设置。任务尝试也是可以用来中止(killed),因为它是一个推测副本(如果一个任务执行时间比预期的慢的时候, 会启动另外一个相同的任务作为备份,这个任务为推测执行)或者它所在的nodeManager失败,导致该nodeManager所执行的任务被标记为killed,被中止的任务是不会被记录到任务运行尝试次数。
ApplicationMaster运行失败
在YARN中,ApplicationMaster有几次尝试次数,最多尝试次数由:mapreduce.am.max-attempts和
yarn.resourcemanager.am.max-attempts确定,默认为2。
mapreduce.am.max-attempts:表示mrAppMaster失败最大次数
yarn.resourcemanager.am.max-attempts:表示在YARN中运行的应用程序失败最大次数。 所以如果要设置mrAppMaster最大失败次数,这两个都需要设置。
在ApplicationMaster向resourceManager定期发送心跳,当ResourceManager检查到ApplicationMaster失败 的时候,ResourceManager会在新的NodeManager开启新的ApplicationMaster实例。如果是mrAppMaster,则会使用作业历史来恢复作业的运行状态,不必重新运行,由yarn.app.mapreduce.am.job.recovery.enable来控制该功能。
MapReduce客户端向mrAppMaster来轮询进度报告,如果mrAppMaster失败了,则客户端通过询问ResourceManager会定位新的mrAppMaster实例。在整个MapReduce任务作业初始化的时候,客户端会向ResourceManager询问并缓存mrAppMaster地址。
NodeManager运行失败
当NodeManager由于奔溃或者非常缓慢运行而失败,会停止向ResourceManager发送心跳信息。则如果10 分钟内(由yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms来设置,以ms为单位), ResourceManager会停止通知发送的NodeManager,并将起从自己的节点池中移除。
在失败的NodeManager上的任务或者ApplicationMaster将由上面的机制恢复。对于曾经在失败的NodeManager运行并且成功的Map Task,如果属于为完成的作业,则ApplicationMaster则会重新分配资源重新运行,因为输出结果在失败的NodeManager的本地文件系统中,Reduce任务可能无法访问到。
如果在一个NodeManager中,任务失败次数过多,即使自己并没有失败过,则ApplicationMaster则会尽 量将任务调度到其他的NodeManager上。失败次数由mapreduce.job.maxtaskfailures.per.tracker设置。
ResourceManager运行失败
在YARN中,ResourceManager失败是个致命的问题,如果失败,任何任务和作业都无法启动。在默认配 置中,ResourceManager是单点故障。为了获得高可用(HA),我们需要配置一对ResourceManager,在 主ResourceManager失败后,备份ResourceManager可以继续运行。
将所有的ApplicationMaster的运行信息保存到一个高可用的状态存储中(由ZooKeeper或者HDFS备
份),这样备份ResourceManager就可以恢复出失败的ResourceManager状态。当新的ResourceManager 从存储区读取ApplicationMaster,然后在集群中重启所有ApplicationMaster。这个行为不会计入到ApplicationMaster尝试。
ResourceManager在主备切换由故障转移器(failover controller)处理。默认情况,failover controller自动工作,由ZooKeeper的Leader选举,保证同一时刻只有一个主ResourceManager。不同于HDFS的HA, 该failover controller不必是单独的进程,而是嵌入ResourceManager中。
YARN高可用
问过的一些公司:蘑菇街参考答案:
ResourceManager(RM)负责资源管理,并调度应用作业。在Hadop2.4之前RsourceManage是单点的,容易 产生单点故障。HA 提供活动和备用的RM,解决了一个RM单点故障问题。
ResourceManager存在单点故障,基于Zookeeper实现HA,通常任务失败后,RM将失败的任务告诉AM, RM负责任务的重启,AM来决定如何处理失败的任务。RMAppMaster会保存已经运行完成的Task,启后无 需重新运行。
HA架构图
集群概述
RM:ResourceManage:r一个集群只有一个active状态的,负责整个集群的管理和调度 处理客户端请求
启动监控ApplicationMaster(AM,一个作业对应一个)
监控NM
系统资源的分配和调度
NM:负责单个节点的资源管理和使用以及task运行
定期想RM汇报本节点的资源使用情况和container运行情况接收处理RM对container的启停各种命令
单节点资源管理和任务管理
ZK
在ZooKeeper上会有一个/yarn-leader-election/yarn1的锁节点,所有的ResourceManager在启动的时候, 都会去竞争写一个Lock子节点:/yarn-leader-election/yarn1/ActiveBreadCrumb,该节点是临时节点。ZooKeepr能够为我们保证最终只有一个ResourceManager能够创建成功。创建成功的那个ResourceManager就切换为Active状态,没有成功的那些ResourceManager则切换为Standby状态。
ZKFC
是RM里面的一个线程,在HDFS HA中,zkfc是一个独立的进程。作用 是监控RM的健康状态,并执行选举作用。
RMStateStore
RM会把job的信息存放在zookeeper的/rmstore目录下,active RM会向这个目录写app的信息。当active RM挂掉之后,standby RM会通过zkfc切换为active状态,然后从zookeeper的/rmstore目录下读取相应的作业信息。重新构建作业的内存信息,启动内部服务,开始接受NM的心跳信息,构建集群的资源信 息,并且接受客户端的作业提交请求
YARN调度器
可回答:1)YARN有哪些调度策略?2)Hadoop中有哪些调度器;3)了解YARN哪些调度
问过的一些公司:转转,滴滴,作业帮,大华(2021.07), soul(2021.09),陌陌(2021.10)
参考答案:
目前,Hadoop 作业调度器主要有三种:FIFO、容量(Capacity Scheduler)和公平(Fair Scheduler)。Apache Hadoop3.1.3 默认的资源调度器是 Capacity Scheduler。
CDH 框架默认调度器是 Fair Scheduler。具体设置详见:yarn-default.xml 文件
多队列:每个队列可配置一定的资源量,每个队列采用FIFO调度策略容量保证:管理员可为每个队列设置资源最低保证和资源使用上限
灵活性:如果一个队列中的资源有剩余,可以暂时共享给那些需要资源的队列,而一旦该队列有新的应 用程序提交,则其它队列借调的资源会归还给该队列
多租户:
支持多用户共享集群和多应用程序同事运行
为了防止同一用户的作业独占队列中的资源,该调度器会对同一用户提交的作业所占资源进行限定
3、公平调度器(Fair Scheduler)
与容量调度器相同点
多队列:支持多队列多作业
容量保证:管理员可为每个队列设置资源最低保证和资源使用上线
灵活性:如果一个队列中的资源有剩余,可以暂时共享给那些需要资源的队列,而一旦该队列有新 的应用程序中提交,则其它队列借调的资源会归还给该队列
多用户:支持多用户共享集群和多应用程序同事运行;为了防止同一用户的作业独占队列总的资 源,该调度器会对同一用户提交的作业所占资源进行限定
与容量调度器不同点
核心调度策略不同
容量调度器:优先选择资源利用率低的队列公平调度器:有限选择对资源的缺额比例大的
每个队列可以单独设置资源分配方式
容量调度器:FIFO、DRF
公平调度器:FIFO、FAIR、DRF
YARN中Container是如何启动的?
问过的一些公司:海康威视参考答案:
1、ApplicationMaster的主要逻辑
AM与NM通信
AM与NM们通过NMClientAsync通信,后者需要调用方提供一个回调类,NM会在合适的时机调用回调类中 的方法来通知AM。回调类被AM实现为NMCallbackHandler,其中最重要的两个函数是:
onContainerStarted(),当NM新启动了Containers时,会调用改方法,把Container列表传给它。onContainerStopped(),当NM停止了一些Containers时,会调用改方法,把Container 列表传给它。
AM与RM通信
AM与RM通过AMRMClientAsync通信。
首先,通过 AMRMClientAsync.registerApplicationMaster() 向 RM 注册自己。
然后AM开始提交对Container的需求,在申请到需要数量的Container之前,先调用setupContainerAskForRM()设置对Container的具体需求(优先级、资源等),然后调用AMRMClientAsync.addContainerRequest()把需求提交给RM,最终该方法会把需求存到一个集合(AMRMClient.ask)里面。
AMRMClientAsync同样需要调用方提供一个回调类,AM实现为RMCallbackHandler。这个回调类主要实现 了两个方法:
onContainersAllocated(),获得新申请的Container,创建一个新线程,设置ContainerLaunchContext,最终调用NMClientAsync.startContainerAsync() 来启动Container。onContainersCompleted(),检查已完成的Container的数量是否达到了需求,没有的话,继续添加需 求。
AM的三个主流程
总结上面说的,AM有三个主要流程与Container的创建密切相关: 提交需求,通过心跳,把需求发送给RM;
获取Container,通过心跳,拿到申请好的Container;
每申请到一个Container ,与NM 通信,启动这个Container;
分析清楚了这三个主流程,也就清楚了 YARN Container 的启动逻辑。
Application 与 ResourceManager 的心跳
再看RM这边,在AM向RM注册时,RM最终会生成一个代表这个APP的实例,我们先不分析注册的具体过程,只要知道在我们的情景下,最终是生成了一个FicaSchedulerApp。
AM与RM进行心跳,发送的信息中含有:
AM告诉RM两个信息: a) 自己对Container的要求,b) 已经用完的待回收的Container列表。
RM给AM的回应:a) 新申请的Container,b) 已经完成的Container 的状态。
ApplicationMasterService是RM的一个组成部分。RM启动时,会初始化这个服务,并根据配置,把相应的 调度器YarnScheduler传进来。它实现了ApplicationMasterProtocol接口,负责对来自AM的 RPC 请求进行回应。在我们的情景中, ApplicationMasterService.allocate() 方法会被调用,核心逻辑是:
触发RMappAttemptStatusupdateEvent 事件。
调用YarnScheduler.allocate() 方法,把执行的结果封装起来返回。YarnScheduler 是与调度器通信的接口。所以,最后调用的是具体调度器的allocate() 方法。
我们使用的是 FIFO 调度器,FifoScheduler.allocate() 方法的主要做两件事情:
调用FicaSchedulerApp.updateResourceRequests()更新APP(指从调度器角度看的APP) 的资源需求。通过FicaSchedulerApp.pullNewlyAllocatedContainersAndNMTokens()把FicaSchedulerApp.newlyAllocatedContainers这个List 中的Container取出来,封装后返回。
FicaSchedulerApp.newlyAllocatedContainers 这个数据结构中存放的,正是最近申请到的 Container 。那么,这个 List 中的元素是怎么来的呢,这要从 NM 的心跳说起。
NodeManager与ResourceManager的心跳
NM 需要和 RM 进行心跳,让 RM 更新自己的信息。心跳的信息包含:
Request(NM->RM) : NM 上所有Container 的状态;
Response(RM->NM) : 已待删除和待清理的Container 列表
NM 启动时会向RM注册自己,RM生成对应的RMNode结构,代表这个NM ,存放了这个NM的资源信息以及其他一些统计信息。
负责具体心跳的,在NM这边是NodeStatusUpdater服务,在RM那边则是ResourceTrackerService服务。心 跳的信息包括这个NM的状态,其中所有Container的状态等。
心跳最终通过RPC调用到了ResourceTrackerService.nodeHeartbeat() 。其核心逻辑就是触发一个RMNodeStatusEvent(RMNodeEventType.STATUS_UPDATE) 事件,这个事件由 NM 注册时生成的RMNode处理。
RMNode接收RMNodeStatusEvent(RMNodeEventType.STATUS_UPDATE) 消息,更新自己的状态机,然后调用 StatusUpdateWhenHealthyTransition.transition ,该方法从参数中获得这个NM所有的Container的信
息,根据其状态分成两组:a) 刚申请到还未使用的,b) 运行完毕需要回收的,这两组 Container 的信息存 放 在 RMNode 的 一 个 队 列 中 。 接 着 , 发 出 一 个 消 息 : NodeUpdateSchedulerEvent(SchedulerEventType.NODE_UPDATE) 。这个消息,由调度器处理。
ResourceManager处理 NODE_UPDATE 消息
RM 接收到 NM 的心跳后,会发出一个 SchedulerEventType.NODE_UPDATE 的消息,改消息由调度器处理。FifoScheduler 接收到这个消息后,调用了 FifoScheduler.nodeUpdate() 方法。与 Container 申请相关的主要逻辑如下:
获取已申请到的
从 RMNode 中获取出那些「刚申请还未使用」的 Container (NM 与 RM 心跳是获得),发出消息:
RMContainerEventType.LAUNCHED,该消息由 RMContainer 处理;
回收已完成的
从 RMNode 中获取出那些「已经使用完待回收」的 Container,进行回收(具体回收过程略);
申请新的
在这个 NM 上申请新的 Container:
通过 FicaSchedulerApp.getResourceRequest() 拿到资源请求(ResourceRequest)
计算可申请的资源,调用 FicaSchedulerApp.allocate(),根据传进来的参数,封装出一个 RMContainer 添加到 newlyAllocatedContainers 中。然后触发事件 RMContainerEventType.START。该事件之后会由RMContainer 处理。
调用 FicaSchedulerNode.allocateContainer()和RMContainer对RMContainerEventType事件进行处理处理:
RMContainerEventType.START : 状态从 NEW 变为 ALLOCATED,最终触发事件RMAppAttemptEvent(type=CONTAINER_ALLOCATED), 改事件由 RMAppAttemptImpl 处理。RMContainerEventType.LAUNCHED : 状态从 ACQUIED 变为 RUNNING 。
RMAppAttemptImpl对RMAppAttemptEvent事件进行处理,该事件告诉就是告诉AppAttempt ,你这个APP 有Container申请好了,AppAttempt 检查自己的状态,如果当前还没有运行AM ,就把这个Container拿来运行AM。
到此,我们已经理清楚了FicaSchedulerApp.newlyAllocatedContainers中元素的来源,也就理清楚了,AM 与 RM 心跳中获得的那些「新申请」的 Container 的来源。
ApplicationMaster 与 NodeManager 通信启动 Container
关于“AM的三个主流程”,上面已经讲过了。
基于上面的分析,第1,2两个流程已经清楚。下面我们来具体看看 NM 具体是怎么启动一个 Container
的。
AM 设置好 ContainerLaunchContext , 调用 NMClientAsync.startContainerAsync() 启动Container。
NMClientAsync 中有一个名叫 events 的事件队列,同时,NMClientAsync 还启动这一个线程,不断地从
events 中取出事件进行处理。
startContainerAsync() 方法被调用时,会生成一个 ContainerEvent(type=START_CONTAINER) 事件放入events 队列。对于这个事件,处理逻辑是调用 NMClient.startContainer() 同步地启动 Container ,然后调用回调类中的 onContainerStarted() 方法。
NMClient 最终会调用 ContainerManagementProtocol.startContainers() ,以 Google Protocol Buffer 格式, 通过 RPC 调用 NM 的对应方法。NM 处理后会返回成功启动的 Container 列表。
NodeManager 中启动 Container ContainerManagerImpl
NM 中负责响应来自 AM 的 RPC 请求的是 ContainerManagerImpl ,它是 NodeManager 的一部分,负责Container 的管理,在 Nodemanager 启动时,该服务被初始化。该类实现了接口ContainerManagementProtocol ,接到 RPC 请求后,会调用 ContainerManagerImpl.startContainers() 。改函数的基本逻辑是:
首先进行APP 的初始化(如果还没有的话),生成一个ApplicationImpl 实例,然后根据请求,生成一堆ContainerImpl 实例
触发一个新事件:ApplicationContainerInitEvent ,之前生成的ApplicationImpl 收到改事件,又出发一个ContainerEvent(type=INIT_CONTAINER) 事件,这个事件由ContainerImpl 处理
ContainerImpl 收到事件, 更新状态机,启动辅助服务,然后触发一个新事件
ContainersLaucherEvent(type=LAUNCH_CONTAINER) ,处理这个事件的是ContainersLauncher 。
ContainerLauncher 是 ContainerManager 的一个子服务,收到ContainersLaucherEvent(type=LAUNCH_CONTAINER) 事件后,组装出一个 ContainerLaunch 类并使用ExecutorService 执行。
ContainerLaunch 类负责一个 Container 具体的 Lanuch 。基本逻辑如下:
设置运行环境,包括生成运行脚本,Local Resource ,环境变量,工作目录,输出目录等触发新事件ContainerEvent(type=CONTAINER_LAUNCHED),该事件由ContainerImpl 处理。调用ContainerExecutor.launchContainer() 执行Container 的工作,这是一个阻塞方法。
执行结束后,根据执行的结果设置Container 的状态。
ContainerExecutor
ContainerExecutor 是 NodeManager 的一部分,负责 Container 中具体工作的执行。该类是抽象类,可以有不同的实现,如 DefaultContainerExecutor ,DockerContainerExecutor ,LinuxContainerExecutor 等。根据 YARN 的配置,NodeManager 启动时,会初始化具体的 ContainerExecutor 。
ContainerExecutor 最主要的方法是 launchContainer() ,该方法阻塞,直到执行的命令结束。
DefaultContainerExecutor 是默认的 ContainerExecutor ,支持 Windows 和 Linux 。它的 launchContainer()
的逻
创建Container 需要的目录
拷贝Token、运行脚本到工作目录
做一些脚本的封装,然后执行脚本,返回状态码
至此,Container 在 NM 中已经启动,AM 中 NMCallback 回调类中的 onContainerStarted() 方法被调用。
YARN的改进之处,Hadoop 3.x相对于Hadoop 2.x?
问过的一些公司:字节参考答案:
YARN Timeline Service版本更新到v.2
本版本引入了Yarn时间抽服务v.2,主要用于解决2大挑战:改善时间轴服务的可伸缩性和可靠性,通过 引入流和聚合增强可用性。
YARN Timeline Service v.2 alpha 1可以让用户和开发者测试以及反馈,以便使得它可以替换现在的Timeline Service v.1.x。
YARN监控
问过的一些公司:美团参考答案:
配置 yarn-site.xml 开启日志聚合
日志聚集是YARN提供的日志中央化管理功能,它能将运行完成的Container/任务日志上传到HDFS上,从 而减轻NodeManager负载,且提供一个中央化存储和分析机制。默认情况下,Container/任务日志存在在 各个NodeManager上
重启YARN
开启日志监控服务进程在nodenode机器上执行
sbin/mr-jobhistory-daemon.sh start historyserver
后使用jps命令查看是否启动成功,若启动成功则会显示出JobHistoryServer服务。
命令,执行完成
最好将 yarn-site.xml 的 yarn.log.server.url 也配置上