• MapReduce(二)


    一:序列化概述

    什么是序列化?

    序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。

    反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。

    为什么要序列化

    一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。 然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。

    为什么不用Java的序列化

    Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,

    Hadoop自己开发了一套序列化机制(Writable)。

    Hadoop序列化特点:  

    (1)紧凑 :高效使用存储空间。

    (2)快速:读写数据的额外开销小。

    (3)互操作:支持多语言的交互。

    二:自定义bean对象实现序列化接口(Writable)

    在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在Hadoop框架内部传递一个bean对象,那么该对象就需要实现序列化接口。

    具体实现bean对象序列化步骤如下7步。

    (1)必须实现Writable接口

    (2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造

    1. public FlowBean()
    2. {
    3. super();
    4. }

     (3)重写序列化方法

    1. @Override
    2. public void write(DataOutput out) throws IOException {
    3. out.writeLong(upFlow);
    4. out.writeLong(downFlow);
    5. out.writeLong(sumFlow);
    6. }

    (4)重写反序列化方法

    1. @Override public void readFields(DataInput in) throws IOException {
    2. upFlow = in.readLong();
    3. downFlow = in.readLong();
    4. sumFlow = in.readLong();
    5. }

    (5)注意反序列化的顺序和序列化的顺序完全一致

    (6)要想把结果显示在文件中,需要重写toString(),可用"\t"分开,方便后续用。

    (7)如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为

    MapReduce框中的Shuffle过程要求对key必须能排序。

    1. @Override public int compareTo(FlowBean o) {
    2. // 倒序排列,从大到小
    3. return this.sumFlow > o.getSumFlow() ? -1 : 1;
    4. }

    三:序列化案例实操

    需求:

    案例分析:

     代码实现:

    (1)编写流量统计的Bean对象

    1. package com.atguigu.mapreduce.combineTextinputformat;
    2. import org.apache.hadoop.conf.Configuration;
    3. import org.apache.hadoop.fs.Path;
    4. import org.apache.hadoop.io.IntWritable;
    5. import org.apache.hadoop.io.Text;
    6. import org.apache.hadoop.mapreduce.Job;
    7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    9. import java.io.IOException;
    10. public class WordCountDriver {
    11. public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
    12. //1.获取job
    13. Configuration conf=new Configuration();
    14. Job job=Job.getInstance(conf);
    15. //2.设置jar包路径
    16. job.setJarByClass(WordCountDriver.class);
    17. //3.关联mapper和reducer
    18. job.setMapperClass(WordCountMapper.class);
    19. job.setReducerClass(WordCountReduce.class);
    20. //4.设置map输出的KV类型
    21. job.setMapOutputKeyClass(Text.class);
    22. job.setMapOutputValueClass(IntWritable.class);
    23. //5.设置最终输出的KV类型
    24. job.setOutputKeyClass(Text.class);
    25. job.setOutputValueClass(IntWritable.class);
    26. //如果不设置InputFormat,它默认使用的是TextInputFormat.class
    27. // job.setInputFormatClass(CombineTextInputFormat.class);
    28. //虚拟存储切片最大值设置4m
    29. // CombineTextInputFormat.setMaxInputSplitSize(job,4194304);
    30. //6.设置输出路径和输出路径
    31. FileInputFormat.setInputPaths(job,new Path("D:\\inputinputformat"));
    32. FileOutputFormat.setOutputPath(job,new Path("D:\\outputcompare"));
    33. //7.提交job
    34. boolean result=job.waitForCompletion(true);
    35. System.exit(result?0:1);
    36. }
    37. }

    (2)编写Mapper类

    1. package com.atguigu.mapreduce.writable;
    2. import org.apache.hadoop.io.LongWritable;
    3. import org.apache.hadoop.io.Text;
    4. import org.apache.hadoop.mapreduce.Mapper;
    5. import java.io.IOException;
    6. public class FlowMapper extends Mapper {
    7. private Text outK=new Text();
    8. private FlowBean outV=new FlowBean();
    9. @Override
    10. protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
    11. //1.读一行
    12. //1 13736230513 192.196.100.1 www.atguigu.com 2481 24681 200
    13. String line = value.toString();
    14. //2.切割
    15. //1,13736230513,192.196.100.1,www.atguigu.com,2481,24681,200
    16. String[] split=line.split("\t");
    17. //3.抓取想要的数据
    18. //电话号码:13736230513
    19. //上行流量:2481
    20. //下行流量:24681
    21. String phone=split[1];
    22. String up=split[split.length-3];
    23. String down=split[split.length-2];
    24. //4.封装
    25. outK.set(phone);
    26. outV.setUpFlow(Long.getLong(up));
    27. outV.setDownFlow(Long.getLong(down));
    28. outV.setSumFlow();
    29. //5.写出
    30. context.write(outK,outV);
    31. }
    32. }

    (3)编写Reducer类

    1. package com.atguigu.mapreduce.writable;
    2. import org.apache.hadoop.io.Text;
    3. import org.apache.hadoop.mapreduce.Reducer;
    4. import java.io.IOException;
    5. public class FlowReduce extends Reducer {
    6. private FlowBean outV=new FlowBean();
    7. @Override
    8. protected void reduce(Text key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {
    9. //1.遍历集合累加值
    10. long totalUp=0;
    11. long totalDown=0;
    12. for(FlowBean value:values){
    13. totalUp+=value.getUpFlow();
    14. totalDown+=value.getDownFlow();
    15. }
    16. //2.outV
    17. outV.setSumFlow(totalUp);
    18. outV.setDownFlow(totalDown);
    19. outV.setSumFlow();
    20. //3.写出
    21. context.write(key,outV);
    22. }
    23. }

    (4)编写Driver驱动类

    1. package com.atguigu.mapreduce.writable;
    2. import org.apache.hadoop.conf.Configuration;
    3. import org.apache.hadoop.fs.Path;
    4. import org.apache.hadoop.io.Text;
    5. import org.apache.hadoop.mapreduce.Job;
    6. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    7. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    8. import java.io.FileInputStream;
    9. import java.io.IOException;
    10. public class FlowDriver {
    11. public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
    12. //1.获取job对象
    13. Configuration conf=new Configuration();
    14. Job job=Job.getInstance(conf);
    15. //2.设置jar包
    16. job.setJarByClass(FlowDriver.class);
    17. //3.关联mapper和Reducer
    18. job.setMapperClass(FlowMapper.class);
    19. job.setReducerClass(FlowReduce.class);
    20. //4.设置mapper 输出的key和value类型
    21. job.setMapOutputKeyClass(Text.class);
    22. job.setMapOutputValueClass(FlowBean.class);
    23. //5.设置最终数据输出的key和value类型
    24. job.setOutputKeyClass(Text.class);
    25. job.setOutputValueClass(FlowBean.class);
    26. //6.设置数据的输入路径和输出路径
    27. FileInputFormat.setInputPaths(job,new Path("D:\\In\\phone_data.txt"));
    28. FileOutputFormat.setOutputPath(job,new Path("D:\\Out"));
    29. //7.提交job
    30. Boolean result=job.waitForCompletion(true);
    31. System.exit(result?0:1);
    32. }
    33. }

     四:MapReduce框架原理

     4.1 InputFormat数据输入

    4.1.1 切片与MapTask并行度决定机制

    1)问题引出

    MapTask的并行度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度。

    思考:1G的数据,启动8个MapTask,可以提高集群的并发处理能力。那么1K的数据,也启动8个MapTask,会提高集群性能吗?MapTask并行任务是否越多越好呢?哪些因素影响了MapTask并行度?

    2)MapTask并行度决定机制

    数据块:Block是HDFS物理上把数据分成一块一块。数据块是HDFS存储数据单位。

    数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask。

    五:Job提交流程源码详解 

    1. waitForCompletion()
    2. submit();
    3. // 1建立连接
    4. connect();
    5. // 1)创建提交Job的代理
    6. new Cluster(getConfiguration());
    7. // (1)判断是本地运行环境还是yarn集群运行环境
    8. initialize(jobTrackAddr, conf);
    9. // 2 提交job
    10. submitter.submitJobInternal(Job.this, cluster)
    11. // 1)创建给集群提交数据的Stag路径
    12. Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    13. // 2)获取jobid ,并创建Job路径
    14. JobID jobId = submitClient.getNewJobID();
    15. // 3)拷贝jar包到集群
    16. copyAndConfigureFiles(job, submitJobDir);
    17. rUploader.uploadFiles(job, jobSubmitDir);
    18. // 4)计算切片,生成切片规划文件
    19. writeSplits(job, submitJobDir);
    20. maps = writeNewSplits(job, jobSubmitDir);
    21. input.getSplits(job);
    22. // 5)向Stag路径写XML配置文件
    23. writeConf(conf, submitJobFile);
    24. conf.writeXml(out);
    25. // 6)提交Job,返回提交状态
    26. status = submitClient.submitJob(jobId, submitJobDir.toString(),job.getCredentials());

    六: FileInputFormat切片源码解析(input.getSplits(job))

     切片机制:

    1、切片机制

    (1)简单地按照文件的内容长度进行切片

    (2)切片大小,默认等于Block大小

    (3)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片

    2、案例分析

    (1)输入数据有两个文件:

    file1.txt320M

    file2.txt10M

    (2)经过FileInputFormat的切片机制

    运算后,形成的切片信息如下:

    file1.txt.split1--0~128

    file1.txt.split2--128~256

    file1.txt.split3--256~320

    file2.txt.split1--0~10M

    (1)源码中计算切片大小的公式

    1. Math.max(minSize,Math.min(maxSize,blockSize));
    2. mapreduce.input.fileinputformat.split.minsize=1默认值为1
    3. mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue默认值Long.MAXValue
    4. 因此,默认情况下,切片大小=blocksize。

    (2)切片大小设置

    1. maxsize(切片最大值):参数如果调得比blockSize小,则会让切片变小,而且就等于配置的这个参数的值。
    2. minsize(切片最小值):参数调的比blockSize大,则可以让切片变得比blockSize还大。

    (3)获取切片信息API

    1. //获取切片的文件名称
    2. Stringname=inputSplit.getPath().getName();//根据文件类型获取切片信息
    3. FileSplitinputSplit=(FileSplit)context.getInputSplit();

     七:TextInputFormat

    1)FileInputFormat实现类

    思考:在运行MapReduce程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等。那么,针对不同的数据类型,MapReduce是如何读取这些数据的呢? FileInputFormat常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、

    NLineInputFormat、CombineTextInputFormat和自定义InputFormat等。

    2)TextInputFormat

    TextInputFormat是默认的FileInputFormat实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量, LongWritable类型。值是这行的内容,不包括任何行终止符(换行符和回车符),Text类型。

    八: CombineTextInputFormat切片机制

    框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的

    MapTask,处理效率极其低下。

    1)应用场景:

    CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。

    2)虚拟存储切片最大值设置

    CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m

    注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。

    3)切片机制

    生成切片过程包括:虚拟存储过程和切片过程二部分。

     

    (1)虚拟存储过程:

    将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。

    例如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分成一个

    4M。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的小的虚拟存储文件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件。

    (2)切片过程:

    (a)判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片。

    (b)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。

    (c)测试举例:有4个小文件大小分别为1.7M、5.1M、3.4M以及6.8M这四个小文件,则虚拟存储之后形成6个文件块,大小分别为: 1.7M,(2.55M、2.55M),3.4M以及(3.4M、3.4M)

    最终会形成3个切片,大小分别为:(1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M

    实操案例:

    代码实现过程:

    (1)不做任何处理,运行WordCount案例程序,观察切片个数为4。 

    number of splits:4 

    (2)在WordcountDriver中增加如下代码,运行程序,并观察运行的切片个数为3。

    1. (a)驱动类中添加代码如下:
    2. // 如果不设置InputFormat,它默认用的是TextInputFormat.class
    3. job.setInputFormatClass(CombineTextInputFormat.class);
    4. //虚拟存储切片最大值设置4m
    5. CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
    6. (b)运行如果为3个切片。
    7. number of splits:3

    (3)在WordcountDriver中增加如下代码,运行程序,并观察运行的切片个数为1。

    1. (a)驱动中添加代码如下:
    2. // 如果不设置InputFormat,它默认用的是TextInputFormat.class
    3. job.setInputFormatClass(CombineTextInputFormat.class);
    4. //虚拟存储切片最大值设置20m
    5. CombineTextInputFormat.setMaxInputSplitSize(job, 20971520);
    6. (b)运行如果为1个切片
    7. number of splits:1

    九:MapReduce工作流程

     

    上面的流程是整个MapReduce最全工作流程,但是Shuffle过程只是从第7步开始到第

    16步结束,具体Shuffle过程详解,如下:

    (1)MapTask收集我们的map()方法输出的kv对,放到内存缓冲区中

    (2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件

    (3)多个溢出文件会被合并成大的溢出文件

    (4)在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序

    (5)ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据

    (6)ReduceTask会抓取到同一个分区的来自不同MapTask的结果文件,ReduceTask会

    将这些文件再进行合并(归并排序)

    (7)合并成大文件后,Shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程(从文件中取出一个一个的键值对Group,调用用户自定义的reduce()方法)

    注意:

    (1)Shuffle中的缓冲区大小会影响到MapReduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。

    (2)缓冲区的大小可以通过参数调整,参数:mapreduce.task.io.sort.mb默认100M。 

    十:Shuffle机制  

    Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。

     

    十一:Partition分区  

    1、问题引出

    要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)

    2、默认Partitioner分区 

    1. publicclassHashPartitionerextendsPartitioner{
    2. publicintgetPartition(Kkey,Vvalue,intnumReduceTasks)
    3. {
    4. return(key.hashCode()&Integer.MAX_VALUE)%numReduceTasks;
    5. }
    6. }

    默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个

    key存储到哪个分区。

     3、自定义Partitioner步骤

    (1)自定义类继承Partitioner,重写getPartition()方法

    1. publicclassCustomPartitionerextendsPartitioner<Text,FlowBean>{
    2. @Override
    3. publicintgetPartition(Textkey,FlowBeanvalue,intnumPartitions){
    4. //控制分区代码逻辑
    5. ……returnpartition;
    6. }
    7. }

    (2)在Job驱动中,设置自定义Partitioner

    job.setPartitionerClass(CustomPartitioner.class);

    (3)自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask

    job.setNumReduceTasks(5);

    4、分区总结

    (1)如果ReduceTask的数量>getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;

    (2)如果1的数量的结果数,则有一部分分区数据无处安放,会Exception;

    (3)如果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也就只会产生一个结果文件part-r-00000;

    (4)分区号必须从零开始,逐一累加。

    5、案例分析

    例如:假设自定义分区数为5,则

    (1)job.setNumReduceTasks(1);会正常运行,只不过会产生一个输出文件

    (2)job.setNumReduceTasks(2);会报错

    (3)job.setNumReduceTasks(6);大于5,程序会正常运行,会产生空文件

  • 相关阅读:
    如何提高麻辣烫店利润?
    视频融合云平台EasyCVR增加多级分组,可灵活管理接入设备
    java-php-python-ssm基于水果商城设计计算机毕业设计
    day41 jdk8新特性Stream流 数据库安装
    相机滤镜软件Nevercenter CameraBag Photo mac中文版特点介绍
    数据结构与算法基础(青岛大学-王卓)(9)
    手写一个数字动态滚动加载组件,从0加载到指定数字
    昨天的事情想说一下
    Oracle/PLSQL: Coalesce Function
    Linux 本地RStudio 工具安装&远程访问
  • 原文地址:https://blog.csdn.net/JiaXingNashishua/article/details/125961864