• hadoop3.x入门到精通-阶段五(图解剖析MapReduce源码分析)


    源码分析开始

    maptask并行度决定机制 

    • 数据块是hdfs物理上切分成了一块一块
    • 数据切片是逻辑上把整个文件进行逻辑拆分(决定maptask任务数量),注意在切分的时候他会保证数据的完整性,比如一个单词hello如果在切分的地方,他不会切分成hel和lo

    上图对应的源码

    package org.apache.hadoop.mapreduce.lib.input;

    FileInputFormat的getSplits(JobContext job)
    1. 得到最小的大小
    2. long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    1. protected long getFormatMinSplitSize() {
    2. return 1;
    3. }
    1. public static long getMinSplitSize(JobContext job) {
    2. return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
    3. }
    4. public static final String SPLIT_MINSIZE =
    5. "mapreduce.input.fileinputformat.split.minsize"; 默认没有配置过
    1. 得到最大的大小
    2. long maxSize = getMaxSplitSize(job);
    1. public static long getMaxSplitSize(JobContext context) {
    2. return context.getConfiguration().getLong(SPLIT_MAXSIZE,
    3. Long.MAX_VALUE);
    4. }
    5. public static final String SPLIT_MAXSIZE =
    6. "mapreduce.input.fileinputformat.split.maxsize"; 默认没有配置过
    1. public List<InputSplit> getSplits(JobContext job) throws IOException {
    2. StopWatch sw = new StopWatch().start();
    3. //得到最小的大小
    4. long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    5. //得到最大的大小
    6. long maxSize = getMaxSplitSize(job);
    7. // generate splits
    8. List<InputSplit> splits = new ArrayList<InputSplit>();
    9. //得到文件的状态
    10. List<FileStatus> files = listStatus(job);
    11. boolean ignoreDirs = !getInputDirRecursive(job)
    12. && job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
    13. for (FileStatus file: files) {
    14. if (ignoreDirs && file.isDirectory()) {
    15. continue;
    16. }
    17. Path path = file.getPath();
    18. long length = file.getLen();
    19. if (length != 0) {
    20. BlockLocation[] blkLocations;
    21. if (file instanceof LocatedFileStatus) {
    22. blkLocations = ((LocatedFileStatus) file).getBlockLocations();
    23. } else {
    24. FileSystem fs = path.getFileSystem(job.getConfiguration());
    25. blkLocations = fs.getFileBlockLocations(file, 0, length);
    26. }
    27. //判断文件是否可以切分,比如snappy就是不能切分的那么就不能更具切分来提高maptask数量
    28. if (isSplitable(job, path)) {
    29. long blockSize = file.getBlockSize();
    30. //blockSize集群的默认值是128m本地是32m
    31. long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    32. long bytesRemaining = length;
    33. //这里大于1.1的时候才切分
    34. while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
    35. int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
    36. splits.add(makeSplit(path, length-bytesRemaining, splitSize,
    37. blkLocations[blkIndex].getHosts(),
    38. blkLocations[blkIndex].getCachedHosts()));
    39. bytesRemaining -= splitSize;
    40. }
    41. if (bytesRemaining != 0) {
    42. int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
    43. splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
    44. blkLocations[blkIndex].getHosts(),
    45. blkLocations[blkIndex].getCachedHosts()));
    46. }
    47. } else { // not splitable
    48. if (LOG.isDebugEnabled()) {
    49. // Log only if the file is big enough to be splitted
    50. if (length > Math.min(file.getBlockSize(), minSize)) {
    51. LOG.debug("File is not splittable so no parallelization "
    52. + "is possible: " + file.getPath());
    53. }
    54. }
    55. splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
    56. blkLocations[0].getCachedHosts()));
    57. }
    58. } else {
    59. //Create empty hosts array for zero length files
    60. splits.add(makeSplit(path, 0, length, new String[0]));
    61. }
    62. }
    63. // Save the number of input files for metrics/loadgen
    64. job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    65. sw.stop();
    66. if (LOG.isDebugEnabled()) {
    67. LOG.debug("Total # of splits generated by getSplits: " + splits.size()
    68. + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
    69. }
    70. return splits;
    71. }

    上面计算切片大小的公式

    1. protected long computeSplitSize(long blockSize, long minSize,
    2. long maxSize) {
    3. return Math.max(minSize, Math.min(maxSize, blockSize));
    4. }

    也就是他总是取中间值,如果想改变大小,要么最大值比blockSize小,要么minsize比blockSize大

    总结:

    • 如果maxSize比blockSize小最后的切片大小就是maxSize
    • 如果minSize比blockSize大最后的切片大小就是minSize
    • 如果不改变两个值那么默认就是blockSize
    • 单独的文件单独切分
    • 还有切片的时候要切分的文件要大于切片的比例的1.1才切分

    InputFormat数据输入

    1. public abstract class InputFormat<K, V> {
    2. //获得分片数据
    3. public abstract
    4. List<InputSplit> getSplits(JobContext context
    5. ) throws IOException, InterruptedException;
    6. //创建RecordReader负责数据的读取
    7. public abstract
    8. RecordReader<K,V> createRecordReader(InputSplit split,
    9. TaskAttemptContext context
    10. ) throws IOException,
    11. InterruptedException;
    12. }

    类的继承关系

     FileInputFormat的继承图

    TextInputFormat对FileInputFormat读取数据进行了实现createRecordReader()

     CombineFileInputFormat 重写了getSplits方法,它的作用是解决小文件问题的

    1. public List<InputSplit> getSplits(JobContext job)
    2. throws IOException {
    3. long minSizeNode = 0;
    4. long minSizeRack = 0;
    5. long maxSize = 0;
    6. Configuration conf = job.getConfiguration();
    7. // the values specified by setxxxSplitSize() takes precedence over the
    8. // values that might have been specified in the config
    9. if (minSplitSizeNode != 0) {
    10. minSizeNode = minSplitSizeNode;
    11. } else {
    12. minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
    13. }
    14. if (minSplitSizeRack != 0) {
    15. minSizeRack = minSplitSizeRack;
    16. } else {
    17. minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
    18. }
    19. if (maxSplitSize != 0) {
    20. maxSize = maxSplitSize;
    21. } else {
    22. maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
    23. // If maxSize is not configured, a single split will be generated per
    24. // node.
    25. }
    26. if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
    27. throw new IOException("Minimum split size pernode " + minSizeNode +
    28. " cannot be larger than maximum split size " +
    29. maxSize);
    30. }
    31. if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
    32. throw new IOException("Minimum split size per rack " + minSizeRack +
    33. " cannot be larger than maximum split size " +
    34. maxSize);
    35. }
    36. if (minSizeRack != 0 && minSizeNode > minSizeRack) {
    37. throw new IOException("Minimum split size per node " + minSizeNode +
    38. " cannot be larger than minimum split " +
    39. "size per rack " + minSizeRack);
    40. }
    41. // all the files in input set
    42. List<FileStatus> stats = listStatus(job);
    43. List<InputSplit> splits = new ArrayList<InputSplit>();
    44. if (stats.size() == 0) {
    45. return splits;
    46. }
    47. // In one single iteration, process all the paths in a single pool.
    48. // Processing one pool at a time ensures that a split contains paths
    49. // from a single pool only.
    50. for (MultiPathFilter onepool : pools) {
    51. ArrayList<FileStatus> myPaths = new ArrayList<FileStatus>();
    52. // pick one input path. If it matches all the filters in a pool,
    53. // add it to the output set
    54. for (Iterator<FileStatus> iter = stats.iterator(); iter.hasNext();) {
    55. FileStatus p = iter.next();
    56. if (onepool.accept(p.getPath())) {
    57. myPaths.add(p); // add it to my output set
    58. iter.remove();
    59. }
    60. }
    61. // create splits for all files in this pool.
    62. getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits);
    63. }
    64. // create splits for all files that are not in any pool.
    65. getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits);
    66. // free up rackToNodes map
    67. rackToNodes.clear();
    68. return splits;
    69. }

     它的切片机制

    设置它作为指定的inputformat

    1. job.setInputFormatClass(CombineTextInputFormat.class);
    2. CombineTextInputFormat.setMaxInputSplitSize(job,4194304);

    总结:

    • inputformat的作用就是读取文件还有更具文件制定切片规则
    • TextInputFormat使用的是默认的分片规则,也就是FileInputFormat的.
    • CombineTextInputFormat重写了分片规则,它有一个虚拟存储的概念,更具虚拟存储计算最终的分片大小

     Shuffle机制

    map方法之后reduce方法之前这个过程为shuffle

    MapTask

    在run方法里面

    1. if (isMapTask()) {
    2. // If there are no reducers then there won't be any sort. Hence the map
    3. // phase will govern the entire attempt's progress.
    4. //如果reducetask的个数为零那么就不进行排序
    5. if (conf.getNumReduceTasks() == 0) {
    6. mapPhase = getProgress().addPhase("map", 1.0f);
    7. } else {
    8. // If there are reducers then the entire attempt's progress will be
    9. // split between the map phase (67%) and the sort phase (33%).
    10. //有reducetask的时候先map后排序
    11. mapPhase = getProgress().addPhase("map", 0.667f);
    12. sortPhase = getProgress().addPhase("sort", 0.333f);
    13. }
    14. }

    ReduceTask

    在run方法里面,有复制,排序阶段

    1. if (isMapOrReduce()) {
    2. copyPhase = getProgress().addPhase("copy");
    3. sortPhase = getProgress().addPhase("sort");
    4. reducePhase = getProgress().addPhase("reduce");
    5. }

    总结:也就是如果没有reducetask那么就只是执行map方法,如果有reduce那么会执行 Shuffle

    map-sort-copy-sort-reduce

     图解Shuffle机制

    • 先map处理数据得到<k,v>数据,然后进入环形缓冲区,环形缓冲区处理的过程中会使用默认的分区(如果没有自定义的话),更具hash取模的方式得到分区编号当到80%的时候发生文件溢出
    • 溢出以后对于分区数据进行key的排序(默认是字符排序规则),排序的时候只会排序索引,不移动数据
    • 如果添加了Combiner操作的话,先执行一直预合并
    • 为了减少reducetask的压力,多个溢出文件会在maptask合并,也就是多个溢出文件,合并成一个大文件,里面包含了分区分界,合并的过程也会排序,最终合并的文件写入磁盘(因为当时的技术原因,数据量如果太大不写入磁盘就会出现内存溢出) 
    • 然后到了reduceTask阶段,先copy文件,如果文件太大先溢出到磁盘,然后更具磁盘的分区数据进行再次排序操作,(注意map过来的数据先分区内排序,比如分区1过来的数据,然后先一块排序,reduce还会有一个分组的过程,按照相同的key进行分组

    重点:也就是说数据到达reduce以后分组的过程是可以自定义的,后面讲解

    Partitioner

    分区器的分区数是和reduceTask的多少有关

     默认的分区器为,更具hash取模numReduceTasks

    1. public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
    2. public void configure(JobConf job) {}
    3. /** Use {@link Object#hashCode()} to partition. */
    4. public int getPartition(K2 key, V2 value,
    5. int numReduceTasks) {
    6. //Integer.MAX_VALUE 二进制是01111...因为key.hashCode()可能为负数,与操作以后就是正数了
    7. return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    8. }
    9. }

    Partitioner原理MapTask时序图分析

     分区器的代码剖析

    1. NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
    2. JobConf job,
    3. TaskUmbilicalProtocol umbilical,
    4. TaskReporter reporter
    5. ) throws IOException, ClassNotFoundException {
    6. collector = createSortingCollector(job, reporter);
    7. // public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); }
    8. //如果不配置那么默认的reduceTask是1个mapreduce.job.reduces
    9. partitions = jobContext.getNumReduceTasks();
    10. if (partitions > 1) {
    11. //如果大于一个reducetask,默认使用 HashPartitioner.class
    12. //如果自定义那么就使用mapreduce.job.partitioner.class,进入这个方法,
    13. //jobContext.getPartitionerClass()可以知道
    14. partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
    15. ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
    16. } else {
    17. //如果分区器就是默认一个那么就默认进入partitions - 1=0号分区
    18. partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
    19. @Override
    20. public int getPartition(K key, V value, int numPartitions) {
    21. return partitions - 1;
    22. }
    23. };
    24. }
    25. }

    自定义分区器

    1. //<Student, IntWritable> 表示map方法得到的<K,V>
    2. public class MyPartition extends Partitioner<Student, IntWritable> {
    3. @Override
    4. public int getPartition(Student student, IntWritable intWritable, int numPartitions) {
    5. return student.getId()%2;
    6. }
    7. }
    job.setPartitionerClass(MyPartition.class);

    设置的原理

    1. public void setPartitionerClass(Class<? extends Partitioner> cls
    2. ) throws IllegalStateException {
    3. ensureState(JobState.DEFINE);
    4. //PARTITIONER_CLASS_ATTR为mapreduce.job.partitioner.class
    5. conf.setClass(PARTITIONER_CLASS_ATTR, cls,
    6. Partitioner.class);
    7. }

    分区器的注意事项:

    • 分区号必须是从0开始,逐一累加.
    • 如果reduce的个数设置为1,那么会走默认的partition-1的分区器(就是不会走自己的分区器)
    • reducetask的个数不能少于自定义分区的数目,比如reduce设置的是2那么只有0,1两个reduceTask处理,如果自定义的partitions返回了一个3,那么就会报错,因为没有一个reduceTask为3的去处理
    • 如果reduceTask>分区数,那么就会有空的文件出现

    总结:

    也就是如果分区器返回的分区序号是0那么就是又reduce为零的处理,如果返回的为2如果没有设置reduceTask的数目为3的话,就会报错,reduceTask为3对应的分区序号是0,1,2,所以分区器返回的序号要包含0,1,2才能执行。如果分区序号返回了比如0,1,2,3,那么3的分区序号就没有reduceTask处理就会报错,如果分区序号返回了比如0,1,那么reduceTask为2的就处理不到数据就会是空文件

    WritableComparable排序(Shuffle机制排序关键)

    hadoop默认的排序是字典排序,默认的排序方法是快速排序

    字典排序 根据单词的前后顺序排序

    排序涉及到的地方

    排序分类 

    • 部分排序:就是reduce分区内有序
    • 全排序:就是只有一个reduce一个文件的时候排序操作
    • 负载排序:就是reduce分组的时候的key的比较操作

    hadoop排序比较的类和接口

    • WritableComparable:排序的接口(支持序列化和比较功能)
    • WritableComparator:比较器

    hadoop如何实现比较的?

    使用的是比较器WritableComparator

    自定义比较器的例子

    Student
    1. public class Student implements WritableComparable<Student> {
    2. private int id;
    3. private String name;
    4. private int sore;
    5. public Integer getId() {
    6. return id;
    7. }
    8. public void setId(Integer id) {
    9. this.id = id;
    10. }
    11. public String getName() {
    12. return name;
    13. }
    14. public void setName(String name) {
    15. this.name = name;
    16. }
    17. public Integer getSore() {
    18. return sore;
    19. }
    20. public void setSore(Integer sore) {
    21. this.sore = sore;
    22. }
    23. public Student() {
    24. super();
    25. }
    26. @Override
    27. public void write(DataOutput out) throws IOException {
    28. out.writeInt(id);
    29. out.writeUTF(name);
    30. out.writeInt(sore);
    31. }
    32. @Override
    33. public void readFields(DataInput in) throws IOException {
    34. this.id = in.readInt();
    35. this.name = in.readUTF();
    36. this.sore = in.readInt();
    37. }
    38. @Override
    39. public String toString() {
    40. return this.id+"\t"+this.name +"\t"+this.sore;
    41. }
    42. @Override
    43. public int compareTo(Student o) {
    44. // int thisValue = this.sore;
    45. // int thatValue = o.sore;
    46. // return (thisValue < thatValue ? 1 : (thisValue==thatValue ? 0 : -1));
    47. return 0;
    48. }
    49. }
    MyWritableComparator
    1. public class MyWritableComparator extends WritableComparator {
    2. public MyWritableComparator() {
    3. super(Student.class, true);
    4. }
    5. @Override
    6. public int compare(WritableComparable a, WritableComparable b) {
    7. Student aStudent=(Student) a;
    8. Student bStudent = (Student) b;
    9. return -aStudent.getId().compareTo(bStudent.getId());
    10. }
    11. }

    设置比较器

    job.setSortComparatorClass(MyWritableComparator.class);

    自定义比较接口实现排序

    1. /**
    2. * 因为mapper是根据key来比较的所以这时候就实现WritableComparable
    3. */
    4. public class Student implements WritableComparable<Student> {
    5. private int id;
    6. private String name;
    7. private int sore;
    8. public Integer getId() {
    9. return id;
    10. }
    11. public void setId(Integer id) {
    12. this.id = id;
    13. }
    14. public String getName() {
    15. return name;
    16. }
    17. public void setName(String name) {
    18. this.name = name;
    19. }
    20. public Integer getSore() {
    21. return sore;
    22. }
    23. public void setSore(Integer sore) {
    24. this.sore = sore;
    25. }
    26. public Student() {
    27. super();
    28. }
    29. @Override
    30. public void write(DataOutput out) throws IOException {
    31. out.writeInt(id);
    32. out.writeUTF(name);
    33. out.writeInt(sore);
    34. }
    35. @Override
    36. public void readFields(DataInput in) throws IOException {
    37. this.id = in.readInt();
    38. this.name = in.readUTF();
    39. this.sore = in.readInt();
    40. }
    41. @Override
    42. public String toString() {
    43. return this.id+"\t"+this.name +"\t"+this.sore;
    44. }
    45. @Override
    46. public int compareTo(Student o) {
    47. int thisValue = this.sore;
    48. int thatValue = o.sore;
    49. return (thisValue < thatValue ? 1 : (thisValue==thatValue ? 0 : -1));
    50. // return 0;
    51. }
    52. }

    hadoop比较的原理

    maptask里面的方法调用流程

    run->runNewMapper->NewOutputCollector->createSortingCollector->collector.init(context);

    ->// k/v serialization    comparator = job.getOutputKeyComparator();

    1. // k/v serialization
    2. comparator = job.getOutputKeyComparator();

     上面就是比较器的源代码的关键

    1. public RawComparator getOutputKeyComparator() {
    2. //public static final String KEY_COMPARATOR = "mapreduce.job.output.key.comparator.class";
    3. //mapreduce.job.output.key.comparator.class默认没有设置
    4. Class<? extends RawComparator> theClass = getClass(
    5. JobContext.KEY_COMPARATOR, null, RawComparator.class);
    6. if (theClass != null)
    7. //设置了比较器就走这里
    8. return ReflectionUtils.newInstance(theClass, this);
    9. //默认走这里(WritableComparable)如果继承了比较接口就会得到对应的比较处理器
    10. return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
    11. }
    12. //getMapOutputKeyClass()得到的是job.setMapOutputKeyClass(Student.class);
    1. public static WritableComparator get(
    2. Class<? extends WritableComparable> c, Configuration conf) {
    3. //从comparators的map里面根据c得到key的比较器对象,这里如果是默认的比如IntWritable他们有默认的比较器,那么就会返回,comparator 就不为空,反之就为空
    4. WritableComparator comparator = comparators.get(c);
    5. if (comparator == null) {
    6. // force the static initializers to run
    7. //如果没有,强制类加载
    8. forceInit(c);
    9. // look to see if it is defined now
    10. comparator = comparators.get(c);
    11. // if not, use the generic one
    12. //如果还是没有那么就创建一个比较器
    13. if (comparator == null) {
    14. comparator = new WritableComparator(c, conf, true);
    15. }
    16. }
    17. // Newly passed Configuration objects should be used.
    18. ReflectionUtils.setConf(comparator, conf);
    19. return comparator;
    20. }
    21. =========================
    22. 下面是创建WritableComparator
    23. protected WritableComparator(Class<? extends WritableComparable> keyClass,
    24. Configuration conf,
    25. boolean createInstances) {
    26. this.keyClass = keyClass;
    27. this.conf = (conf != null) ? conf : new Configuration();
    28. //createInstances这里注意要传入true才能比较
    29. if (createInstances) {
    30. key1 = newKey();
    31. key2 = newKey();
    32. buffer = new DataInputBuffer();
    33. } else {
    34. key1 = key2 = null;
    35. buffer = null;
    36. }
    37. }
    38. ============由于是比较器那么我们查看下他的比较方法=========
    39. public int compare(WritableComparable a, WritableComparable b) {
    40. return a.compareTo(b);
    41. }
    42. @Override
    43. public int compare(Object a, Object b) {
    44. return compare((WritableComparable)a, (WritableComparable)b);
    45. }
    46. 总结:
    47. 在mapreduce比较key的时候最终会执行的比较方法,
    48. public int compare(WritableComparable a, WritableComparable b) {
    49. return a.compareTo(b);
    50. }
    51. @Override
    52. public int compare(Object a, Object b) {
    53. return compare((WritableComparable)a, (WritableComparable)b);
    54. }
    55. 所以要么实现WritableComparable 接口,要么自定义一个比较器
    56. public class MyWritableComparator extends WritableComparator {
    57. public MyWritableComparator() {
    58. //true设置的是createInstances
    59. super(Student.class, true);
    60. }
    61. //这里重写WritableComparator 的比较方法,低层调用的时候会使用这个比较器方法比较
    62. @Override
    63. public int compare(WritableComparable a, WritableComparable b) {
    64. Student aStudent=(Student) a;
    65. Student bStudent = (Student) b;
    66. return -aStudent.getId().compareTo(bStudent.getId());
    67. }
    68. }
    69. 然后设置
    70. job.setSortComparatorClass(MyWritableComparator.class);

    下面说明IntWritable的比较器实现(下面的代码也能说明上面代码的原因),hadoop自身的数据类型都实现了自己的比较器

    1. public static class Comparator extends WritableComparator {
    2. public Comparator() {
    3. super(IntWritable.class);
    4. }
    5. @Override
    6. public int compare(byte[] b1, int s1, int l1,
    7. byte[] b2, int s2, int l2) {
    8. int thisValue = readInt(b1, s1);
    9. int thatValue = readInt(b2, s2);
    10. return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
    11. }
    12. }
    13. static {
    14. //这里注册了自己的类对应的比较器 // register this comparator
    15. WritableComparator.define(IntWritable.class, new Comparator());
    16. }

    总结:

    mapreduce的排序有两种方式,一种是自定义WritableComparator,一种是实现WritableComparable,当时最终使用的还是WritableComparator,只不过实现WritableComparable接口的时候,系统默认的根据接口创建一个WritableComparator

    1. if (comparator == null) {
    2.         comparator = new WritableComparator(c, conf, true);
    3.       }

    Combiner合并

    • Combiner是MR程序中Mapper和Reducer之外的一种组件。 
    • Combiner组件的父关就是Reducer。.
    • Combiner是在每一个MapTask所在的节点运行;Reducer是接收全局所有Mapper的输出结果.
    • Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量。

    那么如果只是汇总的话Combiner可以直接使用Reducer的代码

    job.setCombinerClass(StudentReduce.class);

    效果图 

     ReduceTask分组比较器(辅助比较器)

    说明:

    reduce分组,对于key相同的一组<key,value>数据分配到一组

    job.setGroupingComparatorClass(MyWritableComparator.class);

    作用的地方 

     分组比较器源代码追踪

    ReduceTask方法调用过程
    run->RawComparator comparator = job.getOutputValueGroupingComparator();
    1. public RawComparator getOutputValueGroupingComparator() {
    2. //GROUP_COMPARATOR_CLASS = "mapreduce.job.output.group.comparator.class"
    3. //默认是为空
    4. Class<? extends RawComparator> theClass = getClass(
    5. JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
    6. if (theClass == null) {
    7. //如果为空那么就是根据WritableComparable生成默认的比较器
    8. return getOutputKeyComparator();
    9. }
    10. return ReflectionUtils.newInstance(theClass, this);
    11. }
    12. ================这个方法和上面的排序方法分析一样可以参考上面的mapreduce比较器============
    13. public RawComparator getOutputKeyComparator() {
    14. Class<? extends RawComparator> theClass = getClass(
    15. JobContext.KEY_COMPARATOR, null, RawComparator.class);
    16. if (theClass != null)
    17. return ReflectionUtils.newInstance(theClass, this);
    18. return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
    19. }

    使用排序和辅助排序的例子

    需求:

    • 根据id,还有金额排序
    • 然后id一样的分到一个组里面 

    这个时候我们就可以用id,还有金额实现 WritableComparable进行排序,然后自定义WritableComparator根据id进行分组(key一样的在一个分组)

    job.setGroupingComparatorClass(MyWritableComparator.class);

     原理:

    分组的原理,对于排好的数据,分组他是更具上面的一条数据和下面的一条数据进行比较,如果相同表示在一个组里面,如果不相同那么就是一个新的组开始,所以分组必须先排序,分组不是真正的分开,他是根据上一条数据和下一条数据的比较决定是否要新开始一个组

    读取key的时候一直是一个的原理

    OrderBean key 就是一个引用,hadoop在数据改变的时候会让这个引用指向堆里面的新数据 

    在没有东西要写的时候可以使用NullWritable.get()

    OutputFormat

    1. public interface OutputFormat<K, V> {
    2. //负责数据的写出
    3. RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,
    4. String name, Progressable progress)
    5. throws IOException;
    6. //检查路径是否存在
    7. void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException;
    8. }

    默认子类抽象FileOutputFormat

    1. public void checkOutputSpecs(FileSystem ignored, JobConf job)
    2. throws FileAlreadyExistsException,
    3. InvalidJobConfException, IOException {
    4. // Ensure that the output directory is set and not already there
    5. Path outDir = getOutputPath(job);
    6. if (outDir == null && job.getNumReduceTasks() != 0) {
    7. throw new InvalidJobConfException("Output directory not set in JobConf.");
    8. }
    9. if (outDir != null) {
    10. FileSystem fs = outDir.getFileSystem(job);
    11. // normalize the output directory
    12. outDir = fs.makeQualified(outDir);
    13. setOutputPath(job, outDir);
    14. // get delegation token for the outDir's file system
    15. TokenCache.obtainTokensForNamenodes(job.getCredentials(),
    16. new Path[] {outDir}, job);
    17. // check its existence
    18. //如果文件存在报的错误
    19. if (fs.exists(outDir)) {
    20. throw new FileAlreadyExistsException("Output directory " + outDir +
    21. " already exists");
    22. }
    23. }
    24. }

    默认的子类实现TextOutputFormat

    计数器

    介绍

    计数器就是每调用一次,就会加上相应的数值,然后如下图打印到控制台

    1. context.getCounter("mycount","mymapcount").increment(1);
    2. 表示每执行一次计数器加一

     Job提交流程源码图解

    https://www.aliyundrive.com/s/HDBhGwM1zEV  

    job.waitForCompletion(true)源码开始

     

     Job提交然后执行另外一个job线程流程图解

     https://www.aliyundrive.com/s/gaDEn28iRgb 

     

    MapTask源码爆肝图解

    https://www.aliyundrive.com/s/3FTs5ZLMAge 

     ReductTask源码爆肝图解

    https://www.aliyundrive.com/s/WFKqi3yzx8C

    源码图解百度网盘下载

    链接:https://pan.baidu.com/s/184FKNuk3ryUDTyqg7auQVg 
    提取码:yyds 

  • 相关阅读:
    图解算法,原理逐步揭开「GitHub 热点速览」
    【JQuery_Ajax_方法使用】Ajax的JQuery函数/方法
    开发家政小程序的优点
    【To .NET】C#数据模型,从Entity Framework Core到LINQ
    c/c++面试题
    第6章 数据库事务 & 第7章 DAO及相关实现类
    网络安全(黑客技术)自学内容
    leetcode刷题 (6.1) 字符串
    MySQL安装TokuDB引擎
    c++ primer中文版第五版作业第十一章
  • 原文地址:https://blog.csdn.net/S1124654/article/details/125534276