• 【Hadoop】HDFS API 操作大全


    🍁 博主 "开着拖拉机回家"带您 Go to New World.✨🍁

    🦄 个人主页——🎐开着拖拉机回家_Linux,大数据运维-CSDN博客 🎐✨🍁

    🪁🍁 希望本文能够给您带来一定的帮助🌸文章粗浅,敬请批评指正!🍁🐥

    🪁🍁🪁🍁🪁🍁🪁🍁 🪁🍁🪁🍁🪁🍁🪁 🪁🍁🪁🍁🪁🍁🪁🍁🪁🍁🪁🍁

    🍁🪁🍁 🪁🍁🪁🍁感谢点赞和关注 ,每天进步一点点!加油!🍁🪁🍁 🪁🍁🪁🍁

    目录

    🍁 博主 "开着拖拉机回家"带您 Go to New World.✨🍁

    一、FileSystem文件抽象类

    1.1文件读取API

    1.2文件操作API

    1.3抽象FileSystem类的具体实现子类

    1.4FileSystem IO输入系统相关类

    1.5FileSystem IO输出系统相关类

    二、HDFS的API操作

    2.1测试集群版本信息

    2.2文件上传下载和移动

    2.3文件读写操作

    2.4文件状态信息获取

    2.5实战案例


    一、FileSystem文件抽象类


    为了提供对不同数据访问的一致接口,Hadoop借鉴了Linux虚拟文件系统的概念,为此Hadopo提供了一个抽象的文件系统模型FileSystem,HDFS 是其中的一个实现。

    FileSystem是Hadoop中所有文件系统的抽象父类,它定义了文件系统所具有的基本特征和基本操作。

    1.1文件读取API


    HadoopFileSystem操作

    Java操作

    Linux操作

    描述

    URL.openStream

    FileSystem.open

    FileSystem.create

    FileSystem.append

    URL.openStream

    open

    打开一个文件

    FSDataInputStream.read

    InputStream.read

    read

    读取文件中的数据

    FSDataInputStream.write

    OutputStream.write

    write

    向文件中写入数据

    FSDataInputStream.close

    FSDataOutputStream.close

    InputStream.close

    OutputStream.close

    close

    关闭一个文件

    FSDataInputStream.seek

    RandomAccessFile.seek

    lseek

    改变文件读写位置

    FileSystem.getContentSummary

    du/wc

    获取文件存储信息

    1.2文件操作API


    HadoopFileSystem操作

    Java操作

    Linux操作

    描述

    FileSystem.getFileStatus

    FileSystem.get*

    File.get*

    stat

    获取文件/目录的属性

    FileSystem.set*

    File.set*

    chomd

    修改文件属性

    FileSystem.createNewFile

    File.createNewFile

    create

    创建一个文件

    FileSystem.delete

    File.delete

    remove

    删除一个文件

    FileSystem.rename

    File.renameTo

    rename

    移动或先修改文件/目录名

    FileSystem.mkdirs

    File.mkdir

    mkdir

    创建目录

    FileSystem.delete

    File.delete

    rmdir

    从一个目录下删除一个子目录

    FileSystem.listStatus

    File.list

    readdir

    读取一个目录下的项目

    FileSystem.setWorkingDirectory

    getcwd/getwd

    返回当前工作目录

    FileSystem.setWorkingDirectory

    chdir

    更改当前的工作目录

    1.3抽象FileSystem类的具体实现子类


    1.4FileSystem IO输入系统相关类


    1.5FileSystem IO输出系统相关类



    二、HDFS的API操作


    2.1测试集群版本信息

    2.2文件上传下载和移动

    1. /**
    2. * 本地文件上传到 HDFS
    3. *
    4. * @param srcPath 本地路径 + 文件名
    5. * @param dstPath Hadoop路径
    6. * @param fileName 文件名
    7. */
    8. def copyToHDFS(srcPath: String, dstPath: String, fileName: String): Boolean = {
    9. var path = new Path(dstPath)
    10. val fileSystem: FileSystem = path.getFileSystem(conf)
    11. val isFile = new File(srcPath).isFile
    12. // 判断路径是否存在
    13. val existDstPath: Boolean = fileSystem.exists(path)
    14. if (!existDstPath) {
    15. fileSystem.mkdirs(path)
    16. }
    17. // 本地文件存在
    18. if (isFile) {
    19. // HDFS 采用 路径+ 文件名
    20. path = new Path(dstPath + File.separator + fileName)
    21. // false: 是否删除 目标文件,false: 不覆盖
    22. fileSystem.copyFromLocalFile(false, false, new Path(srcPath), path)
    23. return true
    24. }
    25. false
    26. }
    27. /**
    28. * Hadoop文件下载到本地
    29. *
    30. * @param srcPath hadoop 源文件
    31. * @param dstPath 目标文件
    32. * @param fs 文件访问对象
    33. */
    34. def downLoadFromHDFS(srcPath: String, dstPath: String, fs: FileSystem): Unit = {
    35. val srcPathHDFS = new Path(srcPath)
    36. val dstPathLocal = new Path(dstPath)
    37. // false: 不删除源文件
    38. fs.copyToLocalFile(false, srcPathHDFS, dstPathLocal)
    39. }
    40. /**
    41. * 检查Hadoop文件是否存在并删除
    42. *
    43. * @param path HDFS文件
    44. */
    45. def checkFileAndDelete(path: String, fs: FileSystem) = {
    46. val dstPath: Path = new Path(path)
    47. if (fs.exists(dstPath)) {
    48. // false: 是否递归删除,否
    49. fs.delete(dstPath, false)
    50. }
    51. }
    52. /**
    53. * 获取指定目录下,正则匹配后的文件列表
    54. *
    55. * @param dirPath hdfs路径
    56. * @param regexRule 正则表达式 ,如:"^(?!.*[.]tmp$).*$" ,匹配非 .tmp结尾的文件
    57. */
    58. def listStatusHDFS(dirPath: String, regexRule: String, fs: FileSystem): util.ArrayList[Path] = {
    59. val path = new Path(dirPath)
    60. val pattern: Pattern = Pattern.compile(regexRule)
    61. // 匹配的文件
    62. val fileList = new util.ArrayList[Path]()
    63. val fileStatusArray: Array[FileStatus] = fs.listStatus(path)
    64. for (fileStatus <- fileStatusArray) {
    65. // 文件 全路径
    66. val filePath: Path = fileStatus.getPath()
    67. val fileName: String = filePath.getName.toLowerCase
    68. if (regexRule.equals("")) {
    69. // 如果匹配规则为空 则获取目录下的全部文件
    70. fileList.add(filePath)
    71. log.info("match file : " + fileName)
    72. } else {
    73. // 正则匹配文件
    74. if (pattern.matcher(fileName).matches()) {
    75. fileList.add(filePath)
    76. log.info("match file : " + fileName)
    77. }
    78. }
    79. }
    80. fileList
    81. }
    82. /**
    83. * 文件移动或重命名到指定目录, 如:文件00000 重命名为00001
    84. *
    85. * @param srcPath 源文件路径
    86. * @param dstPath 源文件路径
    87. * @param fs 文件操作对象
    88. */
    89. def renameToHDFS(srcPath: String, dstPath: String, fs: FileSystem): Boolean = {
    90. var renameFlag = false
    91. val targetPath = new Path(dstPath)
    92. // 目标文件存在先删除
    93. if (fs.exists(targetPath)) {
    94. fs.delete(targetPath, false)
    95. }
    96. renameFlag = fs.rename(new Path(srcPath), targetPath)
    97. if (renameFlag) {
    98. log.info("renamed file " + srcPath + " to " + targetPath + " success!")
    99. } else {
    100. log.info("renamed file " + srcPath + " to " + targetPath + " failed!")
    101. }
    102. renameFlag
    103. }

    2.3文件读写操作


    Hadoop抽象文件系统也是使用流机制进行文件的读写。Hadoop抽象文件系统中,用于读文件数据的流是FSDataInputStream,对应地,写文件通过抽象类FSDataOutputStream实现。

    1. /**
    2. * 读取HDFS文件
    3. *
    4. * @param inPutFilePath 源文件路径
    5. * @param fs 文件操作对象
    6. */
    7. def readFromHDFS(inPutFilePath: String, OutputFilePath: String, fs: FileSystem) = {
    8. var fSDataInputStream: FSDataInputStream = null
    9. var bufferedReader: BufferedReader = null
    10. val srcPath = new Path(inPutFilePath)
    11. if (fs.exists(srcPath)) {
    12. val fileStatuses: Array[FileStatus] = fs.listStatus(srcPath)
    13. for (fileStatus <- fileStatuses) {
    14. val filePath: Path = fileStatus.getPath
    15. // 判断文件大小
    16. if (fs.getContentSummary(filePath).getLength > 0) {
    17. fSDataInputStream = fs.open(filePath)
    18. bufferedReader = new BufferedReader(new InputStreamReader(fSDataInputStream))
    19. var line = bufferedReader.readLine()
    20. while (line != null) {
    21. print(line + "\n") // 打印
    22. line = bufferedReader.readLine()
    23. }
    24. }
    25. }
    26. }
    27. fSDataInputStream.close()
    28. bufferedReader.close()
    29. }
    30. /**
    31. * 读取HDFS文件, 处理完成 重新写入
    32. *
    33. * @param inPutFilePath 源文件路径
    34. * @param OutputFilePath 输出文件到新路径
    35. * @param fs 文件操作对象
    36. */
    37. def writeToHDFS(inPutFilePath: String, OutputFilePath: String, fs: FileSystem) = {
    38. var fSDataInputStream: FSDataInputStream = null
    39. var fSDataOutputStream: FSDataOutputStream = null
    40. var bufferedReader: BufferedReader = null
    41. var bufferedWriter: BufferedWriter = null
    42. val srcPath = new Path(inPutFilePath)
    43. var count = 0
    44. if (fs.exists(srcPath)) {
    45. val fileStatuses: Array[FileStatus] = fs.listStatus(srcPath)
    46. for (fileStatus <- fileStatuses) {
    47. val filePath: Path = fileStatus.getPath
    48. // 判断文件大小
    49. if (fs.getContentSummary(filePath).getLength > 0) {
    50. fSDataInputStream = fs.open(filePath)
    51. bufferedReader = new BufferedReader(new InputStreamReader(fSDataInputStream))
    52. val outputFilePath = new Path(OutputFilePath + count)
    53. fSDataOutputStream = fs.create(outputFilePath)
    54. bufferedWriter = new BufferedWriter(new OutputStreamWriter(fSDataOutputStream, "UTF-8"))
    55. var line = bufferedReader.readLine()
    56. while (line != null) {
    57. val bytes: Array[Byte] = line.getBytes("UTF-8")
    58. bufferedWriter.write(new String(bytes) + "\n")
    59. line = bufferedReader.readLine()
    60. }
    61. bufferedWriter.flush()
    62. count += 1
    63. }
    64. }
    65. }
    66. fSDataInputStream.close()
    67. bufferedReader.close()
    68. bufferedWriter.close()
    69. }

    测试结果如下:

    2.4文件状态信息获取


    FileSystem. getContentSummary()提供了类似Linux命令du、df提供的功能。du表示"disk usage",它会报告特定的文件和每个子目录所使用的磁盘空间大小;命令df则是"diskfree"的缩写,用于显示文件系统上已用的和可用的磁盘空间的大小。du、df是Linux中查看磁盘和文件系统状态的重要工具。

    getContentSummary()方法的输入是一个文件或目录的路径,输出是该文件或目录的一些存储空间信息,这些信息定义在ContentSummary,包括文件大小、文件数、目录数、文件配额,已使用空间和已使用文件配额等。

    1. /**
    2. * HDFS路径下文件信息统计
    3. *
    4. * @param dirPath hdfs路径
    5. **/
    6. def listHDFSStatus(dirPath: String, fs: FileSystem) = {
    7. val path = new Path(dirPath)
    8. // 匹配的文件
    9. val contentSummary: ContentSummary = fs.getContentSummary(path)
    10. println("/tmp/kangll 目录下子目录个数: ", contentSummary.getDirectoryCount)
    11. println("/tmp/kangll 目录下文件个数: ", contentSummary.getFileCount)
    12. println("/tmp/kangll 目录下文件大小: ", contentSummary.getLength)
    13. println("/tmp/kangll 目录下文件和子目录个数: ", contentSummary.getFileAndDirectoryCount)
    14. }

    /tmp/kangll目录信息获取结果:

    2.5实战案例


    案例说明: HDFS 文件清理, 根据文件大小、个数、程序休眠时间控制 匀速 批量删除 HDFS 文件,当文件越大 ,需要配置 删除个数更少,休眠时间更长,防止 NameNode 负载过大,减轻DataNode磁盘读写压力,从而不影响线上业务情况下清理过期数据。

    1. package com.kangll.common.utils
    2. import java.text.SimpleDateFormat
    3. import java.util.concurrent.TimeUnit
    4. import java.util.{Calendar, Date, Properties}
    5. import org.apache.hadoop.conf.Configuration
    6. import org.apache.hadoop.fs.{ContentSummary, FileStatus, FileSystem, Path}
    7. import org.apache.log4j.Logger
    8. import scala.collection.mutable.ListBuffer
    9. /** ***************************************************************************************
    10. *
    11. * @auther kangll
    12. * @date 2023/09/12 12:10
    13. * @desc HDFS 文件清理, 根据文件大小、个数、程序休眠时间控制 匀速 批量删除
    14. * HDFS 文件,当文件越大 ,需要配置 删除个数更少,休眠时间更长,防止
    15. * NameNode 负载过大,减轻DataNode磁盘读写压力,从而不影响线上业务下删除
    16. *
    17. *
    18. * 1.遍历文件夹下的文件个数据, 当遍历的文件夹下的文件个数到达阈值时 将
    19. * 文件所述的 父路径直接删除
    20. *
    21. * ****************************************************************************************/
    22. object CleanHDFSFileUtil {
    23. // 删除文件总数统计
    24. var HDFS_FILE_SUM = 0
    25. // 批次删除文件个数显示
    26. var HDFS_FILE_BATCH_DEL_NUM = 0
    27. val start = System.currentTimeMillis()
    28. /**
    29. *
    30. * @param fs 文件操作对象
    31. * @param pathName 文件根路径
    32. * @param fileList 批次清理的 buffer
    33. * @param saveDay 根据文件属性 获取文件创建时间 选择文件保留最近的天数
    34. * @param sleepTime 休眠时间,防止一次性删除太多文件 导致 datanode 文件负载太大
    35. * @param fileBatchCount 批次删除文件的个数, 相当于是 上报到 namenode 文件清理队列的大小,参数越大 队列越大,datanode 磁盘负载相对来说就高
    36. * @return
    37. */
    38. def listPath(fs: FileSystem, pathName: String, fileList: ListBuffer[String], saveDay: Int, sleepTime: Long, fileBatchCount: Int): ListBuffer[String] = {
    39. val fm = new SimpleDateFormat("yyyy-MM-dd")
    40. // 获取当前时间
    41. val currentDay = fm.format(new Date())
    42. val dnow = fm.parse(currentDay)
    43. val call = Calendar.getInstance()
    44. call.setTime(dnow)
    45. call.add(Calendar.DATE, -saveDay)
    46. // 获取保留天前的时期
    47. val saveDayDate = call.getTime
    48. // 遍历文件
    49. val fileStatuses = fs.listStatus(new Path(pathName))
    50. for (status <- fileStatuses) {
    51. // 获取到文件名
    52. val filePath = status.getPath
    53. if (status.isFile) {
    54. // 获取到文件修改时间
    55. val time: Long = status.getModificationTime
    56. val hdfsFileDate = fm.parse(fm.format(new Date(time)))
    57. if (saveDayDate.after(hdfsFileDate)) {
    58. fileList += filePath.toString
    59. // 获取文件个数
    60. val cs: ContentSummary = fs.getContentSummary(filePath)
    61. HDFS_FILE_SUM += cs.getFileCount.toInt
    62. HDFS_FILE_BATCH_DEL_NUM += cs.getFileCount.toInt
    63. if (HDFS_FILE_BATCH_DEL_NUM >= fileBatchCount) {
    64. val end = System.currentTimeMillis()
    65. println("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
    66. println("++++++++++++++++ 遍历文件数量达到 " + HDFS_FILE_BATCH_DEL_NUM + " 个,删除HDFS文件 ++++++++++++++++")
    67. println("++++++++++++++++++++++++++++ 休眠 " + sleepTime + " S ++++++++++++++++++++++++++++")
    68. println("++++++++++++++++++++++++ 删除文件总数:" + HDFS_FILE_SUM + " ++++++++++++++++++++++++++")
    69. println("++++++++++++++++++++++++ 程序运行时间:" + (end - start) / 1000 + " s ++++++++++++++++++++++++")
    70. println("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
    71. HDFS_FILE_BATCH_DEL_NUM = 0
    72. TimeUnit.MILLISECONDS.sleep(sleepTime)
    73. }
    74. // 文件删除根据绝对路径删除
    75. println("+++++ 删除文件: " + filePath + "+++++")
    76. // 递归删除
    77. fs.delete(filePath, true)
    78. }
    79. } else {
    80. // 递归文件夹
    81. listPath(fs, filePath.toString, fileList, saveDay, sleepTime, fileBatchCount)
    82. }
    83. }
    84. println("+++++++++++++++++++++++++ 删除文件总数:" + HDFS_FILE_SUM + " +++++++++++++++++++++++++")
    85. fileList
    86. }
    87. /**
    88. * 删除空文件夹
    89. *
    90. * @param fs 文件操作对象
    91. * @param pathName 路径
    92. * @param pathSplitLength 文件按照"/"拆分后的长度
    93. */
    94. def delEmptyDirectory(fs: FileSystem, pathName: String, pathSplitLength: Int) = {
    95. // 遍历文件
    96. val fileStatuses = fs.listStatus(new Path(pathName))
    97. for (status <- fileStatuses) {
    98. if (status.isDirectory) {
    99. val path: Path = status.getPath
    100. // /kangll/winhadoop/temp/wmall_batch_inout/day/1660878372 = 7
    101. val delPathSplitLength = path.toString.substring(6, path.toString.length).split("/").length
    102. // filePath /kangll/winhadoop/temp/wmall_batch_inout/day 子时间戳文件夹两个
    103. // val hdfsPathListCount = fileStatuses.length
    104. val hdfsPathListCount = fs.listStatus(path).length
    105. if (delPathSplitLength == pathSplitLength && hdfsPathListCount == 0) {
    106. println("+++++++++++++++++ 删除空文件夹 : " + path + " +++++++++++++++++++")
    107. fs.delete(path, true)
    108. }
    109. }
    110. }
    111. }
    112. def main(args: Array[String]): Unit = {
    113. val logger = Logger.getLogger("CleanHDFSFileUtil")
    114. val conf = new Configuration()
    115. conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem")
    116. conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem")
    117. val fs = FileSystem.get(conf)
    118. val fileList = new ListBuffer[String]
    119. val hdfsDir = if (args.size > 0) args(0).toString else System.exit(0).toString
    120. val saveDay = if (args.size > 1) args(1).toInt else 2
    121. val sleepTime = if (args.size > 2) args(2).toLong else 10
    122. val fileBatchCount = if (args.size > 3) args(3).toInt else 5
    123. /*
    124. 默认不启用文件夹删除,参数为 文件夹绝对路径Split后的数组长度
    125. 如 路径 /winhadoop/temp/wmall_batch_inout/thirty" 配置为 7
    126. */
    127. val pathSplitLength = if (args.size > 4) args(4).toInt else 20
    128. // 删除文件
    129. listPath(fs, hdfsDir, fileList, saveDay, sleepTime, fileBatchCount)
    130. // 删除空文件夹
    131. delEmptyDirectory(fs, hdfsDir, pathSplitLength)
    132. fs.close()
    133. }
    134. }

    调用脚本

    1. #
    2. # 脚本功能: 过期文件清理
    3. # 作 者: kangll
    4. # 创建时间: 2023-09-14
    5. # 修改内容: 控制删除文件的批次个数,程序休眠时间传入
    6. # 当前版本: 1.0v
    7. # 调度周期: 一天一次
    8. # 脚本参数: 删除文件夹、文件保留天数、程序休眠时间、批次删除个数
    9. # 1.文件根路径,子文件夹递归遍历
    10. # 2.文件保留天数
    11. # 3.程序休眠时间 防止 DataNode 删除文件负载过大,单位 秒
    12. # 4.批次删除文件个数 ,如配置 100,当满足文件个数100时, 整批执行 delete,紧接着程序休眠
    13. # 5.默认不启用文件夹删除,也就是不传参,参数为 文件夹绝对路径Split后的数组长度
    14. # /winhadoop/temp/wmall_batch_inout/thirty/时间戳/ Split后 长度为7,默认删除时间戳文件夹
    15. #
    16. ### 对应的新删除程序
    17. jarPath=/hadoop/project/del_spark2-1.0-SNAPSHOT.jar
    18. ### 集群日志
    19. java -classpath $jarPath com.kangll.common.utils.CleanHDFSFileUtil /spark2-history 3 10 100

    参考 :

    hadoop抽象文件系统filesystem框架介绍_org.apache.hadoop.fs.filesystem_souy_c的博客-CSDN博客

    Hadoop FileSystem文件系统的概要学习 - 回眸,境界 - 博客园

    hadoop抽象文件系统filesystem框架介绍_org.apache.hadoop.fs.filesystem_souy_c的博客-CSDN博客

  • 相关阅读:
    YOLOv5 PyQt5 | PyQt5快速入门 | 2/3
    Rxjava学习(一)简单分析Rxjava调用流程
    springsecurity实现单点登录
    嵌入式Qt-控制硬件:滑动条控制RGB灯
    Django之视图层
    如何用Java实现SpringCloud Alibaba Sentinel的熔断功能?
    spark任务,使用 repartition 对数据进行了重新分区,但任务输入数据大小仍存在不均衡
    干货!高并发下秒杀商品,你必须知道的9个细节
    派金SDK接入文档
    【图像处理】基于双目视觉的物体体积测量算法研究(Matlab代码实现)
  • 原文地址:https://blog.csdn.net/qq_35995514/article/details/132990790