• Hadoop-Yarn-ResourceManagerHA


    在这里先给屏幕面前的你送上祝福,祝你在未来一年:技术步步高升、薪资节节攀升,身体健健康康,家庭和和美美。

    一、介绍

    在Hadoop2.4之前,ResourceManager是YARN集群中的单点故障

    ResourceManager HA是通过 Active/Standby 体系结构实现的,在任何时候其中一个RM都是活动的,并且一个或多个RM处于备用模式,等待在活动发生任何事情时接管。

    二、架构

    官网的架构图如下:

    1、Active 状态的 ResourceManager 将自己的状态写入ZooKeeper

    2、如果 Active 状态的 ResourceManager状态发生改变,可以通过自动或手动方式完成故障转移

    三、故障转移

    1、手动转换

            如果未启用自动故障切换,管理员必须手动将其中一个ResourceManager转换为活动。要从一个ResourceManager故障切换到另一个ResourceManager,他们应该首先将活动ResourceManager转换为备用ResourceManager,然后将备用ResourceManager转换为活动ResourceManager。相关命令如下:

            获取所有RM节点的状态
                    yarn rmadmin -getAllServiceState
            获取 rm1 节点的状态
                    yarn rmadmin -getServiceState rm1
            手动将 rm1 的状态切换到STANDBY
                    yarn rmadmin -transitionToStandby rm1
                    或
                    yarn rmadmin -transitionToStandby -forcemanual rm1
            手动将 rm1 的状态切换到ACTIVE
                    yarn rmadmin -transitionToActive rm1
                    或 
                    yarn rmadmin -transitionToActive -forcemanual rm1

    2、自动切换

            ResourceManager可以选择嵌入基于Zookeeper的ActiveStandbyElector来决定哪个ResourceManager应该是Active。当Active宕机或无响应时,会自动选择另一个ResourceManager作为Active,然后由它接管。需要注意的是Yarn不需要像HDFS那样运行单独的ZKFC守护进程,因为嵌入在ResourceManager中的ActiveStandbyElector充当故障检测器和领导者选举人。

            配置示例如下:

    1. <property>
    2. <name>yarn.resourcemanager.ha.enabledname>
    3. <value>truevalue>
    4. <description>开启resourcemanager的HAdescription>
    5. property>
    6. <property>
    7. <name>yarn.resourcemanager.cluster-idname>
    8. <value>cluster1value>
    9. <description>标识群集。由选举人使用,以确保RM不会作为“活动”接管另一个群集。description>
    10. property>
    11. <property>
    12. <name>yarn.resourcemanager.ha.rm-idsname>
    13. <value>rm1,rm2value>
    14. <description>RM的逻辑ID列表description>
    15. property>
    16. <property>
    17. <name>yarn.resourcemanager.hostname.rm1name>
    18. <value>master1value>
    19. <description>对于每个rm-ids,指定rm对应的主机名。或者,可以设置rm的每个服务地址description>
    20. property>
    21. <property>
    22. <name>yarn.resourcemanager.hostname.rm2name>
    23. <value>master2value>
    24. <description>对于每个rm-ids,指定rm对应的主机名。或者,可以设置rm的每个服务地址description>
    25. property>
    26. <property>
    27. <name>yarn.resourcemanager.webapp.address.rm1name>
    28. <value>master1:8088value>
    29. <description>对于每个rm-ids,指定与之对应的rm web应用程序的host:portdescription>
    30. property>
    31. <property>
    32. <name>yarn.resourcemanager.webapp.address.rm2name>
    33. <value>master2:8088value>
    34. <description>对于每个rm-ids,指定与之对应的rm web应用程序的host:portdescription>
    35. property>
    36. <property>
    37. <name>hadoop.zk.addressname>
    38. <value>zk1:2181,zk2:2181,zk3:2181value>
    39. <description>ZK法定人数的地址。用于两者状态和领导人选举description>
    40. property>

    四、源码分析

    在我的上一篇<Hadoop-Yarn-启动篇>博客中有ResourceManager的启动源码,现在我们只将关于HA的部分拿处理分析下

    1、设置HA配置

    1. //登录前应设置HA配置
    2. this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));
    3. if (this.rmContext.isHAEnabled()) {
    4. HAUtil.verifyAndSetConfiguration(this.conf);
    5. }
    6. public static boolean isHAEnabled(Configuration conf) {
    7. //即获取yarn.resourcemanager.ha.enabled的值
    8. return conf.getBoolean(YarnConfiguration.RM_HA_ENABLED,
    9. YarnConfiguration.DEFAULT_RM_HA_ENABLED);
    10. }
    11. public static void verifyAndSetConfiguration(Configuration conf)
    12. throws YarnRuntimeException {
    13. //验证配置是否至少有两个RM id,并且为每个RM-id指定了RPC地址。然后设置RM id。
    14. //即 配置文件中的 yarn.resourcemanager.ha.rm-ids 对应配置的多个 RM 节点的RPC地址
    15. verifyAndSetRMHAIdsList(conf);
    16. //设置 yarn.resourcemanager.ha.id 的值,如果没有配置则通过匹配yarn.reresourcemanager.address来计算
    17. verifyAndSetCurrentRMHAId(conf);
    18. //验证 Leader 选举服务是否已启用。YARN允许在配置中禁用领导层选举,从而中断自动故障切换
    19. verifyLeaderElection(conf);
    20. //验证所有服务的地址
    21. // RM_ADDRESS 即 yarn.resourcemanager.address
    22. // RM_SCHEDULER_ADDRESS 即 yarn.resourcemanager.scheduler.address
    23. // RM_ADMIN_ADDRESS 即 yarn.resourcemanager.admin.address
    24. // RM_RESOURCE_TRACKER_ADDRESS 即 yarn.resourcemanager.resource-tracker.address
    25. // RM_WEBAPP_ADDRESS 即 yarn.resourcemanager.webapp.address
    26. verifyAndSetAllServiceAddresses(conf);
    27. }

    2、添加选举人

    1. //必须在管理员服务后添加选举人
    2. if (this.rmContext.isHAEnabled()) {
    3. //获取配置文件中yarn.resourcemanager.ha.automatic-failover.enabled的值,默认true
    4. // 启用自动故障切换;默认情况下,只有在启用HA时才会启用它。
    5. //获取配置文件中yarn.resourcemanager.ha.automatic-failover.embedded的值,默认true
    6. // 启用嵌入式自动故障切换。默认情况下,只有在启用HA时才会启用它。
    7. // 嵌入式elector依赖于RM状态存储来处理围栏,主要用于与ZKRMStateStore结合使用。
    8. if (HAUtil.isAutomaticFailoverEnabled(conf)
    9. && HAUtil.isAutomaticFailoverEmbedded(conf)) {
    10. EmbeddedElector elector = createEmbeddedElector();
    11. //添加Curator的领导人选举服务
    12. addIfService(elector);
    13. rmContext.setLeaderElectorService(elector);
    14. }
    15. }
    16. protected EmbeddedElector createEmbeddedElector() throws IOException {
    17. EmbeddedElector elector;
    18. //获取配置文件中 yarn.resourcemanager.ha.curator-leader-elector.enabled 的值,默认true
    19. /是否使用Curator-based的选举人进行领导人选举
    20. curatorEnabled =
    21. conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
    22. YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
    23. if (curatorEnabled) {
    24. //获取ZooKeeper Curator管理器,创建并启动(如果不存在)
    25. this.zkManager = createAndStartZKManager(conf);
    26. /使用Curator的领导人选举实施
    27. elector = new CuratorBasedElectorService(this);
    28. } else {
    29. elector = new ActiveStandbyElectorBasedElectorService(this);
    30. }
    31. return elector;
    32. }

    3、创建并启动ZooKeeper Curator管理器

    Curator是Netflix公司在原生zookeeper客户端基础上开源的第三方Java客户端,使用它可以去操作zookeeper创建、删除、查询、修改znode节点

    1. public ZKCuratorManager createAndStartZKManager(Configuration
    2. config) throws IOException {
    3. //提供特定于ZK操作的实用程序方法的Helper类
    4. ZKCuratorManager manager = new ZKCuratorManager(config);
    5. //获取身份验证
    6. List authInfos = new ArrayList<>();
    7. //获取 yarn.resourcemanager.ha.enabled 值,默认false
    8. //获取 yarn.resourcemanager.zk-state-store.root-node.acl + yarn.resourcemanager.ha.id 的值
    9. //yarn.resourcemanager.ha.id官方解释:(在第1步已经设置过这个值了)
    10. //当前RM的id(字符串)。启用HA时,这是一个可选配置。当前RM的id可以通过显式指定yarn.resourcemanager.ha.id来设置,也可以通过匹配yarn.reresourcemanager.address来计算。具有本地地址的{id}请参阅yarn.resourcemanager.ha.enabled的描述,了解如何使用它的完整详细信息。
    11. //yarn.resourcemanager.zk-state-store.root-node.acl官方解释:
    12. //在HA场景中使用ZKRMStateStore进行围栏时,用于根znode的ACL。ZKRMStateStore支持隐式围栏,允许单个ResourceManager对存储进行写访问。对于围栏,群集中的ResourceManager在根节点上共享读写管理权限,但Active ResourceManager声明具有独占的创建-删除权限。默认情况下,当未设置此属性时,我们使用来自yarn.resourcemanager.zk-cl的acl进行共享管理访问,并使用rm address:random number进行基于用户名的独占创建-删除访问。此属性允许用户设置自己选择的ACL,而不是使用默认机制。为了使围栏发挥作用,应在每个ResourceManager上小心地以不同的方式设置ACL,以便所有ResourceManager都具有共享的管理访问权限,而Active ResourceManager(仅)接管创建-删除访问权限。
    13. if (HAUtil.isHAEnabled(config) && HAUtil.getConfValueForRMInstance(
    14. YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, config) == null) {
    15. String zkRootNodeUsername = HAUtil.getConfValueForRMInstance(
    16. YarnConfiguration.RM_ADDRESS,
    17. YarnConfiguration.DEFAULT_RM_ADDRESS, config);
    18. // private final String zkRootNodePassword =
    19. Long.toString(new SecureRandom().nextLong());
    20. //由此可见 zkRootNodePassword 是一个随机数
    21. String defaultFencingAuth =
    22. zkRootNodeUsername + ":" + zkRootNodePassword;
    23. //RM地址和一个随机数构建了一个字节数组
    24. byte[] defaultFencingAuthData =
    25. defaultFencingAuth.getBytes(Charset.forName("UTF-8"));
    26. //构建身份验证摘要
    27. String scheme = new DigestAuthenticationProvider().getScheme();
    28. AuthInfo authInfo = new AuthInfo(scheme, defaultFencingAuthData);
    29. authInfos.add(authInfo);
    30. }
    31. //开始连接到ZooKeeper集合
    32. manager.start(authInfos);
    33. return manager;
    34. }

    4、连接ZooKeeper集合

    1. public void start(List authInfos) throws IOException {
    2. //获取ZooKeeper团队地址 即 hadoop.zk.address
    3. //
    4. // hadoop.zk.address
    5. // zk1:2181,zk2:2181,zk3:2181
    6. // ZK法定人数的地址。用于两者状态和领导人选举
    7. //
    8. //
    9. String zkHostPort = conf.get(CommonConfigurationKeys.ZK_ADDRESS);
    10. if (zkHostPort == null) {
    11. throw new IOException(
    12. CommonConfigurationKeys.ZK_ADDRESS + " is not configured.");
    13. }
    14. //获取 hadoop.zk.num-retries 的值 默认值 1000
    15. //ZooKeeper操作的最大重试次数
    16. int numRetries = conf.getInt(CommonConfigurationKeys.ZK_NUM_RETRIES,
    17. CommonConfigurationKeys.ZK_NUM_RETRIES_DEFAULT);
    18. //获取 hadoop.zk.timeout-ms 的值 默认值 10000
    19. //ZooKeepers操作超时(以毫秒为单位)
    20. int zkSessionTimeout = conf.getInt(CommonConfigurationKeys.ZK_TIMEOUT_MS,
    21. CommonConfigurationKeys.ZK_TIMEOUT_MS_DEFAULT);
    22. //获取 hadoop.zk.retry-interval-ms 的值 默认值 1000
    23. //以毫秒为单位重试ZooKeeper操作的频率
    24. int zkRetryInterval = conf.getInt(
    25. CommonConfigurationKeys.ZK_RETRY_INTERVAL_MS,
    26. CommonConfigurationKeys.ZK_RETRY_INTERVAL_MS_DEFAULT);
    27. RetryNTimes retryPolicy = new RetryNTimes(numRetries, zkRetryInterval);
    28. //设置ZooKeeper身份验证
    29. List zkAuths = getZKAuths(conf);
    30. if (authInfos == null) {
    31. authInfos = new ArrayList<>();
    32. }
    33. for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
    34. authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth()));
    35. }
    36. //获取客户端框架
    37. CuratorFramework client = CuratorFrameworkFactory.builder()
    38. .connectString(zkHostPort)
    39. .sessionTimeoutMs(zkSessionTimeout)
    40. .retryPolicy(retryPolicy)
    41. .authorization(authInfos)
    42. .build();
    43. //启动
    44. client.start();
    45. this.curator = client;
    46. }

    5、启动Curator的领导人选举服务

    1. protected void serviceInit(Configuration conf) throws Exception {
    2. rmId = HAUtil.getRMHAId(conf);
    3. String clusterId = YarnConfiguration.getClusterId(conf);
    4. //获取 yarn.resourcemanager.ha.automatic-failover.zk-base-path 的值 默认值 /yarn-leader-election
    5. //官网解释:使用基于ZooKeeper的领导人选举时,用于存储领导人信息的基本znode路径。
    6. String zkBasePath = conf.get(
    7. YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
    8. YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
    9. latchPath = zkBasePath + "/" + clusterId;
    10. //第3步已经设置过了,这里直接取
    11. curator = rm.getCurator();
    12. //初始化并启动LeaderLatch
    13. initAndStartLeaderLatch();
    14. super.serviceInit(conf);
    15. }

    五、总结

    1、判断配置文件中是否配置了HA开启

    2、如果开启了HA,开始配置并设置启动必要参数

    3、根据配置文件添加选举人

    4、获取ZooKeeper Curator管理器,创建并启动

    5、连接到ZooKeeper集合

    6、获取客户端框架并启动

  • 相关阅读:
    深度学习计算机视觉中, 多尺度特征和上下文特征的区别是?
    flutter产物以aar形式嵌入android原生工程
    C语言趣味代码(三)
    浅谈数据结构之队列
    ubuntun系统更换清华源
    开源.NET8.0小项目伪微服务框架(分布式、EFCore、Redis、RabbitMQ、Mysql等)
    hbase最新版本配置属性
    最快的包管理器--pnpm创建vue项目完整步骤
    序列超图的下一项推荐 笔记
    外汇天眼:一平台产品1天收益率20%,7天1倍!FCA已发出警告!
  • 原文地址:https://blog.csdn.net/lu070828/article/details/136070966