• HDFS文件上传之create创建过程-尚硅谷大数据培训


    HDFS的写数据流程,如下图所示:

    HDFS上传源码解析如下图所示:

    0)在pom.xml中增加如下依赖

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.apache.hadoopgroupId>
    4. <artifactId>hadoop-clientartifactId>
    5. <version>3.1.3version>
    6. dependency>
    7. <dependency>
    8. <groupId>org.apache.hadoopgroupId>
    9. <artifactId>hadoop-hdfsartifactId>
    10. <version>3.1.3version>
    11. dependency>
    12. <dependency>
    13. <groupId>org.apache.hadoopgroupId>
    14. <artifactId>hadoop-hdfs-clientartifactId>
    15. <version>3.1.3version>
    16. <scope>providedscope>
    17. dependency>
    18. <dependency>
    19. <groupId>junitgroupId>
    20. <artifactId>junitartifactId>
    21. <version>4.12version>
    22. dependency>
    23. <dependency>
    24. <groupId>org.slf4jgroupId>
    25. <artifactId>slf4j-log4j12artifactId>
    26. <version>1.7.30version>
    27. dependency>
    28. dependencies>

    一、DN向NN发起创建请求

    用户自己写的代码

    1. @Test
    2. public void testPut2() throws IOException {
    3. FSDataOutputStream fos = fs.create(newPath("/input"));
    4. fos.write("helloworld".getBytes());
    5. }

    FileSystem.java

    1. public FSDataOutputStream create(Path f) throws IOException {
    2. return create(f, true);
    3. }
    4. public FSDataOutputStream create(Path f, booleanoverwrite)
    5. throws IOException {
    6. return create(f, overwrite,
    7. getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
    8. IO_FILE_BUFFER_SIZE_DEFAULT),
    9. getDefaultReplication(f),
    10. getDefaultBlockSize(f));
    11. }
    12. public FSDataOutputStream create(Path f,
    13. boolean overwrite,
    14. int bufferSize,
    15. short replication,
    16. long blockSize) throws IOException {
    17. return create(f, overwrite, bufferSize, replication, blockSize,null);
    18. }
    19. public FSDataOutputStream create(Path f,
    20. boolean overwrite,
    21. int bufferSize,
    22. short replication,
    23. long blockSize,
    24. Progressable progress
    25. ) throws IOException {
    26. return this.create(f, FsCreateModes.applyUMask(
    27. FsPermission.getFileDefault(),FsPermission.getUMask(getConf())),
    28. overwrite, bufferSize, replication,blockSize, progress);
    29. }
    30. public abstract FSDataOutputStream create(Path f,
    31. FsPermission permission,
    32. boolean overwrite,
    33. int bufferSize,
    34. short replication,
    35. long blockSize,
    36. Progressable progress) throwsIOException;

    选中create,点击ctrl+h,找到实现类
    DistributedFileSystem.java,查找create方法。

    DistributedFileSystem.java

    1. @Override
    2. public FSDataOutputStream create(Path f,FsPermission permission,
    3. boolean overwrite, int bufferSize, shortreplication, long blockSize,
    4. Progressable progress) throws IOException {
    5. return this.create(f, permission,
    6. overwrite ? EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE)
    7. : EnumSet.of(CreateFlag.CREATE),bufferSize, replication,
    8. blockSize, progress, null);
    9. }
    10. @Override
    11. public FSDataOutputStreamcreate(final Path f, finalFsPermission permission,
    12. final EnumSet cflags, finalint bufferSize,
    13. final short replication, final longblockSize,
    14. final Progressable progress, finalChecksumOpt checksumOpt)
    15. throws IOException {
    16. statistics.incrementWriteOps(1);
    17. storageStatistics.incrementOpCounter(OpType.CREATE);
    18. Path absF = fixRelativePart(f);
    19. return newFileSystemLinkResolver() {
    20. @Override
    21. public FSDataOutputStream doCall(final Path p) throws IOException {
    22. // 创建获取了一个输出流对象
    23. final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
    24. cflags, replication,blockSize, progress, bufferSize,
    25. checksumOpt);
    26. // 这里将上面创建的dfsos进行包装并返回
    27. return dfs.createWrappedOutputStream(dfsos,statistics);
    28. }
    29. @Override
    30. public FSDataOutputStream next(final FileSystem fs, final Path p)
    31. throws IOException {
    32. return fs.create(p, permission,cflags, bufferSize,
    33. replication, blockSize,progress, checksumOpt);
    34. }
    35. }.resolve(this, absF);
    36. }

    点击create,进入DFSClient.java

    1. public DFSOutputStream create(String src,FsPermission permission,
    2. EnumSet flag, short replication, long blockSize,
    3. Progressable progress, int buffersize, ChecksumOpt checksumOpt)
    4. throws IOException {
    5. return create(src, permission, flag, true,
    6. replication, blockSize, progress,buffersize, checksumOpt, null);
    7. }
    8. public DFSOutputStream create(String src,FsPermission permission,
    9. EnumSet flag, boolean createParent, short replication,
    10. long blockSize, Progressable progress, int buffersize,
    11. ChecksumOpt checksumOpt, InetSocketAddress[]favoredNodes)
    12. throws IOException {
    13. return create(src, permission, flag, createParent, replication,blockSize,
    14. progress, buffersize, checksumOpt,favoredNodes, null);
    15. }
    16. public DFSOutputStream create(String src,FsPermission permission,
    17. EnumSet flag, boolean createParent, short replication,
    18. long blockSize, Progressable progress, int buffersize,
    19. ChecksumOpt checksumOpt, InetSocketAddress[]favoredNodes,
    20. String ecPolicyName) throws IOException {
    21. checkOpen();
    22. final FsPermission masked =applyUMask(permission);
    23. LOG.debug("{}: masked={}", src,masked);
    24. final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
    25. src, masked, flag, createParent,replication, blockSize, progress,
    26. dfsClientConf.createChecksum(checksumOpt),
    27. getFavoredNodesStr(favoredNodes),ecPolicyName);
    28. beginFileLease(result.getFileId(),result);
    29. return result;
    30. }

    点击newStreamForCreate,进入DFSOutputStream.java

    1. static DFSOutputStream newStreamForCreate(DFSClientdfsClient, String src,
    2. FsPermission masked,EnumSet flag, boolean createParent,
    3. short replication, long blockSize,Progressable progress,
    4. DataChecksum checksum, String[] favoredNodes,String ecPolicyName)
    5. throws IOException {
    6. try (TraceScope ignored =
    7. dfsClient.newPathTraceScope("newStreamForCreate",src)) {
    8. HdfsFileStatusstat = null;
    9. // Retry the create if we get a RetryStartFileException up to a maximum
    10. // number of times
    11. boolean shouldRetry = true;
    12. int retryCount = CREATE_RETRY_COUNT;
    13. while (shouldRetry) {
    14. shouldRetry = false;
    15. try {
    16. // DN将创建请求发送给NN(RPC)
    17. stat = dfsClient.namenode.create(src, masked,dfsClient.clientName,
    18. new EnumSetWritable<>(flag),createParent, replication,
    19. blockSize, SUPPORTED_CRYPTO_VERSIONS,ecPolicyName);
    20. break;
    21. } catch (RemoteException re) {
    22. … ….
    23. }
    24. }
    25. Preconditions.checkNotNull(stat, "HdfsFileStatus should not benull!");
    26. final DFSOutputStream out;
    27. if(stat.getErasureCodingPolicy() != null) {
    28. out = newDFSStripedOutputStream(dfsClient, src, stat,
    29. flag, progress, checksum,favoredNodes);
    30. }else {
    31. out = newDFSOutputStream(dfsClient, src, stat,
    32. flag, progress, checksum,favoredNodes, true);
    33. }
    34. //开启线程run,DataStreamer extends Daemon extends Thread
    35. out.start();
    36. return out;
    37. }
    38. }

    二、NN处理DN的创建请求

    1)点击create

    ClientProtocol.java

    1. HdfsFileStatus create(String src, FsPermissionmasked,
    2. String clientName,EnumSetWritable flag,
    3. boolean createParent, shortreplication, long blockSize,
    4. CryptoProtocolVersion[]supportedVersions, String ecPolicyName)
    5. throws IOException;

    2)Ctrl + h查找create实现类,点击NameNodeRpcServer,在NameNodeRpcServer.java中搜索create

    NameNodeRpcServer.java

    1. public HdfsFileStatuscreate(String src, FsPermissionmasked,
    2. String clientName,EnumSetWritable flag,
    3. boolean createParent, short replication,long blockSize,
    4. CryptoProtocolVersion[] supportedVersions,String ecPolicyName)
    5. throws IOException {
    6. // 检查NN启动
    7. checkNNStartup();
    8. ... ...
    9. HdfsFileStatus status = null;
    10. try {
    11. PermissionStatus perm = newPermissionStatus(getRemoteUser()
    12. .getShortUserName(), null, masked);
    13. // 重要
    14. status = namesystem.startFile(src, perm, clientName,clientMachine,
    15. flag.get(), createParent, replication,blockSize, supportedVersions,
    16. ecPolicyName, cacheEntry != null);
    17. } finally {
    18. RetryCache.setState(cacheEntry, status !=null, status);
    19. }
    20. metrics.incrFilesCreated();
    21. metrics.incrCreateFileOps();
    22. return status;
    23. }

    FSNamesystem.java

    1. HdfsFileStatus startFile(String src, PermissionStatuspermissions,
    2. String holder, String clientMachine,EnumSet flag,
    3. boolean createParent, short replication,long blockSize,
    4. CryptoProtocolVersion[] supportedVersions,String ecPolicyName,
    5. boolean logRetryCache) throws IOException {
    6. HdfsFileStatus status;
    7. try {
    8. status = startFileInt(src, permissions, holder, clientMachine, flag,
    9. createParent, replication, blockSize,supportedVersions, ecPolicyName,
    10. logRetryCache);
    11. } catch (AccessControlException e) {
    12. logAuditEvent(false, "create",src);
    13. throw e;
    14. }
    15. logAuditEvent(true, "create", src,status);
    16. return status;
    17. }
    18. private HdfsFileStatus startFileInt(String src,
    19. PermissionStatus permissions, Stringholder, String clientMachine,
    20. EnumSet flag, booleancreateParent, short replication,
    21. long blockSize, CryptoProtocolVersion[]supportedVersions,
    22. String ecPolicyName, boolean logRetryCache)throws IOException {
    23. ... ...
    24. stat = FSDirWriteFileOp.startFile(this, iip, permissions,holder,
    25. clientMachine, flag, createParent,replication, blockSize, feInfo,
    26. toRemoveBlocks, shouldReplicate,ecPolicyName, logRetryCache);
    27. ... ...
    28. }
    29. static HdfsFileStatus startFile(
    30. ... ...)
    31. throws IOException {
    32. ... ...
    33. FSDirectory fsd = fsn.getFSDirectory();
    34. // 文件路径是否存在校验
    35. if (iip.getLastINode() != null) {
    36. if (overwrite) {
    37. List toRemoveINodes = newChunkedArrayList<>();
    38. List toRemoveUCFiles = newChunkedArrayList<>();
    39. long ret = FSDirDeleteOp.delete(fsd, iip,toRemoveBlocks,
    40. toRemoveINodes, toRemoveUCFiles, now());
    41. if (ret >= 0) {
    42. iip = INodesInPath.replace(iip,iip.length() - 1, null);
    43. FSDirDeleteOp.incrDeletedFileCount(ret);
    44. fsn.removeLeasesAndINodes(toRemoveUCFiles, toRemoveINodes, true);
    45. }
    46. } else {
    47. // If lease soft limit time is expired,recover the lease
    48. fsn.recoverLeaseInternal(FSNamesystem.RecoverLeaseOp.CREATE_FILE, iip,
    49. src, holder,clientMachine, false);
    50. throw new FileAlreadyExistsException(src + " forclient " +
    51. clientMachine + " already exists");
    52. }
    53. }
    54. fsn.checkFsObjectLimit();
    55. INodeFile newNode = null;
    56. INodesInPath parent = FSDirMkdirOp.createAncestorDirectories(fsd,iip, permissions);
    57. if (parent != null) {
    58. // 添加文件元数据信息
    59. iip = addFile(fsd, parent, iip.getLastLocalName(), permissions,
    60. replication, blockSize, holder,clientMachine, shouldReplicate,
    61. ecPolicyName);
    62. newNode = iip != null ?iip.getLastINode().asFile() : null;
    63. }
    64. ... ...
    65. setNewINodeStoragePolicy(fsd.getBlockManager(), iip, isLazyPersist);
    66. fsd.getEditLog().logOpenFile(src, newNode,overwrite, logRetryEntry);
    67. if (NameNode.stateChangeLog.isDebugEnabled()){
    68. NameNode.stateChangeLog.debug("DIR*NameSystem.startFile: added " +
    69. src + " inode " +newNode.getId() + " " + holder);
    70. }
    71. return FSDirStatAndListingOp.getFileInfo(fsd,iip, false, false);
    72. }
    73. private static INodesInPath addFile(
    74. FSDirectory fsd, INodesInPath existing,byte[] localName,
    75. PermissionStatus permissions, shortreplication, long preferredBlockSize,
    76. String clientName, String clientMachine,boolean shouldReplicate,
    77. String ecPolicyName) throws IOException {
    78. Preconditions.checkNotNull(existing);
    79. long modTime = now();
    80. INodesInPath newiip;
    81. fsd.writeLock();
    82. try {
    83. … …
    84. newiip = fsd.addINode(existing, newNode, permissions.getPermission());
    85. } finally {
    86. fsd.writeUnlock();
    87. }
    88. ... ...
    89. return newiip;
    90. }
    91. INodesInPath addINode(INodesInPath existing, INodechild,
    92. FsPermission modes)
    93. throws QuotaExceededException,UnresolvedLinkException {
    94. cacheName(child);
    95. writeLock();
    96. try {
    97. // 将数据写入到INode的目录树中
    98. return addLastINode(existing, child, modes, true);
    99. } finally {
    100. writeUnlock();
    101. }
    102. }

    三、DataStreamer启动流程

    NN处理完DN请求后,再次回到DN端,启动对应的线程

    DFSOutputStream.java

    1. static DFSOutputStream newStreamForCreate(DFSClientdfsClient, String src,
    2. FsPermission masked,EnumSet flag, boolean createParent,
    3. short replication, long blockSize,Progressable progress,
    4. DataChecksum checksum, String[] favoredNodes,String ecPolicyName)
    5. throws IOException {
    6. ... ...
    7. // DN将创建请求发送给NN(RPC)
    8. stat = dfsClient.namenode.create(src,masked, dfsClient.clientName,
    9. new EnumSetWritable<>(flag), createParent, replication,
    10. blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName);
    11. ... ...
    12. // 创建输出流
    13. out = new DFSOutputStream(dfsClient, src, stat,
    14. flag, progress, checksum,favoredNodes, true);
    15. // 开启线程run,DataStreamer extends Daemon extends Thread
    16. out.start();
    17. return out;
    18. }

    点击DFSOutputStream

    1. protected DFSOutputStream(DFSClient dfsClient, String src,
    2. HdfsFileStatus stat,EnumSet<CreateFlag> flag, Progressable progress,
    3. DataChecksum checksum, String[]favoredNodes, boolean createStreamer) {
    4. this(dfsClient, src, flag, progress, stat,checksum);
    5. this.shouldSyncBlock =flag.contains(CreateFlag.SYNC_BLOCK);
    6. // Directory => File => Block(128M)=> packet(64K) => chunk(chunk 512byte +chunksum 4byte)
    7. computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
    8. bytesPerChecksum);
    9. if (createStreamer) {
    10. streamer = new DataStreamer(stat, null,dfsClient, src, progress,
    11. checksum, cachingStrategy, byteArrayManager,favoredNodes,
    12. addBlockFlags);
    13. }
    14. }

    点击newStreamForCreate方法中的out.start(),进入DFSOutputStream.java

    1. protected synchronized void start() {
    2. getStreamer().start();
    3. }
    4. protected DataStreamer getStreamer() {
    5. return streamer;
    6. }

    点击DataStreamer,进入DataStreamer.java

    1. class DataStreamer extends Daemon {
    2. 。。。。。。
    3. }

    点击Daemon,进入Daemon.java

    1. public class Daemon extends Thread {
    2. 。。。。。。
    3. }

    说明:out.start();实际是开启线程,点击DataStreamer,搜索run方法

    DataStreamer.java

    1. @Override
    2. public void run(){
    3. long lastPacket = Time.monotonicNow();
    4. TraceScope scope = null;
    5. while (!streamerClosed &&dfsClient.clientRunning) {
    6. // if the Responder encountered an error, shutdown Responder
    7. if (errorState.hasError()) {
    8. closeResponder();
    9. }
    10. DFSPacketone;
    11. try {
    12. // process datanode IO errors ifany
    13. boolean doSleep =processDatanodeOrExternalError();
    14. final int halfSocketTimeout =dfsClient.getConf().getSocketTimeout()/2;
    15. synchronized (dataQueue) {
    16. // wait for a packet to be sent.
    17. … …
    18. try {
    19. // 如果dataQueue里面没有数据,代码会阻塞在这儿
    20. dataQueue.wait(timeout);
    21. } catch(InterruptedException e) {
    22. LOG.warn("Caught exception", e);
    23. }
    24. doSleep = false;
    25. now = Time.monotonicNow();
    26. }
    27. … …
    28. // 队列不为空,从队列中取出packet
    29. one = dataQueue.getFirst();// regular data packet
    30. SpanId[] parents =one.getTraceParents();
    31. if (parents.length > 0){
    32. scope = dfsClient.getTracer().
    33. newScope("dataStreamer",parents[0]);
    34. scope.getSpan().setParents(parents);
    35. }
    36. }
    37. }
    38. … …
    39. }
  • 相关阅读:
    【FPGA零基础学习之旅#16】嵌入式块RAM-双口ram的使用
    【Javascript】创建对象的几种方式
    C++ 函数返回多个值的方法总结
    Axure药企内部管理平台+企业内部管理系统平台
    mysql创建数据库sql语句
    读取excel数据的方式整理
    MYSQL--事务
    windows11安装微软商店的ubuntu报错,已解决
    向openssl中添加一个最简单的算法
    Java使用Scanner类实现用户输入与交互
  • 原文地址:https://blog.csdn.net/zjjcchina/article/details/125991073