• Hadoop-Yarn-NodeManager是如何监控容器的


    一、源码下载

    下面是hadoop官方源码下载地址,我下载的是hadoop-3.2.4,那就一起来看下吧

    Index of /dist/hadoop/core

    二、上下文

    在我的博客<Hadoop-Yarn-NodeManager是如何启动容器的>中的ContainerLaunch  prepareForLaunch()会触发ContainerEventType.CONTAINER_LAUNCHED事件,ContainerImpl会处理该事件,监控该容器的资源使用以及处理后续操作,下面让我们把源码捋起来吧。

    三、开始捋源码

    1、ContainerImpl

    1. public class ContainerImpl implements Container {
    2. private static StateMachineFactory
    3. stateMachineFactory =
    4. new StateMachineFactory(ContainerState.NEW).
    5. //......省略其他事件处理......
    6. addTransition(ContainerState.SCHEDULED, ContainerState.RUNNING,
    7. ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition())
    8. //......省略其他事件处理......
    9. .installTopology();
    10. static class LaunchTransition extends ContainerTransition {
    11. @SuppressWarnings("unchecked")
    12. @Override
    13. public void transition(ContainerImpl container, ContainerEvent event) {
    14. //发送容器监控事件,去监控容器的使用
    15. container.sendContainerMonitorStartEvent();
    16. container.metrics.runningContainer();
    17. container.wasLaunched = true;
    18. if (container.isReInitializing()) {
    19. NMAuditLogger.logSuccess(container.user,
    20. AuditConstants.FINISH_CONTAINER_REINIT, "ContainerImpl",
    21. container.containerId.getApplicationAttemptId().getApplicationId(),
    22. container.containerId);
    23. }
    24. container.setIsReInitializing(false);
    25. // Check if this launch was due to a re-initialization.
    26. // If autocommit == true, then wipe the re-init context. This ensures
    27. // that any subsequent failures do not trigger a rollback.
    28. if (container.reInitContext != null
    29. && !container.reInitContext.canRollback()) {
    30. container.reInitContext = null;
    31. }
    32. if (container.recoveredAsKilled) {
    33. LOG.info("Killing " + container.containerId
    34. + " due to recovered as killed");
    35. container.addDiagnostics("Container recovered as killed.\n");
    36. container.dispatcher.getEventHandler().handle(
    37. new ContainersLauncherEvent(container,
    38. ContainersLauncherEventType.CLEANUP_CONTAINER));
    39. }
    40. }
    41. }
    42. private void sendContainerMonitorStartEvent() {
    43. long launchDuration = clock.getTime() - containerLaunchStartTime;
    44. metrics.addContainerLaunchDuration(launchDuration);
    45. long pmemBytes = getResource().getMemorySize() * 1024 * 1024L;
    46. float pmemRatio = daemonConf.getFloat(
    47. YarnConfiguration.NM_VMEM_PMEM_RATIO,
    48. YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
    49. long vmemBytes = (long) (pmemRatio * pmemBytes);
    50. int cpuVcores = getResource().getVirtualCores();
    51. long localizationDuration = containerLaunchStartTime -
    52. containerLocalizationStartTime;
    53. //这里会触发 ContainersMonitorEventType.START_MONITORING_CONTAINER
    54. //该事件由ContainersMonitorImpl处理
    55. dispatcher.getEventHandler().handle(
    56. new ContainerStartMonitoringEvent(containerId,
    57. vmemBytes, pmemBytes, cpuVcores, launchDuration,
    58. localizationDuration));
    59. }
    60. }

    2、ContainersMonitorImpl

    监视收集资源使用情况的容器,并在容器超出限制时抢占容器

    1. public class ContainersMonitorImpl extends AbstractService implements
    2. ContainersMonitor {
    3. private final static Logger LOG =
    4. LoggerFactory.getLogger(ContainersMonitorImpl.class);
    5. private final static Logger AUDITLOG =
    6. LoggerFactory.getLogger(ContainersMonitorImpl.class.getName()+".audit");
    7. private long monitoringInterval;
    8. private MonitoringThread monitoringThread;
    9. private int logCheckInterval;
    10. private LogMonitorThread logMonitorThread;
    11. private long logDirSizeLimit;
    12. private long logTotalSizeLimit;
    13. private CGroupElasticMemoryController oomListenerThread;
    14. private boolean containerMetricsEnabled;
    15. private long containerMetricsPeriodMs;
    16. private long containerMetricsUnregisterDelayMs;
    17. @VisibleForTesting
    18. final Map trackingContainers =
    19. new ConcurrentHashMap<>();
    20. private final ContainerExecutor containerExecutor;
    21. private final Dispatcher eventDispatcher;
    22. private final Context context;
    23. private ResourceCalculatorPlugin resourceCalculatorPlugin;
    24. private Configuration conf;
    25. private static float vmemRatio;
    26. //用于获取进程资源使用情况的接口类
    27. //注意:此类不应由外部用户使用,而只能由外部开发人员使用,以扩展和包括他们自己的流程树实现,尤其是对于Linux和Windows以外的平台。
    28. private Classextends ResourceCalculatorProcessTree> processTreeClass;
    29. private long maxVmemAllottedForContainers = UNKNOWN_MEMORY_LIMIT;
    30. private long maxPmemAllottedForContainers = UNKNOWN_MEMORY_LIMIT;
    31. private boolean pmemCheckEnabled;
    32. private boolean vmemCheckEnabled;
    33. private boolean elasticMemoryEnforcement;
    34. private boolean strictMemoryEnforcement;
    35. private boolean containersMonitorEnabled;
    36. private boolean logMonitorEnabled;
    37. private long maxVCoresAllottedForContainers;
    38. private static final long UNKNOWN_MEMORY_LIMIT = -1L;
    39. private int nodeCpuPercentageForYARN;
    40. /**
    41. * 容器度量的类型
    42. */
    43. @Private
    44. public enum ContainerMetric {
    45. CPU, MEMORY
    46. }
    47. //ResourceUtilization对集群中一组计算机资源的利用率进行建模
    48. private ResourceUtilization containersUtilization;
    49. private volatile boolean stopped = false;
    50. public ContainersMonitorImpl(ContainerExecutor exec,
    51. AsyncDispatcher dispatcher, Context context) {
    52. super("containers-monitor");
    53. this.containerExecutor = exec;
    54. this.eventDispatcher = dispatcher;
    55. this.context = context;
    56. this.monitoringThread = new MonitoringThread();
    57. this.logMonitorThread = new LogMonitorThread();
    58. //ResourceUtilization.newInstance(物理内存, 虚拟内存, cpu利用率)
    59. this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f);
    60. }
    61. @Override
    62. protected void serviceInit(Configuration myConf) throws Exception {
    63. this.conf = myConf;
    64. //监视容器的频率
    65. //获取 yarn.nodemanager.container-monitor.interval-ms 的值
    66. //如果未设置,则将使用yarn.nodemanager.resource-monitor.interval-ms的值。如果为0或为负数,则禁用容器监视。
    67. //监视节点和容器的频率
    68. //获取 yarn.nodemanager.resource-monitor.interval-ms 的值 默认值 3000ms 即 3s 如果为0或为负数,则禁用监视
    69. this.monitoringInterval =
    70. this.conf.getLong(YarnConfiguration.NM_CONTAINER_MON_INTERVAL_MS,
    71. this.conf.getLong(YarnConfiguration.NM_RESOURCE_MON_INTERVAL_MS,
    72. YarnConfiguration.DEFAULT_NM_RESOURCE_MON_INTERVAL_MS));
    73. //检查容器日志目录使用情况的频率(以毫秒为单位)
    74. //获取 yarn.nodemanager.container-log-monitor.interval-ms 的值 默认值 60000ms 即 1min
    75. this.logCheckInterval =
    76. conf.getInt(YarnConfiguration.NM_CONTAINER_LOG_MON_INTERVAL_MS,
    77. YarnConfiguration.DEFAULT_NM_CONTAINER_LOG_MON_INTERVAL_MS);
    78. //单个容器日志目录的磁盘空间限制(以字节为单位)1GB = 1024MB = 1024*1024KB = 1024*1024*1024B B就是字节
    79. //获取 yarn.nodemanager.container-log-monitor.dir-size-limit-bytes 的值 默认值 1000000000L 约等于 1G
    80. this.logDirSizeLimit =
    81. conf.getLong(YarnConfiguration.NM_CONTAINER_LOG_DIR_SIZE_LIMIT_BYTES,
    82. YarnConfiguration.DEFAULT_NM_CONTAINER_LOG_DIR_SIZE_LIMIT_BYTES);
    83. //容器所有日志的磁盘空间限制(以字节为单位)
    84. //获取 yarn.nodemanager.container-log-monitor.total-size-limit-bytes 的值 默认值 10000000000L 即 10G
    85. this.logTotalSizeLimit =
    86. conf.getLong(YarnConfiguration.NM_CONTAINER_LOG_TOTAL_SIZE_LIMIT_BYTES,
    87. YarnConfiguration.DEFAULT_NM_CONTAINER_LOG_TOTAL_SIZE_LIMIT_BYTES);
    88. //用于计算系统上的资源信息的插件,如果未配置插件,此方法将尝试返回可用于此系统的内存计算器插件。
    89. //先获取 yarn.nodemanager.container-monitor.resource-calculator.class (计算当前资源利用率的类) 的值 默认空
    90. //再获取 yarn.nodemanager.resource-calculator.class (计算当前资源利用率的类) 的值 默认空
    91. //如果都为空会判断操作系统,LINUX 返回 SysInfoLinux WINDOWS 返回 SysInfoWindows
    92. this.resourceCalculatorPlugin =
    93. ResourceCalculatorPlugin.getContainersMonitorPlugin(this.conf);
    94. LOG.info(" Using ResourceCalculatorPlugin : "
    95. + this.resourceCalculatorPlugin);
    96. //获取 yarn.nodemanager.container-monitor.process-tree.class (用于计算进程树资源利用率) 的值 默认为空
    97. processTreeClass = this.conf.getClass(
    98. YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE, null,
    99. ResourceCalculatorProcessTree.class);
    100. LOG.info(" Using ResourceCalculatorProcessTree : "
    101. + this.processTreeClass);
    102. //启用容器度量的标志
    103. //获取 yarn.nodemanager.container-metrics.enable 的值 默认 true
    104. this.containerMetricsEnabled =
    105. this.conf.getBoolean(YarnConfiguration.NM_CONTAINER_METRICS_ENABLE,
    106. YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_ENABLE);
    107. //容器度量刷新周期(毫秒)。设置为-1表示完成时刷新
    108. //获取 yarn.nodemanager.container-metrics.period-ms 的值 默认为-1
    109. this.containerMetricsPeriodMs =
    110. this.conf.getLong(YarnConfiguration.NM_CONTAINER_METRICS_PERIOD_MS,
    111. YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS);
    112. //完成后注销容器度量的延迟时间ms
    113. //获取 yarn.nodemanager.container-metrics.unregister-delay-ms 的值 默认 10000ms 即 10s
    114. this.containerMetricsUnregisterDelayMs = this.conf.getLong(
    115. YarnConfiguration.NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS,
    116. YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS);
    117. //NodeManagerHardwareUtils:用于确定与硬件相关的特性,例如节点上的处理器数量和内存量
    118. //函数返回应该为YARN容器留出多少内存。如果在配置文件中指定了一个数字,则会返回该数字。如果未指定任何内容,则为-1。
    119. //如果操作系统是“未知”操作系统(我们没有为其实现ResourceCalculatorPlugin),则返回默认的NodeManager物理内存。
    120. //如果操作系统实现了ResourceCalculatorPlugin,则计算为0.8*(RAM-2*JVM内存),即在考虑了DataNode和NodeManager使用的内存后,使用80%的内存。
    121. //如果数字小于1GB,请记录一条警告消息
    122. //获取 yarn.nodemanager.resource.detect-hardware-capabilities (启用节点功能的自动检测,如内存和CPU) 的值 默认 false
    123. //如果为 false ,即默认会 获取配置文件中的数字 yarn.nodemanager.resource.memory-mb (可分配给容器的内存量(MB))
    124. //这里 源码 和 官方文档 有出入 ,官方文档默认值为-1 源码默认值为 8 * 1024 MB 即 8G ,如果设置为 -1 源码还是会更改为 8G ,可以设置其他值
    125. //返回的值是 8*1024 这里又 * 1024 * 1024L 即为 转换为 8G 对应的字节 B
    126. long configuredPMemForContainers =
    127. NodeManagerHardwareUtils.getContainerMemoryMB(
    128. this.resourceCalculatorPlugin, this.conf) * 1024 * 1024L;
    129. //函数返回系统上可用于YARN容器的vcore数。如果在配置文件中指定了一个数字,则会返回该数字。如果未指定任何内容,则为-1。
    130. //如果操作系统是“未知”操作系统(我们没有为其实现ResourceCalculatorPlugin),则返回默认的NodeManager内核。
    131. //2.如果配置变量yarn.nodemanager.cpu.use_logical_processers设置为true,则返回逻辑处理器计数(将超线程计数为核心),否则返回物理核心计数。
    132. //获取 yarn.nodemanager.resource.cpu-vcores (可分配给容器的虚拟CPU内核数) 的值
    133. //可以分配给容器的vcore数。这是RM调度程序在为容器分配资源时使用的。这并不用于限制YARN容器使用的CPU数量。如果它设置为-1,
    134. //并且yarn.nodemanager.resource.detect-hardware-cability为true,则在Windows和Linux的情况下,它将自动从硬件中确定。
    135. //在其他情况下,默认情况下vcore的数量为8。
    136. long configuredVCoresForContainers =
    137. NodeManagerHardwareUtils.getVCores(this.resourceCalculatorPlugin,
    138. this.conf);
    139. //无论是否启用检查,都要设置这些。UI中必需
    140. // / 物理内存配置 //
    141. //maxPmemAllottedForContainers = 8G
    142. //maxVCoresAllottedForContainers = 8个虚拟核
    143. //这样看来 默认的容器能申请到的最多的资源为 8vc 8G
    144. this.maxPmemAllottedForContainers = configuredPMemForContainers;
    145. this.maxVCoresAllottedForContainers = configuredVCoresForContainers;
    146. // / 虚拟内存配置 //
    147. //获取 yarn.nodemanager.vmem-pmem-ratio 的值 默认 2.1
    148. //为容器设置内存限制时,虚拟内存与物理内存之间的比率。容器分配是以物理内存的形式表示的,虚拟内存的使用率可以超过此分配比例。
    149. vmemRatio = this.conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
    150. YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
    151. //校验 为容器设置的内存限制比率,必须大于 0.99
    152. Preconditions.checkArgument(vmemRatio > 0.99f,
    153. YarnConfiguration.NM_VMEM_PMEM_RATIO + " should be at least 1.0");
    154. //容器可分配的最大虚拟默认为 : 2.1 * 8 = 16.8 G
    155. this.maxVmemAllottedForContainers =
    156. (long) (vmemRatio * configuredPMemForContainers);
    157. //是否将对容器强制执行物理内存限制
    158. //获取 yarn.nodemanager.pmem-check-enabled 的值 默认 true
    159. pmemCheckEnabled = this.conf.getBoolean(
    160. YarnConfiguration.NM_PMEM_CHECK_ENABLED,
    161. YarnConfiguration.DEFAULT_NM_PMEM_CHECK_ENABLED);
    162. //是否将对容器强制执行虚拟内存限制
    163. //获取 yarn.nodemanager.vmem-check-enabled 的值 默认 true
    164. vmemCheckEnabled = this.conf.getBoolean(
    165. YarnConfiguration.NM_VMEM_CHECK_ENABLED,
    166. YarnConfiguration.DEFAULT_NM_VMEM_CHECK_ENABLED);
    167. //启用弹性内存控制。这是Linux独有的功能。启用后,如果所有容器都超过了限制,则节点管理器会添加一个侦听器来接收事件。
    168. //限制由yarn.nodemanager.resource.memory-mb指定。如果未设置此项,则会根据功能设置限制。
    169. //有关详细信息,请参阅yarn.nodemanager.resource.detect-hardware-cability。该限制适用于物理或虚拟(rss+交换)内存,
    170. //具体取决于是否设置了yarn.nodemanager.pmem-check-enabled或yarn.node manager.vmem-check-enabled。
    171. //获取 yarn.nodemanager.elastic-memory-control.enabled 的值 默认 false
    172. elasticMemoryEnforcement = this.conf.getBoolean(
    173. YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_ENABLED,
    174. YarnConfiguration.DEFAULT_NM_ELASTIC_MEMORY_CONTROL_ENABLED);
    175. //是否启用YARN CGroups严格内存强制,顾名思义就是资源一旦超过设置的限制就会里面kill掉
    176. //获取 yarn.nodemanager.resource.memory.enforced 的值 默认 true
    177. strictMemoryEnforcement = conf.getBoolean(
    178. YarnConfiguration.NM_MEMORY_RESOURCE_ENFORCED,
    179. YarnConfiguration.DEFAULT_NM_MEMORY_RESOURCE_ENFORCED);
    180. LOG.info("Physical memory check enabled: " + pmemCheckEnabled);
    181. LOG.info("Virtual memory check enabled: " + vmemCheckEnabled);
    182. LOG.info("Elastic memory control enabled: " + elasticMemoryEnforcement);
    183. LOG.info("Strict memory control enabled: " + strictMemoryEnforcement);
    184. //默认不开启弹性内存控制,这段逻辑不走
    185. if (elasticMemoryEnforcement) {
    186. if (!CGroupElasticMemoryController.isAvailable()) {
    187. // Test for availability outside the constructor
    188. // to be able to write non-Linux unit tests for
    189. // CGroupElasticMemoryController
    190. throw new YarnException(
    191. "CGroup Elastic Memory controller enabled but " +
    192. "it is not available. Exiting.");
    193. } else {
    194. this.oomListenerThread = new CGroupElasticMemoryController(
    195. conf,
    196. context,
    197. ResourceHandlerModule.getCGroupsHandler(),
    198. pmemCheckEnabled,
    199. vmemCheckEnabled,
    200. pmemCheckEnabled ?
    201. maxPmemAllottedForContainers : maxVmemAllottedForContainers
    202. );
    203. }
    204. }
    205. //isContainerMonitorEnabled() 默认为 true
    206. //monitoringInterval 默认 3000ms 即 3s
    207. //因此 containersMonitorEnabled 默认为 true 容器监视默认是开启的
    208. containersMonitorEnabled =
    209. isContainerMonitorEnabled() && monitoringInterval > 0;
    210. LOG.info("ContainersMonitor enabled: " + containersMonitorEnabled);
    211. //用于启用容器日志监视器的标志,该监视器强制执行容器日志目录大小限制
    212. //获取 yarn.nodemanager.container-log-monitor.enable 的值 默认 false
    213. logMonitorEnabled =
    214. conf.getBoolean(YarnConfiguration.NM_CONTAINER_LOG_MONITOR_ENABLED,
    215. YarnConfiguration.DEFAULT_NM_CONTAINER_LOG_MONITOR_ENABLED);
    216. LOG.info("Container Log Monitor Enabled: "+ logMonitorEnabled);
    217. //获取为YARN容器配置的物理CPU的百分比。返回值是 0 ~ 100
    218. //可以分配给容器的CPU百分比。此设置允许用户限制YARN容器使用的CPU数量。目前仅在使用cgroups的Linux上运行。默认情况是使用100%的CPU。
    219. //获取 yarn.nodemanager.resource.percentage-physical-cpu-limit 的值 默认值 100
    220. //nodeCpuPercentageForYARN 默认为 100
    221. nodeCpuPercentageForYARN =
    222. NodeManagerHardwareUtils.getNodeCpuPercentage(this.conf);
    223. //默认为 true 对容器强制执行物理内存限制
    224. if (pmemCheckEnabled) {
    225. //如果无法确定实际设备,则记录下
    226. long totalPhysicalMemoryOnNM = UNKNOWN_MEMORY_LIMIT;
    227. //默认操作系统是LINUX resourceCalculatorPlugin = SysInfoLinux
    228. if (this.resourceCalculatorPlugin != null) {
    229. //SysInfoLinux 只读取/proc/meminfo、解析和计算一次内存信息。给 ramSize、hardwareCorruptSize、hugePagesTotal、hugePageSize赋值
    230. //totalPhysicalMemoryOnNM = (ramSize - hardwareCorruptSize - (hugePagesTotal * hugePageSize)) * 1024
    231. //totalPhysicalMemoryOnNM = (ram磁盘空间 - ram已损坏空间 - (保留的标准大页 * 每个标准大页的大小)) * 1024
    232. //可以参考我的这篇 博客中了解
    233. //ramSize : ram 磁盘空间
    234. //hardwareCorruptSize : RAM已损坏且不可用大小
    235. //hugePagesTotal : 保留的标准大页
    236. //hugePageSize : 每个标准大页的大小
    237. totalPhysicalMemoryOnNM = this.resourceCalculatorPlugin
    238. .getPhysicalMemorySize();
    239. if (totalPhysicalMemoryOnNM <= 0) {
    240. LOG.warn("NodeManager's totalPmem could not be calculated. "
    241. + "Setting it to " + UNKNOWN_MEMORY_LIMIT);
    242. totalPhysicalMemoryOnNM = UNKNOWN_MEMORY_LIMIT;
    243. }
    244. }
    245. //分配给容器的物理内存,占可用物理内存总量的80%以上可能会发生Thrashing
    246. if (totalPhysicalMemoryOnNM != UNKNOWN_MEMORY_LIMIT &&
    247. this.maxPmemAllottedForContainers > totalPhysicalMemoryOnNM * 0.80f) {
    248. LOG.warn("NodeManager configured with "
    249. + TraditionalBinaryPrefix.long2String(maxPmemAllottedForContainers,
    250. "", 1)
    251. + " physical memory allocated to containers, which is more than "
    252. + "80% of the total physical memory available ("
    253. + TraditionalBinaryPrefix.long2String(totalPhysicalMemoryOnNM, "",
    254. 1) + "). Thrashing might happen.");
    255. }
    256. }
    257. super.serviceInit(this.conf);
    258. }
    259. //是否启用容器监视器
    260. //获取 yarn.nodemanager.container-monitor.enabled 的值 默认 true
    261. private boolean isContainerMonitorEnabled() {
    262. return conf.getBoolean(YarnConfiguration.NM_CONTAINER_MONITOR_ENABLED,
    263. YarnConfiguration.DEFAULT_NM_CONTAINER_MONITOR_ENABLED);
    264. }
    265. /**
    266. * 获取最佳进程树计算器
    267. * @param pId container process id
    268. * @return process tree calculator
    269. */
    270. private ResourceCalculatorProcessTree
    271. getResourceCalculatorProcessTree(String pId) {
    272. return ResourceCalculatorProcessTree.
    273. getResourceCalculatorProcessTree(
    274. pId, processTreeClass, conf);
    275. }
    276. private boolean isResourceCalculatorAvailable() {
    277. if (resourceCalculatorPlugin == null) {
    278. LOG.info("ResourceCalculatorPlugin is unavailable on this system. " + this
    279. .getClass().getName() + " is disabled.");
    280. return false;
    281. }
    282. if (getResourceCalculatorProcessTree("0") == null) {
    283. LOG.info("ResourceCalculatorProcessTree is unavailable on this system. "
    284. + this.getClass().getName() + " is disabled.");
    285. return false;
    286. }
    287. return true;
    288. }
    289. @Override
    290. protected void serviceStart() throws Exception {
    291. //containersMonitorEnabled 默认为 true 容器监视默认是开启的
    292. if (containersMonitorEnabled) {
    293. //起一个线程对容器进行监视
    294. this.monitoringThread.start();
    295. }
    296. //默认不开启弹性内存控制
    297. if (oomListenerThread != null) {
    298. //如果开启基于cgroups的一种弹性内存控制,允许某些container可以使用超过设定值的资源,只要不超过整体的阈值。
    299. //因此会启动这个线程oomListenerThread监控是否超过了整体的阈值
    300. oomListenerThread.start();
    301. }
    302. //容器日志监视器默认关闭
    303. if (logMonitorEnabled) {
    304. this.logMonitorThread.start();
    305. }
    306. super.serviceStart();
    307. }
    308. private class MonitoringThread extends Thread {
    309. MonitoringThread() {
    310. super("Container Monitor");
    311. }
    312. @Override
    313. public void run() {
    314. while (!stopped && !Thread.currentThread().isInterrupted()) {
    315. // 打印processTrees以进行调试
    316. if (LOG.isDebugEnabled()) {
    317. StringBuilder tmp = new StringBuilder("[ ");
    318. for (ProcessTreeInfo p : trackingContainers.values()) {
    319. tmp.append(p.getPID());
    320. tmp.append(" ");
    321. }
    322. LOG.debug("Current ProcessTree list : "
    323. + tmp.substring(0, tmp.length()) + "]");
    324. }
    325. //用于计算容器的总资源利用率的临时结构
    326. ResourceUtilization trackedContainersUtilization =
    327. ResourceUtilization.newInstance(0, 0, 0.0f);
    328. //现在对trackingContainers进行监视,检查内存使用情况并杀死任何溢出的容器
    329. //每个容器在启动时都会将本容器信息放入trackingContainers中,详细看onStartMonitoringContainer()
    330. long vmemUsageByAllContainers = 0;
    331. long pmemByAllContainers = 0;
    332. long cpuUsagePercentPerCoreByAllContainers = 0;
    333. for (Entry entry : trackingContainers
    334. .entrySet()) {
    335. ContainerId containerId = entry.getKey();
    336. ProcessTreeInfo ptInfo = entry.getValue();
    337. try {
    338. //初始化未初始化的进程树
    339. initializeProcessTrees(entry);
    340. String pId = ptInfo.getPID();
    341. if (pId == null || !isResourceCalculatorAvailable()) {
    342. continue; //无法跟踪该 processTree
    343. }
    344. if (LOG.isDebugEnabled()) {
    345. LOG.debug("Constructing ProcessTree for : PID = " + pId
    346. + " ContainerId = " + containerId);
    347. }
    348. ResourceCalculatorProcessTree pTree = ptInfo.getProcessTree();
    349. pTree.updateProcessTree(); // 更新 process-tree
    350. //获取进程树中所有进程使用的虚拟内存。
    351. long currentVmemUsage = pTree.getVirtualMemorySize();
    352. //获取进程树中所有进程使用的常驻集大小(rss)内存
    353. //rss 是 Resident Set Size 的缩写 表示驻留内存大小,是进程当前实际使用物理内存大小(包含共享库占用的内存)
    354. long currentPmemUsage = pTree.getRssMemorySize();
    355. if (currentVmemUsage < 0 || currentPmemUsage < 0) {
    356. // YARN-6862/YARN-5021 If the container just exited or for
    357. // another reason the physical/virtual memory is UNAVAILABLE (-1)
    358. // the values shouldn't be aggregated.
    359. LOG.info("Skipping monitoring container {} because "
    360. + "memory usage is not available.", containerId);
    361. continue;
    362. }
    363. // if machine has 6 cores and 3 are used,
    364. // cpuUsagePercentPerCore should be 300%
    365. //基于样本之间的平均值,获取进程树中所有进程的CPU使用率,作为与顶部相似的总CPU周期的比率。因此,如果使用四分之二的核心,则返回200.0。
    366. //注意:在CPU使用率不可用的情况下,将返回UNAVAILABLE。不建议返回任何其他错误代码。
    367. float cpuUsagePercentPerCore = pTree.getCpuUsagePercent();
    368. if (cpuUsagePercentPerCore < 0) {
    369. // CPU usage is not available likely because the container just
    370. // started. Let us skip this turn and consider this container
    371. // in the next iteration.
    372. LOG.info("Skipping monitoring container " + containerId
    373. + " since CPU usage is not yet available.");
    374. continue;
    375. }
    376. //记录使用情况指标
    377. recordUsage(containerId, pId, pTree, ptInfo, currentVmemUsage,
    378. currentPmemUsage, trackedContainersUtilization);
    379. //检查资源限制,如果超出限制,请采取措施
    380. checkLimit(containerId, pId, pTree, ptInfo,
    381. currentVmemUsage, currentPmemUsage);
    382. //计算所有容器的总内存使用情况
    383. vmemUsageByAllContainers += currentVmemUsage;
    384. pmemByAllContainers += currentPmemUsage;
    385. //计算所有容器的总cpu使用量
    386. cpuUsagePercentPerCoreByAllContainers += cpuUsagePercentPerCore;
    387. //向时间线服务报告使用情况指标
    388. reportResourceUsage(containerId, currentPmemUsage,
    389. cpuUsagePercentPerCore);
    390. } catch (Exception e) {
    391. // Log the exception and proceed to the next container.
    392. LOG.warn("Uncaught exception in ContainersMonitorImpl "
    393. + "while monitoring resource of {}", containerId, e);
    394. }
    395. }
    396. if (LOG.isDebugEnabled()) {
    397. LOG.debug("Total Resource Usage stats in NM by all containers : "
    398. + "Virtual Memory= " + vmemUsageByAllContainers
    399. + ", Physical Memory= " + pmemByAllContainers
    400. + ", Total CPU usage(% per core)= "
    401. + cpuUsagePercentPerCoreByAllContainers);
    402. }
    403. //保存容器的聚合利用率
    404. setContainersUtilization(trackedContainersUtilization);
    405. //将容器利用率度量发布到节点管理器度量系统
    406. NodeManagerMetrics nmMetrics = context.getNodeManagerMetrics();
    407. if (nmMetrics != null) {
    408. nmMetrics.setContainerUsedMemGB(
    409. trackedContainersUtilization.getPhysicalMemory());
    410. nmMetrics.setContainerUsedVMemGB(
    411. trackedContainersUtilization.getVirtualMemory());
    412. nmMetrics.setContainerCpuUtilization(
    413. trackedContainersUtilization.getCPU());
    414. }
    415. try {
    416. //监视容器的频率 默认3s
    417. Thread.sleep(monitoringInterval);
    418. } catch (InterruptedException e) {
    419. LOG.warn(ContainersMonitorImpl.class.getName()
    420. + " is interrupted. Exiting.");
    421. break;
    422. }
    423. }
    424. }
    425. private void recordUsage(ContainerId containerId, String pId,
    426. ResourceCalculatorProcessTree pTree,
    427. ProcessTreeInfo ptInfo,
    428. long currentVmemUsage, long currentPmemUsage,
    429. ResourceUtilization trackedContainersUtilization) {
    430. // if machine has 6 cores and 3 are used,
    431. // cpuUsagePercentPerCore should be 300% and
    432. // cpuUsageTotalCoresPercentage should be 50%
    433. float cpuUsagePercentPerCore = pTree.getCpuUsagePercent();
    434. float cpuUsageTotalCoresPercentage = cpuUsagePercentPerCore /
    435. resourceCalculatorPlugin.getNumProcessors();
    436. //乘以1000以避免在转换为int时丢失数据
    437. //cpu 核数利用率 * 1000 * 8 / 100
    438. //比如 0.5 * 1000 * 8 / 100 = 40
    439. int milliVcoresUsed = (int) (cpuUsageTotalCoresPercentage * 1000
    440. * maxVCoresAllottedForContainers /nodeCpuPercentageForYARN);
    441. //进程树的虚拟内存限制(字节)
    442. long vmemLimit = ptInfo.getVmemLimit();
    443. //进程树的物理内存限制(字节)
    444. long pmemLimit = ptInfo.getPmemLimit();
    445. if (AUDITLOG.isDebugEnabled()) {
    446. int vcoreLimit = ptInfo.getCpuVcores();
    447. long cumulativeCpuTime = pTree.getCumulativeCpuTime();
    448. AUDITLOG.debug(String.format(
    449. "Resource usage of ProcessTree %s for container-id %s:" +
    450. " %s %%CPU: %f %%CPU-cores: %f" +
    451. " vCores-used: %d of %d Cumulative-CPU-ms: %d",
    452. pId, containerId.toString(),
    453. formatUsageString(
    454. currentVmemUsage, vmemLimit,
    455. currentPmemUsage, pmemLimit),
    456. cpuUsagePercentPerCore,
    457. cpuUsageTotalCoresPercentage,
    458. milliVcoresUsed / 1000, vcoreLimit,
    459. cumulativeCpuTime));
    460. }
    461. //添加此容器的资源利用率
    462. trackedContainersUtilization.addTo(
    463. (int) (currentPmemUsage >> 20),
    464. (int) (currentVmemUsage >> 20),
    465. milliVcoresUsed / 1000.0f);
    466. //将使用情况添加到容器指标
    467. if (containerMetricsEnabled) {
    468. ContainerMetrics.forContainer(
    469. containerId, containerMetricsPeriodMs,
    470. containerMetricsUnregisterDelayMs).recordMemoryUsage(
    471. (int) (currentPmemUsage >> 20));
    472. ContainerMetrics.forContainer(
    473. containerId, containerMetricsPeriodMs,
    474. containerMetricsUnregisterDelayMs).recordCpuUsage((int)
    475. cpuUsagePercentPerCore, milliVcoresUsed);
    476. }
    477. }
    478. private void checkLimit(ContainerId containerId, String pId,
    479. ResourceCalculatorProcessTree pTree,
    480. ProcessTreeInfo ptInfo,
    481. long currentVmemUsage,
    482. long currentPmemUsage) {
    483. Optional isMemoryOverLimit = Optional.empty();
    484. String msg = "";
    485. int containerExitStatus = ContainerExitStatus.INVALID;
    486. //strictMemoryEnforcement 默认 true elasticMemoryEnforcement默认 false
    487. //因此不走这个逻辑 elasticMemoryEnforcement 开启
    488. if (strictMemoryEnforcement && elasticMemoryEnforcement) {
    489. //弹性内存控制和严格内存控制都是通过cgroups实现的。如果容器超过其请求,它会被弹性内存控制机制冻结,所以我们在这里检查并杀死它。
    490. //否则,如果节点从未超过其限制,并且基于procfs的内存核算与基于cgroup的核算不同,则不会杀死容器。
    491. //默认为 CGroupsMemoryResourceHandlerImpl
    492. //处理程序类来处理内存控制器。YARN已经在Java中提供了一个物理内存监视器,但它不如CGroups。
    493. //此处理程序设置软内存和硬内存限制。软限制设置为硬限制的90%。
    494. MemoryResourceHandler handler =
    495. ResourceHandlerModule.getMemoryResourceHandler();
    496. if (handler != null) {
    497. //检查容器是否处于OOM状态
    498. isMemoryOverLimit = handler.isUnderOOM(containerId);
    499. containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_PMEM;
    500. msg = containerId + " is under oom because it exceeded its" +
    501. " physical memory limit";
    502. }
    503. } else if (strictMemoryEnforcement || elasticMemoryEnforcement) {
    504. //如果启用了基于cgroup的内存控制
    505. isMemoryOverLimit = Optional.of(false);
    506. }
    507. if (!isMemoryOverLimit.isPresent()) {
    508. long vmemLimit = ptInfo.getVmemLimit();
    509. long pmemLimit = ptInfo.getPmemLimit();
    510. //当流程从1开始时,我们想看看是否有超过1次迭代的流程。
    511. long curMemUsageOfAgedProcesses = pTree.getVirtualMemorySize(1);
    512. long curRssMemUsageOfAgedProcesses = pTree.getRssMemorySize(1);
    513. //默认为 true 对容器强制执行虚拟内存限制
    514. if (isVmemCheckEnabled()
    515. && isProcessTreeOverLimit(containerId.toString(),
    516. currentVmemUsage, curMemUsageOfAgedProcesses, vmemLimit)) {
    517. //当前使用率(年龄=0)始终高于过期使用率。我们不在消息中显示老化的大小,而是根据当前使用情况进行增量
    518. long delta = currentVmemUsage - vmemLimit;
    519. // 容器(根进程)仍处于活动状态,内存溢出
    520. // 转储流程树,然后进行清理
    521. msg = formatErrorMessage("virtual",
    522. formatUsageString(currentVmemUsage, vmemLimit,
    523. currentPmemUsage, pmemLimit),
    524. pId, containerId, pTree, delta);
    525. isMemoryOverLimit = Optional.of(true);
    526. containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_VMEM;
    527. //默认为 true 对容器强制执行物理内存限制
    528. //isProcessTreeOverLimit():
    529. //检查容器的进程树的当前内存使用量是否超过限制
    530. //当java进程exec是一个程序时,它可能会暂时占据其内存大小的两倍,因为JVM执行fork()+exec(),在fork时间创建父内存的副本。
    531. //如果监视线程在同一个实例中检测到容器树使用的内存,它可能会认为它超出了限制并杀死该树,因为进程本身没有故障。
    532. //我们通过采用启发式检查来解决这个问题:如果进程树超过内存限制两倍以上,它将立即被杀死;如果进程树的进程比监控间隔早,
    533. //甚至超过内存限制1倍,它将被杀死。否则,它会被赋予怀疑的标志,可以再进行一次迭代。
    534. } else if (isPmemCheckEnabled()
    535. && isProcessTreeOverLimit(containerId.toString(),
    536. currentPmemUsage, curRssMemUsageOfAgedProcesses,
    537. pmemLimit)) {
    538. //当前使用率(年龄=0)始终高于过期使用率。我们不在消息中显示老化的大小,而是根据当前使用情况进行增量
    539. long delta = currentPmemUsage - pmemLimit;
    540. //容器(根进程)仍处于活动状态,内存溢出
    541. //转储流程树,然后进行清理
    542. msg = formatErrorMessage("physical",
    543. formatUsageString(currentVmemUsage, vmemLimit,
    544. currentPmemUsage, pmemLimit),
    545. pId, containerId, pTree, delta);
    546. isMemoryOverLimit = Optional.of(true);
    547. containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_PMEM;
    548. }
    549. }
    550. if (isMemoryOverLimit.isPresent() && isMemoryOverLimit.get()
    551. && trackingContainers.remove(containerId) != null) {
    552. //虚拟内存或物理内存超出限制。使容器失败并删除相应的流程树
    553. LOG.warn(msg);
    554. //警告(如果不是领导者)
    555. if (!pTree.checkPidPgrpidForMatch()) {
    556. LOG.error("Killed container process with PID " + pId
    557. + " but it is not a process group leader.");
    558. }
    559. //杀掉容器
    560. eventDispatcher.getEventHandler().handle(
    561. new ContainerKillEvent(containerId,
    562. containerExitStatus, msg));
    563. LOG.info("Removed ProcessTree with root " + pId);
    564. }
    565. }
    566. private void onStopMonitoringContainer(
    567. ContainersMonitorEvent monitoringEvent, ContainerId containerId) {
    568. LOG.info("Stopping resource-monitoring for " + containerId);
    569. updateContainerMetrics(monitoringEvent);
    570. trackingContainers.remove(containerId);
    571. }
    572. private void onStartMonitoringContainer(
    573. ContainersMonitorEvent monitoringEvent, ContainerId containerId) {
    574. ContainerStartMonitoringEvent startEvent =
    575. (ContainerStartMonitoringEvent) monitoringEvent;
    576. LOG.info("Starting resource-monitoring for " + containerId);
    577. updateContainerMetrics(monitoringEvent);
    578. trackingContainers.put(containerId,
    579. new ProcessTreeInfo(containerId, null, null,
    580. startEvent.getVmemLimit(), startEvent.getPmemLimit(),
    581. startEvent.getCpuVcores()));
    582. }
    583. }

    四、总结

    1、启动容器触发ContainerEventType.CONTAINER_LAUNCHED事件

    2、ContainerImpl会处理1中事件,启动容器的同时触发容器监控事件ContainersMonitorEventType.START_MONITORING_CONTAINER

    3、该事件由ContainersMonitorImpl调用onStartMonitoringContainer()处理2中事件

    4、将启动的容器id、虚拟内存限制、物理内存限制、cpu核数限制封装成ProcessTreeInfo,并放到跟踪所有容器的trackingContainers中

    5、ContainersMonitorImpl初始化时会获取监控容器的频率(默认3s一次)、监控容器日志目录大小频率(默认1min一次)、容器磁盘大小限制(默认1G)、全部容器总磁盘大小限制(默认10G)、系统资源计算插件(可以自己实现,默认LINUX 使用SysInfoLinux,WINDOWS 使用SysInfoWindows)、计算processTree资源利用率的类、系统为YARN容器留内存大小、YARN容器可用vcore数、虚拟内存和物理内存比率、内存控制策略等

    6、ContainersMonitorImpl启动时会启动一个线程(monitoringThread)对容器的资源使用进行监控,如果超过限制就杀掉容器。默认只开启这一个线程,oomListenerThread和logMonitorThread默认不开启

  • 相关阅读:
    rabbitmq队列卡住的一种情况(webservice接口超时)
    Spring学习(2) Spring的IOC底层实现
    apt & apt-get命令
    【python】(六)python的封装、继承和多态
    Web 3.0 是泡沫还是金矿?
    【ManageEngine】加强企业特权访问安全性的7个方法
    常用的无线充发射IC芯片
    硬件顶配、数字先行,路特斯重塑「智能座舱」
    微信小程序:独家全新娱乐性超高的喝酒神器
    PPT架构师架构技能图
  • 原文地址:https://blog.csdn.net/lu070828/article/details/136230442