• Hadoop笔记06-Hadoop-源码解析


    RPC通信原理解析

    在这里插入图片描述
    模拟RPC客户端、服务端、通信协议的工作流程。
    RPCProtocol.java

    package com.demo.rpc;
    
    public interface RPCProtocol {
        long versionID = 1;
    
        void mkdirs(String path);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    NNServer.java

    package com.demo.rpc;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.ipc.RPC;
    import org.apache.hadoop.ipc.Server;
    
    import java.io.IOException;
    
    public class NNServer implements RPCProtocol {
        @Override
        public void mkdirs(String path) {
            System.out.println("服务端创建路径:" + path);
        }
    
        public static void main(String[] args) throws IOException {
            Server server = new RPC.Builder(new Configuration())
                    .setBindAddress("localhost")
                    .setPort(8888)
                    .setProtocol(RPCProtocol.class)
                    .setInstance(new NNServer())
                    .build();
            System.out.println("服务端开始工作");
            server.start();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    HDFSClient.java

    package com.demo.rpc;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.ipc.RPC;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    
    public class HDFSClient {
        public static void main(String[] args) throws IOException {
            RPCProtocol client = RPC.getProxy(
                    RPCProtocol.class,
                    RPCProtocol.versionID,
                    new InetSocketAddress("localhost", 8888),
                    new Configuration()
            );
            System.out.println("客户端发送RPC请求");
            client.mkdirs("/testPath");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    先启动NNServer,此时服务端就监听8888端口,再启动HDFSClient,客户端向8888端口发送一个RPC请求,由服务端执行操作。
    当服务端启动后,可以使用jps命令查看到NNServer服务。

    NameNode启动源码解析

    在这里插入图片描述
    在这里插入图片描述
    为了查看源码,需要在pom.xml里增加依赖。

    <dependency>
        <groupId>org.apache.hadoopgroupId>
        <artifactId>hadoop-clientartifactId>
        <version>3.1.3version>
    dependency>
    <dependency>
        <groupId>org.apache.hadoopgroupId>
        <artifactId>hadoop-hdfsartifactId>
        <version>3.1.3version>
    dependency>
    <dependency>
        <groupId>org.apache.hadoopgroupId>
        <artifactId>hadoop-hdfs-clientartifactId>
        <version>3.1.3version>
        <scope>providedscope>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    找到NameNode.javamain()方法。

    public static void main(String[] argv) throws Exception {
        if (DFSUtil.parseHelpArgument(argv, USAGE, System.out, true)) {
            System.exit(0);
        }
        try {
            StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
            // 创建NameNode结点
            NameNode namenode = createNameNode(argv, (Configuration)null);
            if (namenode != null) {
                namenode.join();
            }
        } catch (Throwable var2) {
            LOG.error("Failed to start namenode.", var2);
            ExitUtil.terminate(1, var2);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    点击进createNameNode方法,在方法最下面找到return new NameNode((Configuration)conf);,点进NameNode方法,点进this(conf, NamenodeRole.NAMENODE);,点进this.initialize(this.getConf());方法,此时就可以看到和上图匹配的流程了。

    protected void initialize(Configuration conf) throws IOException {
        if (conf.get("hadoop.user.group.metrics.percentiles.intervals") == null) {
            String intervals = conf.get("dfs.metrics.percentiles.intervals");
            if (intervals != null) {
                conf.set("hadoop.user.group.metrics.percentiles.intervals", intervals);
            }
        }
    
        UserGroupInformation.setConfiguration(conf);
        this.loginAsNameNodeUser(conf);
        initMetrics(conf, this.getRole());
        StartupProgressMetrics.register(startupProgress);
        this.pauseMonitor = new JvmPauseMonitor();
        this.pauseMonitor.init(conf);
        this.pauseMonitor.start();
        metrics.getJvmMetrics().setPauseMonitor(this.pauseMonitor);
        if (NamenodeRole.NAMENODE == this.role) {
        	// 在9870端口启动HTTP服务
            this.startHttpServer(conf);
        }
    	// 加载镜像文件和编辑日志到内存
        this.loadNamesystem(conf);
        this.startAliasMapServerIfNecessary(conf);
        // 创建NN的RPC服务端
        this.rpcServer = this.createRpcServer(conf);
        this.initReconfigurableBackoffKey();
        if (this.clientNamenodeAddress == null) {
            this.clientNamenodeAddress = NetUtils.getHostPortString(this.getNameNodeAddress());
            LOG.info("Clients are to use " + this.clientNamenodeAddress + " to access this namenode/service.");
        }
    
        if (NamenodeRole.NAMENODE == this.role) {
            this.httpServer.setNameNodeAddress(this.getNameNodeAddress());
            this.httpServer.setFSImage(this.getFSImage());
        }
    	// NN启动资源检查
        this.startCommonServices(conf);
        this.startMetricsLogger(conf);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    启动9870端口服务

    依次点进这几个方法,就可以找到9870端口服务了。

    this.startHttpServer(conf);
    this.getHttpServerBindAddress(conf);
    this.getHttpServerAddress(conf);
    getHttpAddress(conf);
    NetUtils.createSocketAddr(conf.getTrimmed("dfs.namenode.http-address", "0.0.0.0:9870"));
    
    • 1
    • 2
    • 3
    • 4
    • 5

    startHttpServer里,查看start()方法。

    ...
    this.httpServer = builder.build();
    ...
    // 准备servlet
    setupServlets(this.httpServer, this.conf);
    
    • 1
    • 2
    • 3
    • 4
    • 5

    查看setupServlets方法。

    private static void setupServlets(HttpServer2 httpServer, Configuration conf) {
        httpServer.addInternalServlet("startupProgress", "/startupProgress", StartupProgressServlet.class);
        httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class, true);
        httpServer.addInternalServlet("imagetransfer", "/imagetransfer", ImageServlet.class, true);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    加载镜像文件和编辑日志

    回到initialize()方法,依次点进这几个方法。

    this.loadNamesystem(conf);
    FSNamesystem.loadFromDisk(conf);
    
    • 1
    • 2
    static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
        checkConfiguration(conf);
        // FSImage的参数里有镜像文件路径和编辑日志路径
        FSImage fsImage = new FSImage(conf, getNamespaceDirs(conf), getNamespaceEditsDirs(conf));
        FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false);
        StartupOption startOpt = NameNode.getStartupOption(conf);
        if (startOpt == StartupOption.RECOVER) {
            namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
        }
    
        long loadStart = Time.monotonicNow();
    
        try {
        	// 加载FSImage
            namesystem.loadFSImage(startOpt);
        } catch (IOException var9) {
            LOG.warn("Encountered exception loading fsimage", var9);
            fsImage.close();
            throw var9;
        }
    
        long timeTakenToLoadFSImage = Time.monotonicNow() - loadStart;
        LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
        NameNodeMetrics nnMetrics = NameNode.getNameNodeMetrics();
        if (nnMetrics != null) {
            nnMetrics.setFsImageLoadTime((long)((int)timeTakenToLoadFSImage));
        }
    
        namesystem.getFSDirectory().createReservedStatuses(namesystem.getCTime());
        return namesystem;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    初始化NN的RPC服务

    回到initialize()方法,依次点进这几个方法。

    this.createRpcServer(conf);
    new NameNodeRpcServer(conf, this);
    this.serviceRpcServer = (new Builder(conf)).setProtocol(ClientNamenodeProtocolPB.class).setInstance(clientNNPbService).setBindAddress(bindHost).setPort(serviceRpcAddr.getPort()).setNumHandlers(serviceHandlerCount).setVerbose(false).setSecretManager(this.namesystem.getDelegationTokenSecretManager()).build();
    
    • 1
    • 2
    • 3

    NN启动资源检查

    回到initialize()方法,依次点进这几个方法。

    this.startCommonServices(conf);
    this.namesystem.startCommonServices(conf, this.haContext);
    this.checkAvailableResources();
    this.nnResourceChecker.hasAvailableDiskSpace();
    NameNodeResourcePolicy.areResourcesAvailable(this.volumes.values(), this.minimumRedundantVolumes);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    void startCommonServices(Configuration conf, HAContext haContext) throws IOException {
        this.registerMBean();
        this.writeLock();
        this.haContext = haContext;
    
        try {
        	// 创建一个checker,具体check的内容在NameNodeResourceChecker里
            this.nnResourceChecker = new NameNodeResourceChecker(conf);
            // 检查是否有足够的磁盘存储元数据(fsimage(默认 100m)editLog(默认 100m))
            this.checkAvailableResources();
    
            assert !this.blockManager.isPopulatingReplQueues();
    
            StartupProgress prog = NameNode.getStartupProgress();
            // 开始安全模式
            prog.beginPhase(Phase.SAFEMODE);
            // 计算准备完成的块数量
            long completeBlocksTotal = this.getCompleteBlocksTotal();
            // 设置安全模式
            prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS, completeBlocksTotal);
            // 启动块服务
            this.blockManager.activate(conf, completeBlocksTotal);
        } finally {
            this.writeUnlock("startCommonServices");
        }
    
        this.registerMXBean();
        DefaultMetricsSystem.instance().register(this);
        if (this.inodeAttributeProvider != null) {
            this.inodeAttributeProvider.start();
            this.dir.setINodeAttributeProvider(this.inodeAttributeProvider);
        }
    
        this.snapshotManager.registerMXBean();
        InetSocketAddress serviceAddress = NameNode.getServiceAddress(conf, true);
        this.nameNodeHostName = serviceAddress != null ? serviceAddress.getHostName() : "";
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    NN对心跳超时判断

    回到initialize()方法,依次点进这几个方法。

    this.startCommonServices(conf);
    this.namesystem.startCommonServices(conf, this.haContext);
    this.blockManager.activate(conf, completeBlocksTotal);
    this.datanodeManager.activate(conf);
    this.heartbeatManager.activate();
    
    • 1
    • 2
    • 3
    • 4
    • 5

    再找到HeartbeatManager.Monitor的run方法。

    HeartbeatManager.this.heartbeatCheck();
    dm.isDatanodeDead(d);// 判断结点是否挂掉
    
    • 1
    • 2

    heartbeatExpireInterval的值是10分钟+30秒。

    安全模式

    回到initialize()方法,依次点进这几个方法。

    this.startCommonServices(conf);
    this.namesystem.startCommonServices(conf, this.haContext);
    this.blockManager.activate(conf, completeBlocksTotal);
    this.bmSafeMode.activate(blockTotal);
    
    • 1
    • 2
    • 3
    • 4
    void activate(long total) {
        assert this.namesystem.hasWriteLock();
        assert this.status == BlockManagerSafeMode.BMSafeModeStatus.OFF;
        this.startTime = Time.monotonicNow();
        // 计算是否满足块个数阈值
        this.setBlockTotal(total);
        // 判断 DataNode 节点和块信息是否达到退出安全模式标准
        if (this.areThresholdsMet()) {
            boolean exitResult = this.leaveSafeMode(false);
            Preconditions.checkState(exitResult, "Failed to leave safe mode.");
        } else {
            this.status = BlockManagerSafeMode.BMSafeModeStatus.PENDING_THRESHOLD;
            this.initializeReplQueuesIfNecessary();
            this.reportStatus("STATE* Safe mode ON.", true);
            this.lastStatusReport = Time.monotonicNow();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    DataNode启动源码解析

    在这里插入图片描述
    在这里插入图片描述
    找到DadaNode.javamain()方法,依次点进这几个方法。

    secureMain(args, (SecureResources)null);
    DataNode datanode = createDataNode(args, (Configuration)null, resources);
    // 初始化DataNode实例
    DataNode dn = instantiateDataNode(args, conf, resources);
    // 创建DataNode实例
    makeInstance(dataLocations, (Configuration)conf, resources);
    // 创建DataNode
    new DataNode(conf, locations, storageLocationChecker, resources);
    this.startDataNode(dataDirs, resources);
    // 初始化DataXceiver
    this.initDataXceiver();
    // 启动HTTPServer
    this.startInfoServer();
    // 初始化RPC服务
    this.initIpcServer();
    // 创建心跳管理
    this.blockPoolManager = new BlockPoolManager(this);
    // 刷新NameNode
    this.blockPoolManager.refreshNamenodes(this.getConf());
    // 创建DataNode结点
    createDataNode(args, conf, (SecureResources)null);
    // 启动DataNode进程
    dn.runDatanodeDaemon();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    初始化DataXceiverServer

    找到org.apache.hadoop.hdfs.server.datanode.DataNode#startDataNode方法里的this.initDataXceiver();

    // DataXceiverServer是一个服务,DataNode用来接收客户端和其他DataNode发过来的数据服务,这里就是创建了一个线程
    this.dataXceiverServer = new Daemon(this.threadGroup, this.xserver);
    
    • 1
    • 2

    初始化HTTP服务

    找到org.apache.hadoop.hdfs.server.datanode.DataNode#startDataNode方法里的this.startInfoServer();

    new DatanodeHttpServer(this.getConf(), this, httpServerChannel);
    Builder builder = (new Builder()).setName("datanode").setConf(confForInfoServer).setACL(new AccessControlList(conf.get("dfs.cluster.administrators", " "))).hostName(getHostnameForSpnegoPrincipal(confForInfoServer)).addEndpoint(URI.create("http://localhost:" + proxyPort)).setFindPort(true);
    this.infoServer = builder.build();
    
    • 1
    • 2
    • 3

    初始化DN的RPC服务端

    找到org.apache.hadoop.hdfs.server.datanode.DataNode#startDataNode方法里的this.initIpcServer();

    this.ipcServer = (new org.apache.hadoop.ipc.RPC.Builder(this.getConf())).setProtocol(ClientDatanodeProtocolPB.class).setInstance(service).setBindAddress(ipcAddr.getHostName()).setPort(ipcAddr.getPort()).setNumHandlers(this.getConf().getInt("dfs.datanode.handler.count", 10)).setVerbose(false).setSecretManager(this.blockPoolTokenSecretManager).build();
    
    • 1

    DN向NN注册

    找到org.apache.hadoop.hdfs.server.datanode.DataNode#startDataNode方法里的this.blockPoolManager.refreshNamenodes(this.getConf());

    this.doRefreshNamenodes(newAddressMap, newLifelineAddressMap);
    // 每个NodeNode创建一个BPOS
    BPOfferService bpos = this.createBPOS(nsToAdd, addrs, lifelineAddrs);
    // 最后统一启动
    this.startAll();
    // 向NameNode注册
    this.connectToNNAndHandshake();
    // 获取NameNode的RPC客户端对象
    this.bpNamenode = this.dn.connectToNN(this.nnAddr);
    new DatanodeProtocolClientSideTranslatorPB(nnAddr, this.getConf());
    this.rpcProxy = createNamenode(nameNodeAddr, conf, ugi);
    (DatanodeProtocolPB)RPC.getProxy(DatanodeProtocolPB.class, RPC.getProtocolVersion(DatanodeProtocolPB.class), nameNodeAddr, ugi, conf, NetUtils.getSocketFactory(conf, DatanodeProtocolPB.class));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    向NN发送心跳

    找到org.apache.hadoop.hdfs.server.datanode.BPServiceActor#run方法里的this.offerService();方法。

    // 发送心跳信息
    resp = this.sendHeartBeat(requestBlockReportLease);
    // 通过NameNode的RPC客户端发送给NameNode
    HeartbeatResponse response = this.bpNamenode.sendHeartbeat(this.bpRegistration, reports, this.dn.getFSDataset().getCacheCapacity(), this.dn.getFSDataset().getCacheUsed(), this.dn.getXmitsInProgress(), this.dn.getXceiverCount(), numFailedVolumes, volumeFailureSummary, requestBlockReportLease, slowPeers, slowDisks);
    
    • 1
    • 2
    • 3
    • 4

    找到NameNodeRpcServer.java,可以找到一个sendHeartbeat()方法。

    HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg, StorageReport[] reports, long cacheCapacity, long cacheUsed, int xceiverCount, int xmitsInProgress, int failedVolumes, VolumeFailureSummary volumeFailureSummary, boolean requestFullBlockReportLease, @Nonnull SlowPeerReports slowPeers, @Nonnull SlowDiskReports slowDisks) throws IOException {
        this.readLock();
    
        HeartbeatResponse var19;
        try {
            int maxTransfer = this.blockManager.getMaxReplicationStreams() - xmitsInProgress;
            // 处理DataNode发送过来的心跳请求
            DatanodeCommand[] cmds = this.blockManager.getDatanodeManager().handleHeartbeat(nodeReg, reports, this.getBlockPoolId(), cacheCapacity, cacheUsed, xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary, slowPeers, slowDisks);
            long blockReportLeaseId = 0L;
            if (requestFullBlockReportLease) {
                blockReportLeaseId = this.blockManager.requestBlockReportLeaseId(nodeReg);
            }
    
            NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(this.haContext.getState().getServiceState(), this.getFSImage().getCorrectLastAppliedOrWrittenTxId());
            // 响应DateNode的心跳请求
            var19 = new HeartbeatResponse(cmds, haState, this.rollingUpgradeInfo, blockReportLeaseId);
        } finally {
            this.readUnlock("handleHeartbeat");
        }
    
        return var19;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    HDFS上传源码解析

    在这里插入图片描述
    在这里插入图片描述

    create创建过程-DataNode向NameNode发送创建请求

    用户发送一个创建路径的请求:FSDataOutputStream fos = fs.create(new Path("/input"));
    create方法一直点进去,直到碰到一个abstract方法,退一步找实现类,找到DistributedFileSystem类里的create方法。

    publicFSDataOutputStream create(Path f, final FsPermission permission, final EnumSet<CreateFlag> cflags, final int bufferSize, final short replication, final long blockSize, final Progressable progress, final ChecksumOpt checksumOpt) throws IOException {
        this.statistics.incrementWriteOps(1);
        this.storageStatistics.incrementOpCounter(OpType.CREATE);
        Path absF = this.fixRelativePart(f);
        return (FSDataOutputStream)(new FileSystemLinkResolver<FSDataOutputStream>() {
            public FSDataOutputStream doCall(Path p) throws IOException {
            	// 创建输出流对象
                DFSOutputStream dfsos = DistributedFileSystem.this.dfs.create(DistributedFileSystem.this.getPathName(p), permission, cflags, replication, blockSize, progress, bufferSize, checksumOpt);
                // 将上面创建的dfsos进行包装并返回
                return DistributedFileSystem.this.dfs.createWrappedOutputStream(dfsos, DistributedFileSystem.this.statistics);
            }
    
            public FSDataOutputStream next(FileSystem fs, Path p) throws IOException {
                return fs.create(p, permission, cflags, bufferSize, replication, blockSize, progress, checksumOpt);
            }
        }).resolve(this, absF);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    进入DFSClient.java的craete()方法:

    public DFSOutputStream create(String src, FsPermission permission, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes, String ecPolicyName) throws IOException {
        this.checkOpen();
        FsPermission masked = this.applyUMask(permission);
        LOG.debug("{}: masked={}", src, masked);
        // 创建一个输出流
        DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, src, masked, flag, createParent, replication, blockSize, progress, this.dfsClientConf.createChecksum(checksumOpt), this.getFavoredNodesStr(favoredNodes), ecPolicyName);
        this.beginFileLease(result.getFileId(), result);
        return result;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    进入newStreamForCreate

    try {
    	// DataNode将创建请求发送给NameNode(RPC)
        stat= dfsClient.namenode.create(src, masked, dfsClient.clientName, new EnumSetWritable(flag), createParent, replication, blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName);
    } catch (RemoteException var28) {
        IOException e = var28.unwrapRemoteException(new Class[]{AccessControlException.class, DSQuotaExceededException.class, QuotaByStorageTypeExceededException.class, FileAlreadyExistsException.class, FileNotFoundException.class, ParentNotDirectoryException.class, NSQuotaExceededException.class, RetryStartFileException.class, SafeModeException.class, UnresolvedPathException.class, SnapshotAccessControlException.class, UnknownCryptoProtocolVersionException.class});
        if (e instanceof RetryStartFileException) {
            if (retryCount <= 0) {
                throw new IOException("Too many retries because of encryption zone operations", e);
            }
    
            shouldRetry = true;
            --retryCount;
            continue;
        }
    
        throw e;
    }
    Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
    Object out;
    // 创建输出流
    if (stat.getErasureCodingPolicy() != null) {
        out = new DFSStripedOutputStream(dfsClient, src, stat, flag, progress, checksum, favoredNodes);
    } else {
        out = new DFSOutputStream(dfsClient, src, stat, flag, progress, checksum, favoredNodes, true);
    }
    // 开启线程
    ((DFSOutputStream)out).start();
    Object var31 = out;
    return (DFSOutputStream)var31;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    create创建过程-NameNode处理DataNode的创建请求

    点进上面的dfsClient.namenode.create方法,找NameNodeRPCServer实现类。

    
    public HdfsFileStatus create(String src, FsPermission masked, String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize, CryptoProtocolVersion[] supportedVersions, String ecPolicyName) throws IOException {
    	// 检查NameNode启动状态
        this.checkNNStartup();
        String clientMachine = getClientMachine();
        if (stateChangeLog.isDebugEnabled()) {
            stateChangeLog.debug("*DIR* NameNode.create: file " + src + " for " + clientName + " at " + clientMachine);
        }
    
        if (!this.checkPathLength(src)) {
            throw new IOException("create: Pathname too long.  Limit 8000 characters, 1000 levels.");
        } else {
            this.namesystem.checkOperation(OperationCategory.WRITE);
            CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(this.retryCache, (Object)null);
            if (cacheEntry != null && cacheEntry.isSuccess()) {
                return (HdfsFileStatus)cacheEntry.getPayload();
            } else {
                HdfsFileStatus status = null;
    
                try {
                    PermissionStatus perm = new PermissionStatus(getRemoteUser().getShortUserName(), (String)null, masked);
                    // 真正开始创建文件
                    status = this.namesystem.startFile(src, perm, clientName, clientMachine, flag.get(), createParent, replication, blockSize, supportedVersions, ecPolicyName, cacheEntry != null);
                } finally {
                    RetryCache.setState(cacheEntry, status != null, status);
                }
    
                this.metrics.incrFilesCreated();
                this.metrics.incrCreateFileOps();
                return status;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    依次点进startFile,startFileInt,FSDirWriteFileOp.startFile,addFile,fsd.addINode,addLastINode,将数据写入INode目录树。

    create创建过程-DataStreamer启动流程

    回到newStreamForCreate,点击start()方法,找到DataStreamer类里的run方法,如果dataQueue里没有数据,代码会阻塞。

    try {
        this.dataQueue.wait(tmpBytesSent);
    } catch (InterruptedException var52) {
        LOG.warn("Caught exception", var52);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    write上传过程-向DataStreamer的队列里面写数据

    用户发送一个写请求:fos.write("hello world".getBytes());,write方法一直点进去,直到碰到abstract,然后找实现类,找到FSOutputSummer,依次点进flushBuffer。

    protected synchronized int flushBuffer(boolean keep, boolean flushPartial) throws IOException {
        int bufLen= this.count;
        int partialLen = bufLen % this.sum.getBytesPerChecksum();
        int lenToFlush = flushPartial ? bufLen : bufLen - partialLen;
        if (lenToFlush != 0) {
        	// 向队列写数据
            this.writeChecksumChunks(this.buf, 0, lenToFlush);
            if (flushPartial && !keep) {
                this.count = 0;
            } else {
                this.count = partialLen;
                System.arraycopy(this.buf, bufLen - this.count, this.buf, 0, this.count);
            }
        }
    
        return this.count - (bufLen - lenToFlush);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    继续点击writeChecksumChunks,找到writeChunk的是实现方法,选择DFSOutputStream。

    protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum, int ckoff, int cklen) throws IOException {
        this.writeChunkPrepare(len, ckoff, cklen);
        // 往packet写chunk的校验和4byte
        this.currentPacket.writeChecksum(checksum, ckoff, cklen);
        // 往packet里写一个chunk512byte
        this.currentPacket.writeData(b, offset, len);
        // 记录写入packet中的chunk个数,累计到127个chuck,这个packet就满了
        this.currentPacket.incNumChunks();
        this.getStreamer().incBytesCurBlock((long)len);
        if (this.currentPacket.getNumChunks() == this.currentPacket.getMaxChunks() || this.getStreamer().getBytesCurBlock() == this.blockSize) {
            this.enqueueCurrentPacketFull();
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    依次点击enqueueCurrentPacketFull(),enqueueCurrentPacket(),waitAndQueuePacket(),这里有一个this.dataQueue.wait();进行等待,后面this.queuePacket(packet);如果队列没满,会往队列里加数据。

    void queuePacket(DFSPacket packet) {
        synchronized(this.dataQueue) {
            if (packet != null) {
                packet.addTraceParent(Tracer.getCurrentSpanId());
                // 向队列尾部加数据
                this.dataQueue.addLast(packet);
                this.lastQueuedSeqno = packet.getSeqno();
                LOG.debug("Queued {}, {}", packet, this);
                // 通知队列添加数据完成
                this.dataQueue.notifyAll();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    write上传过程-建立管道之机架感知(块存储位置)

    来到DataStreamer的run方法,方法比较长。

    synchronized(this.dataQueue) {
        for(long now = Time.monotonicNow(); !this.shouldStop() && this.dataQueue.size() == 0 && (this.stage != BlockConstructionStage.DATA_STREAMING || now - lastPacket < (long)halfSocketTimeout) || doSleep; now = Time.monotonicNow()) {
            tmpBytesSent = (long)halfSocketTimeout - (now - lastPacket);
            tmpBytesSent = tmpBytesSent <= 0L ? 1000L : tmpBytesSent;
            tmpBytesSent = this.stage == BlockConstructionStage.DATA_STREAMING ? tmpBytesSent : 1000L;
    
            try {
            	// 如果dataQueue里没有数据,代码会阻塞,等待上传信息
                this.dataQueue.wait(tmpBytesSent);
            } catch (InterruptedException var52) {
                LOG.warn("Caught exception", var52);
            }
    
            doSleep = false;
        }
    
        if (this.shouldStop()) {
            continue;
        }
    
        if (this.dataQueue.isEmpty()) {
            one = this.createHeartbeatPacket();
        } else {
            try {
                this.backOffIfNecessary();
            } catch (InterruptedException var51) {
                LOG.warn("Caught exception", var51);
            }
    		// 如果队列不为空,取出第一个
            one = (DFSPacket)this.dataQueue.getFirst();
            SpanId[] parents = one.getTraceParents();
            if (parents.length > 0) {
                scope = this.dfsClient.getTracer().newScope("dataStreamer", parents[0]);
                scope.getSpan().setParents(parents);
            }
        }
    }
    ...
    if (this.stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
        LOG.debug("Allocating new block: {}", this);
        // 步骤1:向NameNode申请block并建立数据管道
        this.setPipeline(this.nextBlockOutputStream());
        // 步骤2:启动ResponseProcessor监听packet发送是否成功
        this.initDataStreaming();
    } else if (this.stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
        LOG.debug("Append to block {}", this.block);
        this.setupPipelineForAppendOrRecovery();
        if (this.streamerClosed) {
            continue;
        }
        this.initDataStreaming();
    }
    ...
    SpanId spanId = SpanId.INVALID;
    synchronized(this.dataQueue) {
        if (!one.isHeartbeatPacket()) {
            if (scope != null) {
                spanId = scope.getSpanId();
                scope.detach();
                one.setTraceScope(scope);
            }
            scope = null;
            // 步骤3:从dataQueue把要发送的packet移出去
            this.dataQueue.removeFirst();
            // 步骤4:向ackQueue里添加这个packet
            this.ackQueue.addLast(one);
            this.packetSendTime.put(one.getSeqno(), Time.monotonicNow());
            this.dataQueue.notifyAll();
        }
    }
    ...
    try {
    	// 将数据写出去
        one.writeTo(this.blockStream);
        this.blockStream.flush();
    } catch (Throwable var49) {
        var11 = var49;
        throw var49;
    }
    ...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80

    点击nextBlockOutputStream()方法。

    ...
    // 向NameNode获取向哪个DataNode写数据
    lb = this.locateFollowingBlock(excluded.length > 0 ? excluded : null, oldBlock);
    ...
    // 创建管道
    success = this.createBlockOutputStream(nodes, nextStorageTypes, nextStorageIDs, 0L, false);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    点击locateFollowingBlock()方法,再点击addBlock()方法,可以看到return dfsClient.namenode.addBlock(src, dfsClient.clientName, prevBlock, excludedNodes, fileId, favoredNodes, allocFlags);,这里就发起了一个RPC请求。进到NameNodeRpcServer实现类里,点击getAdditionalBlock(),看到这一句:DatanodeStorageInfo[] targets = FSDirWriteFileOp.chooseTargetForNewBlock(this.blockManager, src, excludedNodes, favoredNodes, flags, r);,就是在选择块存储位置,继续往里点:chooseTarget4NewBlock()blockplacement.chooseTarget()chooseTarget()

    private DatanodeStorageInfo[] chooseTarget(int numOfReplicas, Node writer, List<DatanodeStorageInfo> chosenStorage, boolean returnChosenNodes, Set<Node> excludedNodes, long blocksize, BlockStoragePolicy storagePolicy, EnumSet<AddBlockFlag> addBlockFlags, EnumMap<StorageType, Integer> sTypes) {
        if (numOfReplicas != 0 && this.clusterMap.getNumOfLeaves() != 0) {
            if (excludedNodes == null) {
                excludedNodes = new HashSet();
            }
    
            int[] result = this.getMaxNodesPerRack(chosenStorage.size(), numOfReplicas);
            numOfReplicas = result[0];
            int maxNodesPerRack = result[1];
            Iterator var13 = chosenStorage.iterator();
    
            while(var13.hasNext()) {
                DatanodeStorageInfo storage = (DatanodeStorageInfo)var13.next();
                // 获取不可用的DataNode,加到ExcludedNodes中
                this.addToExcludedNodes(storage.getDatanodeDescriptor(), (Set)excludedNodes);
            }
    
            List<DatanodeStorageInfo> results = null;
            Node localNode = null;
            boolean avoidStaleNodes = this.stats != null && this.stats.isAvoidingStaleDataNodesForWrite();
            boolean avoidLocalNode = addBlockFlags != null && addBlockFlags.contains(AddBlockFlag.NO_LOCAL_WRITE) && writer != null && !((Set)excludedNodes).contains(writer);
            // 本地数据正在写,避免写入本地
            if (avoidLocalNode) {
                results = new ArrayList(chosenStorage);
                Set<Node> excludedNodeCopy = new HashSet((Collection)excludedNodes);
                if (writer != null) {
                    excludedNodeCopy.add(writer);
                }
    			// 真正选择DataNode结点
                localNode = this.chooseTarget(numOfReplicas, writer, excludedNodeCopy, blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy, EnumSet.noneOf(StorageType.class), results.isEmpty(), sTypes);
                if (results.size() < numOfReplicas) {
                    results = null;
                }
            }
    
            if (results == null) {
                results = new ArrayList(chosenStorage);
                localNode = this.chooseTarget(numOfReplicas, writer, (Set)excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy, EnumSet.noneOf(StorageType.class), results.isEmpty(), sTypes);
            }
    
            if (!returnChosenNodes) {
                results.removeAll(chosenStorage);
            }
    
            return this.getPipeline(writer != null && writer instanceof DatanodeDescriptor ? writer : localNode, (DatanodeStorageInfo[])results.toArray(new DatanodeStorageInfo[results.size()]));
        } else {
            return DatanodeStorageInfo.EMPTY_ARRAY;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    chooseTarget()继续往里点,chooseTargetInOrder()

    protected NodechooseTargetInOrder(int numOfReplicas, Node writer, Set<Node> excludedNodes, long blocksize, int maxNodesPerRack, List<DatanodeStorageInfo> results, boolean avoidStaleNodes, boolean newBlock, EnumMap<StorageType, Integer> storageTypes) throws NotEnoughReplicasException {
        int numOfResults = results.size();
        if (numOfResults == 0) {
        	// 第一块存储在当前结点
            DatanodeStorageInfo storageInfo = this.chooseLocalStorage((Node)writer, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes, true);
            writer = storageInfo != null ? storageInfo.getDatanodeDescriptor() : null;
            --numOfReplicas;
            if (numOfReplicas == 0) {
                return (Node)writer;
            }
        }
    
        DatanodeDescriptor dn0 = ((DatanodeStorageInfo)results.get(0)).getDatanodeDescriptor();
        // 第二块存储在另一个机架
        if (numOfResults <= 1) {
            this.chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes);
            --numOfReplicas;
            if (numOfReplicas == 0) {
                return (Node)writer;
            }
        }
    
        if (numOfResults <= 2) {
            DatanodeDescriptor dn1 = ((DatanodeStorageInfo)results.get(1)).getDatanodeDescriptor();
            // 如果第一个和第二个在同一个机架,那么第三个放在其他机架
            if (this.clusterMap.isOnSameRack(dn0, dn1)) {
                this.chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes);
            } else if (newBlock) {
            	// 如果是新块,和第二块存储在同一个机架
                this.chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes);
            } else {
            	// 如果不是新块,放在当前机架
                this.chooseLocalRack((Node)writer, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes);
            }
    
            --numOfReplicas;
            if (numOfReplicas == 0) {
                return (Node)writer;
            }
        }
    
        this.chooseRandom(numOfReplicas, "", excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes);
        return (Node)writer;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    write上传过程-建立管道之Socket发送

    回到nextBlockOutputStream()方法,点击createBlockOutputStream()方法,这里会创建输入流和输出流,输出流用于写数据到DataNode,输入流用于读取写数据到DataNode的结果,writeBlock()方法用于发送数据,也就是send()方法。

    write上传过程-建立管道之Socket接收

    找到DataXceiverServer类的run()方法。首先接收socket请求,客户端每发送一个block,都会启动一个DataXceiver线程去处理block。来到DataXceiverrun()方法。

    ...
    // 读取这次数据的请求类型
    op = this.readOp();
    ...
    // 根据操作类型处理数据
    this.processOp(op);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    来到processOp(),找到WRITE_BLOCK,找到writeBlock()方法实现类。

    ...
    // 创建一个BlockReceiver
    this.setCurrentBlockReceiver(this.getBlockReceiver(block, storageType, this.in, this.peer.getRemoteAddressString(), this.peer.getLocalAddressString(), stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, clientname, srcDataNode, this.datanode, requestedChecksum, cachingStrategy, allowLazyPersist, pinning, storageId));
    ...
    // 继续连接下游的机器
    if (targets.length > 0) {
    	...
    	// 向新的副本发送socket
    	mirrorSock = this.datanode.newSocket();
    	...
        if (targetPinnings != null && targetPinnings.length > 0) {
            (new Sender(mirrorOut)).writeBlock(originalBlock, targetStorageTypes[0], blockToken, clientname, targets, targetStorageTypes, srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum, cachingStrategy, allowLazyPersist, targetPinnings[0], targetPinnings, targetStorageId, targetStorageIds);
        } else {
            (new Sender(mirrorOut)).writeBlock(originalBlock, targetStorageTypes[0], blockToken, clientname, targets, targetStorageTypes, srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum, cachingStrategy, allowLazyPersist, false, targetPinnings, targetStorageId, targetStorageIds);
        }
    	...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    write上传过程-客户端接收DataNode写数据应答Response

    搜索DataStreamer,找到run()方法。

    ...
    // 如果队列为空,代码
    try {
        this.dataQueue.wait(tmpBytesSent);
    } catch (InterruptedException var52) {
        LOG.warn("Caught exception", var52);
    }
    ...
    // 队列不为空,取出一个来
    one = (DFSPacket)this.dataQueue.getFirst();
    ...
    // 第一步:向NameNode申请block并建立数据管道
    this.setPipeline(this.nextBlockOutputStream());
    // 第二步:启动ResponseProcessor来监听packet发送是否成功
    this.initDataStreaming();
    ...
    // 第三步:从dataQueue里把要发送的packet移出去
    this.dataQueue.removeFirst();
    // 第四步:往ackQueue里添加这个packet
    this.ackQueue.addLast(one);
    ...
    // 将数据写出去
    one.writeTo(this.blockStream);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    Yarn源码解析

    在这里插入图片描述
    在这里插入图片描述

    Yarn客户端向RM提交作业

    WordCountDriver.java中点击job.waitForCompletion(true),依次点击submit()submitter.submitJobInternal(Job.this, cluster)status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials()),点击YARNRunner实现类。

    public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException {
        this.addHistoryToken(ts);
        // 创建提交环境
        ApplicationSubmissionContext appContext = this.createApplicationSubmissionContext(this.conf, jobSubmitDir, ts);
    
        try {
        	// 向RM提交应用程序,appContext封装了启动MRAppMaster和运行container的命令
            ApplicationId applicationId = this.resMgrDelegate.submitApplication(appContext);
            // 获取提交响应
            ApplicationReport appMaster = this.resMgrDelegate.getApplicationReport(applicationId);
            String diagnostics = appMaster == null ? "application report is null" : appMaster.getDiagnostics();
            if (appMaster != null && appMaster.getYarnApplicationState() != YarnApplicationState.FAILED && appMaster.getYarnApplicationState() != YarnApplicationState.KILLED) {
                return this.clientCache.getClient(jobId).getJobStatus(jobId);
            } else {
                throw new IOException("Failed to run job : " + diagnostics);
            }
        } catch (YarnException var8) {
            throw new IOException(var8);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    public ApplicationSubmissionContext createApplicationSubmissionContext(Configuration jobConf, String jobSubmitDir, Credentials ts) throws IOException {
        ApplicationId applicationId = this.resMgrDelegate.getApplicationId();
        // 封装本地资源相关路径
        Map<String, LocalResource> localResources = this.setupLocalResources(jobConf, jobSubmitDir);
        DataOutputBuffer dob = new DataOutputBuffer();
        ts.writeTokenStorageToStream(dob);
        ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
        // 封装启动MRAppMaster和运行container的命令
        List<String> vargs = this.setupAMCommand(jobConf);
        ContainerLaunchContext amContainer = this.setupContainerLaunchContextForAM(jobConf, localResources, securityTokens, vargs);
        String regex = this.conf.get("mapreduce.job.send-token-conf");
        if (regex != null && !regex.isEmpty()) {
            this.setTokenRenewerConf(amContainer, this.conf, regex);
        }
    
        Collection<String> tagsFromConf = jobConf.getTrimmedStringCollection("mapreduce.job.tags");
        ApplicationSubmissionContext appContext = (ApplicationSubmissionContext)recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
        appContext.setApplicationId(applicationId);
        appContext.setQueue(jobConf.get("mapreduce.job.queuename", "default"));
        ReservationId reservationID = null;
    
        String amNodelabelExpression;
        try {
            reservationID = ReservationId.parseReservationId(jobConf.get("mapreduce.job.reservation.id"));
        } catch (NumberFormatException var20) {
            amNodelabelExpression = "Invalid reservationId: " + jobConf.get("mapreduce.job.reservation.id") + " specified for the app: " + applicationId;
            LOG.warn(amNodelabelExpression);
            throw new IOException(amNodelabelExpression);
        }
    
        if (reservationID != null) {
            appContext.setReservationID(reservationID);
            LOG.info("SUBMITTING ApplicationSubmissionContext app:" + applicationId + " to queue:" + appContext.getQueue() + " with reservationId:" + appContext.getReservationID());
        }
    
        appContext.setApplicationName(jobConf.get("mapreduce.job.name", "N/A"));
        appContext.setCancelTokensWhenComplete(this.conf.getBoolean("mapreduce.job.complete.cancel.delegation.tokens", true));
        appContext.setAMContainerSpec(amContainer);
        appContext.setMaxAppAttempts(this.conf.getInt("mapreduce.am.max-attempts", 2));
        List<ResourceRequest> amResourceRequests = this.generateResourceRequests();
        appContext.setAMContainerResourceRequests(amResourceRequests);
        amNodelabelExpression = this.conf.get("mapreduce.job.am.node-label-expression");
        if (null != amNodelabelExpression && amNodelabelExpression.trim().length() != 0) {
            Iterator var16 = amResourceRequests.iterator();
    
            while(var16.hasNext()) {
                ResourceRequest amResourceRequest = (ResourceRequest)var16.next();
                amResourceRequest.setNodeLabelExpression(amNodelabelExpression.trim());
            }
        }
    
        appContext.setNodeLabelExpression(jobConf.get("mapreduce.job.node-label-expression"));
        appContext.setApplicationType("MAPREDUCE");
        if (tagsFromConf != null && !tagsFromConf.isEmpty()) {
            appContext.setApplicationTags(new HashSet(tagsFromConf));
        }
    
        String jobPriority = jobConf.get("mapreduce.job.priority");
        if (jobPriority != null) {
            int iPriority;
            try {
                iPriority = TypeConverter.toYarnApplicationPriority(jobPriority);
            } catch (IllegalArgumentException var19) {
                iPriority = Integer.parseInt(jobPriority);
            }
    
            appContext.setPriority(Priority.newInstance(iPriority));
        }
    
        return appContext;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71

    再回到submitJob()方法,点击submitApplication(),找到YarnClientImpl实现类,点击this.rmClient.submitApplication(request)

    RM启动MRAppMaster

    在pom.xml里加入依赖,找到MRAppMastermain()方法。

    <dependency>
        <groupId>org.apache.hadoopgroupId>
        <artifactId>hadoop-mapreduce-client-appartifactId>
        <version>3.1.3version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    public static void main(String[] args) {
        try {
            mainStarted = true;
            Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
            String containerIdStr = System.getenv(Environment.CONTAINER_ID.name());
            String nodeHostString = System.getenv(Environment.NM_HOST.name());
            String nodePortString = System.getenv(Environment.NM_PORT.name());
            String nodeHttpPortString = System.getenv(Environment.NM_HTTP_PORT.name());
            String appSubmitTimeStr = System.getenv("APP_SUBMIT_TIME_ENV");
            validateInputParam(containerIdStr, Environment.CONTAINER_ID.name());
            validateInputParam(nodeHostString, Environment.NM_HOST.name());
            validateInputParam(nodePortString, Environment.NM_PORT.name());
            validateInputParam(nodeHttpPortString, Environment.NM_HTTP_PORT.name());
            validateInputParam(appSubmitTimeStr, "APP_SUBMIT_TIME_ENV");
            // 初始化一个container
            ContainerId containerId = ContainerId.fromString(containerIdStr);
            ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
            if (applicationAttemptId != null) {
                CallerContext.setCurrent((new Builder("mr_appmaster_" + applicationAttemptId.toString())).build());
            }
    
            long appSubmitTime = Long.parseLong(appSubmitTimeStr);
            // 创建AppMaster对象
            MRAppMaster appMaster = new MRAppMaster(applicationAttemptId, containerId, nodeHostString, Integer.parseInt(nodePortString), Integer.parseInt(nodeHttpPortString), appSubmitTime);
            ShutdownHookManager.get().addShutdownHook(new MRAppMaster.MRAppMasterShutdownHook(appMaster), 30);
            JobConf conf = new JobConf(new YarnConfiguration());
            conf.addResource(new Path("job.xml"));
            MRWebAppUtil.initialize(conf);
            String systemPropsToLog = MRApps.getSystemPropertiesToLog(conf);
            if (systemPropsToLog != null) {
                LOG.info(systemPropsToLog);
            }
    
            String jobUserName = System.getenv(Environment.USER.name());
            conf.set("mapreduce.job.user.name", jobUserName);
            // 初始化并启动AppMaster
            initAndStartAppMaster(appMaster, conf, jobUserName);
        } catch (Throwable var14) {
            LOG.error("Error starting MRAppMaster", var14);
            ExitUtil.terminate(1, var14);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    initAndStartAppMaster()方法里,找到appMaster.init(conf)appMaster.start()方法。

    public void init(Configuration conf) {
        if (conf == null) {
            throw new ServiceStateException("Cannot initialize service " + this.getName() + ": null configuration");
        } else if (!this.isInState(STATE.INITED)) {
            synchronized(this.stateChangeLock) {
                if (this.enterState(STATE.INITED) != STATE.INITED) {
                    this.setConfig(conf);
    
                    try {
                    	// 调用MRAppMaster中的serviceInit()方法
                        this.serviceInit(this.config);
                        if (this.isInState(STATE.INITED)) {
                        	// 初始化完成,通知监听器
                            this.notifyListeners();
                        }
                    } catch (Exception var5) {
                        this.noteFailure(var5);
                        ServiceOperations.stopQuietly(LOG, this);
                        throw ServiceStateException.convert(var5);
                    }
                }
    
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    public void start() {
        if (!this.isInState(STATE.STARTED)) {
            synchronized(this.stateChangeLock) {
                if (this.stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
                    try {
                        this.startTime = System.currentTimeMillis();
                        // 调用MRAppMaster中的serviceStart()方法
                        this.serviceStart();
                        if (this.isInState(STATE.STARTED)) {
                            LOG.debug("Service {} is started", this.getName());
                            this.notifyListeners();
                        }
                    } catch (Exception var4) {
                        this.noteFailure(var4);
                        ServiceOperations.stopQuietly(LOG, this);
                        throw ServiceStateException.convert(var4);
                    }
                }
    
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    初始化完成后,提交Job到队列中:startJobs(),点击handle()方法,找到实现类GenericEventHandler里的handle()eventQueue.put(event)方法将job存储到yarn队列。

    MapReduce源码解析

    调度器任务执行(YarnChild)

    找到YarnChild类,搜索main方法。

    childUGI.doAs(new PrivilegedExceptionAction<Object>() {
        public Object run() throws Exception {
            YarnChild.setEncryptedSpillKeyIfRequired(task);
            FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
            task.run(job, umbilical);
            return null;
        }
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    点击task.run方法,找到MapTask实现类。

    public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
      throws IOException, ClassNotFoundException, InterruptedException {
      this.umbilical = umbilical;
      if (isMapTask()) {
        // If there are no reducers then there won't be any sort. Hence the map 
        // phase will govern the entire attempt's progress.
        // 如果ReduceTask的数量是0,那么,MapTask占整个任务的100%
        // 否则,MapTask占整个任务的66.7%,sort占整个任务的33.3%
        if (conf.getNumReduceTasks() == 0) {
          mapPhase = getProgress().addPhase("map", 1.0f);
        } else {
          // If there are reducers then the entire attempt's progress will be 
          // split between the map phase (67%) and the sort phase (33%).
          mapPhase = getProgress().addPhase("map", 0.667f);
          sortPhase  = getProgress().addPhase("sort", 0.333f);
        }
      }
      TaskReporter reporter = startReporter(umbilical);
      boolean useNewApi = job.getUseNewMapper();
      initialize(job, getJobID(), reporter, useNewApi);
      // check if it is a cleanupJobTask
      if (jobCleanup) {
        runJobCleanupTask(umbilical, reporter);
        return;
      }
      if (jobSetup) {
        runJobSetupTask(umbilical, reporter);
        return;
      }
      if (taskCleanup) {
        runTaskCleanupTask(umbilical, reporter);
        return;
      }
      // 新旧API调用
      if (useNewApi) {
        runNewMapper(job, splitMetaInfo, umbilical, reporter);
      } else {
        runOldMapper(job, splitMetaInfo, umbilical, reporter);
      }
      done(umbilical, reporter);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    runNewMapper里,可以看到mapper.run(mapperContext);,这就在运行Map阶段了。

    public void run(Context context) throws IOException, InterruptedException {
      setup(context);
      try {
        while (context.nextKeyValue()) {
          map(context.getCurrentKey(), context.getCurrentValue(), context);
        }
      } finally {
        cleanup(context);
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    回到刚才的task.run方法,找到ReduceTask实现类。同样,这里点进runNewReducer方法,可以看到reducer.run(reducerContext);的调用。

    public void run(Context context) throws IOException, InterruptedException {
      setup(context);
      try {
        while (context.nextKey()) {
          reduce(context.getCurrentKey(), context.getValues(), context);
          // If a back up store is used, reset it
          Iterator<VALUEIN> iter = context.getValues().iterator();
          if(iter instanceof ReduceContext.ValueIterator) {
            ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
          }
        }
      } finally {
        cleanup(context);
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    Hadoop源码编译

    比较麻烦,没有记录。

  • 相关阅读:
    力扣46:全排列(Java回溯)
    学大数据技术与应用的女生多吗?适合吗?
    Docker中搭建likeadmin
    iOS使用CMMotionActivityManager获取用户状态
    分类预测 | MATLAB实现CNN-LSTM(卷积长短期记忆神经网络)多特征分类预测
    探究Socks5代理和代理IP在技术领域的多重应用
    利用Hadoop自带example实现wordCount
    文件编码格式
    软件测试,面试题——水杯测试用例
    [1153]mysql中between的边界范围
  • 原文地址:https://blog.csdn.net/qq_36059561/article/details/125286049