HDFS的写数据流程,如下图所示:

HDFS上传源码解析如下图所示:

0)在pom.xml中增加如下依赖
org.apache.hadoop hadoop-client 3.1.3 org.apache.hadoop hadoop-hdfs 3.1.3 org.apache.hadoop hadoop-hdfs-client 3.1.3 provided junit junit 4.12 org.slf4j slf4j-log4j12 1.7.30
一、向DataStreamer的队列里面写数据
用户自己写的代码
@Test
public void testPut2() throws IOException {
FSDataOutputStream fos = fs.create(newPath("/input"));
fos.write("hello world".getBytes());
}
点击write
FilterOutputStream.java
public void write(byte b[]) throws IOException{
write(b, 0, b.length);
}
public void write(byte b[], int off, int len)throws IOException {
if ((off | len | (b.length - (len + off)) |(off + len)) < 0)
throw new IndexOutOfBoundsException();
for (int i = 0 ; i < len ; i++) {
write(b[off + i]);
}
}
public void write(int b) throws IOException {
out.write(b);
}
点击write
OutputStream.java
public abstract void write(int b) throws IOException;
ctrl + h 查找write实现类,选择FSOutputSummer.java,在该类中查找write
FSOutputSummer.java
public synchronized void write(int b) throws IOException {
buf[count++] = (byte)b;
if(count == buf.length) {
flushBuffer();
}
}
protected synchronized void flushBuffer() throws IOException {
flushBuffer(false, true);
}
protected synchronized int flushBuffer(boolean keep,
boolean flushPartial) throws IOException {
int bufLen = count;
int partialLen = bufLen %sum.getBytesPerChecksum();
int lenToFlush = flushPartial ? bufLen :bufLen - partialLen;
if (lenToFlush != 0) {
// 向队列中写数据
// Directory=> File => Block(128M) => package(64K) => chunk(chunk 512byte + chunksum 4byte)
writeChecksumChunks(buf, 0, lenToFlush);
if (!flushPartial || keep) {
count = partialLen;
System.arraycopy(buf, bufLen - count,buf, 0, count);
} else {
count = 0;
}
}
// total bytes left minus unflushed bytesleft
return count - (bufLen - lenToFlush);
}
private void writeChecksumChunks(byte b[], int off, int len)
throws IOException {
// 计算chunk的校验和
sum.calculateChunkedSums(b, off, len, checksum,0);
TraceScope scope = createWriteTraceScope();
// 按照chunk的大小遍历数据
try {
for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
int chunkLen =Math.min(sum.getBytesPerChecksum(), len - i);
int ckOffset = i / sum.getBytesPerChecksum()* getChecksumSize();
// 一个chunk一个chunk的将数据写入队列
writeChunk(b, off + i,chunkLen, checksum, ckOffset,
getChecksumSize());
}
} finally {
if (scope != null) {
scope.close();
}
}
}
protected abstract void writeChunk(byte[] b, int bOffset, int bLen,
byte[] checksum, int checksumOffset, intchecksumLen) throws IOException;
ctrl + h 查找writeChunk实现类DFSOutputStream.java
protected synchronized void writeChunk(byte[] b, intoffset, int len,
byte[] checksum, int ckoff, int cklen)throws IOException {
writeChunkPrepare(len, ckoff, cklen);
// 往packet里面写chunk的校验和 4byte
currentPacket.writeChecksum(checksum, ckoff,cklen);
// 往packet里面写一个chunk 512 byte
currentPacket.writeData(b, offset, len);
// 记录写入packet中的chunk个数,累计到127个chuck,这个packet就满了
currentPacket.incNumChunks();
getStreamer().incBytesCurBlock(len);
// If packet is full, enqueue it fortransmission
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks()||
getStreamer().getBytesCurBlock() ==blockSize) {
enqueueCurrentPacketFull();
}
}
synchronized void enqueueCurrentPacketFull() throws IOException {
LOG.debug("enqueue full {}, src={},bytesCurBlock={}, blockSize={},"
+ " appendChunk={}, {}",currentPacket, src, getStreamer()
.getBytesCurBlock(), blockSize,getStreamer().getAppendChunk(),
getStreamer());
enqueueCurrentPacket();
adjustChunkBoundary();
endBlock();
}
void enqueueCurrentPacket() throws IOException {
getStreamer().waitAndQueuePacket(currentPacket);
currentPacket = null;
}
void waitAndQueuePacket(DFSPacket packet) throws IOException{
synchronized (dataQueue) {
try {
// 如果队列满了,等待
// If queue is full, then wait till wehave enough space
boolean firstWait = true;
try {
while (!streamerClosed &&dataQueue.size() + ackQueue.size() >
dfsClient.getConf().getWriteMaxPackets()) {
if (firstWait) {
Span span =Tracer.getCurrentSpan();
if (span != null) {
span.addTimelineAnnotation("dataQueue.wait");
}
firstWait = false;
}
try {
dataQueue.wait();
} catch (InterruptedException e) {
... ...
}
}
} finally {
Span span = Tracer.getCurrentSpan();
if ((span != null) && (!firstWait)){
span.addTimelineAnnotation("end.wait");
}
}
checkClosed();
// 如果队列没满,向队列中添加数据
queuePacket(packet);
} catch (ClosedChannelException ignored) {
}
}
}
DataStreamer.java
void queuePacket(DFSPacketpacket) {
synchronized (dataQueue) {
if (packet == null) return;
packet.addTraceParent(Tracer.getCurrentSpanId());
// 向队列中添加数据
dataQueue.addLast(packet);
lastQueuedSeqno = packet.getSeqno();
LOG.debug("Queued {}, {}",packet, this);
// 通知队列添加数据完成
dataQueue.notifyAll();
}
}
二、建立管道之机架感知(块存储位置)
1)点击create
Ctrl + n全局查找DataStreamer,搜索run方法
DataStreamer.java
@Override
public void run() {
long lastPacket = Time.monotonicNow();
TraceScope scope = null;
while (!streamerClosed &&dfsClient.clientRunning) {
// if the Responder encountered an error, shutdown Responder
if (errorState.hasError()) {
closeResponder();
}
DFSPacket one;
try {
// process datanode IO errors ifany
boolean doSleep =processDatanodeOrExternalError();
final int halfSocketTimeout =dfsClient.getConf().getSocketTimeout()/2;
synchronized (dataQueue) {
// wait for a packet to be sent.
long now = Time.monotonicNow();
while ((!shouldStop() && dataQueue.size() == 0 &&
(stage !=BlockConstructionStage.DATA_STREAMING ||
now - lastPacket < halfSocketTimeout)) ||doSleep) {
long timeout =halfSocketTimeout - (now-lastPacket);
timeout = timeout <= 0 ?1000 : timeout;
timeout = (stage ==BlockConstructionStage.DATA_STREAMING)?
timeout : 1000;
try {
// 如果dataQueue里面没有数据,代码会阻塞在这儿
dataQueue.wait(timeout); // 接收到notify消息
} catch(InterruptedException e) {
LOG.warn("Caught exception", e);
}
doSleep = false;
now = Time.monotonicNow();
}
if (shouldStop()) {
continue;
}
// get packet to be sent.
if (dataQueue.isEmpty()) {
one =createHeartbeatPacket();
} else {
try {
backOffIfNecessary();
} catch(InterruptedException e) {
LOG.warn("Caught exception", e);
}
// 队列不为空,从队列中取出packet
one = dataQueue.getFirst(); // regular data packet
SpanId[] parents =one.getTraceParents();
if (parents.length > 0){
scope = dfsClient.getTracer().
newScope("dataStreamer",parents[0]);
scope.getSpan().setParents(parents);
}
}
}
// get new block from namenode.
if (LOG.isDebugEnabled()) {
LOG.debug("stage=" + stage + ", " + this);
}
if (stage ==BlockConstructionStage.PIPELINE_SETUP_CREATE) {
LOG.debug("Allocating new block: {}", this);
// 步骤一:向NameNode 申请block 并建立数据管道
setPipeline(nextBlockOutputStream());
// 步骤二:启动ResponseProcessor用来监听packet发送是否成功
initDataStreaming();
} else if (stage ==BlockConstructionStage.PIPELINE_SETUP_APPEND) {
setupPipelineForAppendOrRecovery();
if (streamerClosed) {
continue;
}
initDataStreaming();
}
long lastByteOffsetInBlock =one.getLastByteOffsetBlock();
if (lastByteOffsetInBlock >stat.getBlockSize()) {
throw new IOException("BlockSize " + stat.getBlockSize() +
" < lastByteOffsetInBlock, " +this + ", " + one);
}
… …
// send the packet
SpanId spanId = SpanId.INVALID;
synchronized (dataQueue) {
// move packet from dataQueue to ackQueue
if (!one.isHeartbeatPacket()) {
if (scope != null) {
spanId = scope.getSpanId();
scope.detach();
one.setTraceScope(scope);
}
scope = null;
// 步骤三:从dataQueue 把要发送的这个packet 移除出去
dataQueue.removeFirst();
// 步骤四:然后往ackQueue 里面添加这个packet
ackQueue.addLast(one);
packetSendTime.put(one.getSeqno(),Time.monotonicNow());
dataQueue.notifyAll();
}
}
LOG.debug("{} sending{}", this, one);
// write out data to remotedatanode
try (TraceScope ignored =dfsClient.getTracer().
newScope("DataStreamer#writeTo",spanId)) {
// 将数据写出去
one.writeTo(blockStream);
blockStream.flush();
} catch (IOException e) {
errorState.markFirstNodeIfNotMarked();
throw e;
}
… …
}
点击nextBlockOutputStream
protected LocatedBlock nextBlockOutputStream() throws IOException {
LocatedBlock lb;
DatanodeInfo[] nodes;
StorageType[] nextStorageTypes;
String[] nextStorageIDs;
int count =dfsClient.getConf().getNumBlockWriteRetry();
boolean success;
final ExtendedBlock oldBlock =block.getCurrentBlock();
do {
errorState.resetInternalError();
lastException.clear();
DatanodeInfo[] excluded =getExcludedNodes();
// 向NN获取向哪个DN写数据
lb = locateFollowingBlock(
excluded.length > 0 ? excluded :null, oldBlock);
// 创建管道
success = createBlockOutputStream(nodes,nextStorageTypes, nextStorageIDs,
0L, false);
… …
} while (!success && --count >=0);
if (!success) {
throw new IOException("Unable to createnew block.");
}
return lb;
}
private LocatedBlock locateFollowingBlock(DatanodeInfo[] excluded,
ExtendedBlock oldBlock) throws IOException{
return DFSOutputStream.addBlock(excluded, dfsClient, src,oldBlock,
stat.getFileId(), favoredNodes,addBlockFlags);
}
static LocatedBlock addBlock(DatanodeInfo[]excludedNodes,
DFSClient dfsClient, String src,ExtendedBlock prevBlock, long fileId,
String[] favoredNodes,EnumSet allocFlags)
throws IOException {
... ...
//向NN获取向哪个DN写数据
return dfsClient.namenode.addBlock(src,dfsClient.clientName, prevBlock,
excludedNodes, fileId,favoredNodes, allocFlags);
... ...
}
LocatedBlock addBlock(String src, String clientName,
ExtendedBlock previous, DatanodeInfo[]excludeNodes, long fileId,
String[] favoredNodes,EnumSet addBlockFlags)
throws IOException;
ctrl + h 点击NameNodeRpcServer,在该类中搜索addBlock
NameNodeRpcServer.java
public LocatedBlock addBlock(String src, StringclientName,
ExtendedBlock previous, DatanodeInfo[]excludedNodes, long fileId,
String[] favoredNodes,EnumSet addBlockFlags)
throws IOException {
checkNNStartup();
LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,
clientName, previous, excludedNodes,favoredNodes, addBlockFlags);
if (locatedBlock != null) {
metrics.incrAddBlockOps();
}
return locatedBlock;
}
FSNamesystrm.java
LocatedBlock getAdditionalBlock(
String src, long fileId, String clientName,ExtendedBlock previous,
DatanodeInfo[] excludedNodes, String[]favoredNodes,
EnumSet flags) throwsIOException {
final String operationName ="getAdditionalBlock";
NameNode.stateChangeLog.debug("BLOCK*getAdditionalBlock: {} inodeId {}"+
" for {}", src, fileId,clientName);
... ...
// 选择块存储位置
DatanodeStorageInfo[] targets =FSDirWriteFileOp.chooseTargetForNewBlock(
blockManager, src, excludedNodes,favoredNodes, flags, r);
... ...
return lb;
}
staticDatanodeStorageInfo[] chooseTargetForNewBlock(
BlockManager bm, String src, DatanodeInfo[]excludedNodes,
String[] favoredNodes,EnumSet flags,
ValidateAddBlockResult r) throwsIOException {
... ...
return bm.chooseTarget4NewBlock(src, r.numTargets,clientNode,
excludedNodesSet, r.blockSize,
favoredNodesList, r.storagePolicyID,
r.blockType,r.ecPolicy, flags);
}
public DatanodeStorageInfo[] chooseTarget4NewBlock(... ...
) throws IOException {
... ...
final DatanodeStorageInfo[] targets =blockplacement.chooseTarget(src,
numOfReplicas, client, excludedNodes,blocksize,
favoredDatanodeDescriptors,storagePolicy, flags);
... ...
return targets;
}
DatanodeStorageInfo[] chooseTarget(String src,
int numOfReplicas, Node writer,
Set excludedNodes,
long blocksize,
ListfavoredNodes,
BlockStoragePolicy storagePolicy,
EnumSet flags) {
return chooseTarget(src, numOfReplicas, writer,
newArrayList(numOfReplicas), false,
excludedNodes, blocksize, storagePolicy,flags);
}
public abstract DatanodeStorageInfo[] chooseTarget(String srcPath,
int numOfReplicas,
Node writer,
List chosen,
boolean returnChosenNodes,
Set excludedNodes,
long blocksize,
BlockStoragePolicy storagePolicy,
EnumSetflags);
Crtl + h 查找chooseTarget实现类
BlockPlacementPolicyDefault.java
public DatanodeStorageInfo[] chooseTarget(String srcPath,
int numOfReplicas,
Node writer,
ListchosenNodes,
boolean returnChosenNodes,
Set excludedNodes,
long blocksize,
final BlockStoragePolicy storagePolicy,
EnumSet flags) {
return chooseTarget(numOfReplicas, writer, chosenNodes,returnChosenNodes,
excludedNodes, blocksize, storagePolicy,flags, null);
}
private DatanodeStorageInfo[]chooseTarget(int numOfReplicas,
Node writer,
List chosenStorage,
boolean returnChosenNodes,
Set excludedNodes,
long blocksize,
final BlockStoragePolicy storagePolicy,
EnumSet addBlockFlags,
EnumMap sTypes) {
… …
int[] result =getMaxNodesPerRack(chosenStorage.size(), numOfReplicas);
numOfReplicas = result[0];
int maxNodesPerRack = result[1];
for (DatanodeStorageInfo storage :chosenStorage) {
// add localMachine and related nodes toexcludedNodes
// 获取不可用的DN
addToExcludedNodes(storage.getDatanodeDescriptor(),excludedNodes);
}
List results =null;
Node localNode = null;
boolean avoidStaleNodes = (stats != null
&&stats.isAvoidingStaleDataNodesForWrite());
//
boolean avoidLocalNode = (addBlockFlags !=null
&&addBlockFlags.contains(AddBlockFlag.NO_LOCAL_WRITE)
&& writer != null
&&!excludedNodes.contains(writer));
// Attempt to exclude local node if theclient suggests so. If no enough
// nodes can be obtained, it falls back tothe default block placement
// policy.
// 有数据正在写,避免都写入本地
if (avoidLocalNode) {
results = new ArrayList<>(chosenStorage);
Set excludedNodeCopy = new HashSet<>(excludedNodes);
if (writer != null) {
excludedNodeCopy.add(writer);
}
localNode = chooseTarget(numOfReplicas,writer,
excludedNodeCopy, blocksize,maxNodesPerRack, results,
avoidStaleNodes, storagePolicy,
EnumSet.noneOf(StorageType.class),results.isEmpty(), sTypes);
if (results.size() < numOfReplicas) {
// not enough nodes; discard results andfall back
results = null;
}
}
if (results == null) {
results = newArrayList<>(chosenStorage);
// 真正的选择DN节点
localNode = chooseTarget(numOfReplicas, writer, excludedNodes,
blocksize, maxNodesPerRack, results,avoidStaleNodes,
storagePolicy,EnumSet.noneOf(StorageType.class), results.isEmpty(),
sTypes);
}
if (!returnChosenNodes) {
results.removeAll(chosenStorage);
}
// sorting nodes to form a pipeline
return getPipeline(
(writer != null && writerinstanceof DatanodeDescriptor) ? writer
: localNode,
results.toArray(newDatanodeStorageInfo[results.size()]));
}
private Node chooseTarget(int numOfReplicas,
... ...) {
writer = chooseTargetInOrder(numOfReplicas, writer,excludedNodes, blocksize,
maxNodesPerRack, results,avoidStaleNodes, newBlock, storageTypes);
... ...
}
protected Node chooseTargetInOrder(int numOfReplicas,
Node writer,
final Set excludedNodes,
final longblocksize,
final intmaxNodesPerRack,
final List results,
final booleanavoidStaleNodes,
final booleannewBlock,
EnumMap storageTypes)
throws NotEnoughReplicasException {
final int numOfResults = results.size();
if (numOfResults == 0) {
// 第一个块存储在当前节点
DatanodeStorageInfo storageInfo = chooseLocalStorage(writer,
excludedNodes, blocksize,maxNodesPerRack, results, avoidStaleNodes,
storageTypes, true);
writer = (storageInfo != null) ?storageInfo.getDatanodeDescriptor()
: null;
if (--numOfReplicas == 0) {
return writer;
}
}
final DatanodeDescriptor dn0 =results.get(0).getDatanodeDescriptor();
// 第二个块存储在另外一个机架
if (numOfResults <= 1) {
chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
if (--numOfReplicas == 0) {
return writer;
}
}
if (numOfResults <= 2) {
final DatanodeDescriptor dn1 =results.get(1).getDatanodeDescriptor();
// 如果第一个和第二个在同一个机架,那么第三个放在其他机架
if (clusterMap.isOnSameRack(dn0, dn1)) {
chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes,storageTypes);
} else if (newBlock){
// 如果是新块,和第二个块存储在同一个机架
chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes,storageTypes);
} else {
// 如果不是新块,放在当前机架
chooseLocalRack(writer,excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes,storageTypes);
}
if (--numOfReplicas == 0) {
return writer;
}
}
chooseRandom(numOfReplicas, NodeBase.ROOT,excludedNodes, blocksize,
maxNodesPerRack, results,avoidStaleNodes, storageTypes);
return writer;
}
三、建立管道之Socket发送
NN处理完DN请求后,再次回到DN端,启动对应的线程
点击nextBlockOutputStream
protected LocatedBlock nextBlockOutputStream() throws IOException {
LocatedBlock lb;
DatanodeInfo[] nodes;
StorageType[] nextStorageTypes;
String[] nextStorageIDs;
int count =dfsClient.getConf().getNumBlockWriteRetry();
boolean success;
final ExtendedBlock oldBlock =block.getCurrentBlock();
do {
errorState.resetInternalError();
lastException.clear();
DatanodeInfo[] excluded =getExcludedNodes();
// 向NN获取向哪个DN写数据
lb = locateFollowingBlock(
excluded.length > 0 ? excluded :null, oldBlock);
// 创建管道
success = createBlockOutputStream(nodes, nextStorageTypes,nextStorageIDs,
0L, false);
… …
} while (!success && --count >=0);
if (!success) {
throw new IOException("Unable tocreate new block.");
}
return lb;
}
boolean createBlockOutputStream(DatanodeInfo[]nodes,
StorageType[] nodeStorageTypes, String[]nodeStorageIDs,
long newGS, boolean recoveryFlag) {
... ...
// 和DN创建socket
s = createSocketForPipeline(nodes[0],nodes.length, dfsClient);
// 获取输出流,用于写数据到DN
OutputStreamunbufOut =NetUtils.getOutputStream(s, writeTimeout);
// 获取输入流,用于读取写数据到DN的结果
InputStreamunbufIn = NetUtils.getInputStream(s,readTimeout);
IOStreamPair saslStreams =dfsClient.saslClient.socketSend(s,
unbufOut, unbufIn, dfsClient,accessToken, nodes[0]);
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
blockReplyStream = new DataInputStream(unbufIn);
// 发送数据
new Sender(out).writeBlock(blockCopy,nodeStorageTypes[0], accessToken,
dfsClient.clientName, nodes,nodeStorageTypes, null, bcs,
nodes.length, block.getNumBytes(),bytesSent, newGS,
checksum4WriteBlock,cachingStrategy.get(), isLazyPersistFile,
(targetPinnings != null &&targetPinnings[0]), targetPinnings,
nodeStorageIDs[0], nodeStorageIDs);
... ...
}
public void writeBlock(... ...) throws IOException{
... ...
send(out, Op.WRITE_BLOCK, proto.build());
}
四、建立管道之Socket接收
1)点击create
Ctrl +n 全局查找DataXceiverServer.java,在该类中查找run方法
public void run(){
Peer peer = null;
while (datanode.shouldRun &&!datanode.shutdownForUpgrade) {
try {
// 接收socket的请求
peer = peerServer.accept();
// Make sure the xceiver count is notexceeded
int curXceiverCount =datanode.getXceiverCount();
if (curXceiverCount > maxXceiverCount){
throw new IOException("Xceivercount " + curXceiverCount
+ " exceeds the limit ofconcurrent xcievers: "
+ maxXceiverCount);
}
// 客户端每发送一个block,都启动一个DataXceiver去处理block
new Daemon(datanode.threadGroup,
DataXceiver.create(peer, datanode, this))
.start();
} catch (SocketTimeoutException ignored) {
... ...
}
}
... ...
}
点击DataXceiver(线程),查找run方法
public void run() {
int opsProcessed = 0;
Op op = null;
try {
synchronized(this) {
xceiver = Thread.currentThread();
}
dataXceiverServer.addPeer(peer,Thread.currentThread(), this);
peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
InputStream input = socketIn;
try {
IOStreamPair saslStreams =datanode.saslServer.receive(peer, socketOut,
socketIn,datanode.getXferAddress().getPort(),
return;
}
super.initialize(newDataInputStream(input));
do {
updateCurrentThreadName("Waiting foroperation #" + (opsProcessed + 1));
try {
if (opsProcessed != 0) {
assert dnConf.socketKeepaliveTimeout> 0;
peer.setReadTimeout(dnConf.socketKeepaliveTimeout);
} else {
peer.setReadTimeout(dnConf.socketTimeout);
}
// 读取这次数据的请求类型
op = readOp();
} catch (InterruptedIOException ignored){
// Time out while we wait for clientrpc
break;
} catch (EOFException |ClosedChannelException e) {
// Since we optimistically expect thenext op, it's quite normal to
// get EOF here.
LOG.debug("Cached {} closing after {}ops. " +
"This message is usuallybenign.", peer, opsProcessed);
break;
} catch (IOException err) {
incrDatanodeNetworkErrors();
throw err;
}
// restore normal timeout
if (opsProcessed != 0) {
peer.setReadTimeout(dnConf.socketTimeout);
}
opStartTime = monotonicNow();
// 根据操作类型处理我们的数据
processOp(op);
++opsProcessed;
} while ((peer != null) &&
(!peer.isClosed() &&dnConf.socketKeepaliveTimeout > 0));
} catch (Throwable t) {
... ...
}
}
protected finalvoid processOp(Op op) throws IOException {
switch(op) {
... ...
case WRITE_BLOCK:
opWriteBlock(in);
break;
... ...
default:
throw new IOException("Unknown op" + op + " in data stream");
}
}
private void opWriteBlock(DataInputStream in) throwsIOException {
final OpWriteBlockProto proto =OpWriteBlockProto.parseFrom(vintPrefixed(in));
final DatanodeInfo[] targets = PBHelperClient.convert(proto.getTargetsList());
TraceScope traceScope =continueTraceSpan(proto.getHeader(),
proto.getClass().getSimpleName());
try {
writeBlock(PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock()),
PBHelperClient.convertStorageType(proto.getStorageType()),
PBHelperClient.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
targets,
PBHelperClient.convertStorageTypes(proto.getTargetStorageTypesList(),targets.length),
PBHelperClient.convert(proto.getSource()),
fromProto(proto.getStage()),
proto.getPipelineSize(),
proto.getMinBytesRcvd(),proto.getMaxBytesRcvd(),
proto.getLatestGenerationStamp(),
fromProto(proto.getRequestedChecksum()),
(proto.hasCachingStrategy() ?
getCachingStrategy(proto.getCachingStrategy()) :
CachingStrategy.newDefaultStrategy()),
(proto.hasAllowLazyPersist() ?proto.getAllowLazyPersist() : false),
(proto.hasPinning() ?proto.getPinning(): false),
(PBHelperClient.convertBooleanList(proto.getTargetPinningsList())),
proto.getStorageId(),
proto.getTargetStorageIdsList().toArray(new String[0]));
} finally {
if (traceScope != null) traceScope.close();
}
}
Ctrl +alt +b 查找writeBlock的实现类DataXceiver.java
public void writeBlock(... ...) throws IOException{
... ...
try {
final Replica replica;
if (isDatanode ||
stage !=BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
// open a block receiver
// 创建一个BlockReceiver
setCurrentBlockReceiver(getBlockReceiver(block, storageType, in,
peer.getRemoteAddressString(),
peer.getLocalAddressString(),
stage, latestGenerationStamp,minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode,requestedChecksum,
cachingStrategy, allowLazyPersist,pinning, storageId));
replica = blockReceiver.getReplica();
} else {
replica = datanode.data.recoverClose(
block, latestGenerationStamp,minBytesRcvd);
}
storageUuid = replica.getStorageUuid();
isOnTransientStorage = replica.isOnTransientStorage();
//
// Connect to downstream machine, ifappropriate
// 继续连接下游的机器
if (targets.length > 0) {
InetSocketAddress mirrorTarget = null;
// Connect to backup machine
mirrorNode =targets[0].getXferAddr(connectToDnViaHostname);
LOG.debug("Connecting to datanode{}", mirrorNode);
mirrorTarget =NetUtils.createSocketAddr(mirrorNode);
// 向新的副本发送socket
mirrorSock = datanode.newSocket();
try {
... ...
if (targetPinnings != null &&targetPinnings.length > 0) {
// 往下游socket发送数据
new Sender(mirrorOut).writeBlock(originalBlock,targetStorageTypes[0],
blockToken, clientname, targets,targetStorageTypes,
srcDataNode, stage, pipelineSize,minBytesRcvd, maxBytesRcvd,
latestGenerationStamp,requestedChecksum, cachingStrategy,
allowLazyPersist,targetPinnings[0], targetPinnings,
targetStorageId, targetStorageIds);
} else {
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
blockToken, clientname, targets,targetStorageTypes,
srcDataNode, stage, pipelineSize,minBytesRcvd, maxBytesRcvd,
latestGenerationStamp,requestedChecksum, cachingStrategy,
allowLazyPersist, false,targetPinnings,
targetStorageId,targetStorageIds);
}
mirrorOut.flush();
DataNodeFaultInjector.get().writeBlockAfterFlush();
// read connect ack (only for clients,not for replication req)
if (isClient) {
BlockOpResponseProto connectAck =
BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(mirrorIn));
mirrorInStatus =connectAck.getStatus();
firstBadLink =connectAck.getFirstBadLink();
if (mirrorInStatus != SUCCESS) {
LOG.debug("Datanode {} gotresponse for connect" +
"ack from downstream datanode with firstbadlink as{}",
targets.length, firstBadLink);
}
}
… …
//update metrics
datanode.getMetrics().addWriteBlockOp(elapsed());
datanode.getMetrics().incrWritesFromClient(peer.isLocal(),size);
}
BlockReceiver getBlockReceiver(
final ExtendedBlock block, finalStorageType storageType,
final DataInputStream in,
final String inAddr, final String myAddr,
final BlockConstructionStage stage,
final long newGs, final long minBytesRcvd,final long maxBytesRcvd,
final String clientname, final DatanodeInfosrcDataNode,
final DataNode dn, DataChecksumrequestedChecksum,
CachingStrategy cachingStrategy,
final boolean allowLazyPersist,
final boolean pinning,
final String storageId) throws IOException{
return new BlockReceiver(block, storageType, in,
inAddr, myAddr, stage, newGs,minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, dn,requestedChecksum,
cachingStrategy, allowLazyPersist,pinning, storageId);
}
BlockReceiver(final ExtendedBlock block,final StorageType storageType,
final DataInputStream in,
final String inAddr, final String myAddr,
final BlockConstructionStage stage,
final long newGs, final long minBytesRcvd,final long maxBytesRcvd,
final String clientname, final DatanodeInfosrcDataNode,
final DataNode datanode, DataChecksumrequestedChecksum,
CachingStrategy cachingStrategy,
final boolean allowLazyPersist,
final boolean pinning,
final String storageId) throws IOException {
... ...
if (isDatanode) { //replication or move
replicaHandler =
datanode.data.createTemporary(storageType, storageId, block, false);
} else {
switch (stage) {
case PIPELINE_SETUP_CREATE:
// 创建管道
replicaHandler = datanode.data.createRbw(storageType, storageId,
block, allowLazyPersist);
datanode.notifyNamenodeReceivingBlock(
block,replicaHandler.getReplica().getStorageUuid());
break;
... ...
default: throw newIOException("Unsupported stage " + stage +
" while receiving block " +block + " from " + inAddr);
}
}
... ...
}
public ReplicaHandler createRbw(
StorageType storageType, String storageId,ExtendedBlock b,
boolean allowLazyPersist) throwsIOException {
try (AutoCloseableLock lock =datasetLock.acquire()) {
... ...
if (ref == null) {
ref = volumes.getNextVolume(storageType,storageId, b.getNumBytes());
}
FsVolumeImpl v = (FsVolumeImpl)ref.getVolume();
// create an rbw file to hold block in thedesignated volume
if (allowLazyPersist &&!v.isTransientStorage()) {
datanode.getMetrics().incrRamDiskBlocksWriteFallback();
}
ReplicaInPipeline newReplicaInfo;
try {
// 创建输出流的临时写文件
new ReplicaInfo = v.createRbw(b);
if(new ReplicaInfo.getReplicaInfo().getState() != ReplicaState.RBW) {
throw new IOException("CreateRBWreturned a replica of state "
+new ReplicaInfo.getReplicaInfo().getState()
+ " for block " +b.getBlockId());
}
} catch (IOException e) {
IOUtils.cleanup(null, ref);
throw e;
}
volumeMap.add(b.getBlockPoolId(),newReplicaInfo.getReplicaInfo());
return new ReplicaHandler(newReplicaInfo,ref);
}
}
public ReplicaHandler createRbw(
StorageType storageType, String storageId,ExtendedBlock b,
boolean allowLazyPersist) throwsIOException {
try (AutoCloseableLock lock =datasetLock.acquire()) {
... ...
if (ref == null) {
// 有可能有多个临时写文件
ref = volumes.getNextVolume(storageType, storageId,b.getNumBytes());
}
FsVolumeImpl v = (FsVolumeImpl)ref.getVolume();
// create an rbw file to hold block in thedesignated volume
if (allowLazyPersist &&!v.isTransientStorage()) {
datanode.getMetrics().incrRamDiskBlocksWriteFallback();
}
ReplicaInPipeline newReplicaInfo;
try {
// 创建输出流的临时写文件
newReplicaInfo = v.createRbw(b);
if(newReplicaInfo.getReplicaInfo().getState() != ReplicaState.RBW) {
throw new IOException("CreateRBWreturned a replica of state "
+ newReplicaInfo.getReplicaInfo().getState()
+ " for block " +b.getBlockId());
}
} catch (IOException e) {
IOUtils.cleanup(null, ref);
throw e;
}
volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo());
return new ReplicaHandler(newReplicaInfo,ref);
}
}
public ReplicaInPipeline createRbw(ExtendedBlock b)throws IOException {
File f = createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
LocalReplicaInPipeline newReplicaInfo = newReplicaBuilder(ReplicaState.RBW)
.setBlockId(b.getBlockId())
.setGenerationStamp(b.getGenerationStamp())
.setFsVolume(this)
.setDirectoryToUse(f.getParentFile())
.setBytesToReserve(b.getNumBytes())
.buildLocalReplicaInPipeline();
return newReplicaInfo;
}
五、客户端接收DN写数据应答Response
NN处理完DN请求后,再次回到DN端,启动对应的线程
Ctrl + n全局查找DataStreamer,搜索run方法
DataStreamer.java
@Override
public void run() {
long lastPacket = Time.monotonicNow();
TraceScope scope = null;
while (!streamerClosed &&dfsClient.clientRunning) {
// if the Responder encountered an error, shutdown Responder
if (errorState.hasError()) {
closeResponder();
}
DFSPacket one;
try {
// process datanode IO errors ifany
boolean doSleep =processDatanodeOrExternalError();
final int halfSocketTimeout =dfsClient.getConf().getSocketTimeout()/2;
synchronized (dataQueue) {
// wait for a packet to be sent.
long now = Time.monotonicNow();
while ((!shouldStop() && dataQueue.size() == 0 &&
(stage !=BlockConstructionStage.DATA_STREAMING ||
now - lastPacket < halfSocketTimeout)) ||doSleep) {
long timeout =halfSocketTimeout - (now-lastPacket);
timeout = timeout <= 0 ?1000 : timeout;
timeout = (stage ==BlockConstructionStage.DATA_STREAMING)?
timeout : 1000;
try {
// 如果dataQueue里面没有数据,代码会阻塞在这儿
dataQueue.wait(timeout); // 接收到notify消息
} catch(InterruptedException e) {
LOG.warn("Caught exception", e);
}
doSleep = false;
now = Time.monotonicNow();
}
if (shouldStop()) {
continue;
}
// get packet to be sent.
if (dataQueue.isEmpty()) {
one =createHeartbeatPacket();
} else {
try {
backOffIfNecessary();
} catch(InterruptedException e) {
LOG.warn("Caught exception", e);
}
// 队列不为空,从队列中取出packet
one = dataQueue.getFirst(); // regular data packet
SpanId[] parents =one.getTraceParents();
if (parents.length > 0){
scope = dfsClient.getTracer().
newScope("dataStreamer",parents[0]);
scope.getSpan().setParents(parents);
}
}
}
// get new block from namenode.
if (LOG.isDebugEnabled()) {
LOG.debug("stage=" + stage + ", " + this);
}
if (stage ==BlockConstructionStage.PIPELINE_SETUP_CREATE) {
LOG.debug("Allocating new block: {}", this);
// 步骤一:向NameNode 申请block 并建立数据管道
setPipeline(nextBlockOutputStream());
// 步骤二:启动ResponseProcessor用来监听packet发送是否成功
initDataStreaming();
} else if (stage ==BlockConstructionStage.PIPELINE_SETUP_APPEND) {
LOG.debug("Append to block {}", block);
setupPipelineForAppendOrRecovery();
if (streamerClosed) {
continue;
}
initDataStreaming();
}
long lastByteOffsetInBlock =one.getLastByteOffsetBlock();
if (lastByteOffsetInBlock >stat.getBlockSize()) {
throw new IOException("BlockSize " + stat.getBlockSize() +
" < lastByteOffsetInBlock, " +this + ", " + one);
}
if (one.isLastPacketInBlock()) {
// wait for all data packets have been successfully acked
synchronized (dataQueue) {
while (!shouldStop()&& ackQueue.size() != 0) {
try {
// wait for acks toarrive from datanodes
dataQueue.wait(1000);
} catch (InterruptedException e) {
LOG.warn("Caughtexception", e);
}
}
}
if (shouldStop()) {
continue;
}
stage = BlockConstructionStage.PIPELINE_CLOSE;
}
// send the packet
SpanId spanId = SpanId.INVALID;
synchronized (dataQueue) {
// move packet from dataQueue to ackQueue
if (!one.isHeartbeatPacket()) {
if (scope != null) {
spanId = scope.getSpanId();
scope.detach();
one.setTraceScope(scope);
}
scope = null;
// 步骤三:从dataQueue 把要发送的这个packet 移除出去
dataQueue.removeFirst();
// 步骤四:然后往ackQueue 里面添加这个packet
ackQueue.addLast(one);
packetSendTime.put(one.getSeqno(),Time.monotonicNow());
dataQueue.notifyAll();
}
}
LOG.debug("{} sending{}", this, one);
// write out data to remotedatanode
try (TraceScope ignored =dfsClient.getTracer().
newScope("DataStreamer#writeTo",spanId)) {
// 将数据写出去
one.writeTo(blockStream);
blockStream.flush();
} catch (IOException e) {
errorState.markFirstNodeIfNotMarked();
throw e;
}
lastPacket = Time.monotonicNow();
// update bytesSent
long tmpBytesSent = one.getLastByteOffsetBlock();
if (bytesSent < tmpBytesSent) {
bytesSent = tmpBytesSent;
}
if (shouldStop()) {
continue;
}
// Is this block full?
if (one.isLastPacketInBlock()) {
// wait for the close packet has been acked
synchronized (dataQueue) {
while (!shouldStop()&& ackQueue.size() != 0) {
dataQueue.wait(1000);// wait for acks toarrive from datanodes
}
}
if (shouldStop()) {
continue;
}
endBlock();
}
if (progress != null) { progress.progress();}
// This is used by unit test totrigger race conditions.
if (artificialSlowdown != 0&& dfsClient.clientRunning) {
Thread.sleep(artificialSlowdown);
}
}catch (Throwable e) {
... ...
}finally {
if (scope != null) {
scope.close();
scope = null;
}
}
}
closeInternal();
}
private void initDataStreaming() {
this.setName("DataStreamer for file" + src +
" block " + block);
... ...
response = new ResponseProcessor(nodes);
response.start();
stage =BlockConstructionStage.DATA_STREAMING;
}
点击response再点击ResponseProcessor,ctrl + f 查找run方法
public void run(){
... ...
ackQueue.removeFirst();
packetSendTime.remove(seqno);
dataQueue.notifyAll();
... ...
}