今天给大家带来的是Hadoop生态中的Mapreduce,看到这里诸佬们可能就有疑惑了呢,啥是Mapreduce?小小的脑袋大大的疑惑。
在上篇博客中博主使用了王者来举例子,如果把Hadoop当作王者的话,HDFS是后台存储点券数据的系统的话,那么我们今天介绍的Mapreduce就是某者用来计算优惠力度,并且计算游戏里最终到账的点券。(虽然博主不怎么充钱)
MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。
MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。
1)MapReduce易于编程
它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得MapReduce编程变得非常流行。
2)良好的扩展性
当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。
3)高容错性
MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的。
4)适合PB级以上海量数据的离线处理
可以实现上千台服务器集群并发工作,提供数据处理能力。
1)不擅长实时计算
MapReduce无法像MySQL一样,在毫秒或者秒级内返回结果。
2)不擅长流式计算
流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的。
3)不擅长DAG(有向无环图)计算
多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。
诸佬们从MapReduce的名字以及上面的介绍中,应该也可以知道,MapReduce实现中最重要的两个概念:Map和Reduce。
Map
Map的任务是:处理原始数据、为数据打标签、对数据进行分发(严格来说这并不完全是map的职责)
处理原始数据
这一阶段是对原始数据进行预处理的阶段,可以从行和列两个角度来考虑。
行:比如我们需要对数据按照时间过滤,只选择本周一的数据,其他数据过滤掉不处理。
列:比如原始数据有10列,我们只需要其中的5列,其他列过滤掉不处理。
举例:
假如我的HDFS上有一周的支出数据,我们想统计周一周二的支出情况,接下来我们会一步步解释这个过程。下图是其中一部分记录:
map:处理原始数据
可以看出过滤掉了非周一周二的数据,并且删除了使用人的字段。
为数据打标签
map处理完原始数据之后,接下来就要将数据分组,从而分配给合适的reduce去处理,分组的第一步就是打标签。
map:为数据打标签
可以看出,对每一条数据加了一条对应天数的标签。
对数据进行分发
打完标签之后,就需要对数据进行分发,严格来说,这并不完全属于Map的职责,其中也用到了一个神秘的中间环节:shuffle。不过入门来看,我们就单纯任务这属于Map。
分发的意思是,打完标签之后,要对数据进行分类处理,然后再发送给Reduce;分类的依据,就是上面对其打的自定义标签。
map:分发有标签的数据
可以看出,对每一条数据,按照标签分配,由原来的一个列表,变成了现在的两个列表。
Map阶段到此完成,接下来的任务就是要等着Reduce来取数了。
Reduce
Reduce的任务是:拉取Map分类好的数据(这也并不完全是Reduce的职责)、执行具体的计算
拉取Map分类好的数据
之前说到,Map已经将数据分类,我们直接拉取Reduce需要的数据就好了;但是要注意的是,我们是在一个分布式的环境中执行的任务,所以,Reduce的数据来源可能是多个Map中属于自己的块。
reduce:获取map分发的数据
可以看到,Reduce按照Map分类的key拉取到了自己应该处理的当日数据。
执行具体的计算
Reduce在拿到所有自己的数据之后,接下来就可以执行自定义的计算逻辑了,最简单的就是计数、去重。
reduce:执行具体的计算
可以看到,Reduce已经完成了所需要的单日支出计算功能。
PS:
Map和Reduce的职责并不是完全绝对的,比如过滤操作可以在Map,也可以在Reduce,只是因为在Map做可以减少传输的数据量,减少网络IO压力和时间消耗,所以做了上述的分工。
1)什么是序列化
序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。
反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。
2)为什么要序列化
一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。 然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。
3)为什么不用Java的序列化
Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable)。
4)Hadoop序列化特点:
(1)紧凑 :高效使用存储空间。
(2)快速:读写数据的额外开销小。
(3)互操作:支持多语言的交互
在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在Hadoop框架内部传递一个bean对象,那么该对象就需要实现序列化接口。
具体实现bean对象序列化步骤如下7步。
(1)必须实现Writable接口
(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造
public FlowBean() {
super();
}
(3)重写序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
(4)重写反序列化方法
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
(5)注意反序列化的顺序和序列化的顺序完全一致
(6)要想把结果显示在文件中,需要重写toString(),可用"\t"分开,方便后续用。
(7)如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。。
@Override
public int compareTo(FlowBean o) {
// 倒序排列,从大到小
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}
诸佬们如果想深入理解序列化案例,可以参考硅谷的经典wordcount案例,有时候经常阅读源码也是一个非常好的习惯呢
Hadoop 划分工作为任务。有两种类型的任务:
Map 任务 (分割及映射)
Reduce 任务 (重排,还原)
如上所述完整的执行流程(执行 Map 和 Reduce 任务)是由两种类型的实体的控制,称为Jobtracker : 就像一个主(负责提交的作业完全执行)
多任务跟踪器 : 充当角色就像从机,它们每个执行工作
对于每一项工作提交执行在系统中,有一个 JobTracker 驻留在 Namenode 和 Datanode 驻留多个 TaskTracker。
1)问题引出
MapTask的并行度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度。
思考:1G的数据,启动8个MapTask,可以提高集群的并发处理能力。那么1K的数据,也启动8个MapTask,会提高集群性能吗?MapTask并行任务是否越多越好呢?哪些因素影响了MapTask并行度?
2)MapTask并行度决定机制
数据块:Block是HDFS物理上把数据分成一块一块。数据块是HDFS存储数据单位。
数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask。
1)Job提交流程源码详解
waitForCompletion()
submit();
// 1建立连接
connect();
// 1)创建提交Job的代理
new Cluster(getConfiguration());
// (1)判断是本地运行环境还是yarn集群运行环境
initialize(jobTrackAddr, conf);
// 2 提交job
submitter.submitJobInternal(Job.this, cluster)
// 1)创建给集群提交数据的Stag路径
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
// 2)获取jobid ,并创建Job路径
JobID jobId = submitClient.getNewJobID();
// 3)拷贝jar包到集群
copyAndConfigureFiles(job, submitJobDir);
rUploader.uploadFiles(job, jobSubmitDir);
// 4)计算切片,生成切片规划文件
writeSplits(job, submitJobDir);
maps = writeNewSplits(job, jobSubmitDir);
input.getSplits(job);
// 5)向Stag路径写XML配置文件
writeConf(conf, submitJobFile);
conf.writeXml(out);
// 6)提交Job,返回提交状态
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
(1)默认使用的实现类是:TextInputFormat
(2)TextInputFormat的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为key,行内容作为value返回。
(3)CombineTextInputFormat可以把多个小文件合并成一个切片处理,提高处理效率。
用户根据业务需求实现其中三个方法:map() setup() cleanup ()
(1)有默认实现 HashPartitioner,逻辑是根据key的哈希值和numReduces来返回一个分区号;key.hashCode()&Integer.MAXVALUE % numReduces
(2)如果业务上有特别的需求,可以自定义分区。
(1)当我们用自定义的对象作为key来输出时,就必须要实现WritableComparable接口,重写其中的compareTo()方法。
(2)部分排序:对最终输出的每一个文件进行内部排序。
(3)全排序:对所有数据进行排序,通常只有一个Reduce。
(4)二次排序:排序的条件有两个。
Combiner合并可以提高程序执行效率,减少IO传输。但是使用时必须不能影响原有的业务处理结果。
用户根据业务需求实现其中三个方法:reduce() setup() cleanup ()
(1)默认实现类是TextOutputFormat,功能逻辑是:将每一个KV对,向目标文本文件输出一行。
(2)用户还可以自定义OutputFormat。
最后博主再给诸佬奉上几个常见面试题希望大家能三连一波。
map数量是由任务提交时,传来的切片信息决定的,切片有多少,map数量就有多少
科普:什么是切片?切片的数量怎么决定?
举例:输入路径中有两个文件,a.txt(130M),b.txt(1M),切片是一块128M,但是不会跨越文件,每个文件单独切片,所以这个路径提交之后获得的切片数量是3,大小分别是128M,2M,1M
reduce的数量是可以自己设置的
设置合理的map和reduce的个数。合理设置blocksize
避免出现数据倾斜
combine函数
对数据进行压缩
小文件处理优化:事先合并成大文件,combineTextInputformat,在hdfs上用- mapreduce将小文件合并成SequenceFile大文件(key:文件名,value:文件内容)
参数优化
求平均数的时候就不需要用combiner,因为不会减少reduce执行数量,运行结果也会出错。在其他的时候,可以依据情况,使用combiner,来减少map的输出数量,减少拷贝到reduce的文件,从而减轻reduce的压力,节省网络开销,提升执行效率
这些都是必须记住的,面试经常考
map操作
reduce操作
上面的流程是整个MapReduce最全工作流程,但是Shuffle过程只是从第7步开始到第16步结束,
Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。
具体Shuffle过程详解,如下:
(1)MapTask收集我们的map()方法输出的kv对,放到内存缓冲区中
(2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
(3)多个溢出文件会被合并成大的溢出文件
(4)在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序
(5)ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据
(6)ReduceTask会抓取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序)
(7)合并成大文件后,Shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程(从文件中取出一个一个的键值对Group,调用用户自定义的reduce()方法)
注意:
(1)Shuffle中的缓冲区大小会影响到MapReduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。
(2)缓冲区的大小可以通过参数调整,参数:mapreduce.task.io.sort.mb默认100M。
(1)partition:
(1)过程:经过map函数处理后输出新的<key,value>,它首先被存储到环形缓冲区的kvbuffer,环形缓冲区默认是100M,并且对每个key/value对hash一个partition值,相同的partition值为同一个分区。
(2)作用:分区之后,每个reduce就会处理对应的partition,减少reduce的压力。
(2)sort/combiner/compress:
(1)过程:对环形缓冲区内的partition值和key值进行排序;如果用户设置了combiner,会对每个partition中的数据进行预处理,相当于是map端的reduce;如果用户设置了compress,会对combiner的数据进行压缩。
(2)作用:sort作用是在内部排序,减少reduce的压力;combiner作用是节省网络带宽和本地磁盘的IO;compress作用是减少本地磁盘的读写和减少reduce拷贝map端数据时的网络带宽。
(3)spill:
(1)过程:因为环形缓冲区的内存不够用,所以必须要写到本地磁盘中。将排序好的数据spill到本地磁盘中。
(2)作用:数据量非常大,全部放到内存不现实,所以最后还是会存到本地磁盘中。
(4)merge:
(1)过程:因为会产生多次spill,本身存放数据的out文件和存放数据偏移量索引index文件会产生多个,把多个文件合并在一起。
(2)作用:方便reduce的一次性拷贝。
(1)copy:
(1)过程:reduce拷贝map最终的输出的磁盘数据,一个reduce拷贝每个map节点的相同partition数据。
(2)作用:拷贝后的数据不止一份,先进行合并操作,为后面的排序做准备。
(2)merge:
(1)过程:reduce拷贝map最终的输出的磁盘数据,一个reduce拷贝每个map节点的相同partition数据。
(2)作用:拷贝后的数据不止一份,先进行合并操作,为后面的排序做准备。
(3)、sort:这里和map端的一样,但是reduce端的缓冲区更加灵活一点,如果内存够用,就是内存到内存的merge,不够用了就是内存到磁盘的merge,最后是磁盘到磁盘的merge。
(4)、group:将排序好的数据进行分组,分组默认是将相同的key的value放在一起。作用是为了reduce函数更好的计算相同key值出现的次数。
这里的分片只是逻辑分片,根据文件的字节索引进行分割。比如0—1MB位置定义为第一个分片,1MB-2MB定义为为第二个分片,依次类推……而原来的大文件还是原来的大文件,不会受到影响.
因此,输入分片(input split)存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组。
Map Task的个数等于split的个数。 mapreduce在处理大文件的时候,会根据一定的规则,把大文件划分成多个分片,这样能够提高map的并行度。 划分出来的就是InputSplit,每个map处理一个InputSplit,因此,有多少个InputSplit,就有多少个map task。
主要是 InputFormat类 来负责划分Split。InputFormat类有2个重要的作用:
1)将输入的数据切分为多个逻辑上的InputSplit,其中每一个InputSplit作为一个map的输入。
2)提供一个RecordReader,用于将InputSplit的内容转换为可以作为map输入的k,v键值对。
FileInputFormat是InputFormat的子类,是使用比较广泛的类,输入格式如果是hdfs上的文件,基本上用的都是FileInputFormat的子类,如TextInputFormat用来处理普通的文件,SequceFileInputFormat用来处理Sequce格式文件。 FileInputFormat类中的getSplits(JobContext job)方法是划分split的主要逻辑。
每个输入分片的大小是固定的,默认为128M。
分片大小范围可以在mapred-site.xml中设置
优点就是可以实现分块优化,减少网络传输数据,使用本地数据运行map任务。
如果分片跨越两个数据块的话,对于任何一个HDFS节点,分片中的另外一块数据就需要通过网络传输到Map任务节点,效率更低!