前言:
本篇文章仅用于个人学习笔记,仅限于参考,内容如有侵权,请联系删除,谢谢!
一:大数据简介
大数据概念 :
指无法在一定时间范围内用常规软件工具进行捕管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、捉、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。
大数据主要解决,海量数据的采集、存储和分析计算问题。
按顺序给出数据存储单位: bit、 Byte、KB、MB、GB、TB、PB、EB、ZB、YB、BB、NB、DB.
1Byte = 8bit 1K = 1024Byte 1MB = 1024K 1G = 1024M 1T = 1024G1P = 1024T
大数据特点:
1、Volume ( 大量)
截至目前,人类生产的所有印刷材料的数据量是200PB,而历史上全人类总共说过的话的数据量大约是5EB。当前,典型个人计算机硬盘的容量为TB量级,而些大企业的数据量已经接近EB量级。
2、Velocity (高速 )
这是大数据区分于传统数据挖掘的最显著特征。根据IDC的“数字宇宙”的报告,预计到2025年,全球数据使用量将达到163ZB。在如此海量的数据面前,处理数据的效率就是企业的生命。
3、Variety (多样)
这种类型的多样性也让数据被分为结构化数据和非结构化数据。相对于以往便于存储的以数据库/文本为主的结构化数据,非结构化数据越来越多,包括网络日志、音频、视频、图片、地理位置信息等,这些多类型的数据对数据的处理能力提出了更高要求
4、Value ( 低价值密度 )
价值密度的高低与数据总量的大小成反比比如,在一天监控视频中,我们只关心宋宋老师晚上在床上健身那一分钟,如何快速对有价值数据“提纯”成为目前大数据背景下待解决的难题
大数据应用场景:
1、抖音:推荐的都是你喜欢的视频
2、电商站内广告推荐:给用户推荐可能喜欢的商品
3、零售:分析用户消费习惯,为用户购买商品提供方便,从而提升商品销量
4、物流仓储: 京东物流,上午下单下午送达、下午下单次日上午送达
5、保险: 海量数据挖掘及风险预测,助力保险行业精准营销,提升精细化定价能力。
6、金融: 多维度体现用户特征,帮助金融机构推荐优质客户,防范欺诈风险
7、房产:大数据全面助力房地产行业,打造精准投策与营销,选出更合适的地,建造更合适的楼卖给更合适的人。
8、人工智能 + 5G + 物联网 + 虚拟与现实
二:hadoop入门:
hadoop概述:
1) Hadoop是一个由Apache基金会所开发的分布式系统基础架构
2)主要解决,海量数据的存储和海量数据的分析计算问题
3)广义上来说,Hadoop通常是指一个更广泛的概念-Hadoop生态圈
hadoop的优势:
1)高可靠性: Hadoop底层维护多个数据副本,所以即使Hadoop某个计算元素或存储出现故障,也不会导致数据的丢失。
2)高扩展性:在集群间分配任务数据,可方便的扩展数以干计的节点
3)高效性: 在MapReduce的思想下,Hadoop是并行工作的,以加快任务处
理速度。
4)高容错性: 能够自动将失败的任务重新分配
hadoop三大版本区别:
HDFS概述:
Hadoop Distributed File System,简称 HDFS,是一个分布式文件系统。
1)NameNode ( mn ):存储文件的元数据,如文件名,文件目录结构,文件属性( 生成时间、副本数文件权限),以及每个文件的块列表和块所在的DataNode等。
2)DataNode(dn): 在本地文件系统存储文件块数据,以及块数据的校验和。
3)Secondary NameNode(2nn): 每隔一段时间对NameNode元数据备份。
YARN概述:
YetAnother Resource Negotiator 简称 YARN,另一种资源协调者,是 Hadoop 的资源管理器。
MapReduce概述:
MapReduce 将计算过程分为两个阶段: Map 和 Reduce
1) Map 阶段并行处理输入数据
2) Reduce 阶段对 Map 结果进行汇总
hadoop安装,环境搭建:
请参考另一篇博客:写文章-CSDN创作中心https://mp.csdn.net/mp_blog/creation/editor/131597332,该文章记录了hadoop安装所需的环境,工具准备,网络配置等。(首先需要安装VMware,之后安装LInux系统,复制虚拟机三台形成一个集群,之后配置静态IP,保证三台机器可以正常通信。)
CSDN
hadoop_hdfs详细介绍:
hdfs产生背景:
随着数据量越来越大,在一个操作系统存不下所有的数据,那么就分配到更多的操作系统管理的磁盘中,但是不方便管理和维护,迫切需要一种系统来管理多台机器上的文件,这就是分布式文件管理系统。HDFS 只是分布式文件管理系统中的一种。
HDFS定义:
HDFS (Hadoop Distributed File System),它是一个文件系统,用于存储文件,通过目录树来定位文件:其次,它是分布式的,由很多服务器联合起来实现其功能,集群中的服务器有各自的角色。
HDFS 的使用场景:适合一次写入,多次读出的场景。一个文件经过创建、写入和关闭 之后就不需要改变。
HDFS优缺点:
优点:
缺点:
HDFS组成架构:
NameNode(nn):
就是Master,它 是一个主管、管理者。
(1)管理HDFS的名称空间;
(2)配置副本策略;
(3)管理数据块(Block)映射信息;
(4)处理客户端读写请求。
DataNode:
就是Slave。NameNode 下达命令,DataNode执行实际的操作。
(1)存储实际的数据块; (2)执行数据块的读/写操作。
Client:就是客户端
(1)文件切分。文件上传HDFS的时候,Client将文件切分成一个一个的Block,然后进行上传;
(2)与NameNode交互,获取文件的位置信息;
(3)与DataNode交互,读取或者写入数据;
(4)Client提供一些命令来管理HDFS,比如NameNode格式化;
(5)Client可以通过一些命令来访问HDFS,比如对HDFS增删查改操作;
Secondary NameNode:
并非NameNode的热备。当NameNode挂掉的时候,它并不 能马上替换NameNode并提供服务。 (1)辅助NameNode,分担其工作量,比如定期合并Fsimage和Edits,并推送给NameNode ; (2)在紧急情况下,可辅助恢复NameNode。
关系内容如下:
HDFS文件块大小:
HDFS中的文件在物理上是分块存储(Block),块的大小可以通过配置参数 ( dfs.blocksize)来规定,默认大小在Hadoop2.x/3.x版本中是128M,1.x版本中是64M。
思考:为什么块的大小不能设置太小,也不能设置太大
(1)HDFS的块设置太小,会增加寻址时间,程序一直在找块的开始位置;
(2)如果块设置的太大,从磁盘传输数据的时间会明显大于定位这个块开 始位置所需的时间。导致程序在处理这块数据时,会非常慢。
总结:HDFS块的大小设置主要取决于磁盘传输速率。
HDFS_shell操作(文件上传,下载):
准备工作先把hadoop集群起来:(集群搭建请参考上面链接)
在hadoop2上:
sbin/start-dfs.sh
sbin/stop-dfs.sh
hadoop fs -mkdir /wcinput hdfs上根目录下创建文件夹
上传文件到hdfs上:
hadoop fs -put wcinput/word.txt /wcinput 上传本地文件到hdfs上面
hadoop fs -moveFromLocal ./suguo.txt /sanguo 从本地剪切粘贴到 HDFS,本地会消失
hadoop fs -copyFromLocal b.txt /shanguo 从本地文件系统中拷贝文件到 HDFS 路径去,本地还在
hadoop fs -put c.txt /shanguo 等同于 copyFromLocal,生产环境更习惯用 put,本地还在
hadoop fs -appendToFile a1.txt /shanguo/a.txt 追加一个文件到已经存在的文件末尾
HDFS下载文件到本地:
-copyToLocal:从 HDFS 拷贝到本地
Hadoop fs -copyToLocal /shanguo/a.txt ./
-get:等同于 copyToLocal,生产环境更习惯用 get
Hadoop fs -get /shanguo/a.txt ./a2.txt 这里可以改名字
HDFS的直接命令:
1)-ls 显示目录信息
hadoop fs -ls /sanguo
2)-cat 显示文件内容
hadoop fs -cat /sanguo/a.txt
3)-chgrp、-chmod、-chown:Linux 文件系统中的用法一样,修改文件所属权限
hadoop fs -chmod 666 /sanguo/shuguo.txt
hadoop fs -chown atguigu:atguigu /sanguo/shuguo.txt
4)-mkdir:创建路径
hadoop fs -mkdir /jinguo
5)-cp:从 HDFS 的一个路径拷贝到 HDFS 的另一个路径
hadoop fs -cp /sanguo/shuguo.txt /jinguo
6)-mv:在 HDFS 目录中移动文件
hadoop fs -mv /sanguo/wuguo.txt /jinguo
7)-tail:显示一个文件的末尾 1kb 的数据
hadoop fs -tail /jinguo/shuguo.txt
8)-rm:删除文件或文件夹
hadoop fs -rm /sanguo/shuguo.txt
9)-rm -r:递归删除目录及目录里面内容
hadoop fs -rm -r /sanguo
10)-du 统计文件夹的大小信息
hadoop fs -du -s -h /jinguo
hadoop fs -du -h /jinguo
11)-setrep:设置 HDFS 中文件的副本数量
hadoop fs -setrep 10 /jinguo/shuguo.txt
这里设置的副本数只是记录在 NameNode 的元数据中,是否真的会有这么多副本,还得 看 DataNode 的数量。因为目前只有 3 台设备,最多也就 3 个副本,只有节点数的增加到 10 台时,副本数才能达到 10。
hdfs_api操作:这里不再记录需要的话回顾视频。p46
HDFS的读写流程:
1)客户端通过DistributedFileSystem模块向Name Node请求上传文件,Name Node检查目录文件是否已经存在,父目录是否存在。
2)Name Node返回是否可以上传。
3)客户端请求第一个Block上传到哪几个DATANode服务器上
4)Namenode返回3个DATa Node节点,分别为dn1,dn2,dn3。
5)客户端通过FSDataOutputStream模块请求往dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3,将这个通信管道建立完成。
6)dn1,dn2,dn3逐级应答客户端。
7)客户端开始往dn1上传第一个Block(先从磁盘读取数据放到一个本地内存缓存),以Packet为单位,dn1收到一个Packet就会传给dn2,dn2传给dn3;dn1每传一个packet会放入一个应答队列等待应答。
8)当一个Block传输完成之后,客户端再次请求Name Node上传第二个Block的服务器。(重复执行3-7步)。
网络拓扑-节点距离计算
在 HDFS 写数据的过程中,NameNode 会选择距离待上传数据最近距离的 DataNode 接 收数据。那么这个最近距离怎么计算呢?
节点距离:两个节点到达最近的共同祖先的距离总和。
HDFS读数据流程:
1)客户端通过Distributed File System向Name Node请求下载文件,Name Node通过查询元数据,找到文件块所在的DATa Node地址。
2)挑选一台DATa Node(就近原则,然后随机)服务器,请求读取数据。
3)DataNode开始传输数据给客户端(从磁盘里面读取数据输入流,以Packet为单位来做校验)。
4)客户端以Packet为单位接收,先在本地缓存,然后写入目标文件。
NameNode 和 SecondaryNameNode
NN 和 2NN 工作机制:
思考:
NameNode
中的元数据是存储在哪里的?
首先,我们做个假设,如果存储在
NameNode
节点的磁盘中,因为经常需要进行随机访
问,还有响应客户请求,必然是效率过低。因此,元数据需要存放在内存中。但如果只存在
内存中,一旦断电,元数据丢失,整个集群就无法工作了。
因此产生在磁盘中备份元数据的
FsImage
。
这样又会带来新的问题,当在内存中的元数据更新时,如果同时更新
FsImage
,就会导
致效率过低,但如果不更新,就会发生一致性问题,一旦
NameNode
节点断电,就会产生数
据丢失。
因此,引入
Edits
文件(只进行追加操作,效率很高)。每当元数据有更新或者添
加元数据时,修改内存中的元数据并追加到
Edits
中。
这样,一旦
NameNode
节点断电,可
以通过
FsImage
和
Edits
的合并,合成元数据。
但是,如果长时间添加数据到
Edits
中,会导致该文件数据过大,效率降低,而且一旦
断电,恢复元数据需要的时间过长。因此,需要定期进行
FsImage
和
Edits
的合并,如果这
个操作由
NameNode
节点完成,又会效率过低。
因此,引入一个新的节点
SecondaryNamenode
,
专门用于
FsImage
和
Edits
的合并。
Name Node工作机制:
1)第一阶段:NameNode 启动
(
1
)第一次启动
NameNode
格式化后,创建
Fsimage
和
Edits 文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。
(
2
)客户端对元数据进行增删改的请求。
(
3
)
NameNode
记录操作日志,更新滚动日志。
(
4
)
NameNode
在内存中对元数据进行增删改。
2
)第二阶段:
Secondary NameNode
工作
(
1
)
Secondary NameNode
询问
NameNode
是否需要
CheckPoint
。直接带回
NameNode
是否检查结果。
(
2
)
Secondary NameNode
请求执行
CheckPoint
。
(
3
)
NameNode
滚动正在写的
Edits
日志。
(
4
)将滚动前的编辑日志和镜像文件拷贝到
Secondary NameNode
。
(
5
)
Secondary NameNode
加载编辑日志和镜像文件到内存,并合并。
(
6
)生成新的镜像文件
fsimage.chkpoint
。
(
7
)拷贝
fsimage.chkpoint
到
NameNode
。
(
8
)
NameNode
将
fsimage.chkpoint
重新命名成
fsimage
。
(9) 具体什么时候合并根据设置的时间或者设置的文件大小比如(一小时或者100万个文件)
Fsimage 和 Edits 解析
NameNode
被格式化之后,将在
/opt/module/hadoop-3.1.3/data/tmp/dfs/name/current
目录中产生如下文件
fsimage_0000000000000000000
fsimage_0000000000000000000.md5
seen_txid
VERSION
(
1
)
Fsimage
文件:
HDFS
文件系统元数据的一个
永久性的检查点
,其中包含
HDFS
文件系统的所有目
录和文件
inode
的序列化信息。
(
2
)
Edits
文件:存放
HDFS
文件系统的所有更新操作的路径,文件系统客户端执行的所有写操作首先
会被记录到
Edits
文件中。
(
3
)
seen_txid
文件保存的是一个数字,就是最后一个
edits_
的数字
(
4
)每 次
NameNode
启动的时候
都会将
Fsimage
文件读入内存,加 载
Edits
里面的更新操作,保证内存
中的元数据信息是最新的、同步的,可以看成
NameNode
启动的时候就将
Fsimage
和
Edits
文件进行了合并。
(
1
)一个数据块在
DataNode
上以文件形式存储在磁盘上,包括两个文件,一个是数据
本身,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳。
(
2
)
DataNode
启动后向
NameNode
注册,通过后,周期性(
6
小时)的向
NameNode
上
报所有的块信息。
(
3
)心跳是每
3
秒一次,心跳返回结果带有
NameNode
给该
DataNode
的命令如复制块
数据到另一台机器,或删除某个数据块。如果超过
10
分钟没有收到某个
DataNode
的心跳,
则认为该节点不可用。
(
4
)集群运行中可以安全加入和退出一些机器。
数据完整性:
思考:如果电脑磁盘里面存储的数据是控制高铁信号灯的红灯信号(
1
)和绿灯信号(
0
),
但是存储该数据的磁盘坏了,一直显示是绿灯,是否很危险?同理
DataNode
节点上的数据
损坏了,却没有发现,是否也很危险,那么如何解决呢?
如下是
DataNode
节点保证数据完整性的方法。
(
1
)当
DataNode
读取
Block
的时候,它会计算
CheckSum
。
(
2
)如果计算后的
CheckSum
,与
Block
创建时值不一样,说明
Block
已经损坏。
(
3
)
Client
读取其他
DataNode
上的
Block
。
(
4
)常见的校验算法
crc
(
32
),
md5
(
128
),
sha1
(
160
)
(
5
)
DataNode
在其文件创建后周期验证
CheckSum
。
hadoop_MapReduce:
mr概述:
MapReduce
是一个
分布式运算程序
的编程框架,是用户开发“基于
Hadoop
的数据分析
应用”的核心框架。
MapReduce
核心功能是将
用户编写的业务逻辑代码
和
自带默认组件
整合成一个完整的
分布式运算程序
,并发运行在一个
Hadoop
集群上。
MR的优缺点:
优点:
1
)
MapReduce
易于编程
它简单的实现一些接口,就可以完成一个分布式程序,
这个分布式程序可以分布到大量
廉价的
PC
机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一
样的。就是因为这个特点使得
MapReduce
编程变得非常流行。
2
)良好的扩展性
当你的计算资源不能得到满足的时候,你可以通过
简单的增加机器
来扩展它的计算能力。
3
)高容错性
MapReduce
设计的初衷就是使程序能够部署在廉价的
PC
机器上,这就要求它具有很高
的容错性。比如
其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,
不至于这个任务运行失败
,而且这个过程不需要人工参与,而完全是由
Hadoop
内部完成的。
4
)适合
PB
级以上海量数据的离线处理
可以实现上千台服务器集群并发工作,提供数据处理能力。
缺点:
1
)不擅长实时计算
MapReduce
无法像
MySQL
一样,在毫秒或者秒级内返回结果。
2
)不擅长流式计算
流式计算的输入数据是动态的,而
MapReduce
的
输入数据集是静态的
,不能动态变化。
这是因为
MapReduce
自身的设计特点决定了数据源必须是静态的。
3
)不擅长
DAG
(有向无环图)计算
多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,
MapReduce
并不是不能做,而是使用后,
每个
MapReduce
作业的输出结果都会写入到磁盘,
会造成大量的磁盘
IO
,导致性能非常的低下。
MapReduce 核心思想:
(
1
)分布式的运算程序往往需要分成至少
2
个阶段。
(
2
)第一个阶段的
MapTask
并发实例,完全并行运行,互不相干。
(
3
)第二个阶段的
ReduceTask
并发实例互不相干,但是他们的数据依赖于上一个阶段
的所有
MapTask
并发实例的输出。
(
4
)
MapReduce
编程模型只能包含一个
Map
阶段和一个
Reduce
阶段,如果用户的业
务逻辑非常复杂,那就只能多个
MapReduce
程序,串行运行。
总结:分析
WordCount
数据流走向深入理解
MapReduce
核心思想。
MapReduce 进程:
一个完整的 MapReduce 程序在分布式运行时有三类实例进程:
(
1
)
MrAppMaster
:负责整个程序的过程调度及状态协调。
(
2
)
MapTask
:负责
Map
阶段的整个数据处理流程。
(
3
)
ReduceTask
:负责
Reduce
阶段的整个数据处理流程。
MapReduce
编程规范:
用户编写的程序分成三个部分:
Mapper
、
Reducer
和
Driver
Mapper阶段:
(
1
)用户自定义的
Mapper
要继承自己的父类
(
2
)
Mapper
的输入数据是
KV
对的形式(
KV
的类型可自定义)
(
3
)
Mapper
中的业务逻辑写在
map()
方法中
(
4
)
Mapper
的输出数据是
KV
对的形式(
KV
的类型可自定义)
(
5
)
map()
方法(
MapTask
进程)对每一个
调用一次
Reducer阶段 :
(
1
)用户自定义的
Reducer
要继承自己的父类
(2
)
Reducer
的输入数据类型对应
Mapper
的输出数据类型,也是
KV
(
3
)
Reducer
的业务逻辑写在
reduce()
方法中
(
4
)
ReduceTask
进程对每一组相同
k
的
组调用一次
reduce()
方法
Driver阶段
相当于
YARN
集群的客户端,用于提交我们整个程序到
YARN
集群,提交的是
封装了
MapReduce
程序相关运行参数的
job
对象
Hadoop 序列化
1
)
什么是序列化
序列化
就是
把内存中的对象,转换成字节序列
(或其他数据传输协议)以便于存储到磁
盘(持久化)和网络传输。
反序列化
就是将收到字节序列(或其他数据传输协议)或者是
磁盘的持久化数据,转换
成内存中的对象。
2
)
为什么要序列化
一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能
由本地的进程使用,不能被发送到网络上的另外一台计算机。
然而
序列化可以存储“活的”
对象,可以将“活的”对象发送到远程计算机。
3
)为什么不用
Java
的序列化
Java
的序列化是一个重量级序列化框架(
Serializable
),一个对象被序列化后,会附带
很多额外的信息(各种校验信息,
Header
,继承体系等),不便于在网络中高效传输。所以,
Hadoop
自己开发了一套序列化机制(
Writable
)。
4
)
Hadoop
序列化特点:
(
1
)紧凑 :
高效使用存储空间。
(
2
)快速:
读写数据的额外开销小。
(
3
)互操作:
支持多语言的交互
MapReduce 框架原理
MapTask
的并行度决定
Map
阶段的任务处理并发度,进而影响到整个
Job
的处理速度。
MapTask
并行度决定机制 :
数据块:
Block
是
HDFS
物理上把数据分成一块一块。
数据块是
HDFS
存储数据单位
。
数据切片:
数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行
存储。
数据切片是
MapReduce
程序计算输入数据的单位
,一个切片会对应启动一个
MapTask
。
多文件合并切片见P93.
分区请查看P98
set.jobsetreducetasks(2) 决定文件的个数,具体怎么分是根据hash值,或者自定义规则。
排序:
排序是MapReduce框架中最重要的操作之一。
MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。个任何应用程序中的数据均会被排序,而不管逻辑上是否需要。
默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。
对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序,
MapReduce 工作流程:
上面的流程是整个
MapReduce
最全工作流程,但是
Shuffle
过程只是从第
7
步开始到第
16
步结束,具体
Shuffle
过程详解,如下:
(
1
)
MapTask
收集我们的
map()
方法输出的
kv
对,放到内存缓冲区中
(
2
)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
(
3
)多个溢出文件会被合并成大的溢出文件
(
4
)在溢出过程及合并的过程中,都要调用
Partitioner
进行分区和针对
key
进行排序
(
5
)
ReduceTask
根据自己的分区号,去各个
MapTask
机器上取相应的结果分区数据
(
6
)
ReduceTask
会抓取到同一个分区的来自不同
MapTask
的结果文件,
ReduceTask
会
(
7
)合并成大文件后,
Shuffle
的过程也就结束了,后面进入
ReduceTask
的逻辑运算过
程(从文件中取出一个一个的键值对
Group
,调用用户自定义的
reduce()
方法)
注意:
(
1
)
Shuffle
中的缓冲区大小会影响到
MapReduce
程序的执行效率,原则上说,缓冲区
越大,磁盘
io
的次数越少,执行速度就越快。
(
2
)缓冲区的大小可以通过参数调整,参数:
mapreduce.task.io.sort.mb
默认
100M
。
Map
方法之后,
Reduce
方法之前的数据处理过程称之为
Shuffle
。
(
1
)
Read
阶段:
MapTask
通过
InputFormat
获得的
RecordReader
,从输入
InputSplit
中
解析出一个个
key/value
。
(
2
)
Map
阶段:该节点主要是将解析出的
key/value
交给用户编写
map()
函数处理,并
产生一系列新的
key/value
。
(
3
)
Collect
收集阶段:在用户编写
map()
函数中,当数据处理完成后,一般会调用
OutputCollector.collect()
输出结果。在该函数内部,它会将生成的
key/value
分区(调用
Partitioner
),并写入一个环形内存缓冲区中。
(
4
)
Spill
阶段:即
“
溢写
”
,当环形缓冲区满后,
MapReduce
会将数据写到本地磁盘上,
生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
输出类:
(
1
)
Copy
阶段:
ReduceTask
从各个
MapTask
上远程拷贝一片数据,并针对某一片数
据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
(
2
)
Sort
阶段:在远程拷贝数据的同时,
ReduceTask
启动了两个后台线程对内存和磁
盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。按照
MapReduce
语义,用
户编写
reduce()
函数输入数据是按
key
进行聚集的一组数据。为了将
key
相同的数据聚在一
起,
Hadoop
采用了基于排序的策略。由于各个
MapTask
已经实现对自己的处理结果进行了
局部排序,因此,
ReduceTask
只需对所有数据进行一次归并排序即可。
(
3
)
Reduce
阶段:
reduce()
函数将计算结果写到
HDFS
上。
debug调试P77。
分区案例请查看P96。
回顾:
MapTask
并行度由切片个数决定,切片个数由输入文件和切片规则决定。
思考:
ReduceTask
并行度由谁决定?
1
)设置
ReduceTask
并行度(个数)
ReduceTask
的并行度同样影响整个
Job
的执行并发度和执行效率,但与
MapTask
的并
发数由切片数决定不同,
ReduceTask
数量的决定是可以直接手动设置:
//
默认值是
1
,手动设置为
4
job.setNumReduceTasks(4);
注意事项:
(
1
)
ReduceTask=0
,表示没有
Reduce
阶段,输出文件个数和
Map
个数一致。
(
2
)
ReduceTask
默认值就是
1
,所以输出文件个数为一个。
(
3
)如果数据分布不均匀,就有可能在
Reduce
阶段产生数据倾斜
(
4
)
ReduceTask
数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全
局汇总结果,就只能有
1
个
ReduceTask
。
(
5
)具体多少个
ReduceTask
,需要根据集群性能而定。
(
6
)如果分区数不是
1
,但是
ReduceTask
为
1
,是否执行分区过程。答案是:不执行分区过
程。因为在
MapTask
的源码中,执行分区的前提是先判断
ReduceNum
个数是否大于
1
。不大于
1
肯定不执行。
Map
端的主要工作:为来自不同表或文件的
key/value
对,
打标签以区别不同来源的记
录
。然后
用连接字段作为
key
,其余部分和新加的标志作为
value
,最后进行输出。
Reduce
端的主要工作:在
Reduce
端
以连接字段作为
key
的分组已经完成
,我们只需要
在每一个分组当中将那些来源于不同文件的记录(在
Map
阶段已经打标志)分开,最后进
行合并就
ok
了。
reduce端合并将名称字段补充上去,具体实现P144
缺点:这种方式中,合并的操作是在 Reduce 阶段完成,Reduce 端的处理压力太大,Map
节点的运算负载则很低,资源利用率不高,且在 Reduce 阶段极易产生数据倾斜。
解决方案:Map 端实现数据合并。
Map Join:
Map Join
适用于一张表十分小、一张表很大的场景
具体办法:采用 DistributedCache
(1)在 Mapper 的 setup 阶段,将文件读取到缓存集合中。
(2)在 Driver 驱动类中加载缓存。
//缓存普通文件到 Task 运行节点。
job.addCacheFile(new URI("file:///e:/cache/pd.txt"));
//如果是集群运行,需要设置 HDFS 路径
job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));
优点
思考:在
Reduce
端处理过多的表,非常容易产生数据倾斜。怎么办?
在
Map
端缓存多张表,提前处理业务逻辑,这样增加
Map
端业务,减少
Reduce
端数
据的压力,尽可能的减少数据倾斜。
数据清洗:
“ETL,是英文 Extract-Transform-Load 的缩写,用来描述将数据从来源端经过抽取
(Extract)、转换(Transform)、加载(Load)至目的端的过程。ETL 一词较常用在数据仓
库,但其对象并不限于数据仓库
在运行核心业务 MapReduce 程序之前,往往要先对数据进行清洗,清理掉不符合用户
要求的数据。
清理的过程往往只需要运行 Mapper 程序,不需要运行 Reduce 程序。
正则表达式P121章节有记录。
MapReduce 开发总结:
1)输入数据接口:InputFormat
(1)默认使用的实现类是:TextInputFormat
(2)TextInputFormat 的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为
key,行内容作为 value 返回。
(3)CombineTextInputFormat 可以把多个小文件合并成一个切片处理,提高处理效率。
2)逻辑处理接口:Mapper
用户根据业务需求实现其中三个方法:map() setup() cleanup ()
3)Partitioner 分区
(1)有默认实现 HashPartitioner,逻辑是根据 key 的哈希值和 numReduces 来返回一个
分区号;key.hashCode()&Integer.MAXVALUE % numReduces
(2)如果业务上有特别的需求,可以自定义分区。
4)Comparable 排序
(1)当我们用自定义的对象作为 key 来输出时,就必须要实现 WritableComparable 接
口,重写其中的 compareTo()方法。
(2)部分排序:对最终输出的每一个文件进行内部排序。
(3)全排序:对所有数据进行排序,通常只有一个 Reduce。
(4)二次排序:排序的条件有两个。
5)Combiner 合并
Combiner 合并可以提高程序执行效率,减少 IO 传输。但是使用时必须不能影响原有的
业务处理结果。
6)逻辑处理接口:Reducer
用户根据业务需求实现其中三个方法:reduce() setup() cleanup ()
7)输出数据接口:OutputFormat
(1)默认实现类是 TextOutputFormat,功能逻辑是:将每一个 KV 对,向目标文本文件
输出一行。
(2)用户还可以自定义 OutputFormat。
Yarn 资源调度器:
Yarn
是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式
的
操作系统平台
,而
MapReduce
等运算程序则相当于运行于
操作系统之上的应用程序。
YARN
主要由
ResourceManager
、
NodeManager
、
ApplicationMaster
和
Container
等组件
构成。
yarn 工作机制:
(
1
)
MR
程序提交到客户端所在的节点。
(
2
)
YarnRunner
向
ResourceManager
申请一个
Application
。
(
3
)
RM
将该应用程序的资源路径返回给
YarnRunner
。
(
4
)该程序将运行所需资源提交到
HDFS
上。
(
5
)程序资源提交完毕后,申请运行
mrAppMaster
。
(
6
)
RM
将用户的请求初始化成一个
Task
。
(
7
)其中一个
NodeManager
领取到
Task
任务。
(
8
)该
NodeManager
创建容器
Container
,并产生
MRAppmaster
(9
)
Container
从
HDFS
上拷贝资源到本地。
(
10
)
MRAppmaster
向
RM
申请运行
MapTask
资源。
(
11
)
RM
将运行
MapTask
任务分配给另外两个
NodeManager
,另两个
NodeManager
分
别领取任务并创建容器。
(
12
)
MR
向两个接收到任务的
NodeManager
发送程序启动脚本,这两个
NodeManager
分别启动
MapTask
,
MapTask
对数据分区排序。
(
13
)
MrAppMaster
等待所有
MapTask
运行完毕后,向
RM
申请容器,运行
ReduceTask
。
(
14
)
ReduceTask
向
MapTask
获取相应分区的数据。
(
15
)程序运行完毕后,
MR
会向
RM
申请注销自己
作业提交全过程详解
(1)作业提交
第 1 步:Client 调用 job.waitForCompletion 方法,向整个集群提交 MapReduce 作业。
第 2 步:Client 向 RM 申请一个作业 id。
第 3 步:RM 给 Client 返回该 job 资源的提交路径和作业 id。
第 4 步:Client 提交 jar 包、切片信息和配置文件到指定的资源提交路径。
第 5 步:Client 提交完资源后,向 RM 申请运行 MrAppMaster。
(2)作业初始化
第 6 步:当 RM 收到 Client 的请求后,将该 job 添加到容量调度器中。
第 7 步:某一个空闲的 NM 领取到该 Job。
第 8 步:该 NM 创建 Container,并产生 MRAppmaster。
第 9 步:下载 Client 提交的资源到本地。
(3)任务分配
第 10 步:MrAppMaster 向 RM 申请运行多个 MapTask 任务资源。
第 11 步:RM 将运行 MapTask 任务分配给另外两个 NodeManager,另两个 NodeManager
分别领取任务并创建容器。
(4)任务运行
第 12 步 : MR 向两个接收到任 务的 NodeManager 发 送 程序启动 脚本 , 这两 个
NodeManager 分别启动 MapTask,MapTask 对数据分区排序。
第 13 步:MrAppMaster 等待所有 MapTask 运行完毕后,向 RM 申请容器,运行 ReduceTask。
第 14 步:ReduceTask 向 MapTask 获取相应分区的数据。
第 15 步:程序运行完毕后,MR 会向 RM 申请注销自己。
(5)进度和状态更新
YARN 中的任务将其进度和状态(包括 counter)返回给应用管理器, 客户端每秒(通过
mapreduce.client.progressmonitor.pollinterval 设置)向应用管理器请求进度更新, 展示给用户。
(6)作业完成
除了向应用管理器请求作业进度外, 客户端每 5 秒都会通过调用 waitForCompletion()来
检查作业是否完成。时间间隔可以通过 mapreduce.client.completion.pollinterval 来设置。作业
完成之后, 应用管理器和 Container 会清理工作状态。作业的信息会被作业历史服务器存储
以备之后用户核查。
Hadoop调度器:
目前,Hadoop 作业调度器主要有三种:FIFO、容量(Capacity Scheduler)和公平(Fair
Scheduler)。Apache Hadoop3.1.3 默认的资源调度器是 Capacity Scheduler。
CDH 框架默认调度器是 Fair Scheduler。
yarn总结:
hadoop生产调优:
加载硬盘P149
同服务器数据均衡
HDFS集群扩容与缩容
节点之间数据均衡
节点退役,先把数据给到其他节点,然后再退役。
HDFS存储优化:
纠删码策略
3块硬盘+2个策略 数据不会再备份,缺少两块后会自动恢复,缺点是耗费内存,优点节省磁盘空间。
异构存储,冷热分离
HDFS 故障排除:
小文件合并-针对于namadate把多个小文件合并成一个
集群迁移:
mapreduce生产经验:
处理数据倾斜:
生产环境,可以直接过滤掉空值;如果想保留空值,就自定义分区,将空值加随机数打
散。最后再二次聚合。
(2)能在 map 阶段提前处理,最好先在 Map 阶段处理。如:Combiner、MapJoin
(3)设置多个 reduce 个数
启动源码解析