(1)HDFS 产生背景
随着数据量越来越大,在一个操作系统存不下所有的数据,那么就分配到更多的操作系统管理的磁盘中,但是不方便管理和维护,迫切需要一种系统来管理多台机器上的文件,这就是分布式文件管理系统。HDFS 只是分布式文件管理系统中的一种。
(2)HDFS 定义
HDFS(Hadoop Distributed File System),它是一个文件系统,用于存储文件,通过目录树来定位文件;其次,它是分布式的,由很多服务器联合起来实现其功能,集群中的服务器有各自的角色。
HDFS 的使用场景:适合一次写入,多次读出的场景。一个文件经过创建、写入和关闭之后就不需要改变。






hadoop fs 具体命令 OR hdfs dfs 具体命令
两个是完全相同的。






这里设置的副本数只是记录在 NameNode 的元数据中,是否真的会有这么多副本,还得
看 DataNode 的数量。因为目前只有 3 台设备,最多也就 3 个副本,只有节点数的增加到 10
台时,副本数才能达到 10。

验证 Hadoop 环境变量是否正常。双击 winutils.exe,如果报如下错误。说明缺少微软运行库(正版系统往往有这个问题)。再资料包里面有对应的微软运行库安装包双击安装即可。

4)在 IDEA 中创建一个 Maven 工程 HdfsClientDemo,并导入相应的依赖坐标+日志添加
<dependencies>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-clientartifactId>
<version>3.1.3version>
dependency>
<dependency>
<groupId>junitgroupId>
<artifactId>junitartifactId>
<version>4.12version>
dependency>
<dependency>
<groupId>org.slf4jgroupId>
<artifactId>slf4j-log4j12artifactId>
<version>1.7.30version>
dependency>
dependencies>
在项目的 src/main/resources 目录下,新建一个文件,命名为“log4j.properties”,在文件中填入
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
5)创建包名:com.xxxx.hdfs
6)创建 HdfsClient 类
package com.xxxx.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
/**
* 客户端代码常用套路:
* 1、获取一个客户端对象
* 2、执行相关的操作命令
* 3、关闭资源
*/
public class HdfsClient {
@Test
public void testMkdirs() throws IOException, URISyntaxException, InterruptedException {
//创建一个配置文件
Configuration configuration = new Configuration();
//获取文件系统,url:连接的集群nn地址
//FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:8020"), configuration);
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:8020"), configuration,"lln");
//创建目录
fs.mkdirs(new Path("/xiyou/huaguoshan/"));
//关闭资源
fs.close();
}
}
优化
package com.xxxx.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
/**
* 客户端代码常用套路:
* 1、获取一个客户端对象
* 2、执行相关的操作命令
* 3、关闭资源
*/
public class HdfsClient {
private FileSystem fs;
//初始化
@Before
public void init() throws URISyntaxException, IOException, InterruptedException {
//创建一个配置文件
Configuration configuration = new Configuration();
//获取文件系统,url:连接的集群nn地址
fs = FileSystem.get(new URI("hdfs://hadoop102:8020"), configuration,"lln");
}
//关闭资源
@After
public void close() throws IOException {
fs.close();
}
@Test
public void testMkdirs() throws IOException {
//创建目录
fs.mkdirs(new Path("/xiyou/huaguoshan1/"));
}
}
@Test
public void testCopyFromLocalFile() throws IOException, InterruptedException, URISyntaxException {
// 1 获取文件系统
Configuration configuration = new Configuration();
//configuration.set("dfs.replication", "2");//副本数量
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:8020"), configuration, "lln");
// 2 上传文件
//参数解读:参数一:是否删除源数据,参数二:是否允许覆盖,参数三:原数据路径,参数四:目的地路径
fs.copyFromLocalFile(false, false, new Path("d:/sunwukong.txt"), new Path("/xiyou/huaguoshan1"));
// 3 关闭资源
fs.close();
}
<configuration>
<property>
<name>dfs.replicationname>
<value>1value>
property>
configuration>
参数优先级排序:
@Test
public void testCopyToLocalFile() throws IOException{
//参数解读:参数一:是否删除源数据,参数二:源文件路径,参数三:目标地址路径win,参数四:是否开启文件校验
fs.copyToLocalFile(false,new Path("hdfs://hadoop102/xiyou/huaguoshan"),new Path("D:\\"),true);
}
@Test
public void testDelete() throws IOException{
//参数解读:参数一:要删除的路径,参数二:是否递归删除
//删除文件
//fs.delete(new Path("/jdk-8u212-linux-x64.tar.gz"),false);
//删除空目录
//fs.delete(new Path("/sanguo"),false);
//删除非空目录
fs.delete(new Path("/xiyou"),true);
}
@Test
public void testMv() throws IOException {
//对文件名称的修改
//参数解读:参数一:源文件路径,参数二:目标文件路径
//fs.rename(new Path("/input/word.txt"),new Path("/input/lln.txt"));
//文件个更名和修改
//fs.rename(new Path("/input/lln.txt"),new Path("/word.txt"));
//目录的更名
fs.rename(new Path("/input"),new Path("/output"));
}
@Test
public void testDetail() throws IOException {
//获取所有文件信息
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
//遍历文件
while (listFiles.hasNext()) {
LocatedFileStatus fileStatus = listFiles.next();
System.out.println("========" + fileStatus.getPath() + "=========");
System.out.println(fileStatus.getPermission());
System.out.println(fileStatus.getOwner());
System.out.println(fileStatus.getGroup());
System.out.println(fileStatus.getLen());
System.out.println(fileStatus.getModificationTime());
System.out.println(fileStatus.getReplication());
System.out.println(fileStatus.getBlockSize());
System.out.println(fileStatus.getPath().getName());
// 获取块信息
BlockLocation[] blockLocations = fileStatus.getBlockLocations();
System.out.println(Arrays.toString(blockLocations));
}
}
@Test
public void testListStatus() throws IOException {
//判断是文件还是文件夹
FileStatus[] listStatus = fs.listStatus(new Path("/"));
for(FileStatus fileStatus :listStatus){
// 如果是文件
if (fileStatus.isFile()) {
System.out.println("f:"+fileStatus.getPath().getName());
}else {
System.out.println("d:"+fileStatus.getPath().getName());
}
}
}

(1)客户端通过 Distributed FileSystem 模块向 NameNode 请求上传文件,NameNode 检查目标文件是否已存在,父目录是否存在。
(2)NameNode 返回是否可以上传。
(3)客户端请求第一个 Block 上传到哪几个 DataNode 服务器上。
(4)NameNode 返回 3 个 DataNode 节点,分别为 dn1、dn2、dn3。
(5)客户端通过FSDataOutputStream模块请求dn1上传数据,dn1收到请求继续调用dn2,然后dn2调用dn3,将这个通信管道建立完成。
(6)dn1、dn2、dn3 逐级应答客户端。
(7)客户端开始往 dn1 上传第一个 Block(先从磁盘读取数据放到一个本地内存缓存),以 Packet 为单位,dn1 收到一个 Packet 就会传给 dn2,dn2 传给 dn3;dn1 每传一个 packet会放入一个应答队列等待应答。
(8)当一个 Block 传输完成之后,客户端再次请求 NameNode 上传第二个 Block 的服务器。(重复执行 3-7 步)。
在 HDFS 写数据的过程中,NameNode 会选择距离待上传数据最近距离的 DataNode 接
收数据。那么这个最近距离怎么计算呢?
节点距离:两个节点到达最近的共同祖先的距离总和。

(1)机架感知说明
http://hadoop.apache.org/docs/r3.1.3/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html#Data_Replication

(2)源码说明
Crtl + n 查找 BlockPlacementPolicyDefault,在该类中查找 chooseTargetInOrder 方法。
(3)Hadoop3.1.3 副本节点选择


(1)客户端通过 DistributedFileSystem 向 NameNode 请求下载文件,NameNode 通过查询元数据,找到文件块所在的 DataNode 地址。
(2)挑选一台 DataNode(就近原则,然后随机)服务器,请求读取数据。
(3)DataNode 开始传输数据给客户端(从磁盘里面读取数据输入流,以 Packet 为单位来做校验)。
(4)客户端以 Packet 为单位接收,先在本地缓存,然后写入目标文件。
注:串行读


第一阶段:NameNode 启动
(1)第一次启动 NameNode 格式化后,创建 Fsimage 和 Edits 文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。
(2)客户端对元数据进行增删改的请求。
(3)NameNode 记录操作日志,更新滚动日志。
(4)NameNode 在内存中对元数据进行增删改。
第二阶段:Secondary NameNode 工作
(1)Secondary NameNode 询问 NameNode 是否需要 CheckPoint。直接带回 NameNode是否检查结果。
(2)Secondary NameNode 请求执行 CheckPoint。
(3)NameNode 滚动正在写的 Edits 日志。
(4)将滚动前的编辑日志和镜像文件拷贝到 Secondary NameNode。
(5)Secondary NameNode 加载编辑日志和镜像文件到内存,并合并。
(6)生成新的镜像文件 fsimage.chkpoint。
(7)拷贝 fsimage.chkpoint 到 NameNode。
(8)NameNode 将 fsimage.chkpoint 重新命名成 fsimage。





1)通常情况下,SecondaryNameNode 每隔一小时执行一次。 [hdfs-default.xml] dfs.namenode.checkpoint.period 3600s
2)一分钟检查一次操作次数,当操作次数达到 1 百万时,SecondaryNameNode 执行一次
dfs.namenode.checkpoint.txns</name>
1000000</value>
操作动作次数</description>
</property>
dfs.namenode.checkpoint.check.period</name>
60s</value>
1 分钟检查一次操作次数</description>
</property>







需要注意的是 hdfs-site.xml 配置文件中的 heartbeat.recheck.interval 的单位为毫秒,dfs.heartbeat.interval 的单位为秒。
dfs.namenode.heartbeat.recheck-interval</name>
300000</value>
</property>
dfs.heartbeat.interval</name>
3</value>
</property>
