• Hadoop源码阅读(三):HDFS上传


    说明:
    1.Hadoop版本:3.1.3
    2.阅读工具:IDEA 2023.1.2
    3.源码获取:Index of /dist/hadoop/core/hadoop-3.1.3 (apache.org)
    4.工程导入:下载源码之后得到 hadoop-3.1.3-src.tar.gz 压缩包,在当前目录打开PowerShell,使用tar -zxvf指令解压即可,然后使用IDEA打开hadoop-3.1.3-src文件夹,要注意配置好Maven或Gradle仓库,否则jar包导入会比较慢
    5.参考课程:www.bilibili.com/video/BV1Qp…

    HDFS上传

    一个简单的上传代码:

    1. public void test() throws IOException {
    2. FSDataOutputStream fos = fs.create(new Path("/input"));
    3. fos.write("hello world".getBytes());
    4. }

    可以看到,首先创建了一个FSDataOutputStream,然后向其中写数据; 接下来就分为 create创建过程 和 write上传过程 分别进行源码阅读解析

    create创建过程

    1.客户端向NN发送创建请求

    首先进入create方法中,来到FileSystem.java:

    找到create方法,继续进入,直到找到静态方法create

    因此返回到该静态方法的调用:

    ctrl+alt+B查找该静态方法的实现类:

    进入DistributedFileSystem中:

    继续向下查找:

    可以看到在doCall方法中创建了一个输出流对象;

    继续进入create方法,来到DFSClient.java中:

    不断向下查找,找到newStreamForCreate方法:

    进入newStreamForCreate方法,来到DFSOutputStream.java

    这里客户端将创建请求通过RPC通信发送给NN进行处理

    开启线程

    2.NN处理来自客户端的创建请求

    newStreamForCreate方法中进入create方法,来到ClientProtocol.java:

    查找其实现类:

    进入NameNodeRpcServer,create方法如下:

    1. @Override // ClientProtocol
    2. public HdfsFileStatus create(String src, FsPermission masked,
    3. String clientName, EnumSetWritable flag,
    4. boolean createParent, short replication, long blockSize,
    5. CryptoProtocolVersion[] supportedVersions, String ecPolicyName)
    6. throws IOException {
    7. checkNNStartup(); //检查NN是否启动
    8. String clientMachine = getClientMachine();
    9. if (stateChangeLog.isDebugEnabled()) {
    10. stateChangeLog.debug("*DIR* NameNode.create: file "
    11. +src+" for "+clientName+" at "+clientMachine);
    12. }
    13. if (!checkPathLength(src)) { //检查路径长度
    14. throw new IOException("create: Pathname too long. Limit "
    15. + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
    16. }
    17. namesystem.checkOperation(OperationCategory.WRITE);
    18. CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null);
    19. if (cacheEntry != null && cacheEntry.isSuccess()) { //缓存相关检查
    20. return (HdfsFileStatus) cacheEntry.getPayload();
    21. }
    22. HdfsFileStatus status = null;
    23. try {
    24. PermissionStatus perm = new PermissionStatus(getRemoteUser()
    25. .getShortUserName(), null, masked);
    26. //开启文件(重要)
    27. status = namesystem.startFile(src, perm, clientName, clientMachine,
    28. flag.get(), createParent, replication, blockSize, supportedVersions,
    29. ecPolicyName, cacheEntry != null);
    30. } finally {
    31. RetryCache.setState(cacheEntry, status != null, status);
    32. }
    33. metrics.incrFilesCreated();
    34. metrics.incrCreateFileOps();
    35. return status;
    36. }

    接下来进入startFile方法,来到FSNamesystem.java:

    进入startFileInt

    将src(文件路径)封装到INodesInPath中;

    对于INodesInPath类的解释:Contains INodes information resolved from a given path.

    首先我们需要明确INodes类的概念:

    INodes是一个抽象类,官方对于其的解释如下:

    简单来说一个基本的INode类是一种保留在内存中的文件/块层次结构的表示形式,包含文件和目录索引节点的公共字段

    可以看到INodes是最底层的一个类,保存一些文件目录共有的属性,而INodesInPath类则保存了从给定的路径解析出的INode信息;

    接下来定位到startFile

    进入startFile

    • 首先对文件路径是否存在进行校验:

    进入getLastINode

    进入getINode

    可以看到,i=-1时,return inodes[inodes.length-1];

    也就是说,获取最后位置上的inode,如果有,说明文件路径已经存在;

    接下来再判断是否允许覆写:

    如果不允许覆写,则会抛出异常,告知文件路径已存在,不允许重复上传文件;

    • 然后判断是否存在父目录:

    如果父目录存在,则向其中添加文件元数据信息(addFile方法)

    进入addFile方法:

    进入addINode

    将数据写入到INode的目录树中;至此文件目录创建完毕

    3.DataStreamer启动流程

    NN处理完成后,再次回到客户端,启动相应线程;

    打开DFSOutputStream.java,找到newStreamForCreate方法,NN完成创建请求后,进行输出流的创建:

    定位到DFSOutputStream

    计算chunk大小(Directory => File => Block(128M) => packet(64K) => chunk(chunk 512byte + chunksum 4byte))

    返回到newStreamForCreate方法,进入out.start()

    继续进入:

    继续进入DataStreamer

    进入Daemon

    可以看到,out.start方法开启了一个线程,因此回到DataStreamer,搜索run方法:

    如果dataQueue中没有数据,代码会进行阻塞;

    如果dataQueue不为空,则从其中取出packet

    write上传过程

    1.向DataStreamer的队列里面写数据

    create阶段启动了DataStreamer,在write阶段向其中写数据;

    进入write方法,到FilterOutputStream.java中:

    一直前进,直到抽象方法write

    ctrl+alt+B查找其实现类:

    进入FSOutputSummer.java,定位到write方法:

    进入flushBuffer方法,顾名思义即为刷写缓冲区:

    进入writeChecksumChunks方法:

    进入writeChunk方法(将chunk写入数据队列):

    是一个抽象方法,因此查找其实现类:

    进入DFSOutputStream.java,查看writeChunk方法的具体实现逻辑,如下:

    1. @Override
    2. protected synchronized void writeChunk(byte[] b, int offset, int len,
    3. byte[] checksum, int ckoff, int cklen) throws IOException {
    4. writeChunkPrepare(len, ckoff, cklen);
    5. currentPacket.writeChecksum(checksum, ckoff, cklen); //往packet里面写chunk的校验和 4byte
    6. currentPacket.writeData(b, offset, len); // 往packet里面写一个chunk 512byte
    7. // 记录写入packet中的chunk个数,累计到127个chuck,这个packet就满了
    8. currentPacket.incNumChunks();
    9. getStreamer().incBytesCurBlock(len);
    10. //如果packet已经满了,则将其放入队列等待传输
    11. if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
    12. getStreamer().getBytesCurBlock() == blockSize) {
    13. enqueueCurrentPacketFull();
    14. }
    15. }

    进入enqueueCurrentPacketFull方法:

    进入enqueueCurrentPacket方法:

    进入waitAndQueuePacket方法:

    1. void waitAndQueuePacket(DFSPacket packet) throws IOException {
    2. synchronized (dataQueue) {
    3. try {
    4. // 如果队列满了,则等待
    5. boolean firstWait = true;
    6. try {
    7. while (!streamerClosed && dataQueue.size() + ackQueue.size() >
    8. dfsClient.getConf().getWriteMaxPackets()) {
    9. if (firstWait) {
    10. Span span = Tracer.getCurrentSpan();
    11. if (span != null) {
    12. span.addTimelineAnnotation("dataQueue.wait");
    13. }
    14. firstWait = false;
    15. }
    16. try {
    17. dataQueue.wait(); //等待队列有充足的空间
    18. } catch (InterruptedException e) {
    19. // If we get interrupted while waiting to queue data, we still need to get rid
    20. // of the current packet. This is because we have an invariant that if
    21. // currentPacket gets full, it will get queued before the next writeChunk.
    22. //
    23. // Rather than wait around for space in the queue, we should instead try to
    24. // return to the caller as soon as possible, even though we slightly overrun
    25. // the MAX_PACKETS length.
    26. Thread.currentThread().interrupt();
    27. break;
    28. }
    29. }
    30. } finally {
    31. Span span = Tracer.getCurrentSpan();
    32. if ((span != null) && (!firstWait)) {
    33. span.addTimelineAnnotation("end.wait");
    34. }
    35. }
    36. checkClosed();
    37. //如果队列没满,则向队列中添加数据
    38. queuePacket(packet);
    39. } catch (ClosedChannelException ignored) {
    40. }
    41. }
    42. }

    进入queuePacket方法(向队列中添加数据的逻辑),来到DataStreamer.java中:

    2.建立管道

    2.1机架感知(确定block的存储位置)

    Ctrl + n全局查找DataStreamer,搜索run方法:

    1. @Override
    2. public void run() {
    3. long lastPacket = Time.monotonicNow();
    4. TraceScope scope = null;
    5. while (!streamerClosed && dfsClient.clientRunning) {
    6. // if the Responder encountered an error, shutdown Responder
    7. if (errorState.hasError()) {
    8. closeResponder();
    9. }
    10. DFSPacket one;
    11. try {
    12. // process datanode IO errors if any
    13. boolean doSleep = processDatanodeOrExternalError();
    14. final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2;
    15. //步骤一:等待要发送的packet到来
    16. synchronized (dataQueue) {
    17. // wait for a packet to be sent.
    18. long now = Time.monotonicNow();
    19. while ((!shouldStop() && dataQueue.size() == 0 &&
    20. (stage != BlockConstructionStage.DATA_STREAMING ||
    21. now - lastPacket < halfSocketTimeout)) || doSleep) {
    22. long timeout = halfSocketTimeout - (now-lastPacket);
    23. timeout = timeout <= 0 ? 1000 : timeout;
    24. timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
    25. timeout : 1000;
    26. try {
    27. //如果dataQueue中没有数据,代码会阻塞在这里
    28. dataQueue.wait(timeout);
    29. } catch (InterruptedException e) {
    30. LOG.warn("Caught exception", e);
    31. }
    32. doSleep = false;
    33. now = Time.monotonicNow();
    34. }
    35. if (shouldStop()) {
    36. continue;
    37. }
    38. // 获取要发送的数据包
    39. if (dataQueue.isEmpty()) {
    40. one = createHeartbeatPacket();
    41. }
    42. else {
    43. try {
    44. backOffIfNecessary();
    45. } catch (InterruptedException e) {
    46. LOG.warn("Caught exception", e);
    47. }
    48. //如果数据队列不为空,则从其中取出packet
    49. one = dataQueue.getFirst();
    50. SpanId[] parents = one.getTraceParents();
    51. if (parents.length > 0) {
    52. scope = dfsClient.getTracer().
    53. newScope("dataStreamer", parents[0]);
    54. scope.getSpan().setParents(parents);
    55. }
    56. }
    57. }
    58. //步骤二:从NN获取新的block
    59. if (LOG.isDebugEnabled()) {
    60. LOG.debug("stage=" + stage + ", " + this);
    61. }
    62. if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
    63. LOG.debug("Allocating new block: {}", this);
    64. //向NN申请block并建立数据管道(Pipeline)
    65. setPipeline(nextBlockOutputStream());
    66. //启动ResponseProcessor用来监听packet发送是否成功
    67. initDataStreaming();
    68. } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
    69. LOG.debug("Append to block {}", block);
    70. setupPipelineForAppendOrRecovery();
    71. if (streamerClosed) {
    72. continue;
    73. }
    74. initDataStreaming();
    75. }
    76. long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
    77. if (lastByteOffsetInBlock > stat.getBlockSize()) {
    78. throw new IOException("BlockSize " + stat.getBlockSize() +
    79. " < lastByteOffsetInBlock, " + this + ", " + one);
    80. }
    81. if (one.isLastPacketInBlock()) {
    82. // wait for all data packets have been successfully acked
    83. synchronized (dataQueue) {
    84. while (!shouldStop() && ackQueue.size() != 0) {
    85. try {
    86. // wait for acks to arrive from datanodes
    87. dataQueue.wait(1000);
    88. } catch (InterruptedException e) {
    89. LOG.warn("Caught exception", e);
    90. }
    91. }
    92. }
    93. if (shouldStop()) {
    94. continue;
    95. }
    96. stage = BlockConstructionStage.PIPELINE_CLOSE;
    97. }
    98. // 步骤三:发送packet
    99. SpanId spanId = SpanId.INVALID;
    100. synchronized (dataQueue) {
    101. // move packet from dataQueue to ackQueue
    102. if (!one.isHeartbeatPacket()) {
    103. if (scope != null) {
    104. spanId = scope.getSpanId();
    105. scope.detach();
    106. one.setTraceScope(scope);
    107. }
    108. scope = null;
    109. dataQueue.removeFirst(); //从dataQueue 把要发送的这个packet 移除出去
    110. ackQueue.addLast(one); //ackQueue 里面添加这个packet
    111. packetSendTime.put(one.getSeqno(), Time.monotonicNow());
    112. dataQueue.notifyAll();
    113. }
    114. }
    115. LOG.debug("{} sending {}", this, one);
    116. // 步骤四:向DN中写数据
    117. try (TraceScope ignored = dfsClient.getTracer().
    118. newScope("DataStreamer#writeTo", spanId)) {
    119. one.writeTo(blockStream); //写出数据
    120. blockStream.flush();
    121. } catch (IOException e) {
    122. // HDFS-3398 treat primary DN is down since client is unable to
    123. // write to primary DN. If a failed or restarting node has already
    124. // been recorded by the responder, the following call will have no
    125. // effect. Pipeline recovery can handle only one node error at a
    126. // time. If the primary node fails again during the recovery, it
    127. // will be taken out then.
    128. errorState.markFirstNodeIfNotMarked();
    129. throw e;
    130. }
    131. lastPacket = Time.monotonicNow();
    132. // update bytesSent
    133. long tmpBytesSent = one.getLastByteOffsetBlock();
    134. if (bytesSent < tmpBytesSent) {
    135. bytesSent = tmpBytesSent;
    136. }
    137. if (shouldStop()) {
    138. continue;
    139. }
    140. // Is this block full?
    141. if (one.isLastPacketInBlock()) {
    142. // wait for the close packet has been acked
    143. synchronized (dataQueue) {
    144. while (!shouldStop() && ackQueue.size() != 0) {
    145. dataQueue.wait(1000);// wait for acks to arrive from datanodes
    146. }
    147. }
    148. if (shouldStop()) {
    149. continue;
    150. }
    151. endBlock();
    152. }
    153. if (progress != null) { progress.progress(); }
    154. // This is used by unit test to trigger race conditions.
    155. if (artificialSlowdown != 0 && dfsClient.clientRunning) {
    156. Thread.sleep(artificialSlowdown);
    157. }
    158. } catch (Throwable e) {
    159. // Log warning if there was a real error.
    160. if (!errorState.isRestartingNode()) {
    161. // Since their messages are descriptive enough, do not always
    162. // log a verbose stack-trace WARN for quota exceptions.
    163. if (e instanceof QuotaExceededException) {
    164. LOG.debug("DataStreamer Quota Exception", e);
    165. } else {
    166. LOG.warn("DataStreamer Exception", e);
    167. }
    168. }
    169. lastException.set(e);
    170. assert !(e instanceof NullPointerException);
    171. errorState.setInternalError();
    172. if (!errorState.isNodeMarked()) {
    173. // Not a datanode issue
    174. streamerClosed = true;
    175. }
    176. } finally {
    177. if (scope != null) {
    178. scope.close();
    179. scope = null;
    180. }
    181. }
    182. }
    183. closeInternal();
    184. }

    进入nextBlockOutputStream(第68行):

    进入locateFollowingBlock

    进入addBlock

    进入addBlock,来到ClientProtocol类:

    因此可以判断,该方法是通过NN的客户端代理来实现的

    查找其实现类:

    进入NameNodeRpcServer,定位到addBlock:

    进入getAdditionalBlock

    选择block的存储位置;

    进入chooseTargetForNewBlock

    进入chooseTarget4NewBlock

    进入chooseTarget

    继续进入chooseTarget

    可以看到其是一个抽象类,因此查找其实现类:

    进入BlockPlacementPolicyDefault.java:

    进入chooseTarget

    进入chooseTarget

    进入chooseTargetInOrder,即机架感知的逻辑:

    1. protected Node chooseTargetInOrder(int numOfReplicas,
    2. Node writer,
    3. final Set excludedNodes,
    4. final long blocksize,
    5. final int maxNodesPerRack,
    6. final List results,
    7. final boolean avoidStaleNodes,
    8. final boolean newBlock,
    9. EnumMap storageTypes)
    10. throws NotEnoughReplicasException {
    11. final int numOfResults = results.size();
    12. if (numOfResults == 0) {
    13. //第一个block存储在当前节点
    14. DatanodeStorageInfo storageInfo = chooseLocalStorage(writer,
    15. excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes,
    16. storageTypes, true);
    17. writer = (storageInfo != null) ? storageInfo.getDatanodeDescriptor()
    18. : null;
    19. if (--numOfReplicas == 0) {
    20. return writer;
    21. }
    22. }
    23. final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();
    24. if (numOfResults <= 1) {
    25. //第二个block存储在另外一个机架
    26. chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
    27. results, avoidStaleNodes, storageTypes);
    28. if (--numOfReplicas == 0) {
    29. return writer;
    30. }
    31. }
    32. if (numOfResults <= 2) {
    33. final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();
    34. if (clusterMap.isOnSameRack(dn0, dn1)) {
    35. //如果第一个和第二个在同一个机架,那么第三个放在其他机架
    36. chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
    37. results, avoidStaleNodes, storageTypes);
    38. } else if (newBlock){
    39. //如果是新块,和第二个块存储在同一个机架
    40. chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
    41. results, avoidStaleNodes, storageTypes);
    42. } else {
    43. //如果不是新块,放在当前机架
    44. chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
    45. results, avoidStaleNodes, storageTypes);
    46. }
    47. if (--numOfReplicas == 0) {
    48. return writer;
    49. }
    50. }
    51. chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
    52. maxNodesPerRack, results, avoidStaleNodes, storageTypes);
    53. return writer;
    54. }
    2.2 socket发送

    回到nextBlockOutputStream

    进入createBlockOutputStream

    从注释可以看出,该方法的主要功能是和管道中的第一个DN建立连接;

    1. boolean createBlockOutputStream(DatanodeInfo[] nodes,
    2. StorageType[] nodeStorageTypes, String[] nodeStorageIDs,
    3. long newGS, boolean recoveryFlag) {
    4. if (nodes.length == 0) {
    5. LOG.info("nodes are empty for write pipeline of " + block);
    6. return false;
    7. }
    8. String firstBadLink = "";
    9. boolean checkRestart = false;
    10. if (LOG.isDebugEnabled()) {
    11. LOG.debug("pipeline = " + Arrays.toString(nodes) + ", " + this);
    12. }
    13. // persist blocks on namenode on next flush
    14. persistBlocks.set(true);
    15. int refetchEncryptionKey = 1;
    16. while (true) {
    17. boolean result = false;
    18. DataOutputStream out = null;
    19. try {
    20. assert null == s : "Previous socket unclosed";
    21. assert null == blockReplyStream : "Previous blockReplyStream unclosed";
    22. //和DN创建socket连接
    23. s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
    24. long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
    25. long readTimeout = dfsClient.getDatanodeReadTimeout(nodes.length);
    26. //输出流,用于写数据到DN
    27. OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
    28. //输入流,用于读取写数据到DN的结果
    29. InputStream unbufIn = NetUtils.getInputStream(s, readTimeout);
    30. IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s,
    31. unbufOut, unbufIn, dfsClient, accessToken, nodes[0]);
    32. unbufOut = saslStreams.out;
    33. unbufIn = saslStreams.in;
    34. out = new DataOutputStream(new BufferedOutputStream(unbufOut,
    35. DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
    36. blockReplyStream = new DataInputStream(unbufIn);
    37. //
    38. // Xmit header info to datanode
    39. //
    40. BlockConstructionStage bcs = recoveryFlag ?
    41. stage.getRecoveryStage() : stage;
    42. // We cannot change the block length in 'block' as it counts the number
    43. // of bytes ack'ed.
    44. ExtendedBlock blockCopy = block.getCurrentBlock();
    45. blockCopy.setNumBytes(stat.getBlockSize());
    46. boolean[] targetPinnings = getPinnings(nodes);
    47. // 发送数据
    48. new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
    49. dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
    50. nodes.length, block.getNumBytes(), bytesSent, newGS,
    51. checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
    52. (targetPinnings != null && targetPinnings[0]), targetPinnings,
    53. nodeStorageIDs[0], nodeStorageIDs);
    54. // receive ack for connect
    55. BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
    56. PBHelperClient.vintPrefixed(blockReplyStream));
    57. Status pipelineStatus = resp.getStatus();
    58. firstBadLink = resp.getFirstBadLink();
    59. // Got an restart OOB ack.
    60. // If a node is already restarting, this status is not likely from
    61. // the same node. If it is from a different node, it is not
    62. // from the local datanode. Thus it is safe to treat this as a
    63. // regular node error.
    64. if (PipelineAck.isRestartOOBStatus(pipelineStatus) &&
    65. !errorState.isRestartingNode()) {
    66. checkRestart = true;
    67. throw new IOException("A datanode is restarting.");
    68. }
    69. String logInfo = "ack with firstBadLink as " + firstBadLink;
    70. DataTransferProtoUtil.checkBlockOpStatus(resp, logInfo);
    71. assert null == blockStream : "Previous blockStream unclosed";
    72. blockStream = out;
    73. result = true; // success
    74. errorState.resetInternalError();
    75. lastException.clear();
    76. // remove all restarting nodes from failed nodes list
    77. failed.removeAll(restartingNodes);
    78. restartingNodes.clear();
    79. } catch (IOException ie) {
    80. if (!errorState.isRestartingNode()) {
    81. LOG.info("Exception in createBlockOutputStream " + this, ie);
    82. }
    83. if (ie instanceof InvalidEncryptionKeyException &&
    84. refetchEncryptionKey > 0) {
    85. LOG.info("Will fetch a new encryption key and retry, "
    86. + "encryption key was invalid when connecting to "
    87. + nodes[0] + " : " + ie);
    88. // The encryption key used is invalid.
    89. refetchEncryptionKey--;
    90. dfsClient.clearDataEncryptionKey();
    91. // Don't close the socket/exclude this node just yet. Try again with
    92. // a new encryption key.
    93. continue;
    94. }
    95. // find the datanode that matches
    96. if (firstBadLink.length() != 0) {
    97. for (int i = 0; i < nodes.length; i++) {
    98. // NB: Unconditionally using the xfer addr w/o hostname
    99. if (firstBadLink.equals(nodes[i].getXferAddr())) {
    100. errorState.setBadNodeIndex(i);
    101. break;
    102. }
    103. }
    104. } else {
    105. assert !checkRestart;
    106. errorState.setBadNodeIndex(0);
    107. }
    108. final int i = errorState.getBadNodeIndex();
    109. // Check whether there is a restart worth waiting for.
    110. if (checkRestart) {
    111. errorState.initRestartingNode(i,
    112. "Datanode " + i + " is restarting: " + nodes[i],
    113. shouldWaitForRestart(i));
    114. }
    115. errorState.setInternalError();
    116. lastException.set(ie);
    117. result = false; // error
    118. } finally {
    119. if (!result) {
    120. IOUtils.closeSocket(s);
    121. s = null;
    122. IOUtils.closeStream(out);
    123. IOUtils.closeStream(blockReplyStream);
    124. blockReplyStream = null;
    125. }
    126. }
    127. return result;
    128. }
    129. }

    进入writeBlock

    进入send:

    通过flush刷写数据;

    2.3.socket接收

    数据接收是DN的任务,因此进入DataXceiverServer.java,定位到run方法:

    接收socket请求;

    客户端每发送一个block,都启动一个DataXceiver去处理block

    进入DataXceiver,定位到run方法:

    读取数据的操作类型;

    根据操作类型处理数据;

    进入processOp

    可以看到不同的操作类型

    进入opWriteBlock(写数据):

    Ctrl +alt +b 查找writeBlock的实现类,进入DataXceiver.java:

    创建一个BlockReceiver;

    向下游socket中发送数据

    接下来进入getBlockReceiver

    进入BlockReceiver

    创建管道;

    进入createRbw

    进入FsDatasetImpl.java:

    进入createRbw

    通过createRbwFile创建file

    3.客户端接收DN的应答

    回到DataStreamer.java,定位到run:

    通过initDataStreaming方法来启动ResponseProcessor,用于监听packet发送是否成功;

    创建ResponseProcessor并启动线程;

    进入ResponseProcessor,定位到run:

    1. @Override
    2. public void run() {
    3. setName("ResponseProcessor for block " + block);
    4. PipelineAck ack = new PipelineAck();
    5. TraceScope scope = null;
    6. while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {
    7. // 处理来自DN的应答
    8. try {
    9. // 从管道中读取一个ack
    10. ack.readFields(blockReplyStream);
    11. if (ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) {
    12. Long begin = packetSendTime.get(ack.getSeqno());
    13. if (begin != null) {
    14. long duration = Time.monotonicNow() - begin;
    15. if (duration > dfsclientSlowLogThresholdMs) {
    16. LOG.info("Slow ReadProcessor read fields for block " + block
    17. + " took " + duration + "ms (threshold="
    18. + dfsclientSlowLogThresholdMs + "ms); ack: " + ack
    19. + ", targets: " + Arrays.asList(targets));
    20. }
    21. }
    22. }
    23. if (LOG.isDebugEnabled()) {
    24. LOG.debug("DFSClient {}", ack);
    25. }
    26. long seqno = ack.getSeqno();
    27. // processes response status from datanodes.
    28. ArrayList congestedNodesFromAck = new ArrayList<>();
    29. for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) {
    30. final Status reply = PipelineAck.getStatusFromHeader(ack
    31. .getHeaderFlag(i));
    32. if (PipelineAck.getECNFromHeader(ack.getHeaderFlag(i)) ==
    33. PipelineAck.ECN.CONGESTED) {
    34. congestedNodesFromAck.add(targets[i]);
    35. }
    36. // Restart will not be treated differently unless it is
    37. // the local node or the only one in the pipeline.
    38. if (PipelineAck.isRestartOOBStatus(reply)) {
    39. final String message = "Datanode " + i + " is restarting: "
    40. + targets[i];
    41. errorState.initRestartingNode(i, message,
    42. shouldWaitForRestart(i));
    43. throw new IOException(message);
    44. }
    45. // node error
    46. if (reply != SUCCESS) {
    47. errorState.setBadNodeIndex(i); // mark bad datanode
    48. throw new IOException("Bad response " + reply +
    49. " for " + block + " from datanode " + targets[i]);
    50. }
    51. }
    52. if (!congestedNodesFromAck.isEmpty()) {
    53. synchronized (congestedNodes) {
    54. congestedNodes.clear();
    55. congestedNodes.addAll(congestedNodesFromAck);
    56. }
    57. } else {
    58. synchronized (congestedNodes) {
    59. congestedNodes.clear();
    60. lastCongestionBackoffTime = 0;
    61. }
    62. }
    63. assert seqno != PipelineAck.UNKOWN_SEQNO :
    64. "Ack for unknown seqno should be a failed ack: " + ack;
    65. if (seqno == DFSPacket.HEART_BEAT_SEQNO) { // a heartbeat ack
    66. continue;
    67. }
    68. // 标志成功传输的ack
    69. DFSPacket one;
    70. synchronized (dataQueue) {
    71. one = ackQueue.getFirst();
    72. }
    73. if (one.getSeqno() != seqno) {
    74. throw new IOException("ResponseProcessor: Expecting seqno " +
    75. " for block " + block +
    76. one.getSeqno() + " but received " + seqno);
    77. }
    78. isLastPacketInBlock = one.isLastPacketInBlock();
    79. // Fail the packet write for testing in order to force a
    80. // pipeline recovery.
    81. if (DFSClientFaultInjector.get().failPacket() &&
    82. isLastPacketInBlock) {
    83. failPacket = true;
    84. throw new IOException(
    85. "Failing the last packet for testing.");
    86. }
    87. // update bytesAcked
    88. block.setNumBytes(one.getLastByteOffsetBlock());
    89. synchronized (dataQueue) {
    90. scope = one.getTraceScope();
    91. if (scope != null) {
    92. scope.reattach();
    93. one.setTraceScope(null);
    94. }
    95. lastAckedSeqno = seqno;
    96. pipelineRecoveryCount = 0;
    97. ackQueue.removeFirst(); //从ack队列中移除
    98. packetSendTime.remove(seqno);
    99. dataQueue.notifyAll(); //通知dataQueue应答处理完毕
    100. one.releaseBuffer(byteArrayManager);
    101. }
    102. } catch (Exception e) {
    103. if (!responderClosed) {
    104. lastException.set(e);
    105. errorState.setInternalError();
    106. errorState.markFirstNodeIfNotMarked();
    107. synchronized (dataQueue) {
    108. dataQueue.notifyAll();
    109. }
    110. if (!errorState.isRestartingNode()) {
    111. LOG.warn("Exception for " + block, e);
    112. }
    113. responderClosed = true;
    114. }
    115. } finally {
    116. if (scope != null) {
    117. scope.close();
    118. }
    119. scope = null;
    120. }
    121. }
    122. }

    至此,客户端成功收到DN的应答后,上传过程完成

  • 相关阅读:
    [附源码]计算机毕业设计springboot实验室管理系统
    23 经典卷积神经网络 LeNet【李沐动手学深度学习v2课程笔记】 (备注:提到如何把代码从CPU改到在GPU上使用)
    CentOS7安装telnet服务
    新快报:十年聚焦,巨杉数据库打造中国基础软件的“原创力”
    【GNN】【ICML2019】Position-aware GraphNeural Networks
    lvs集群(一)
    信创需求激增,国产服务器操作系统赋能数字化转型
    Dialog组件
    入行IC | 从小白助理级,到总监专家级,到底要经历怎样的成长阶段呢?
    基于Spring Boot项目构建流水线
  • 原文地址:https://blog.csdn.net/qq_51235856/article/details/132973642