hadoop编程模型
编程模型
一.inputformat
在MapReduce程序的开发过程中,往往需要用到FileInputFormat与TextInputFormat,我们会发现TextInputFormat这个类继承自FileInputFormat,FileInputFormat这个类继承自InputFormat,InputFormat这个类会将文件file按照逻辑进行划分,划分成的每一个split切片将会被分配给一个Mapper任务,文件先被切分成split块,而后每一个split切片对应一个Mapper任务 FileInputFormat的划分机制:
1简单地按照文件的内容长度进行切片
2切片大小,默认等于 block 大小
3切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
默认情况下, split size =block size,在 hadoop 2.x 中为 128M。
注意:bytesRemaining/splitSize > 1.1 不满足的话,那么最后所有剩余的会作为一个切片。从而不会形成例如 129M 文件规划成两个切片的局面。
上文总结:1inputformat的继承,2以及这个类的功能,3划分fileinputformat的划分机制。
总结解释:
1
发现TextInputFormat这个类继承自FileInputFormat,FileInputFormat这个类继承自InputFormat。
2
InputFormat这个类会将文件file按照逻辑进行划分,划分成的每一个split切片将会被分配给一个Mapper任务,文件先被切分成split块,而后每一个split切片对应一个Mapper任务。
3
3.1简单地按照文件的内容长度进行切片
3.2切片大小,默认等于 block 大小
3.3切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
二.MaTask端的工作机制
inputFile通过split被逻辑切分为多个split文件,通过Record按行读取内容给map(用户自己实现的)进行处理,数据被map处理结束之后交给OutputCollector收集器,对其结果key进行分区(默认使用hash分区),然后写入buffer,每个maptask都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reduce task来拉数据。Map端的输入的(k,v)分别是该行的起始偏移量,以及每一行的数据内容,map端的输出(k,v)可以根据需求进行自定义,但是如果输出的是javabean对象,需要对javabean继承writable
上文总结:讲数据进行分割读取,再集体分区,进行写入,快满时临时存储。结束时:合并临时文件生成输出文件。
三.shuffle的过程
shuffle的过程是:Map产生输出开始到Reduc取得数据作为输入之前的过程称作shuffle.
1).Collect 阶段:将 MapTask 的结果输出到默认大小为 100M 的环形缓冲区,保存的是 key/value,Partition 分区信息等。
2).Spill 阶段:当内存中的数据量达到一定的阀值的时候,就会将数据写入本地磁盘,在将数据写入磁盘之前需要对数据进行一次排序的操作,如果配置了 combiner,还会将有相同分区号和 key 的数据进行排序。
3).Merge 阶段:把所有溢出的临时文件进行一次合并操作,以确保一个MapTask 最终只产生一个中间数据文件。
4).Copy 阶段: ReduceTask 启动 Fetcher 线程到已经完成 MapTask 的节点上复制一份属于自己的数据,这些数据默认会保存在内存的缓冲区中,当内存的缓冲区达到一定的阀值的时候,就会将数据写到磁盘之上。
5).Merge 阶段:在 ReduceTask 远程复制数据的同时,会在后台开启两个线程对内存到本地的数据文件进行合并操作。
6).Sort 阶段:在对数据进行合并的同时,会进行排序操作,由于 MapTask阶段已经对数据进行了局部的排序,ReduceTask 只需保证 Copy 的数据的最终整体有效性即可。
Shuffle 中的缓冲区大小会影响到 mapreduce 程序的执行效率,原则上说,缓冲区越大,磁盘 io 的次数越少,执行速度就越快缓冲区的大小可以通过参数调整, 参数:io.sort.mb 默认 100M
上文总结:shuffle的过程分为几步,都叫什么?都有什么具体作用。
四.reduceTask
reducer将已经分好组的数据作为输入,并依次为每个键对应分组执行reduce函数。reduce函数的输入是键以及包含与该键对应的所有值的迭代器。
reduce端的输入是map端的输出,它的输出的(k,v)根据需求进行自定义reducetask 并行度同样影响整个 job 的执行并发度和执行效率,与 maptask的并发数由切片数决定不同,Reducetask 数量的决定是可以直接手动设置:job.setNumReduceTasks(4);
如果数据分布不均匀,就有可能在 reduce 阶段产生数据倾斜。
默认的reduceTask的是1
*Task并行度经验之谈:
最好每个task的执行时间至少一分钟。如果job的每个 map 或者 reduce task 的运行时间都只有30-40秒钟,那么就减少该job的map 或者 reduce 数,每一个task(map|reduce)的setup和加入到调度器中进行调度,这个中间的过程可能都要花费几秒钟,所以如果每个task 都非常快就跑完了,就会在 task 的开始和结束的时候浪费太多的时间。
默认情况下,每一个task都是一个新的JVM 实例,都需要开启和销毁的开销。在一些情况下,JVM开启和销毁的时间可能会比实际处理数据的时间要消耗的长,配置 task的M JVM重用可以改善该问题:(mapred.job.reuse.jvm.num.tasks,默认是 1,表示一个 JVM 上最多可以顺序执行的 task 数目(属于同一个 Job)是 1。也就是说一个 task 启一个 JVM)如果 input 的文件非常的大,比如 1TB,可以考虑将 hdfs 上的每个 blocksize 设大,比如设成 256MB 或者 512MB
上文总结: reduceTask的运行有关项,Task并行度经验。
5.outputformat
OutputFormat主要用于描述输出数据的格式,它能够将用户提供的key/value对写入特定格式的文件中。Hadoop 自带了很多 OutputFormat 的实现,它们与InputFormat实现相对应,足够满足我们业务的需要。 OutputFormt的层次图为都指向Outputformat。
OutputFormat是MapReduce输出的基类,所有MapReduce输出都实现了 OutputFormat 接口,主要有:
TextInputFormat
SequenceFileOutputFormat
MultipleOutputs
DBOutputFormat等