HDFS的写数据流程,如下图所示:

HDFS上传源码解析如下图所示:

0)在pom.xml中增加如下依赖
- <dependencies>
- <dependency>
- <groupId>org.apache.hadoopgroupId>
- <artifactId>hadoop-clientartifactId>
- <version>3.1.3version>
- dependency>
-
- <dependency>
- <groupId>org.apache.hadoopgroupId>
- <artifactId>hadoop-hdfsartifactId>
- <version>3.1.3version>
- dependency>
-
- <dependency>
- <groupId>org.apache.hadoopgroupId>
- <artifactId>hadoop-hdfs-clientartifactId>
- <version>3.1.3version>
- <scope>providedscope>
- dependency>
-
- <dependency>
- <groupId>junitgroupId>
- <artifactId>junitartifactId>
- <version>4.12version>
- dependency>
- <dependency>
- <groupId>org.slf4jgroupId>
- <artifactId>slf4j-log4j12artifactId>
- <version>1.7.30version>
- dependency>
- dependencies>
一、DN向NN发起创建请求
用户自己写的代码
- @Test
- public void testPut2() throws IOException {
- FSDataOutputStream fos = fs.create(newPath("/input"));
-
- fos.write("helloworld".getBytes());
- }
FileSystem.java
- public FSDataOutputStream create(Path f) throws IOException {
- return create(f, true);
- }
-
- public FSDataOutputStream create(Path f, booleanoverwrite)
- throws IOException {
-
- return create(f, overwrite,
- getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
- IO_FILE_BUFFER_SIZE_DEFAULT),
- getDefaultReplication(f),
- getDefaultBlockSize(f));
- }
-
- public FSDataOutputStream create(Path f,
- boolean overwrite,
- int bufferSize,
- short replication,
- long blockSize) throws IOException {
-
- return create(f, overwrite, bufferSize, replication, blockSize,null);
- }
-
- public FSDataOutputStream create(Path f,
- boolean overwrite,
- int bufferSize,
- short replication,
- long blockSize,
- Progressable progress
- ) throws IOException {
-
- return this.create(f, FsCreateModes.applyUMask(
- FsPermission.getFileDefault(),FsPermission.getUMask(getConf())),
- overwrite, bufferSize, replication,blockSize, progress);
- }
-
- public abstract FSDataOutputStream create(Path f,
- FsPermission permission,
- boolean overwrite,
- int bufferSize,
- short replication,
- long blockSize,
- Progressable progress) throwsIOException;
选中create,点击ctrl+h,找到实现类
DistributedFileSystem.java,查找create方法。
DistributedFileSystem.java
- @Override
- public FSDataOutputStream create(Path f,FsPermission permission,
- boolean overwrite, int bufferSize, shortreplication, long blockSize,
- Progressable progress) throws IOException {
-
- return this.create(f, permission,
- overwrite ? EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE)
- : EnumSet.of(CreateFlag.CREATE),bufferSize, replication,
- blockSize, progress, null);
- }
-
- @Override
- public FSDataOutputStreamcreate(final Path f, finalFsPermission permission,
- final EnumSet
cflags, finalint bufferSize, - final short replication, final longblockSize,
- final Progressable progress, finalChecksumOpt checksumOpt)
- throws IOException {
-
- statistics.incrementWriteOps(1);
- storageStatistics.incrementOpCounter(OpType.CREATE);
- Path absF = fixRelativePart(f);
-
- return newFileSystemLinkResolver
() { -
- @Override
- public FSDataOutputStream doCall(final Path p) throws IOException {
-
- // 创建获取了一个输出流对象
- final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
- cflags, replication,blockSize, progress, bufferSize,
- checksumOpt);
- // 这里将上面创建的dfsos进行包装并返回
- return dfs.createWrappedOutputStream(dfsos,statistics);
- }
-
- @Override
- public FSDataOutputStream next(final FileSystem fs, final Path p)
- throws IOException {
- return fs.create(p, permission,cflags, bufferSize,
- replication, blockSize,progress, checksumOpt);
- }
- }.resolve(this, absF);
- }
点击create,进入DFSClient.java
- public DFSOutputStream create(String src,FsPermission permission,
- EnumSet
flag, short replication, long blockSize, - Progressable progress, int buffersize, ChecksumOpt checksumOpt)
- throws IOException {
-
- return create(src, permission, flag, true,
- replication, blockSize, progress,buffersize, checksumOpt, null);
- }
-
- public DFSOutputStream create(String src,FsPermission permission,
- EnumSet
flag, boolean createParent, short replication, - long blockSize, Progressable progress, int buffersize,
- ChecksumOpt checksumOpt, InetSocketAddress[]favoredNodes)
- throws IOException {
-
- return create(src, permission, flag, createParent, replication,blockSize,
- progress, buffersize, checksumOpt,favoredNodes, null);
- }
-
- public DFSOutputStream create(String src,FsPermission permission,
- EnumSet
flag, boolean createParent, short replication, - long blockSize, Progressable progress, int buffersize,
- ChecksumOpt checksumOpt, InetSocketAddress[]favoredNodes,
- String ecPolicyName) throws IOException {
-
- checkOpen();
-
- final FsPermission masked =applyUMask(permission);
- LOG.debug("{}: masked={}", src,masked);
-
- final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
- src, masked, flag, createParent,replication, blockSize, progress,
- dfsClientConf.createChecksum(checksumOpt),
- getFavoredNodesStr(favoredNodes),ecPolicyName);
-
- beginFileLease(result.getFileId(),result);
-
- return result;
- }
点击newStreamForCreate,进入DFSOutputStream.java
- static DFSOutputStream newStreamForCreate(DFSClientdfsClient, String src,
- FsPermission masked,EnumSet
flag, boolean createParent, - short replication, long blockSize,Progressable progress,
- DataChecksum checksum, String[] favoredNodes,String ecPolicyName)
- throws IOException {
-
- try (TraceScope ignored =
- dfsClient.newPathTraceScope("newStreamForCreate",src)) {
- HdfsFileStatusstat = null;
-
- // Retry the create if we get a RetryStartFileException up to a maximum
- // number of times
- boolean shouldRetry = true;
- int retryCount = CREATE_RETRY_COUNT;
-
- while (shouldRetry) {
- shouldRetry = false;
- try {
- // DN将创建请求发送给NN(RPC)
- stat = dfsClient.namenode.create(src, masked,dfsClient.clientName,
- new EnumSetWritable<>(flag),createParent, replication,
- blockSize, SUPPORTED_CRYPTO_VERSIONS,ecPolicyName);
- break;
- } catch (RemoteException re) {
- … ….
- }
- }
- Preconditions.checkNotNull(stat, "HdfsFileStatus should not benull!");
- final DFSOutputStream out;
-
- if(stat.getErasureCodingPolicy() != null) {
- out = newDFSStripedOutputStream(dfsClient, src, stat,
- flag, progress, checksum,favoredNodes);
- }else {
- out = newDFSOutputStream(dfsClient, src, stat,
- flag, progress, checksum,favoredNodes, true);
- }
-
- //开启线程run,DataStreamer extends Daemon extends Thread
- out.start();
-
- return out;
- }
- }
二、NN处理DN的创建请求
1)点击create
ClientProtocol.java
- HdfsFileStatus create(String src, FsPermissionmasked,
- String clientName,EnumSetWritable
flag, - boolean createParent, shortreplication, long blockSize,
- CryptoProtocolVersion[]supportedVersions, String ecPolicyName)
- throws IOException;
2)Ctrl + h查找create实现类,点击NameNodeRpcServer,在NameNodeRpcServer.java中搜索create
NameNodeRpcServer.java
- public HdfsFileStatuscreate(String src, FsPermissionmasked,
- String clientName,EnumSetWritable
flag, - boolean createParent, short replication,long blockSize,
- CryptoProtocolVersion[] supportedVersions,String ecPolicyName)
- throws IOException {
- // 检查NN启动
- checkNNStartup();
- ... ...
-
- HdfsFileStatus status = null;
- try {
- PermissionStatus perm = newPermissionStatus(getRemoteUser()
- .getShortUserName(), null, masked);
- // 重要
- status = namesystem.startFile(src, perm, clientName,clientMachine,
- flag.get(), createParent, replication,blockSize, supportedVersions,
- ecPolicyName, cacheEntry != null);
- } finally {
- RetryCache.setState(cacheEntry, status !=null, status);
- }
-
- metrics.incrFilesCreated();
- metrics.incrCreateFileOps();
- return status;
- }
FSNamesystem.java
- HdfsFileStatus startFile(String src, PermissionStatuspermissions,
- String holder, String clientMachine,EnumSet
flag, - boolean createParent, short replication,long blockSize,
- CryptoProtocolVersion[] supportedVersions,String ecPolicyName,
- boolean logRetryCache) throws IOException {
-
- HdfsFileStatus status;
- try {
- status = startFileInt(src, permissions, holder, clientMachine, flag,
- createParent, replication, blockSize,supportedVersions, ecPolicyName,
- logRetryCache);
- } catch (AccessControlException e) {
- logAuditEvent(false, "create",src);
- throw e;
- }
- logAuditEvent(true, "create", src,status);
- return status;
- }
-
- private HdfsFileStatus startFileInt(String src,
- PermissionStatus permissions, Stringholder, String clientMachine,
- EnumSet
flag, booleancreateParent, short replication, - long blockSize, CryptoProtocolVersion[]supportedVersions,
- String ecPolicyName, boolean logRetryCache)throws IOException {
- ... ...
- stat = FSDirWriteFileOp.startFile(this, iip, permissions,holder,
- clientMachine, flag, createParent,replication, blockSize, feInfo,
- toRemoveBlocks, shouldReplicate,ecPolicyName, logRetryCache);
- ... ...
- }
-
- static HdfsFileStatus startFile(
- ... ...)
- throws IOException {
-
- ... ...
- FSDirectory fsd = fsn.getFSDirectory();
-
- // 文件路径是否存在校验
- if (iip.getLastINode() != null) {
- if (overwrite) {
- List
toRemoveINodes = newChunkedArrayList<>(); - List
toRemoveUCFiles = newChunkedArrayList<>(); - long ret = FSDirDeleteOp.delete(fsd, iip,toRemoveBlocks,
- toRemoveINodes, toRemoveUCFiles, now());
- if (ret >= 0) {
- iip = INodesInPath.replace(iip,iip.length() - 1, null);
- FSDirDeleteOp.incrDeletedFileCount(ret);
- fsn.removeLeasesAndINodes(toRemoveUCFiles, toRemoveINodes, true);
- }
- } else {
- // If lease soft limit time is expired,recover the lease
- fsn.recoverLeaseInternal(FSNamesystem.RecoverLeaseOp.CREATE_FILE, iip,
- src, holder,clientMachine, false);
- throw new FileAlreadyExistsException(src + " forclient " +
- clientMachine + " already exists");
- }
- }
- fsn.checkFsObjectLimit();
- INodeFile newNode = null;
- INodesInPath parent = FSDirMkdirOp.createAncestorDirectories(fsd,iip, permissions);
- if (parent != null) {
- // 添加文件元数据信息
- iip = addFile(fsd, parent, iip.getLastLocalName(), permissions,
- replication, blockSize, holder,clientMachine, shouldReplicate,
- ecPolicyName);
- newNode = iip != null ?iip.getLastINode().asFile() : null;
- }
- ... ...
- setNewINodeStoragePolicy(fsd.getBlockManager(), iip, isLazyPersist);
- fsd.getEditLog().logOpenFile(src, newNode,overwrite, logRetryEntry);
- if (NameNode.stateChangeLog.isDebugEnabled()){
- NameNode.stateChangeLog.debug("DIR*NameSystem.startFile: added " +
- src + " inode " +newNode.getId() + " " + holder);
- }
- return FSDirStatAndListingOp.getFileInfo(fsd,iip, false, false);
- }
-
- private static INodesInPath addFile(
- FSDirectory fsd, INodesInPath existing,byte[] localName,
- PermissionStatus permissions, shortreplication, long preferredBlockSize,
- String clientName, String clientMachine,boolean shouldReplicate,
- String ecPolicyName) throws IOException {
-
- Preconditions.checkNotNull(existing);
- long modTime = now();
- INodesInPath newiip;
- fsd.writeLock();
- try {
- … …
-
- newiip = fsd.addINode(existing, newNode, permissions.getPermission());
- } finally {
- fsd.writeUnlock();
- }
- ... ...
- return newiip;
- }
-
- INodesInPath addINode(INodesInPath existing, INodechild,
- FsPermission modes)
- throws QuotaExceededException,UnresolvedLinkException {
- cacheName(child);
- writeLock();
- try {
- // 将数据写入到INode的目录树中
- return addLastINode(existing, child, modes, true);
- } finally {
- writeUnlock();
- }
- }
三、DataStreamer启动流程
NN处理完DN请求后,再次回到DN端,启动对应的线程
DFSOutputStream.java
- static DFSOutputStream newStreamForCreate(DFSClientdfsClient, String src,
- FsPermission masked,EnumSet
flag, boolean createParent, - short replication, long blockSize,Progressable progress,
- DataChecksum checksum, String[] favoredNodes,String ecPolicyName)
- throws IOException {
- ... ...
- // DN将创建请求发送给NN(RPC)
- stat = dfsClient.namenode.create(src,masked, dfsClient.clientName,
- new EnumSetWritable<>(flag), createParent, replication,
- blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName);
- ... ...
-
- // 创建输出流
- out = new DFSOutputStream(dfsClient, src, stat,
- flag, progress, checksum,favoredNodes, true);
- // 开启线程run,DataStreamer extends Daemon extends Thread
- out.start();
-
- return out;
- }
点击DFSOutputStream
- protected DFSOutputStream(DFSClient dfsClient, String src,
- HdfsFileStatus stat,EnumSet<CreateFlag> flag, Progressable progress,
- DataChecksum checksum, String[]favoredNodes, boolean createStreamer) {
- this(dfsClient, src, flag, progress, stat,checksum);
- this.shouldSyncBlock =flag.contains(CreateFlag.SYNC_BLOCK);
-
- // Directory => File => Block(128M)=> packet(64K) => chunk(chunk 512byte +chunksum 4byte)
- computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
- bytesPerChecksum);
-
- if (createStreamer) {
- streamer = new DataStreamer(stat, null,dfsClient, src, progress,
- checksum, cachingStrategy, byteArrayManager,favoredNodes,
- addBlockFlags);
- }
- }
点击newStreamForCreate方法中的out.start(),进入DFSOutputStream.java
- protected synchronized void start() {
- getStreamer().start();
- }
-
- protected DataStreamer getStreamer() {
- return streamer;
- }
点击DataStreamer,进入DataStreamer.java
- class DataStreamer extends Daemon {
- 。。。。。。
- }
点击Daemon,进入Daemon.java
- public class Daemon extends Thread {
- 。。。。。。
- }
说明:out.start();实际是开启线程,点击DataStreamer,搜索run方法
DataStreamer.java
- @Override
- public void run(){
-
- long lastPacket = Time.monotonicNow();
- TraceScope scope = null;
- while (!streamerClosed &&dfsClient.clientRunning) {
- // if the Responder encountered an error, shutdown Responder
- if (errorState.hasError()) {
- closeResponder();
- }
-
- DFSPacketone;
- try {
- // process datanode IO errors ifany
- boolean doSleep =processDatanodeOrExternalError();
-
- final int halfSocketTimeout =dfsClient.getConf().getSocketTimeout()/2;
- synchronized (dataQueue) {
- // wait for a packet to be sent.
- … …
- try {
- // 如果dataQueue里面没有数据,代码会阻塞在这儿
- dataQueue.wait(timeout);
- } catch(InterruptedException e) {
- LOG.warn("Caught exception", e);
- }
- doSleep = false;
- now = Time.monotonicNow();
- }
- … …
- // 队列不为空,从队列中取出packet
- one = dataQueue.getFirst();// regular data packet
- SpanId[] parents =one.getTraceParents();
- if (parents.length > 0){
- scope = dfsClient.getTracer().
- newScope("dataStreamer",parents[0]);
- scope.getSpan().setParents(parents);
- }
- }
- }
- … …
- }