• Hadoop总结


    hadoop总结

    1、安装操作

    1.1、虚拟机配置

    1. 配置静态网络:

      其他已经配置好

      虚拟机里操作:

      vi /etc/sysconfig/network-scripts/ifcfg-ens33
      #修改
      BOOTPROTO='static'
      ONBOOT='yes'
      #添加
      IPADDR=192.168.51.100
      NETMASK=225.225.225.0
      GATEWAY=192.168.51.1
      DNS1=8.8.8.8
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9

      然后重启网卡

      systemctl restart network
      
      • 1

      安装常用软件

      yum -y install vim
      yum -y install net-tools
      yum -y install openssl-devel
      
      • 1
      • 2
      • 3

      然后克隆三台,记得各自改ip

    2. 关闭防火墙:

      systemctl stop firewalld
      systemctl disable firewalld
      
      • 1
      • 2
    3. 关闭selinux:

      vi /etc/sysconfig/selinux
      
      SELINUX=disabled
      
      • 1
      • 2
      • 3
    4. 更改主机名

      vi /etc/hostname
      
      node1
      node2
      node3
      
      • 1
      • 2
      • 3
      • 4
      • 5
    5. 做主机名和IP地址的映射

      vi /etc/hosts
      
      192.268.51.110 node1 node01
      192.268.51.120 node2 node02
      192.268.51.130 node3 node03
      
      • 1
      • 2
      • 3
      • 4
      • 5
    6. 三台机器时钟同步

      必须连接到外网

      yum -y install ntpdatei
      
      • 1

      阿里云时钟同步服务器

      ntpdate ntp4.aliyun.com
      
      • 1

      三天机器定时任务

      crontab -e
      
      • 1

      添加如下内容

      */1 * * * * /usr/sbin/ntpdate ntp4.aliyun.com;
      
      • 1
    7. 添加普通用户,使其具有sudo权限

      useradd hadoop
      passwod hadoop
      
      • 1
      • 2

      设置123456

      添加root权限

      visudo
      
      • 1

      增加内容

      hadoop ALL=(ALL)  ALL
      
      • 1
    8. 三台定义统一目录

      mkdir -p /kkb/soft #软件压缩包存放目录
      mkdir -p /kkb/install #软件解压后存放目录
      chown -R hadoop:hadoop /kkb
      
      • 1
      • 2
      • 3
    9. 配置免密码登录

      1. 三台机器下各生成公钥私钥

        ssh-keygen -t rsa
        
        • 1
      2. 三台机器在hadoop用户下,执行拷贝公钥到node01服务器

        ssh-copy-id node01
        
        • 1
      3. 将node01的公钥拷贝给node02与node03

        cd /home/hadoop/.ssh/
        
        scp authorized_keys node02:$PWD
        scp authorized_keys node03:$PWD
        
        • 1
        • 2
        • 3
        • 4
      4. 验证

        ssh node02
        
        • 1

    1.2、安装jdk

    解压,配置环境变量

    1.3、hadoop安装

    1. 上传文件,解压

    2. 配置环境变量

    3. 修改hadoop-env.sh

      cd /kkb/install/hadoop/etc/hadoop/
      vim hadoop-env.sh
      
      export JAVA_HOME=/kkb/insyall/jdk
      
      • 1
      • 2
      • 3
      • 4
    4. 修改core-site.xml

      vim  core-site.xml
      
      
      
      • 1
      • 2
      • 3
    5. 修改hdfs-site.xml

    6. 修改mapred-site.xml

    7. 修改yarn-site.xml

    8. 修改work文件

      第一台机器执行

      vim worker
      #更换内容
      node01
      node02
      node03
      
      • 1
      • 2
      • 3
      • 4
      • 5
    9. 创建文件存放目录

      node01机器下创建以下目录

      mkdir -p /kkb/install/hadoop/hadoopDatas/tempDatas
      mkdir -p /kkb/install/hadoop/hadoopDatas/namenodeDAtas
      mkdir -p /kkb/install/hadoop/hadoopDatas/datanodeDatas
      mkdir -p /kkb/install/hadoop/hadoopDatas/dfs/nn/edits
      mkdir -p /kkb/install/hadoop/hadoopDatas/dfs/snn/name
      mkdir -p /kkb/install/hadoop/hadoopDatas/dfs/nn/snn/edits
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
    10. 安装包分发与rsync

      scp拷贝

      scp -r 文件名 username@host:文件路径
      
      • 1
    11. 格式化集群

      在node01执行一遍就行

      hdfs namenode -format
      
      • 1
    12. 集群启动

      start-dfs.sh
      start-yarn.sh
      
      mapred  --daemon start historyserver
      
      • 1
      • 2
      • 3
      • 4
    13. 验证是否成功

      hdfs集群访问地址Browsing HDFS

      yarn集群访问地址 All Applications

      jobhistory集群访问地址 JobHistory

    2、hadoop

    Hadoop起源

    hadoop的创始者是Doug Cutting,起源于Nutch项目,该项目是作者尝试构建的一个开源的Web搜索引擎。起初该项目遇到了阻碍,因为始终无法将计算分配给多台计算机。后受到谷歌发表的关于GFS和MapReduce相关的论文的启发,最终让Nutch可以在多台计算机上稳定的运行。后来雅虎对这项技术产生了很大的兴趣,并组建了团队开发,从Nutch中剥离出分布式计算模块命名为“Hadoop”。最终Hadoop在雅虎的帮助下能够真正的处理海量的Web数据。

    狭义上:指hadoop软件

    广义上:指Hadoop生态圈,包含 hadoop,durid,flink,zookeeper,shark,hue,ooize,elasticsearch,solr,phoenix,hive,hbase,flume,storm,sqoop,Kafka,spark,impala

    版本区别:

    • 0.x:最早一个版本
    • 1.x:第二代开源本,修复了0.x的bug,包含,mapreduce和hdfs
    • 2.x:架构映入yarn平台等许多新特性,将mapreduce拆分为yarn和mapreduce,同样包含hdfs
    • 3.x:在2.x的版本下,引入了hdfs的新特性,架构和2.x没区别

    运行模式:

    1. 单机版

      无需守护进程,所有程序都在一个jvm上执行。在独立模式下调试mr比较方便。一般用于学习或者调试

    2. 伪分布式模式

      守护进程运行在本地机器上,模拟一个小规模集群,可以配置一台机器的集群。

    3. 完全分布式模式

      守护进程在一个集群上,需要多台机器

    架构介绍:

    2.0版本以后Hadoop由三个模块组成:分布式文件存储hdfs,分布式计算mapreduce,资源管理调度引擎yarn

    主从架构

    hdfs:namenode(管理集群,存元数据信息),SecondaryNameNode,datanode(存数据)

    yarn:REsourceManager(主要资源调度分配),NodeManager(提供具体资源)

    3、hadoop高可用

    Hadoop HA 高可用

    4、hdfs

    4.1、hdfs 架构

    分块存储

    block块:默认128mb,具有三个副本

    • 文件保存到hdfs会按128mb切分为一个个block块

    • 以block块的形式保存在hdfs中

      • hadoop1.x默认64mb

      • Hadoop2以后,默认为128mb,也可在hdfs-site.xml中设定

        <property>
            <name>dfs.blocksizename>
            <value>字节数value>
        property>
        
        • 1
        • 2
        • 3
        • 4

        hdfs参考属性:https://hadoop.apache.org/docs/r3.2.3/hadoop-project-dist/hadoop-hdfs/hdfs-default.xml

        其他属性类似位置

    • block元数据

      • 每个block的元数据大小为150字节
      • 一个1k的文件的元数据和一个128mb的块的元数据差不多一样
    • 抽象成块的好处

      1. 文件可以大于集群的任何一个磁盘,分块可以很好的存储
      2. 使用块抽象而不是文件可以简化存储子系统
      3. 块适合数据备份,进而提高数据的容错能力和可用性

    机架感知

    通过一个机架感知的过程,NameNode可以确定每一个 DataNode所属的机架id(这也是NameNode采用NetworkTopology数据结构来存储数据节点的原因)。

    一个简单但没有优化的策略就是将副本存放在不同的机架上,这样可以防止当整个机架失效时数据
    的丢失,并且允许读数据的时候充分利用多个机架的带宽。这种策略设置可以将副本均匀分布在集
    群中,有利于当组件失效的情况下的均匀负载,但是,因为这种策略的一个写操作需要传输到多个
    机架,这增加了写的代价。

    HDFS的存放策略是将一个副本存放在本地机架节点上,一个副本存放在同一个机架的另一个节点上,最后一个副本放在不同机 架的节点上。这种策略减少了机架间的数据传输,提高了写操作的效率。机架的错误远远比节点的错误少,所以这种策略不会影响到数据的可靠性和可用性

    hdfs架构

    包括:

    1. HDFS Client:文件切分。文件上传 HDFS 的时候,Client 将文件切分成 一个一个的Block,然后进行存储。 与 NameNode 交互,获取文件的位置信息。 与 DataNode 交互,读取或者写入数据。 Client 提供一些命令来管理 HDFS,比如启动或者关闭HDFS。 Client 可以通过一些命令来访问 HDFS。
    2. NameNode:管理整个文件系统的元数据,以及每一个路劲(文件)对应的数据块信息,配置副本策略,处理客户端读写请求。
    3. DataNode:管理用户的文件数据块,每一个数据库都可以在多个datanode上存多个副本 执行数据块的读/写操作。
    4. Secondary NameNode:用来监控hdfs的富足后台程序,每隔一段时间获取hdfs元数据的快照,主要作用是辅助namenode管理数据信息,分担其工作量。 定期合并 fsimage和fsedits,并推送给NameNode。 在紧急情况下,可辅助恢复 NameNode。

    扩展

    • 块缓存:

      对于频繁访问的文件,其对应的块可能被缓存子啊datanode的内存中,以堆外块缓存的形式存在

    • 文件权限验证

      与linux的文件系统的机制类似。建议使用kerberos或ranger来做权限验证。

    4.2、hdfs shell操作

    俩种风格

    • hadoop fs开头
    • hdfs dfs开头
    • 俩种命令效果相同
    1. 帮助信息

      hadoop fs -help ls
      
      • 1
    2. 查看指定目录的文件列表

      hadoop fs -ls /
      
      • 1
    3. 在hdfs中创建文件

      hadoop fs -touchz /a.txt
      
      • 1
    4. 向文件追加内容

      hadoop fs -appenfToFile b.txt /a.txt  #将本地磁盘b.txt的内容追加到hdfs中a.txt的末尾 
      
      • 1
    5. 查看文件内容

      hadoop fs -cat /a.txt
      
      • 1
    6. 本地文件上传到hdfs

      hadoop fs -put ./shuguo.txt /test
      hadoop fs -copyFromLocal ./shuguo.txt /test  #复制上去
      hadoop fs -moveFromLocal ./shuguo.txt /test  #剪切上去
      
      • 1
      • 2
      • 3
    7. 下载文件

      hadoop fs -get /test/shuguo.txt ./
      hadoop fs -copyToLocal /test/shuguo.txt ./
      
      • 1
      • 2
    8. 创建目录

      hadoop fs -mkdir /shell
      
      • 1
    9. 删除文件

      hadoop fs -rm /a.txt
      hadoop fs -rm -skipTrash /a  #彻底删除,不放入hdfs垃圾桶
      
      • 1
      • 2
    10. 修改文件名称

      hadoop fs -mv /a  /b
      
      • 1
    11. 拷贝文件

      hadoop fs -cp /a.txt  /b
      
      • 1
    12. 递归删除

      hadoop fs -rm -r /a
      
      • 1
    13. 列出本地文件的内容(默认是hdfs文件系统)

      hadoop fs -ls file:///home/hadoop/
      
      • 1
    14. 查找文件

      hadoop fs -find / -name asd  #在hdfs的/目录下,查找asd的文件
      
      • 1

    组合查询

    1. hdfs和getconf结合

      1. 查询帮助信息

        hdfs getconf
        
        • 1
      2. 获取namenode的节点名称

        hdfs getconf -namenodes
        
        • 1
      3. 获取hdfs最小块信息

        hdfs getconf -confKey dfs.namenode.fs-limits.min-block-size
        
        • 1

        相同命令可以获取其他属性值

      4. hdfs的namenode的RPC地址

        hdfs getconf -nnRpcAddresses
        
        • 1
    2. hdfs和dfsadmin结合使用

      1. 查询帮助信息

        hdfs dfsadmin
        
        • 1
      2. 查看当前的模式

        hdfs dfsadmin -safemode get
        
        • 1
      3. 进入/退出安全模式

        hdfs dfsadmin -safemode enter
        hdfs dfsadmin -safemode leave
        
        • 1
        • 2
    3. hdfs和fsck结合使用

      1. 显示hdfs块信息

        hdfs fsck /02-041-0029.mp4 -file -blocks -locations #查看文件块信息
        
        • 1
    4. 其他命令

      1. 检查压缩库本地安装情况

        hadoop checknative
        
        • 1
      2. 格式化名称节点(一般只在初始搭建集群后用)

        hadoop namenode -format
        
        • 1
      3. 执行jar包

        hadoop jar a.jar pi 3 3
        yarn jar a.jar pi 3 3
        
        • 1
        • 2

    4.3、hdfs优缺点

    • 优点

      1. 高容错

        自动保存多个副本。都一个副本丢失后,可以自动恢复

      2. 适合批处理

        把数据位置暴露给计算框架,通过移动计算而不是移动数据,提高效率

      3. 是和大数据处理

        • 数据规模:可以处理GB,TB,甚至PB级别的大规模数据;
        • 文件规模:能够处理百万规模的文件数量,数量相当大,
        • 节点规模:能够处理10k节点的规模
      4. 流式数据访问

        一次写入,多次读取,不能随机修改,只能追加,保证数据的一致性

      5. 可构建在廉价机器上

        多副本机制,提高可靠性。提供容错和回复机制。

    • 缺点

      1. 不适合低延时数据访问
        1. 做不到低延时存储和读取数据,适合高吞吐的场景(某一时间写入大量数据)
      2. 无法高效的对大量小文件进行存储
        1. 存储文件太小,元数据占用namenode的内存(内存有限)
        2. 小文件的存储寻道时间会超过读取时间,违反了hdfs的设计目标
      3. 不支持并发写入,文件随机修改
        1. 一个文件只能有一个写,bu’enbuen那个多个线程同时写(租约机制)
        2. 仅支持数据append,不支持数据随即修改
    • hdfs安全模式

      是hdfs的一种特殊状态,只允许读,不允许写

      • 进入安全模式的方式:
        • 在namenode主节点启动时,hdfs进入安全模式(前30秒)
        • 在 block 丢失率达到 0.1%
        • 手动进入安全模式
      • 退出安全模式的方式
        1. 在 HDFS 集群正常冷启动完成后,自动退出
        2. 手动退出安全模式(但是并不能真正地解决问题)

    4.4、hdfs java编程

    1. 首先需要创建configuration
    2. 然后创建文件系统流
    3. 操作
    4. 关闭文件系统流

    例子

    //创建configuration
    Configuration configuration= new Configuration();
    configuration.set("fs.defaultFS","hdfs://192.168.51.110:8020");
    //创建文件系统流
    FileSystem fileSystem = FileSystem.get(configuration);
    //操作
    boolean mkdir= fileSystem.mkdirs(new Path("/gao/dir2"));
    System.out.println(mkdir);
    //关闭文件系统流
    fileSystem.close();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    4.5、datanode的工作机制及存储

    datanode工作机制

    1. 一个数据块在datanode上以文件存储与磁盘,包括俩个文件
      • 一个是数据本身
      • 一个是元数据包括数据块长度,块数据的校验和,以及时间戳
      • hdfs-site.xml里定义了数据的存储路径(dfs.datanode.data.dir)
    2. datanode启动后向namenode注册,通过后,周期性(6h)向namenode报告所有的块信息
    3. 心跳是每三秒一次
      • 心跳返回结果带有namenode给该给datanode的命令如复制块数据到另一个机器,或删除某个数据块等等
      • 如果超过十分钟没有收到datanode的心跳,则该节点不可用
    4. 集群运行中可以安全的加入和退出一些机器

    数据完整性

    1. 当客户端向hdfs写数据时候

      • 会计算数据的checksum(校验和),以保证数据通过网络出书,到达datanode后,没有丢失数据
    2. 当DataNode读取Block的时候,

      • 它会计算CheckSum。
      • 如果计算后的CheckSum,与Block创建时值不一样,说明Block已经损坏。
      • Client读取其他DataNode上的Block。
    3. DataNode在其文件创建后周期验证CheckSum。

    掉线时限参数设置

    DataNode工作机制_数据库

    需要注意的是hdfs-site.xml 配置文件中的heartbeat.recheck.interval的单位为毫秒,dfs.heartbeat.interval的单位为秒。

    datanode的目录结构

    • namenode需要自己创建,datanode在初始化时候自己创建,不需要额外格式化
    • 在主节点的 /usr/local/study/hadoop/data/datanodedata/current 可以查看版本号
      • storageID:存储 id 号
      • clusterID 集群 id,全局唯一
      • cTime 属性标记了 datanode 存储系统的创建时间,对于刚刚格式化的存储系统,这个属性为 0;但是在文件系统升级之后,该值会更新到新的时间戳。
      • datanodeUuid:datanode 的唯一识别码
      • storageType:存储类型
      • layoutVersion 是一个负整数。通常只有 HDFS 增加新 特性时才会更新这个版本号。

    datanode的多目录配置

    datanode也可以配置多个目录,每个目录存储的数据不一样,数据不是副本

    修改配置文件hdfs-site.xml

    <property>
    <name>dfs.datanode.data.dirname>
    	<value>
            file:///${hadoop.tmp.dir}/dfs/data1,
       		file:///${hadoop.tmp.dir}/dfs/data2
        value>
    property>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    4.6、hdfs的读写流程

    HDFS写流程

    1. 客户端通过对FileSystem.create() 对象创建建文件,DistributedFileSystem会创建输出流FSDataOutputStream。

    2. DistributedFileSystem 通过RPC远程调用名称节点,在文件系统的命名空间中创建一个新的文件,此时该文件中还没有相应的数据块。

    3. 名称节会执行一些检查,比如文件是否已经存在、客户端是都有权限创建文件等。检查通过后,名称节点会构造一个新文件,并添加文件信息。如果检查不通过,文件创建失败会向客户端抛一个 IOException 异常。

    4. DistributedFileSystem利用DFSOutputStream来实例化FSDataOutputStream,返回给客户端,客户端使用这个输出流写入数据(new DFSDataOutputStream)。

      了解:DFSOutputStream负责处理 DataNode 和 NameNode 之间的通信

    5. 客户端向输出流FSDataOutputStream中写入的数据会被分成一个个的分包,这些分包被放入DFSOutputStream对象的内部队列“dataQueue”

    6. 输出流FSDataOutputStream会向名称节点申请保存文件和副本数据块的若干个数据节点,这些数据节点(DataNode)形成一个数据流管道(Pipeline)。队列中的分包最后被打包成数据包,发往数据流管道的第一个数据节点, 第一个数据节点将数据包发送给第二个节点,第二个数据节点发送给第三个数据节点,数据包流经管道上个各个数据节点(即流水线复制策略)

    7. 接受确认包,因为各个数据节点位于不同的机器,数据需要通过网络发送。为了保证所有数据节点的数据都是准确的,接收到数据的数据节点要向发送这发送"确认包"(ACK Packet)。确认包沿着数据流管道逆流而上,从数据流管道一次经过各个数据节点并最终发往客户端(DataStreamer),客户端收到应答时,它将u四月 那个的分包从内部队列移除。不断执行(4)~(7)步,直到数据全部写完。

    8. 客户端调用close()方法关闭输出流。
         了解:此时,客户端不会再向输出流中写入数据,所有,当DFSOutputStream对象内部队列中的分包都收到应答后,就可以使用ClientProtocal.complete()方法通知名称节点关闭文件,完成一次正常的写文件的过程。

    在这里插入图片描述

    HDFS读流程

    1. 客户端通过FileSystem.open()打开文件,相应地,在HDFS文件系统DistributedFileSystem具体实现了FileSystem。调用open()后,DistributedFileSystem会创建输出流FSDataInputStream。对于HDFS而言,具体输入流是DFSInputStream.
    2. 在DFSInputStream的构造函数中,输入流是通过ClientProtocal.getBlockLocations()远程调用名称节点,获取文件开始部分数据块的保存位置。
      1. 对于该数据块,名称节点返回保存该数据块的所有数据节点的名称,同时根据客户端的远近对数据节点进行排序,
      2. 然后,DistributedFileSystem利用DFSOutputStream来实例化FSDataOutputStream,返回给客户端,同时返回给数据块的数据节点的地址。
    3. 获得输入流FSDataOutputStream后,客户端调用read()函数开始读取数据。输入流根据前面排序结果,选择距离客户端最近的数据节点建立连接并读取数据。
    4. 数据从数据节点读到客户端,当该数据块读取完毕时,FSDataOutputStream关闭和该数据节点的连接。
    5. 输入流通过getBlockLocations()方法查找下一个数据块(如果客户端缓存中已经包含了该数据块的位置信息,就不需要调用该方法)。
    6. 找到该数据块的最佳数据节点,读取数据。
    7. 当客户端读取完毕数据的时候,调用FSDataOutputStream的close()函数,关闭输入流。

    注意:在读取数据的过程中,如果客户端与数据节点通信时出现错误,就会尝试连接包含此数据块的下一个数据节点
    在这里插入图片描述

    hdfs 读写容错

    2.2 一文了解HDFS数据读、写原理及容错机制 - 知乎 (zhihu.com)

    4.7、namenode和secondarynamenode解析

    namenode负责集群的元数据管理,因为元素据随机访问,所有元数据需要存放在内存中。但如果只存在内存中,一旦断电,元数据丢失,整个集群就无法工作了。因此产生在磁盘中备份元数据的FsImage。

    当在内存中的元数据更新时,如果同时更新FsImage,就会导致效率过低,但如果不更新,就会发生一致性问题,一旦NameNode节点断电,就会产生数据丢失。==因此,引入Edits文件(只进行追加操作,效率很高)。每当元数据有更新或者添加元数据时,修改内存中的元数据并追加到Edits中。==这样,一旦NameNode节点断电,可以通过FsImage和Edits的合并,合成元数据。

    但是,如果长时间添加数据到Edits中,会导致该文件数据过大,效率降低,而且一旦断电,恢复元数据需要的时间过长。因此,需要定期进行FsImage和Edits的合并,如果这个操作由NameNode节点完成,又会效率过低。因此,引入一个新的节点SecondaryNamenode,专门用于FsImage和Edits的合并。

    checkpoint流程

    • 第一阶段:NameNode启动
      1. 第一次启动NameNode格式化后,创建Fsimage和Edits文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。
      2. 客户端对元数据进行增删改的请求。
      3. NameNode记录操作日志,更新滚动日志。
      4. NameNode在内存中对元数据进行增删改。
    • 第二阶段:Secondary NameNode工作
      1. Secondary NameNode询问NameNode是否需要CheckPoint。直接带回NameNode是否检查结果。
      2. Secondary NameNode请求执行CheckPoint。
      3. NameNode滚动正在写的Edits日志。
      4. 将滚动前的编辑日志和镜像文件拷贝到Secondary NameNode。
      5. Secondary NameNode加载编辑日志和镜像文件到内存,并合并。
      6. 生成新的镜像文件fsimage.chkpoint。
      7. 拷贝fsimage.chkpoint到NameNode。
      8. NameNode将fsimage.chkpoint重新命名成fsimage。

    namenode的多目录配置

    namenode也可以配置多个目录,每个目录存储的数据不一样,数据不是副本

    修改配置文件hdfs-site.xml

    <property>
    <name>dfs.namenode.data.dirname>
    	<value>
            file:///${hadoop.tmp.dir}/dfs/namenodedata1,
       		file:///${hadoop.tmp.dir}/dfs/namenodedata2
        value>
    property>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    4.8、hdfs小文件治理

    出现问题原因

    • NameNode存储着文件系统的元数据,每个文件、目录、块大概有150字节的元数据;

    • 因此文件数量的限制也由NN内存大小决定,如果小文件过多则会造成NN的压力过大

    • 且HDFS能存储的数据总量也会变小

    解决方案

    (1) HAR文件方案

    · 本质启动mr程序,所以需要启动yarn

    在这里插入图片描述

    用法:archive -archiveName .har -p [-r ]*

    在这里插入图片描述
    在这里插入图片描述

    第一步:创建归档文件

    注意:归档文件一定要保证yarn集群启动

    hadoop archive  -archiveName myhar.har -p /user/hadoop /user
    
    • 1

    第二步:查看归档文件内容

    hdfs dfs -ls -R /user/myhar.har
    hdfs dfs -ls -R har:///user/myhar.har
    
    • 1
    • 2

    第三步:解压归档文件

    hdfs dfs -mkdir -p /user/har
    hdfs dfs -cp har:///user/myhar.har/* /user/har/
    hdfs dfs -R /user/har
    
    • 1
    • 2
    • 3

    (2) Sequence Files方案

    在这里插入图片描述

    • SequenceFile文件,主要由一条条record记录组成;

    • 具体结构(如上图):

      • 一个SequenceFile首先有一个4字节的header(文件版本号)
      • 接着是若干record记录
      • 每个record是键值对形式的;键值类型是可序列化类型,如IntWritable、Text
      • 记录间会随机的插入一些同步点sync marker,用于方便定位到记录边界
    • SequenceFile文件可以作为小文件的存储容器;

      • 每条record保存一个小文件的内容
      • 小文件名作为当前record的键;
      • 小文件的内容作为当前record的值;
      • 如10000个100KB的小文件,可以编写程序将这些文件放到一个SequenceFile文件。
    • 一个SequenceFile是可分割的,所以MapReduce可将文件切分成块,每一块独立操作。

    • 不像HAR,SequenceFile支持压缩。记录的结构取决于是否启动压缩

      • 支持两类压缩:

        • 不压缩NONE,如上图
        • 压缩RECORD,如上图
        • 压缩BLOCK,如下图,①一次性压缩多条记录;②每一个新块Block开始处都需要插入同步点
      • 在大多数情况下,以block(注意:指的是SequenceFile中的block)为单位进行压缩是最好的选择

      • 因为一个block包含多条记录,利用record间的相似性进行压缩,压缩效率更高

      • 把已有的数据转存为SequenceFile比较慢。比起先写小文件,再将小文件写入SequenceFile,一个更好的选择是直接将数据写入一个SequenceFile文件,省去小文件作为中间媒介.

        在这里插入图片描述

    • 向SequenceFile写入数据

      package com.kaikeba.hadoop.sequencefile;
      
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.FileSystem;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.IOUtils;
      import org.apache.hadoop.io.IntWritable;
      import org.apache.hadoop.io.SequenceFile;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.io.compress.BZip2Codec;
      
      import java.io.IOException;
      import java.net.URI;
      
      public class SequenceFileWriteNewVersion {
      
          //模拟数据源;数组中一个元素表示一个文件的内容
          private static final String[] DATA = {
                  "The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models.",
                  "It is designed to scale up from single servers to thousands of machines, each offering local computation and storage.",
                  "Rather than rely on hardware to deliver high-availability, the library itself is designed to detect and handle failures at the application layer",
                  "o delivering a highly-available service on top of a cluster of computers, each of which may be prone to failures.",
                  "Hadoop Common: The common utilities that support the other Hadoop modules."
          };
      
          public static void main(String[] args) throws IOException {
              //输出路径:要生成的SequenceFile文件名
              String uri = "hdfs://node01:8020/writeSequenceFile";
      
              Configuration conf = new Configuration();
              FileSystem fs = FileSystem.get(URI.create(uri), conf);
              //向HDFS上的此SequenceFile文件写数据
              Path path = new Path(uri);
      
              //因为SequenceFile每个record是键值对的
              //指定key类型
              IntWritable key = new IntWritable(); //key数字 -> int -> IntWritable
              //指定value类型
              Text value = new Text();//value -> String -> Text
      
              //创建向SequenceFile文件写入数据时的一些选项
              //要写入的SequenceFile的路径
              SequenceFile.Writer.Option pathOption       = SequenceFile.Writer.file(path);
              //record的key类型选项
              SequenceFile.Writer.Option keyOption        = SequenceFile.Writer.keyClass(IntWritable.class);
              //record的value类型选项
              SequenceFile.Writer.Option valueOption      = SequenceFile.Writer.valueClass(Text.class);
              //SequenceFile压缩方式:NONE | RECORD | BLOCK三选一
              //方案一:RECORD、不指定压缩算法
      //        SequenceFile.Writer.Option compressOption   = SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD);
      //        SequenceFile.Writer writer = SequenceFile.createWriter(conf, pathOption, keyOption, valueOption, compressOption);
      
              //方案二:BLOCK、不指定压缩算法
      //        SequenceFile.Writer.Option compressOption   = SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK);
      //        SequenceFile.Writer writer = SequenceFile.createWriter(conf, pathOption, keyOption, valueOption, compressOption);
      
              //方案三:使用BLOCK、压缩算法BZip2Codec;压缩耗时间
              //再加压缩算法
              BZip2Codec codec = new BZip2Codec();
              codec.setConf(conf);
              SequenceFile.Writer.Option compressAlgorithm = SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD, codec);
              //创建写数据的Writer实例
              SequenceFile.Writer writer = SequenceFile.createWriter(conf, pathOption, keyOption, valueOption, compressAlgorithm);
      
      
              for (int i = 0; i < 100000; i++) {
                  //分别设置key、value值
                  key.set(100000 - i);
                  value.set(DATA[i % DATA.length]); //%取模 3 % 3 = 0;
                  System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
                  //在SequenceFile末尾追加内容
                  writer.append(key, value);
              }
              //关闭流
              IOUtils.closeStream(writer);
          }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
    • 命令查看SequenceFile内容

      hadoop fs -text /writeSequenceFile
      
      • 1
    • 读取SequenceFile文件

      package com.kaikeba.hadoop.sequencefile;
      
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.IOUtils;
      import org.apache.hadoop.io.SequenceFile;
      import org.apache.hadoop.io.Writable;
      import org.apache.hadoop.util.ReflectionUtils;
      
      import java.io.IOException;
      
      public class SequenceFileReadNewVersion {
      
          public static void main(String[] args) throws IOException {
              //要读的SequenceFile
              String uri = "hdfs://node01:8020/writeSequenceFile";
              Configuration conf = new Configuration();
              Path path = new Path(uri);
      
              //Reader对象
              SequenceFile.Reader reader = null;
              try {
                  //读取SequenceFile的Reader的路径选项
                  SequenceFile.Reader.Option pathOption = SequenceFile.Reader.file(path);
      
                  //实例化Reader对象
                  reader = new SequenceFile.Reader(conf, pathOption);
      
                  //根据反射,求出key类型对象
                  Writable key = (Writable)
                          ReflectionUtils.newInstance(reader.getKeyClass(), conf);
                  //根据反射,求出value类型对象
                  Writable value = (Writable)
                          ReflectionUtils.newInstance(reader.getValueClass(), conf);
      
                  long position = reader.getPosition();
                  System.out.println(position);
      
                  while (reader.next(key, value)) {
                      String syncSeen = reader.syncSeen() ? "*" : "";
                      System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value);
                      //移动到下一个record开头的位置
                      position = reader.getPosition(); // beginning of next record
                  }
              } finally {
                  IOUtils.closeStream(reader);
              }
          }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50

    4.9、hdfs其他功能介绍

    多个集群之间数据拷贝

    在我们实际工作当中,极有可能会遇到将测试集群的数据拷贝到生产环境集群,或者将生产环境集群的数据拷贝到测试集群,那么就需要我们在多个集群之间进行数据的远程拷贝,hadoop自带也有命令可以帮我们实现这个功能

    1. 本地文件拷贝scp
    cd /kkb/soft
    scp -r jdk-8u141-linux-x64.tar.gz hadoop@node02:/kkb/soft
    
    • 1
    • 2
    1. 集群之间的数据拷贝distcp
    hadoop distcp hdfs://node01:8020/jdk-8u141-linux-x64.tar.gz hdfs://cluster2:8020/
    
    • 1

    hdfs快照snapshot管理

    快照顾名思义,就是相当于对我们的hdfs文件系统做一个备份,我们可以通过快照对我们指定的文件夹设置备份,但是添加快照之后,并不会立即复制所有文件,而是指向同一个文件。当写入发生时,才会产生新文件。快照的管理一般是运维人员来做。

    基本用法:

    • 开启快照权限:

      1、开启指定目录的快照功能(创建快照之前要执行次步骤)
      hdfs dfsadmin -allowSnapshot 路径
      2、禁用指定目录的快照功能(默认就是禁用状态)
      hdfs dfsadmin -disallowSnapshot 路径
      
      • 1
      • 2
      • 3
      • 4
    • 创建快照:

      3、给某个路径创建快照snapshot
      hdfs dfs -createSnapshot 路径
      
      4、指定快照名称进行创建快照snapshot
      hdfs dfs -createSanpshot 路径 名称
      
      5、给快照重新命名
      hdfs dfs -renameSnapshot 路径 旧名称 新名称
      
      6、列出当前用户所有可快照目录
      hdfs lsSnapshottableDir
      
      7、比较两个快照的目录不同之处
      hdfs snapshotDiff 路径1 路径2
      
      8、删除快照snapshot
      hdfs dfs -deleteSnapshot
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17

    hdfs回收站

    任何一个文件系统,基本上都会有垃圾桶机制,也就是删除的文件,不会直接彻底清掉,我们一把都是将文件放置到垃圾桶当中去,过一段时间之后,自动清空垃圾桶当中的文件,这样对于文件的安全删除比较有保证,避免我们一些误操作,导致误删除文件或者数据

    1. 回收站配置俩个参数

      fs.trash.interval 默认值为0,0表示禁用回收站,不是0就表示启动了回收站,这个代表回收站的文件的存活时间,过了这个时间文件就会被删掉。
      fs.trash.checkpoint.interval=0 默认值也为0,表示检查回收站的间隔时间。
      要求fs .interval <=fs.trash.interval。
      
      • 1
      • 2
      • 3
    2. 启用回收站

      修改core-site.xml

      fs.trash.interval
      10080
      
      • 1
      • 2
    3. 通过javaAPI删除的数据,不会进入回收站,需要调用moveToTrash()方法才会进入回收站

      Trash trash = New Trash(conf);
      trash.moveToTrash(path);
      
      • 1
      • 2
    4. 查看回收站

      回收站在集群的/user/hadoop/.Trash/ 这个路径下

    5. 回复回收站数据

      hdfs dfs -mv trashFileDir hdfsdir
      //trashFileDir :回收站的文件路径
      //hdfsdir :将文件移动到hdfs的哪个路径下
      
      • 1
      • 2
      • 3
    6. 清空回收站

      hdfs dfs -expunge
      
      • 1

    5、MapReduce

    5.1、概念

    定义

    Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架。

    Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上。

    思想

    思想核心为分而治之

    Map负责分,把复杂任务分解为简单任务(这些任务可以并行计算,彼此之间几乎没有依赖关系)

    Reduce负责合,即对map阶段的结果进行全局汇总

    编程模型

    image-20220603172820674

    • map阶段:

      • map阶段有一个关键的map()函数

      • 此函数的输入是键值对

      • 输出是一系列键值对,输出写入本地磁盘

    • reduce阶段

      • reduce阶段有一个关键的函数reduce()函数
      • 此函数的输入也是键值对(map的输出(kv对))
      • 输出也是一系列键值对,结果最终写入hdfs
    • Map&Reduce

    5.2、mapreduce过程

    mapreduce开发的八个步骤

    • map的2个步骤
      • 第一步:设置inputFormat类,将数据切分成为key,value对;此kv对对作为第二部的输入
      • 第二步:自定义map逻辑,处理我们第一步的传过来的kv对数据,然后转换成新的key,value对,并输出
    • shuffle阶段的4个步骤
      • 第三步:对上一步输出的key,value对进行分区。(相同key的kv对属于同一分区)
      • 第四步:对每个分区的数据按照key进行排序
      • 第五步:对分区中的数据进行规约(combine操作),降低shu’jushuju 的网络拷贝(可选步骤)
      • 第六步:对排序后的kv对进行分组;分组的过程中,key相同的kv对为一组;将相同的kv对的所有value放到一个集合当中(每组数据调用一次reduce方法)
    • reduce阶段的2个步骤
      • 第七步:对多个map的任务进行合并,排序,写reduce函数自己的逻辑,对输入的key,value对进行处理,转换成新的key,value对进行输出
      • 第八步:设置将输出的key,value对数据保存到文件中

    5.3、常用数据类型

    hadoop没有沿用java当中的基本的数据类型,而是自己进行了一套数据封装,常用的如下:

    java中的类型hadoop writable类型
    BooleanBooleanWritable
    ByteByteWritable
    IntIntWritable
    FloatFloatWritable
    LongLongWritable
    DoubleDoubleWritable
    StringText
    MapMapWritable
    ArrayArrayWritable
    byte[]BytesWritable

    5.4、词频统计实现

    map:

    public class myMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            //获得当前行数据
            String line=value.toString();
            //获得一个个单词
            String[] words = line.split(",");
            //每个单词编程kv对
            for (String word : words) {
                //将kv对输出出去
                context.write(new Text(word),new IntWritable(1));
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    reduce:

    public class myReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            int sum=0;
            for (IntWritable value : values) {
                int count = value.get();
                sum+=count;
            }
            context.write(key,new IntWritable(sum));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    主函数:

    public class wordCount extends Configured implements Tool {
        public static void main(String[] args) throws Exception {
            int run = ToolRunner.run(new Configuration(), new wordCount(), args);
            System.exit(run);
        }
        @Override
        public int run(String[] args) throws Exception {
            Job job=Job.getInstance(super.getConf(),"wordCount");
            job.setJarByClass(wordCount.class);
            //第一步:读取文件,解析成key,value对
            job.setInputFormatClass(TextInputFormat.class);
            TextInputFormat.addInputPath(job,new Path(args[0]));
            //第二步:自定义map逻辑,对kv转换成新的kv输出
            job.setMapperClass(myMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            //第三步:分区:相同的k数据发送到同一个reduce里面去,key合并,value形成一个集合
            //第四步:排序 对key2排序,字典顺序排序
            //第五步:规约 combiner过程,调优步骤 可选
            //第六步:分组
            //第七步:自定义reduce逻辑,转换kv
            job.setReducerClass(myReducer.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            //第八步:输出
            job.setOutputFormatClass(TextOutputFormat.class);
            TextOutputFormat.setOutputPath(job,new Path(args[1]));
    
            job.setNumReduceTasks(Integer.parseInt(args[2]));
            boolean b = job.waitForCompletion(true);
            return b?0:1;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    jar包运行方式

     //hadoop jar jar包  主类位置 输入文件位置(hdfs中) 输出文件位置(hdfs中) 进程数目
    hadoop jar MR-1.0-SNAPSHOT.jar com.wordcount.wordCount /a.txt /wordcount02 3
    
    • 1
    • 2

    5.5、MR切片机制

    img

    • 在mapreduce中,每个maptask处理一个切片split

    • MapTask并行度决定机制

    • 数据块:Block是HDFS物理上把数据分成一块一块。

    • 数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。

    • 分片大小

      Math.max(minSize, Math.min(maxSize, blockSize));
      
      • 1
    • 如何空值maptask数量

      • 调整maxSize和minSize
      • maxSize调的比blocksize小,那么切片会变小
      • minsize调的比blockSize大,那么切片会变大

    5.6、inputFormat数据输入详解

    job提交流程源码

    waitForCompletion()
    submit();
    //1建立连接
    connect();
    //1)创建提交Job的代理
    newCluster(getConfiguration());
    //(1)判断是本地yarn还是远程
    initialize(jobTrackAddr,conf);
    //2提交job
    submitter.submitJobInternal(Job.this,cluster)
    //1)创建给集群提交数据的Stag路径
    PathjobStagingArea=
    JobSubmissionFiles.getStagingDir(cluster,conf);
    //2)获取jobid,并创建Job路径
    JobIDjobId=submitClient.getNewJobID();
    //3)拷贝jar包到集群
    copyAndConfigureFiles(job,submitJobDir);
    rUploader.uploadFiles(job,jobSubmitDir);
    //4)计算切片,生成切片规划文件
    writeSplits(job,submitJobDir);
    maps=writeNewSplits(job,jobSubmitDir);
    input.getSplits(job);
    //5)向Stag路径写XML配置文件
    writeConf(conf,submitJobFile);
    conf.writeXml(out);
    //6)提交Job,返回提交状态
    status=submitClient.submitJob(jobId,
    submitJobDir.toString(),job.getCredentials());
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    img

    FileInputFormat切片源码解析

    1. 程序先找到你数据存储的目录
    2. 开始遍历处理(规划切片)目录下的每个文件
    3. 遍历第一个文件xx.txt
      1. 获取文件大小fs.sizeOf(xx.txt)
      2. 计算切片大小( Math.max(minSize, Math.min(maxSize, blockSize));)
      3. 默认情况下,切片大小=blocksize
      4. 开始切,形成第一个切片,然后第二个,依次往后(其中每切一片,都要将剩下部分的大小和块的1.1倍作比较,大于这个大小才会继续切,否则就不切了)
      5. 将切片信息写到一个切片规划文件中。
      6. 整个切片的核心过程 在getSplits(job)方法中
      7. InputSplit只记录了切片的元数据信息,比如起始位置、长度以及所在的节点列表等。
    4. 提交切片规划文件到YARN上,YARN上的MrAppMaster就可以根据切片规划文件计算开启MapTask个数。

    FileInputFormat切片机制

    • FileInputFormat中默认的切片机制

      1. 简单地按照文件的内容长度进行切片
      2. 切片大小,默认等于block大小
      3. 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
    • FileInputFormat切片大小的参数配置

      1. 通过分析源码,在FileInputFormat中,计算切片大小的逻辑:
        Math.max(minSize, Math.min(maxSize, blockSize));
      2. 切片主要由这几个值来运算决定
        mapreduce.input.fileinputformat.split.minsize=1 默认值为1
        mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默认值Long.MAXValue
        因此,默认情况下,切片大小=blocksize。
        maxsize(切片最大值):参数如果调得比blocksize小,则会让切片变小,而且就等于配置的这个参数的值。
        minsize(切片最小值):参数调的比blockSize大,则可以让切片变得比blocksize还大。
    • 获取切片信息API

      // 根据文件类型获取切片信息
      FileSplit inputSplit = (FileSplit) context.getInputSplit();
      // 获取切片的文件名称
      String name = inputSplit.getPath().getName();
      
      • 1
      • 2
      • 3
      • 4

    FileInputFormat子类

    输入类FileInputFormat(切片)及其4个实现类(kv)的用法

    mapreduce的切片机制——里面有一些例子

    5.7、mapreduce分区(Partitioner)

    mapreduce 分区

    分区指的是:将MapReduce统计的结果按照条件输出到不同的文件中。

    public int getPartition(K key,V value,int numReduceTasks){
        return (key.hashCode()&Intger.MAX_VALUE)%numReduceTasks;
    }
    
    • 1
    • 2
    • 3

    **默认分区:**是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。

    自定义Partitioner步骤:

    1. 自定义继承Partitioner,重写getPartition()方法
    2. 在Job驱动中,设置自定义Partitioner
    3. 自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask

    分区总结

    • 如果ReduceTask的数量>getPartition的结果数,则会多生产几个空的输出文件;
    • 如果1ReduceTask的数量
    • 如果ReduceTask的数量=1,无论MapTask端输出多少分区文件,最终都是ReduceTask,最终也只是产生一个结果文件;
    • 分区号必须从零开始,逐一累加;

    5.8、mapreduce序列化与反序列化

    Hadoop MapReduce 序列化 与 序列化

    Writble是hadoop的序列化格式

    • hadoop定义了这样一个接口。一个类需要支持可序列化只需要实现这个接口即可。

    Writabley有一个子接口是WritableComparable

    • WritableComparable即可实现序列化也可实现反序列化

    5.9、mapreduce排序

    Hadoop MapReduce 排序

    • 排序是 MR 中非常重要的操作之一,

      • MapTask 和 ReduceTask 都会对数据按照 key 进行排序
      • 该操作是默认行为。任何 MR 程序中数据均会被排序,而不看逻辑是否需要。
      • 默认排序是按照字典顺序排序的,且实现该排序的方法是快速排序
    • MapTask 中,

      它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率到一定的阈值,再对缓冲区数据进行一次快排,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。

    • ReduceTask 中,

      • 它从每个 MapTask 上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写到磁盘上,否则储存在内存上。

      • 如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件。

      • 如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完后,ReduceTask 统一对内存和磁盘上的所有数据进行一次归并排序。

    5.10、mapreduce 合并(combine)

    • Combiner合并也属于Shuffle机制
    • Combiner的父类是Reducer
    • Combiner和Reducer 的区别在于运行时的位置
      • Combiner是在每一个MapTask所在的节点运行的
      • Reducer接收全局所有Mapper的输出结果
    • Combiner的意义就是对每一个MapTask的输出进行局部汇总,主要目的是为了减小网络的传输量
    • 并不是所有的场景都能适用Combiner,一般主要用于求和操作

    实现Combiner的步骤就是继承Reducer,最后在Driver类通过setCombinerClass(类.class)设置进去就好了

    5.11、mapreduce分组

    MapReduce中分组组件

    分组是mapreduce中shuffle组件当中reduce端的一个功能组件,主要的作用是决定哪些数据作 为一组

    5.12、outputFormat详解

    Hadoop OutputFormat

    OutputFormat可以说是MapReduce处理过程的最后一步,由它负责把输出的信息输送到哪个文件或者哪个数据库,等等

    OutputFormatMapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口,下图为OutputFormat的几个常见实现类(请忽略画横线的,那是我自定义的)

    它的默认输出格式为TextOutputFormat

    自定义output

    • 自定义一个类继承FileOutputFormat,实现RecordWriter方法,实际上只需要调用RecordWriter最终返回就可以了
    • 改写RecordWriter方法,需要实现writeclose方法

    5.13、shuffle的压缩

    MapReduce 数据压缩

    压缩的好处和坏处:

    • 压缩的优点:减少磁盘IO、减少磁盘存储空间
    • 压缩的缺点:增加CPU开销

    压缩方式

    • 代码中压缩,如链接

    • 修改mapred-site.xml进行压缩

      #map数据压缩
      <property>
      	<name>mapreduce.map.output.compressname>
          <value>truevalue>
      property>
      <property>
      	<name>mapreduce.map.output.compress.codecname>
          <value>org.apache.hadoop.io.compress.SnappyCodecvalue>
      property>
      #reduce端压缩
      <property>
      	<name>mapreduce.output.fileoutputformat.compressname>
          <value>truevalue>
      property>
      <property>
      	<name>apreduce.output.fileoutputformat.compress.typename>
          <value>RECODEvalue>
      property>
      <property>
      	<name>apreduce.output.fileoutputformat.compress.codecname>
          <value>org.apache.hadoop.io.compress.SnappyCodecvalue>
      property>
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22

    5.14、计数器,累加器

    MapReduce 计数器

    5.15、MR的join操作

    MapReduce Join应用 | ReduceJoin案例实操

    MapReduce Join应用 | MapJoin案例实操

    5.16、MapTask与ReduceTask工作机制

    (6条消息) Hadoop MapReduce 内核源码解析 | MapTask与ReduceTask工作机制_lesileqin的博客-CSDN博客

    6、yarn

    Hadoop Yarn详解

    Yarn 内存分配管理机制及相关参数配置

    Yarn 调度器Scheduler详解

  • 相关阅读:
    记一次hook mac地址实现伪装硬件码
    【C语言】转义字符\xhh和\ddd到底如何判断?被兔子个数支配的恐惧你也有吗?(每日小细节001)
    C语言枚举和联合体
    在visual studio里配置Qt插件并运行Qt工程
    Git 2.37 发布,带来重大变化!!
    基于SSM框架流浪猫救援网站的设计与实现 毕业设计-附源码201502
    WEB前端网页设计 HTML CSS 网页设计参数 - 【浏览器背景图片】
    2022FW柯罗芭KLOVA 用极简主义演绎服装美学
    广告学概论试题总汇
    数据要素价值:在数字时代的血脉中流淌
  • 原文地址:https://blog.csdn.net/m0_46272485/article/details/126125154