• 解码Hadoop系列——NameNode启动流程


    namenode的主要责任是文件元信息与数据块映射的管理。相应的,namenode的启动流程需要关注与客户端、datanode通信的工作线程,文件元信息的管理机制,数据块的管理机制等。其中,RpcServer主要负责与客户端、datanode通信,FSDirectory主要负责管理文件元信息。流程如下:
    在这里插入图片描述

    1.执行命令启动hdfs集群

    start-dfs.sh
    
    • 1

    该命令会启动Hdfs的NameNode以及DataNode,启动NameNode主要是通过org.apache.hadoop.hdfs.server.namenode.NameNode类。

    2.NameNode启动主流程

    public class NameNode extends ReconfigurableBase implements
        NameNodeStatusMXBean {
      //主要初始化一些HDFS的配置信息
      static{
        //点进去发现init方法为空,但并不是什么也没做,
        //可以观察HdfsConfiguration的静态代码块,源码跳转到3查看
        HdfsConfiguration.init();
      }
      //代码省略 。。。。。。。。。。
       //main方法
      public static void main(String argv[]) throws Exception {
        //解析传入的参数是否为usage参数,如果是的话打印usage帮助信息并退出
        if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) {
          System.exit(0);
        }
    
        try {
          //格式化输出启动信息,并创建hook,打印节点关闭信息
          StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
          //创建NameNode,源码跳转到4查看
          NameNode namenode = createNameNode(argv, null);
          if (namenode != null) {
            //namenode加入集群
            namenode.join();
          }
        } catch (Throwable e) {
          //启动异常
          LOG.error("Failed to start namenode.", e);
          terminate(1, e);
        }
      }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    3.HdfsConfiguration类,HDFS配置参数

    public class HdfsConfiguration extends Configuration {
        //静态代码块,初始化hdfs一些配置信息
      static {
        addDeprecatedKeys();
    
        // adds the default resources
         //添加配置文件
        Configuration.addDefaultResource("hdfs-default.xml");
        Configuration.addDefaultResource("hdfs-site.xml");
      }
    
      public HdfsConfiguration() {
        super();
      }
    
      public HdfsConfiguration(boolean loadDefaults) {
        super(loadDefaults);
      }
    
      public HdfsConfiguration(Configuration conf) {
        super(conf);
      }
    
      /**
       * This method is here so that when invoked, HdfsConfiguration is class-loaded
       * if it hasn't already been previously loaded.  Upon loading the class, the
       * static initializer block above will be executed to add the deprecated keys
       * and to add the default resources. It is safe for this method to be called
       * multiple times as the static initializer block will only get invoked once.
       *
       * This replaces the previously, dangerous practice of other classes calling
       * Configuration.addDefaultResource("hdfs-default.xml") directly without
       * loading this class first, thereby skipping the key deprecation.
       */
      public static void init() {
      }
    
      private static void addDeprecatedKeys() {
        Configuration.addDeprecations(new DeprecationDelta[]{
            new DeprecationDelta("dfs.backup.address",
                DeprecatedKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY),
            //。。。。。省略多个配置参数
            
            new DeprecationDelta("dfs.encryption.key.provider.uri",
                CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH),
        });
      }
    
      public static void main(String[] args) {
        init();
        Configuration.dumpDeprecatedKeys();
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    4.createNameNode()创建NameNode

    public static NameNode createNameNode(String argv[], Configuration conf)
          throws IOException {
        LOG.info("createNameNode " + Arrays.asList(argv));
        if (conf == null)
          conf = new HdfsConfiguration();
        // Parse out some generic args into Configuration.
        //解析一些通用参数到配置项
        GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
        argv = hParser.getRemainingArgs();
        // Parse the rest, NN specific args.
        //解析NameNode启动参数
        StartupOption startOpt = parseArguments(argv);
        if (startOpt == null) {
          printUsage(System.err);
          return null;
        }
        setStartupOption(conf, startOpt);
    
        switch (startOpt) {
          case FORMAT: {
            boolean aborted = format(conf, startOpt.getForceFormat(),
                startOpt.getInteractiveFormat());
            terminate(aborted ? 1 : 0);
            return null; // avoid javac warning
          }
          case GENCLUSTERID: {
            System.err.println("Generating new cluster id:");
            System.out.println(NNStorage.newClusterID());
            terminate(0);
            return null;
          }
          //省略多个case分支...
          case UPGRADEONLY: {
            DefaultMetricsSystem.initialize("NameNode");
            new NameNode(conf);
            terminate(0);
            return null;
          }
          //正常启动进入该分支
          default: {
            //初始化metric系统
            DefaultMetricsSystem.initialize("NameNode");
            //返回新的NameNode,源码跳转到5查看
            return new NameNode(conf);
          }
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    5.NameNode构造函数

    public NameNode(Configuration conf) throws IOException {
        this(conf, NamenodeRole.NAMENODE);
      }
    
      protected NameNode(Configuration conf, NamenodeRole role)
          throws IOException {
        super(conf);
        this.tracer = new Tracer.Builder("NameNode").
            conf(TraceUtils.wrapHadoopConf(NAMENODE_HTRACE_PREFIX, conf)).
            build();
        this.tracerConfigurationManager =
            new TracerConfigurationManager(NAMENODE_HTRACE_PREFIX, conf);
        this.role = role;
        //设置NameNode#clientNameNodeAddress为'hdfs://localhost:9000'
        setClientNamenodeAddress(conf);
        String nsId = getNameServiceId(conf);
        String namenodeId = HAUtil.getNameNodeId(conf, nsId);
    
        //HA相关
        this.haEnabled = HAUtil.isHAEnabled(conf, nsId);
        state = createHAState(getStartupOption(conf));
        this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);
        this.haContext = createHAContext();
        try {
          initializeGenericKeys(conf, nsId, namenodeId);
          //完成实际的初始化工作,源码跳转到6查看
          initialize(getConf());
          //HA相关
            //尽管本地没有开启HA(haEnabled=false**),namenode依然拥有一个HAState
            //namenode的HAState状态为active.
          try {
            haContext.writeLock();
            state.prepareToEnterState(haContext);
            state.enterState(haContext);
          } finally {
            haContext.writeUnlock();
          }
        } catch (IOException e) {
          this.stopAtException(e);
          throw e;
        } catch (HadoopIllegalArgumentException e) {
          this.stopAtException(e);
          throw e;
        }
        this.started.set(true);
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    6.initialize(Configuration conf) 完成实际的初始化工作

     protected void initialize(Configuration conf) throws IOException {
        if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) {
          String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY);
          if (intervals != null) {
            conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS,
              intervals);
          }
        }
    
        UserGroupInformation.setConfiguration(conf);
        loginAsNameNodeUser(conf);
    
        //初始化metric
        NameNode.initMetrics(conf, this.getRole());
        StartupProgressMetrics.register(startupProgress);
    
        //启动JvmPauseMonitor等,反向监控JVM
        pauseMonitor = new JvmPauseMonitor();
        pauseMonitor.init(conf);
        pauseMonitor.start();
        metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
    
        //启动httpserver
        if (NamenodeRole.NAMENODE == role) {
          startHttpServer(conf);
        }
    
        //从NameNode目录加载editlog和fsimage,初始化FsNameSystem,FsDirectory,LeaseManager等
        loadNamesystem(conf);
    
        //创建rpcserver,封装了NameNodeRpcServer、ClientRPCServer
        //支持ClientNameNodeProtocol、DataNodeProtocolPB等协议
        rpcServer = createRpcServer(conf);
    
        initReconfigurableBackoffKey();
    
        if (clientNamenodeAddress == null) {
          // This is expected for MiniDFSCluster. Set it now using 
          // the RPC server's bind address.
          clientNamenodeAddress = 
              NetUtils.getHostPortString(getNameNodeAddress());
          LOG.info("Clients are to use " + clientNamenodeAddress + " to access"
              + " this namenode/service.");
        }
        if (NamenodeRole.NAMENODE == role) {
          httpServer.setNameNodeAddress(getNameNodeAddress());
          httpServer.setFSImage(getFSImage());
        }
    
        //启动执行多个重要的工作线程,源码跳转到7
        startCommonServices(conf);
        startMetricsLogger(conf);
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    7.startCommonServices(conf) 启动执行多个重要工作的线程

      private void startCommonServices(Configuration conf) throws IOException {
        //创建NameNodeResourceChecker、激活BlockManager等
          //源码跳转到8
        namesystem.startCommonServices(conf, haContext);
        registerNNSMXBean();
    
        //如果role不为NameNode.NAMENODE的在此处启动Httpserver
        if (NamenodeRole.NAMENODE != role) {
          startHttpServer(conf);
          httpServer.setNameNodeAddress(getNameNodeAddress());
          httpServer.setFSImage(getFSImage());
        }
        //启动RpcServer
        rpcServer.start();
    
        //启动各种插件
        try {
          plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY,
              ServicePlugin.class);
        } catch (RuntimeException e) {
          String pluginsValue = conf.get(DFS_NAMENODE_PLUGINS_KEY);
          LOG.error("Unable to load NameNode plugins. Specified list of plugins: " +
              pluginsValue, e);
          throw e;
        }
        for (ServicePlugin p: plugins) {
          try {
            p.start(this);
          } catch (Throwable t) {
            LOG.warn("ServicePlugin " + p + " could not be started", t);
          }
        }
        LOG.info(getRole() + " RPC up at: " + getNameNodeAddress());
        if (rpcServer.getServiceRpcAddress() != null) {
          LOG.info(getRole() + " service RPC up at: "
              + rpcServer.getServiceRpcAddress());
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    8.startCommonServices(conf, haContext)

     void startCommonServices(Configuration conf, HAContext haContext) throws IOException {
        this.registerMBean(); // register the MBean for the FSNamesystemState
        writeLock();
        this.haContext = haContext;
        try {
          //创建NameNodeResourceChecker,并立即检查一次
          nnResourceChecker = new NameNodeResourceChecker(conf);
          checkAvailableResources();
          assert !blockManager.isPopulatingReplQueues();
          //设置一些启动过程中的信息
          StartupProgress prog = NameNode.getStartupProgress();
          prog.beginPhase(Phase.SAFEMODE);
    
          //获取已完成的数据块总量
          long completeBlocksTotal = getCompleteBlocksTotal();
          prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,
              completeBlocksTotal);
    
          //激活BlockManager,源码跳转到9
          blockManager.activate(conf, completeBlocksTotal);
        } finally {
          writeUnlock("startCommonServices");
        }
        
        registerMXBean();
        DefaultMetricsSystem.instance().register(this);
        if (inodeAttributeProvider != null) {
          inodeAttributeProvider.start();
          dir.setINodeAttributeProvider(inodeAttributeProvider);
        }
        snapshotManager.registerMXBean();
        InetSocketAddress serviceAddress = NameNode.getServiceAddress(conf, true);
        this.nameNodeHostName = (serviceAddress != null) ?
            serviceAddress.getHostName() : "";
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    9.activate(conf, completeBlocksTotal)激活BlockManager

    //blockManager.activate(conf)激活BlockManager主要完成PendingReplicationMonitor、
     // DecommissionManager#Monitor、HeartbeatManager#Monitor、ReplicationMonitor
      public void activate(Configuration conf, long blockTotal) {
        // 启动PendingReplicationMonitor
        pendingReplications.start();
        // 激活DatanodeManager:启动DecommissionManager--Monitor、HeartbeatManager--Monitor
        datanodeManager.activate(conf);
        this.replicationThread.setName("ReplicationMonitor");
        // 启动BlockManager--ReplicationMonitor
        this.replicationThread.start();
        this.blockReportThread.start();
        mxBeanName = MBeans.register("NameNode", "BlockStats", this);
        bmSafeMode.activate(blockTotal);
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
  • 相关阅读:
    SSM项目 - Online Music Player(在线音乐播放器)- 前端页面设计 - 细节狂魔
    Spring Ioc源码分析系列--Bean实例化过程(一)
    使用Spring Boot实现GraphQL
    (干货) 差分对信号的长度和间距基于什么而界定的,一文了解。
    CV第四次上机 利用双目图像计算深度图
    经颅直流电刺激对大脑网络的调制
    arcgis创建postgre企业级数据库
    3 垃圾收集算法
    剑指offer——JZ82 二叉树中和为某一值的路径(一) 解题思路与具体代码【C++】
    SFUZZ模糊测试平台全新升级,从标准到实践助力车企安全出海
  • 原文地址:https://blog.csdn.net/manshiai/article/details/126287854