基本语法:
# 使用hadoop命令操作hdfs
#可操作任意文件系统,不仅仅是hdfs文件系统,还能操作本地、谷歌GFS等,使用范围更广
hadoop fs <具体命令>
# 或者 使用hdfs命令操作hdfs
#只能操作hdfs文件系统(包括与local fs之间的操作)
hdfs dfs <具体命令> # 最后其实也是调用的:hadoop fs <具体命令>
# 或者使用hadoop dfs操作hdfs,
hadoop dfs <具体命令> # 不推荐,已过时
很多命令、参数和Linux命令很相似,例如:
# 列出hdfs上的文件/文件夹列表,类似linux的ls命令
hadoop fs -ls /
# 创建文件夹,类似Linux的mkdir命令
hadoop fs -mkdir /wcinput
# 删除hdfs上的文件夹,类似linux的 rm -rf命令
hadoop fs -rm -f -r /wcoutput
# 统计文件夹的大小信息
# -s 列出总大小,不加该参数时会列出文件夹下每个文件大小
# -h 以适当的单位展示
# 输出结果第一项表示文件大小,第二项表示所有副本加一起的总大小(默认情况下,一个文件有3个副本,所以默认情况下第二项结果等于第一项的3倍)
hadoop fs -du -s -h /jinguo
# 从本地剪切,粘贴到hdfs
hadoop fs -moveFromLocal shuguo.txt /sanguo
# 从本地复制,粘贴到hdfs
hadoop fs -copyFromLocal weiguo.txt /sanguo
# 上传,同 copyFromLocal 功能一样
hadoop fs -put aa.txt /wcinput
# 从服务器复制,粘贴到本地
hadoop fs -copyToLocal /sanguo/shuguo.txt ./
# 下载,等同于 copyToLocal
hadoop fs -get /wcinput/aa.txt a.txt
# 将liubei.txt追加到shuguo.txt文件末尾
hadoop fs -appendToFile liubei.txt /sanguo/shuguo.txt
#设置HDFS中文件副本数量
hadoop fs -setrep 10 /jinguo/weiguo.txt
虽然我们编写 java 代码的电脑是作为客户端去连接 hdfs 服务器,但是 hdfs 要求如果要读写hdfs就需要在客户端也安装 hadoop。但是hdfs官方又没有Windows版的安装包:

创建Maven工程,添加依赖
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-clientartifactId>
<version>3.2.3version>
dependency>
<dependency>
<groupId>junitgroupId>
<artifactId>junitartifactId>
<version>4.12version>
<scope>testscope>
dependency>
<dependency>
<groupId>org.slf4jgroupId>
<artifactId>slf4j-log4j12artifactId>
<version>1.7.30version>
dependency>
配置log4j日志:log4j.properties
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/hadoop-client.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
HdfsClient
package com.example.bigdatademo.HdfsClient;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
public class HdfsClient {
private FileSystem fs;
@BeforeEach
/**
* 获取客户端对象
*/
public void init() throws URISyntaxException, IOException, InterruptedException {
Configuration configuration = new Configuration();
String uri = "hdfs://master:9000"; // 连接的集群NN地址
String user = "root"; // 用户
fs = FileSystem.get(new URI(uri), configuration, user);
}
@AfterEach
/**
* 关闭资源
*/
public void close() throws IOException {
fs.close();
}
@Test
/**
* 测试创建目录
*/
public void testMkdirs() throws IOException, URISyntaxException, InterruptedException {
// 在HDFS上创建一个文件夹
fs.mkdirs(new Path("/xiyouji/huaguoshan"));
}
@Test
/**
* 测试上传
*/
public void testPut() throws IOException {
// 上传完毕是否删除原数据
boolean delSrc = false;
// 如果hdfs上已有该文件,是否允许覆盖,(不允许覆盖时,如果目的地已经存在该文件,则抛出异常)
boolean overwrite = false;
// 源数据路径
Path src = new Path("C:/Users/YoRHa/Desktop/word.txt");
// 目的地路径,可以加上协议写成完整路径 hdfs://hadoop102:8020/xiyouji/huaguoshan/,也可以不加
Path dst = new Path("/xiyouji/huaguoshan/");
fs.copyFromLocalFile(delSrc, overwrite, src, dst);
}
@Test
/**
* 测试从hdfs下载
*/
public void testGet() throws IOException {
// 下载完毕后,是否删除hdfs上的源文件
boolean delSrc = false;
// hdfs源数据路径(文件或文件夹),也可以加上hdfs://hadoop102/写成完整路径
Path src = new Path("/xiyouji/huaguoshan/word.txt");
// 目的地路径是一个文件夹,程序会将hdfs中的文件下载到该文件夹
Path dst = new Path("C:/Users/YoRHa/Desktop/temp/");
// 是否进行CRC完整性校验。true则不生成crc校验文件,false会生成
boolean useRawLocalFileSystem = true;
fs.copyToLocalFile(delSrc, src, dst, useRawLocalFileSystem);
}
@Test
/**
* 测试删除hdfs文件
*/
public void testRM() throws IOException {
// 要删除的路径(文件或文件夹)
Path path = new Path("/xiyouji/huaguoshan/word.txt");
// 是否递归删除,删除非空文件夹时需要递归删除文件夹下的内容,类似 rm 的 -r 参数。删除文件或空文件夹时可以不递归
boolean recursive = false;
fs.delete(path, recursive);
}
@Test
/**
* 测试移动和重命名hdfs文件
*/
public void testMV() throws IOException {
//更改前的文件
//Path src = new Path("/xiyouji/huaguoshan/word.txt");
//更改后的文件
//Path dst = new Path("/xiyouji/huaguoshan/ss.txt");
// 重命名
//fs.rename(src, dst);
// 文件移动位置并重命名
//fs.rename(new Path("/xiyouji/huaguoshan/ss.txt"), new Path("/xiyouji/word.txt"));
//目录更名
fs.rename(new Path( "/xiyouji/huaguoshan"), new Path( "/xiyouji/shuliandong"));
}
@Test
/**
* 列举文件夹下文件详情, ls 命令
*/
public void testLS() throws IOException {
Path path = new Path("/xiyouji");
boolean recursive = false; // 是否递归
// 类似ls命令,返回值是一个迭代器
RemoteIterator<LocatedFileStatus> listFilesIterator = fs.listFiles(path, recursive);
while(listFilesIterator.hasNext()) {
LocatedFileStatus fileStatus = listFilesIterator.next(); // 获取到文件属性
System.out.println("==================="+fileStatus.getPath()+"===================");
System.out.println(fileStatus.getPermission());
System.out.println(fileStatus.getOwner());
System.out.println(fileStatus.getGroup());
System.out.println(fileStatus.getLen());
System.out.println(fileStatus.getModificationTime());
System.out.println(fileStatus.getReplication());
System.out.println(fileStatus.getBlockSize());
System.out.println(fileStatus.getPath().getName());
//获取块信息
BlockLocation[] blockLocations = fileStatus.getBlockLocations();
System.out.println(Arrays.toString(blockLocations));
}
}
@Test
/**
* HDFS 文件和文件夹判断
*/
public void testListstatus () throws IOException, InterruptedException, URISyntaxException {
FileStatus[] listStatus = fs.listStatus(new Path( "/"));
for (FileStatus status : listStatus) {
if (status.isFile()) {
System.out.println("文件: " + status.getPath().getName());
} else {
System.out.println("目录: " + status.getPath().getName());
}
}
}
}
参数优先级
代码里面的配置 > 在项目资源目录下的配置文件 > hdfs-site.xml >hdfs-default.xml

在第7步中,客户端要向DataNode写数据时,是以一种管道(pipeline)的方式,先向一个DataNode写入,然后由该DataNode继续发给下一个、下一个再发给下下个。不是客户端多线程同时写多个DataNode。
客户端向DataNode传输数据时,也不是直接串行将整个文件块写入,而是将文件块拆分成多个64k的数据包(packet),多个数据包并行的对同一个DataNode进行写入。
DataNode写数据完成后会向给自己发送数据的前一级发送应答信号。如果DataNode写数据失败,没有正确发送应答信号,它的前一级会重试重新向该DataNode传输数据。
写数据流程:

根据节点选择的官方说明,当使用的默认3个节点副本时,hdfs选择的副本存储节点为:


在第1步时,NameNode接到了客户端的请求,会判断客户端的用户是否有权限读,并且判断hdfs中是否有该文件,然后将元数据响应给客户端。
在第3步时,客户端要从DataNode中读取数据,而一个文件块会有多个副本,客户端会考虑哪个DataNode离自己最近,并且该DataNode的访问量负载不是很高才从这个DataNode上下载。
在第5步请求第二个文件块blk_2时,是在已经读取完成了blk_1之后才会发出该请求,是串行读,不是多线程并行。最后将读到的blk_2数据追加到blk_1末尾,就可以拼接成一个完整的文件。
所以,我们在 hadoop 服务器上的data文件夹中找到hadoop存放数据的文件夹$HADOOP_HOME/data/dfs/data/current/BP-xxxxxxx/current/finalized/subdir0/subdir0,在里面将某个文件的几个blk_xxx按顺序拼接,也能恢复出原文件:

在没有 SecondaryNameNode 的集群中,NameNode的工作流程是:
有了SecondaryNameNode的集群,SecondaryNameNode会定期询问NameNode是否到达检查点(checkpoint),如果到达了检查点,SecondaryNameNode就辅助NameNode执行edits信息向fsimage中写入。
NameNode工作流程为:
SecondaryNameNode工作流程为:
因为checkpoint拷贝的时候,客户端如果向DataNode发出写元数据的请求,这部分元数据的修改被记录到了NameNode的edit_inprogress中,而SecondaryNameNode没有这个文件。所以如果NameNode宕机,使用SecondaryNameNode临时充当NameNode的话,会丢失掉这部分信息。
NameNode被格式化之后,将在$HADOOP_HOME/data/dfs/name/current目录下产生如下文件:
如果NameNode中存储的元数据信息有修改变动,还会在该文件夹下生成:
FSImage文件:HDFS文件系统元数据的一个永久性的检查点。其中包含HDFS文件系统的所有目录和文件inode的序列化信息
Edits文件:存放HDFS文件系统的所有更新操作的路径,文件系统客户端执行的所有写操作首先会被记录到Edits文件中。因为它只是记录本次修改的这个操作指令,并不直接去FSImage上修改指定元数据,所以Edits写的速度比较快。
seen_txid:保存的是一个数字。就是最后一个 edits_xxx 的数字
VERSION:记录了NameNode的命令空间编号namespaceID,还有集群编号clusterID等信息。
每次NameNode启动时候都会将FSImage文件读入内存,加载Edis里面的更新操作,保证内存中的元数据信息是最新的、同步的,可以看成NameNode启动时候就将FSImage和Edits文件进行了合并。
使用cat等命令无法直接查看FSImage和Edits文件,可以借助 hdfs oiv查看FSImage,借助 hdfs oev查看Edits文件。
使用oiv命令查看FSImage文件:
hdfs oiv -p <文件类型> -d <FSImage文件> -o <转换后要输出的文件>
FSImage中并没有存储每个数据块存储在哪台DataNode上。而是在系统上电启动时,DataNode主动向NameNode汇报自己服务器上存储的数据块信息
使用oev命令查看Edits文件:
hdfs oev -p <文件类型> -i <Edits文件> -o <转换后要输出的文件>
默认执行CheckPoint的情况:
间隔时间的配置:hdfs-default.xml
<property>
<name>dfs.namenode.checkpoint.periodname>
<value>3600svalue>
property>
操作次数设置、检查操作次数的时间间隔设置:hdfs-site.xml
<property>
<name>dfs.namenode.checkpoint.txnsname>
<value>1000000value>
<describe>操作动作次数describe>
property>
<property>
<name>dfs.namenode.checkpoint.check.periodname>
<value>60svalue>
<describe>1分钟检查一次操作次数describe>
property>

一个数据块在DataNode上以文件形式存储在磁盘上,包括两个文件:一个是数据本身,一个是元数据(包括数据块的长度、块数据的校验和,以及时间戳)。
DataNode启动后向NameNode注册,通过后,周期性(默认6小时)的向NameNode上报所有的块信息。
心跳是每3秒一次,心跳返回结果带有NameNode给该DataNode的命令(如复制数据块到另一台机器,或删除某个数据块)。如果超过10分钟 + 30秒(超过10分钟,再给10次机会)还没有收到某个DataNode的心跳,就认为该节点不可用,认为该节点宕机了,不会再向该节点传输信息。
集群运行中,可以安全的加入或退出一些机器。
DN向NN汇报当前节点信息的时间间隔,默认6小时:hdfs-site.xml
<property>
<name>dfs.blockreport.intervalMsecname>
<value>21600000value>
<describe>Determines block reporting interval in millisecondsdescribe>
property>
DN扫描自己节点块信息列表的时间(默认也是6小时):hdfs-site.xml
<property>
<name>dfs.datanode.directoryscan.intervalname>
<value>21600svalue>
property>
即:每隔6小时,DataNode扫描自己节点上的所有数据块信息,然后上报给NameNode。
DataNode保证数据完整性的做法:
常见的校验算法有:crc(32)、md5(128)、sha1(160)。
DataNode在其文件创建后周期性验证CheckSum

其中dfs.namenode.heartbeat.recheck-interval (单位毫秒)和 dfs.heartbeat.interval (单位秒)可以在 hdfs-site.xml 中进行配置。
<property>
<name>dfs.namenode.heartbeat.recheck-intervalname>
<value>300000value>
<describe>默认为300秒,即5分钟describe>
property>
<property>
<name>dfs.heartbeat.intervalname>
<value>3value>
<describe>心跳时间默认3秒describe>
property>
所以,如果是默认配置,最后真正判断的超时时间为 : 2 * 5分钟 + 10 * 3秒 = 10分钟30秒
每个节点的上次心跳发送时间,可以在浏览器上查看:http://hadoop102:9870,进入 Datanodes,即可看到每个节点的上次发送心跳时间Last contact