• 【Hadoop】学习笔记(五)


    三、MapReduce

    3.6、Hadoop序列化

    序列化:就是把内存中的对象转换成字节序列(或其他数据传输协议),以便于存储到磁盘(持久化)和网络传输。

    反序列化:就是将收到的字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。

    因为Java自带的序列化存储了很多额外信息(各种校验、Header、继承体系等),是一个重量级序列化框架,不便于在网络中传输,所以Hadoop有自己的一套序列化。

    Hadoop序列化的特点:

    • 紧凑:存储空间少
    • 快速:传输速度快
    • 互操作性:支持多语言的交互

    实际开发中,Hadoop自带的Text、IntWritable等基本的序列化类型往往不够用,需要自定义一些可序列化的 JavaBean。

    自定义需要序列化的类:

    1. 必须实现Writable接口
    2. 反序列化时,需要反射调用空参构造函数,所以必须要有空参构造
    3. 实现序列化方法write():
    4. 实现反序列化方法readFields():
    5. 注意反序列化顺序要和序列化的顺序完全一致(先进先出)
    6. 如果想把结果显示在文件中,还需要重写 toString()方法
    7. 如果要把自定义的类的对象放在key中传输,则还需要实现 Comparable接口,因为MapReduce框架中的Shuffle过程要求key必须能够排序
    package com.study.mapreduce.writable;
    
    import org.apache.hadoop.io.Writable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    public class MyWritable implements Writable,Comparable<MyWritable> {
    
        private Integer id;
        private Long scale;
        private Integer age;
    
        /**
         * 需要有无参构造器
         */
        public MyWritable() {
        }
    
        /**
         * 序列化
         * @param out
         * @throws IOException
         */
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeInt(id);
            out.writeLong(scale);
            out.writeInt(age);
        }
    
        /**
         * 反序列化
         * @param in
         * @throws IOException
         */
        @Override
        public void readFields(DataInput in) throws IOException {
            // 读取顺序要和write方法写的顺序一致,即先进先出
            id = in.readInt();
            scale = in.readLong();
            age = in.readInt();
        }
    
        /**
         * 如果想当做key在MapReduce中传输,需要实现Comparable,因为Shuffle过程要求key必须能排序
         * @param o
         * @return
         */
        @Override
        public int compareTo(MyWritable o) {
            return this.id > o.getId() ? -1 : 1;
        }
        
         /**
         * 为方便查看,还可以重写toString()方法
         * @return
         */
        @Override
        public String toString() {
            MessageFormat mf = new MessageFormat("MyWritable:{id:[0], scale:[1], age:[2]}");
            return mf.format(new Object[]{id, scale, age});
        }
    
        // 生成getter/setter
    
        public Integer getId() {
            return id;
        }
    
        public void setId(Integer id) {
            this.id = id;
        }
    
        public Long getScale() {
            return scale;
        }
    
        public void setScale(Long scale) {
            this.scale = scale;
        }
    
        public Integer getAge() {
            return age;
        }
    
        public void setAge(Integer age) {
            this.age = age;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91

    3.7、MapReduce框架

    3.7.1、MapReduce框架原理

    MapReduce框架运行流程:

    在这里插入图片描述

    3.7.2、InputFormat数据输入

    InputFormat是一个抽象类,它有很多实现类,例如 FileInputFormat、CombineFileInputFormat、TextInputFormat等。

    前面的WordCount示例中,我们使用文件输入,使用的就是 FileInputFormat。

    InputFormat类有两个抽象方法:

    • getSplits:判断当前的输入的文件是否可以被切分
    • createRecordReader:创建一个Reader
    3.7.2.1、切片与MapTask并行度决定机制

    MapTask的并行度决定Map阶段的任务处理并发度,进而影响到整个 Job 的处理速度。

    数据块:Block是HDFS物理上把数据分成一块一块的,数据块是HDFS存储数据单位。

    数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask。

    数据块是物理上的分开存储,例如一个129Mb的数据文件,在存入 hdfs 时,因为hdfs一个数据块默认大小只有128Mb,所以会被分成两个数据块存储:block0存储128Mb,block1存储1Mb,这两个数据块可能存储在不同的服务器上,这个是物理上的分开存储。

    数据切片是逻辑上的切片,不是真正的物理磁盘上分开存储。例如将这个129Mb的文件切成一个100Mb和一个29Mb两个片,那么只会找个位置记录下来:0-100索引位置属于第一个片,100-129索引位置属于第二个片,并不影响物理上的存储。

    切片的大小会影响到执行的效率:

    • 如果切片个数太少,例如 1Gb 的数据只切2个切片,那么就只会有两个MapTask并行执行,效率太低;
    • 如果切片个数太多,例如 1Kb 的数据切了5个切片,那么初始化的过程可能比真正执行花费的时间还长,效率也不高;
    • 如果切片大小和数据块大小不一致,例如每个数据块128Mb,每个切片设置为100Mb,那么数据块上剩余的28Mb就需要通过网络IO传递给其他节点进行执行,效率也不高;

    如图:对于一个300Mb的文件切成3片(即产生3个MapTask)
    如果切片大小设置为100Mb(红色线条),那么:

    ● 第一个MapTask只能执行DataNode1上存储的128Mb数据中的100Mb
    ● MapTask2需要通过网络IO获取DataNode1上剩余的28Mb,加上DataNode2上的一部分,组成100Mb
    ● MapTask3需要通过网络IO获取DataNode2上剩余的数据,加上DataNode3上的进行处理

    这样就会因为这些网络IO导致效率低下。

    如果切片代销设置为128Mb(蓝色线条),那么:

    ● MapTask1执行本机DataNode1上的128Mb
    ● MapTask2执行本机DataNode2上的128Mb
    ● MapTask3执行本机DataNode3上的数据

    没有了网络IO,效率就会高很多
    在这里插入图片描述
    Hadoop默认参数中,切片大小就是等于数据块大小。(也可以自定义配置,但不推荐)

    切片时,不考虑数据集整体,而是逐个针对每一个文件单独切片。

    例如:输入数据中有两个文件(a.txt、b.txt),其中 a.txt 大小为300Mb, b.txt大小为100Mb。切片时,不会按整体400Mb切分,而是逐个对这两个文件进行切分。切片大小为128Mb的话:

    先将a.txt切分成3个片:128Mb、128Mb、44Mb,然后再将b.txt切分成一个100Mb的片。最后产生4个切片、4个MapTask。

    而不是将b.txt追加到a.txt上,因为b.txt的block和a.txt的block不是同一个,如果合一起切就又会产生网络IO。

    3.7.2.2、job提交流程的源码分析

    根据WordCount程序可知,MapReduce中真正执行job的是job.waitForCompletion(true)方法。job.waitForCompletion(true)方法中调用了submit()提交job。

    submit()方法的源码如下:

    public void submit() throws IOException, InterruptedException, ClassNotFoundException {
        ensureState(JobState.DEFINE);  // 确认job的状态为DEFINE(未运行)状态
        setUseNewAPI();  // 如果使用的是mapred包(hadoop 1.x)中的类,做一些特殊配置进行兼容
    
        // 获取hadoop集群的连接
        // connect()方法内部会调用到Cluster类构造方法,Cluster构造方法会调用initialize()初始化方法
        // Cluster的initialize()初始化方法中,会得到两个Provider:YarnClientProtocolProvider、LocalClientProtocolProvider
        // 如果配置项mapreduce.framework.name的值为yarn,则使用YarnClientProtocolProvider;如果为local,则使用LocalClientProtocolProvider
        // mapreduce.framework.name在mapred-default.xml默认值为local,我们本地的hadoop没有修改该配置项,所以本地运行WordCount会使用本地模式运行,即输入、输出路径都是本地路径。而我们的hadoop102集群中,在mapred-site.xml中将该值配置为了yarn,所以在hadoop102上运行WordCount程序时走的是Yarn,即输入、输出路径都是hdfs路径。
        connect(); 
        final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
        
        status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
            public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {
                // 向集群提交job信息
                return submitter.submitJobInternal(Job.this, cluster);
            }
        });
        state = JobState.RUNNING;  // 将DEFINE(未运行)状态改为 RUNNING(运行)
        LOG.info("The url to track the job: " + getTrackingURL());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    submitter.submitJobInternal(Job.this, cluster)向集群提交job信息的源码:

    JobStatus submitJobInternal(Job job, Cluster cluster) 
      throws ClassNotFoundException, InterruptedException, IOException {
    
        // 校验output文件夹是否存在。如果存在,会抛出异常output文件夹已存在 
        checkSpecs(job);
    
        Configuration conf = job.getConfiguration();
        addMRFrameworkToDistributedCache(conf);
    
        // 获取临时路径(该路径后面会用于临时存放job的切片等信息,处理完毕后该文件夹会清空)
        
        // 该路径的前面一部分可以通过mapreduce.jobtracker.staging.root.dir进行配置
        // 如果没有配置,则默认取/tmp/hadoop/mapred/staging
        // 然后再后面加上 <当前用户名(用户名为空则取dummy)+随机数>/.staging
        // 例如:/tmp/hadoop/mapred/staging/tengyer2113150384/.staging
        Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
        
    	// ......配置一些命令行信息。省略
        
        // 创建一个jobID,提交的每个任务都有一个唯一的jobID,例如:job_local2113150384_0001
        JobID jobId = submitClient.getNewJobID();
        job.setJobID(jobId);
        // 在上面创建的/tmp/hadoop/mapred/staging/tengyer2113150384/.staging基础上,
        // 创建一个新的path:/tmp/hadoop/mapred/staging/tengyer2113150384/.staging/job_local2113150384_0001
        Path submitJobDir = new Path(jobStagingArea, jobId.toString());
        JobStatus status = null;
        try {
            
          // ........ 中间这一块是设置一些配置信息、缓存信息,省略    
         
          // 将job的jar包、依赖、配置文件等内容提交到集群
          // yarn模式才会将jar包提交到集群,local模式不提交
          copyAndConfigureFiles(job, submitJobDir);
    
          Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
            
          LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
            
          // 切片,将切片信息临时保存到/tmp/hadoop/mapred/staging/tengyer2113150384/.staging/job_local2113150384_0001
          // 会生成 job.split、.job.split.crc、job.splitmetainfo、.job.splitmetainfo.crc文件,保存切片信息
          int maps = writeSplits(job, submitJobDir);
           // 将切片的个数赋值给将来要创建的MapTask数量,有几个切片就有几个MapTask
          conf.setInt(MRJobConfig.NUM_MAPS, maps); 
          LOG.info("number of splits:" + maps);
    
          int maxMaps = conf.getInt(MRJobConfig.JOB_MAX_MAP,
              MRJobConfig.DEFAULT_JOB_MAX_MAP);
          if (maxMaps >= 0 && maxMaps < maps) {
            throw new IllegalArgumentException("The number of map tasks " + maps +
                " exceeded limit " + maxMaps);
          }
    
          // ....... 队列、缓存等信息。省略
    
          // 在/tmp/hadoop/mapred/staging/tengyer2113150384/.staging/job_local2113150384_0001下创建 jbo.xml、.job.xml.crc
          // job.xml中保存了job运行需要的配置参数信息
          writeConf(conf, submitJobFile);
          
          //
          // Now, actually submit the job (using the submit name)
          // 真正的提交 job 
          //
          printTokens(jobId, job.getCredentials());
          status = submitClient.submitJob(
              jobId, submitJobDir.toString(), job.getCredentials());
          if (status != null) {
            return status;
          } else {
            throw new IOException("Could not launch job");
          }
        } finally {
          if (status == null) {
            LOG.info("Cleaning up the staging area " + submitJobDir);
            if (jtFs != null && submitJobDir != null)
              jtFs.delete(submitJobDir, true);
    
          }
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79

    最后,在job.waitForCompletion(true)运行完monitorAndPrintJob()方法后,/tmp/hadoop/mapred/staging/tengyer2113150384/.staging/job_local2113150384_0001文件夹被清空。

    在这里插入图片描述

    3.7.2.3、切片源码

    submitter.submitJobInternal(Job.this, cluster)向集群提交job信息的源码中,执行到int maps = writeSplits(job, submitJobDir);时会进行切片。

    writeSplits(job, submitJobDir)中,会进行判断,Hadoop 1.x的程序会调用 writeOldSplits(jConf, jobSubmitDir),Hadoop 2.x的程序会调用writeNewSplits(job, jobSubmitDir)

    writeNewSplits(job, jobSubmitDir)会调用具体的InputFormat实现类的getSplits(job)去进行切片。·

    public List<InputSplit> getSplits(JobContext job) throws IOException {
        StopWatch sw = new StopWatch().start();
        
        // getFormatMinSplitSize()在本类中固定返回1
        // getMinSplitSize(job)获取配置项mapreduce.input.fileinputformat.split.minsize的值,在mapred-default.xml中该配置项默认值为0
        // 所以两个数取最大值结果为1
        long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
        
        // 获取配置项mapreduce.input.fileinputformat.split.maxsize的值,mapred-default.xml中默认没有该配置项
        // 没有该配置项时,取默认值:Long.MAX_VALUE
        long maxSize = getMaxSplitSize(job);
    
        // generate splits
        List<InputSplit> splits = new ArrayList<InputSplit>();
        List<FileStatus> files = listStatus(job);
       
        boolean ignoreDirs = !getInputDirRecursive(job)
            && job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
        
        // 循环遍历所有输入文件,分别对每个文件进行分片
        // 这也证明了:切片时,不考虑数据集整体,而是逐个针对每一个文件单独切片。
        for (FileStatus file: files) {
            if (ignoreDirs && file.isDirectory()) {
                continue;
            }
            
            // 获取文件路径、内容长度
            Path path = file.getPath();
            long length = file.getLen();
            if (length != 0) {
                BlockLocation[] blkLocations;
                if (file instanceof LocatedFileStatus) {
                    blkLocations = ((LocatedFileStatus) file).getBlockLocations();
                } else {
                    FileSystem fs = path.getFileSystem(job.getConfiguration());
                    blkLocations = fs.getFileBlockLocations(file, 0, length);
                }
                
                // 判断文件是否允许切片
                // 对于普通文本文件,从文件中间截断不会有影响。
                // 但是如果是一些压缩文件(有些压缩文件也支持切片),截断后内容不完整程序就无法处理类。所以一些压缩文件就只允许切成1个片(即没有切片)
                if (isSplitable(job, path)) {
                    
                    // 获取数据块大小。
                    // local本地模式,默认的数据块大小是 32Mb
                    // Hadoop 1.x:默认的数据块大小是 64Mb
                    // Hadoop 2.x/3.x:默认的数据块大小是 128Mb
                    long blockSize = file.getBlockSize(); 
                    
                    // 计算切片大小
                    // 该方法的实现为:Math.max(minSize, Math.min(maxSize, blockSize))
                    // 由前面的配置可知,minSize为1,maxSize为Long.MAX_VALUE,blockSize为 32Mb(当前是本地模式运行)
                    // 所以  Math.min(maxSize, blockSize) = 32Mb, Math.max(minSize, 32Mb) = 32Mb
                    // 即:默认情况下,切片大小等于块大小
                    long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    
                    
                    // 进行切片,只有大于切片大小的 SPLIT_SLOP(本类中该值固定为1.1) 倍才切。                
                    // 例如,传入一个 66Mb 的文件:
                    // 第一次进while时,66Mb > 35.2Mb(32Mb * 1.1), 会被切分一个 32Mb的片,剩余 34Mb
                    // 第二次进while时, 34Mb < 35.2Mb (32Mb * 1.1), 那么剩下的 34Mb 就不再切分了,就当做1个片处理
                    
                    // 这样做的好处是:防止过度切分,导致最后一台服务器过于空闲,初始化时间比真正处理数据时间还长,拉低效率。
                    // 例如上面剩下的 34Mb 如果继续切分成 32Mb 和 2 Mb的片,最后处理 2Mb 的那台机器真正处理的数据量太少,造成资源浪费、效率降低
                    long bytesRemaining = length;
                    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                        int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                        splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                                             blkLocations[blkIndex].getHosts(),
                                             blkLocations[blkIndex].getCachedHosts()));
                        bytesRemaining -= splitSize;
                    }
    
                    if (bytesRemaining != 0) {
                        int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                        splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                                             blkLocations[blkIndex].getHosts(),
                                             blkLocations[blkIndex].getCachedHosts()));
                    }
                } else { // not splitable
                    
                    // 对于不能切片的文件,直接设置成1个片
                    
                    if (LOG.isDebugEnabled()) {
                        // Log only if the file is big enough to be splitted
                        if (length > Math.min(file.getBlockSize(), minSize)) {
                            LOG.debug("File is not splittable so no parallelization "
                                      + "is possible: " + file.getPath());
                        }
                    }
                    splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                                         blkLocations[0].getCachedHosts()));
                }
            } else { 
                //Create empty hosts array for zero length files
                splits.add(makeSplit(path, 0, length, new String[0]));
            }
        }
        // Save the number of input files for metrics/loadgen
        job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
        sw.stop();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Total # of splits generated by getSplits: " + splits.size()
                      + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
        }
        return splits;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    3.7.2.4、流程

    在这里插入图片描述

    3.7.2.5、修改切片的大小

    根据切片的源码可知,切片的大小为:

    long splitSize = Math.max(minSize, Math.min(maxSize, blockSize));
    
    • 1

    所以,如果要修改切片大小,可以通过调整 minSize、maxSize 进行修改:

    • 如果想要调整的切片大小,大于数据块的大小,就需要调整 minSize的大小。max(minSize, min(Long.MAX_VALUE, blockSize))时就可以取到 minSize
    • 如果想要调整的切片大小,小于数据块的大小,就需要调整 maxSize的大小。max(1, min(maxSize, blockSize)) 时 就可以取到 maxSize
    • minSize可以通过mapred-site.xml的mapreduce.input.fileinputformat.split.minsize配置(默认在mapred-default.xml中配置为0);
    • maxSize可以通过mapred-site.xml的mapreduce.input.fileinputformat.split.maxsize配置(默认在mapred-default.xml中没有配置);
    3.7.2.6、CombineTextInputFormat切片机制

    默认的TextInputFormat切片机制是第任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask。这样如果有大量小文件,就会产生大量的MapTask,处理效率变低。

    CombineTextInputFormat用于小文件过多的场景,可以将多个小文件从逻辑上规划到一个切片中,这样多个小文件就可以交给一个或多个MapTask处理。

    3.7.3、MapReduce工作流程

    3.7.3.1、详细的执行过程

    在这里插入图片描述
    在这里插入图片描述

    流程:

    1. 客户端准备待处理文本
    2. 客户端在真正submit()前,规划好需要切分的片,在stage文件夹生成切片信息、job信息、jar包等文件
    3. 将stage中的切片信息文件、job信息、jar包提交给Yarn
    4. Yarn根据切片信息,进行切片划分MapTask
    5. MapTask中首先由InputFormat处理文件(默认是FileInputFormat的实现类TextInputFormat),如果isSplit方法判断结果允许分片,则创建RecorderReader读取文件并处理成为 key-value形式(TextInputFormat创建的是LineRecordReader,处理结果中 key 为偏移量,value为这一行的内容)。将处理的 key-value 传递给Mapper 作为输入
    6. Mapper 对传入的 key-value 进行逻辑运算,并将结果通过 context.write(key, value)输出给 ouputCollector
    7. Mapper的 key-value 结果会写到内存中的环形缓冲区中。因为计算结果直接输出到磁盘时,效率比较低下,所以设计了一个缓冲区。先将结果存入缓冲区,达到比例时才溢出(spill)写到磁盘上,溢写的时候根据 key 进行排序(默认根据key字典序排序)。
      在写入环形缓冲区之前,会先按照一定的规则对Mapper输出的键值对进行分区(partition)。默认情况下,只有一个ReduceTask,所以不分区。最后分区的数量就是ReduceTask的数量。
      环形缓冲区默认100Mb,分为两部分,一部分用来写索引(记录key和value的位置、分区信息等元数据),一部分用来写真实的数据记录。
      当环形缓冲区中存储的数据达到 80% 时,就开始逆着向磁盘中溢写:找到索引和数据的中间位置,倒序着向磁盘中写入。此时因为缓冲区还剩20%,所以即使有新数据进来也可以正常写入到内存缓冲区。
      当环形缓冲区达到100%时,因为末尾的数据刚刚已经溢写到了磁盘,所以数据可以反方向的向环中写入,覆盖掉已经写入磁盘中的数据。如果内存中的数据写入过多,将要覆盖掉还没写入磁盘的数据时,程序就会进入等待,等这部分数据被写入磁盘后,内存中新数据才能将这部分旧数据覆盖。
    8. 存入的数据会记录分区信息,当数据达到 80% 时,在溢写前会先进行排序,使用的是快排算法。排序时是修改的索引元数据中的keystart、valuestart,而不是直接移动 key-value
    9. 数据每达到一次 80%,就产生一次溢写文件,最后就会产生大量的溢写文件。
      每次溢写只产生一个溢写文件,虽然有多个分区,但是这些分区数据都存储在这一个溢写文件中,只是会把它们在一个文件中分隔开。
    10. 对第9步产生的大量溢写文件进行合并(merge)和排序。
      因为第9步的文件中,每个文件在第8步时都单独进行了排序,所以每个文件内部都是有序的。所以再对这些有序文件进行排序时,就可以使用归并排序算法。
      合并后的文件依然是按不同分区分隔的,所以排序时也是按照分区进行排序的,不同分区并没有进行整体排序。
    11. 预聚合,对分区内的相同 key 进行一次预聚合,方便后面发送给下一步。但是预聚合有前提条件,不是每次都能预聚合,例如:{a: 1, a: 1, b: 1} 被合并为:{a:2, b:1}
    12. MapTask任务完成后,启动对应数量的ReduceTask处理。
      对于数据量较少的文件,一般都会等所有的MapTask都完成时才启动ReduceTask。
      但是对于MapTask特别多时,可以配置推测执行等策略,在部分MapTask工作完成后就进行部分Reduce合并
    13. ReduceTask主动从MapTask拉取自己指定分区的数据(不是MapTask推送给ReduceTask)。
      每个MapTask的指定分区内数据是有序的,但是ReduceTask会对应多个MapTask,所以还需要对该ReduceTask拿到的所有MapTask指定分区的数据进行合并和归并排序。
    14. 因为13步进行了归并排序,所以可以从前向后遍历所有的 key-value, 如果 key 和前一个相同,就接着获取下一个 key ,直到出现不同的 key ,然后将前面的这些相同 key 的值作为一个集合,连同 key 一起将这一组内容传入reduce()方法。在reduce()方法中执行对应的业务逻辑
    15. 还可以进行分组操作
    16. 将reduce()方法的结果写成数据文件,使用的是OutputFormat(默认使用的TextOutputFormat)
  • 相关阅读:
    第二章——古典密码学及算法实现
    低代码中的工作流:简化开发流程,提升效率
    F28069教程3-中断 PIE
    java-php-python-ssm选课排课系统计算机毕业设计
    重量级ORM框架--持久化框架Hibernate【JPA注解开发】
    WebGIS(一):高德地图API-显示地图
    Swoole Compiler 加密PHP源代码(简版)
    简单讲解Android Fragment(一)
    MyBatis核心知识点介绍
    无穷滚动加载(v-infinite-scroll)
  • 原文地址:https://blog.csdn.net/lushixuan12345/article/details/126556842