• MapReduce Shuffle源码解读


    MapReduce Shuffle源码解读

    相信很多小伙伴都背过shuffle的八股文,但一直不是很理解shuffle的过程,这次我通过源码来解读下shuffle过程,加深对shuffle的理解,但是我自己还是个菜鸟,这篇博客也是参考了很多资料,如果有不对的地方,请指正。

    shuffle是Map Task和 Reduce Task之间的一个阶段,本质上是一个跨节点跨进程间的数据传输,网上的资料也把MapReduce的过程细分为六个阶段:

    1. Collect 2. Spill 3.Merge 4.Copy 5.Merge 6. Sort

    看过源码之后,这几个阶段划分的还是很有道理的,首先看看官网上对shuffle的描述图,有个印象

    Map

    首先,我们先来看看Map阶段的代码,先找到Map Task的入口(org/apache/hadoop/mapred/MapTask.java)的run方法,当map task启动时都会执行这个方法。

    @Override
    public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
      throws IOException, ClassNotFoundException, InterruptedException {
      this.umbilical = umbilical;   // 一个taskAttempt的代理,后面比较多的地方使用
    
      if (isMapTask()) {
        // If there are no reducers then there won't be any sort. Hence the map 
        // phase will govern the entire attempt's progress.
        if (conf.getNumReduceTasks() == 0) {
          mapPhase = getProgress().addPhase("map", 1.0f);
        } else {
          // If there are reducers then the entire attempt's progress will be 
          // split between the map phase (67%) and the sort phase (33%).
          mapPhase = getProgress().addPhase("map", 0.667f);
          sortPhase  = getProgress().addPhase("sort", 0.333f);
        }
      }
    
      // 启动任务状态汇报器,其内部有周期性的汇报线程(状态汇报和心跳)
      TaskReporter reporter = startReporter(umbilical);
    
      boolean useNewApi = job.getUseNewMapper();
      initialize(job, getJobID(), reporter, useNewApi);  // 重要方法,可以认为初始化task启动的一切资源了
    
      // check if it is a cleanupJobTask
      if (jobCleanup) {
        runJobCleanupTask(umbilical, reporter);
        return;
      }
      if (jobSetup) {
        runJobSetupTask(umbilical, reporter);
        return;
      }
      if (taskCleanup) {
        runTaskCleanupTask(umbilical, reporter);
        return;
      }
    
      if (useNewApi) {
        runNewMapper(job, splitMetaInfo, umbilical, reporter); // 核心代码,点进去
      } else {
        runOldMapper(job, splitMetaInfo, umbilical, reporter);
      }
      done(umbilical, reporter);
    }

    这里umbilical比较难理解,我其实也没怎么搞懂,看名字是个协议,这里贴出它的注释

    任务子进程用于联系其父进程的协议。父进程是一个守护进程,它轮询中央主进程以获取新的map或reduce Task,并将其作为子进程(Child)运行。孩子和父母之间的所有通信都是通过此协议进行的

    看起来是个RPC,这个父进程我不是很清楚,我理解是在v1版本的话,这个可能是taskTracker,如果在v2版本(yarn)可能是ApplicationMaster,如果不对,请大神解答我的疑问。

    进入runNewMapper方法

    @SuppressWarnings("unchecked")
    private 
    void runNewMapper(final JobConf job,
                      final TaskSplitIndex splitIndex,
                      final TaskUmbilicalProtocol umbilical,
                      TaskReporter reporter
                      ) throws IOException, ClassNotFoundException,
                               InterruptedException {
      // make a task context so we can get the classes  创建Task的上下文环境
      org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
        new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, 
                                                                    getTaskID(),
                                                                    reporter);
      // make a mapper  通过反射创建mapper
      org.apache.hadoop.mapreduce.Mapper mapper =
        (org.apache.hadoop.mapreduce.Mapper)
          ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
      // make the input format   通过反射创建inputFormat,来读取数据
      org.apache.hadoop.mapreduce.InputFormat inputFormat =
        (org.apache.hadoop.mapreduce.InputFormat)
          ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
      // rebuild the input split // 获取切片信息
      org.apache.hadoop.mapreduce.InputSplit split = null;
      split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
          splitIndex.getStartOffset());
      LOG.info("Processing split: " + split);
    
      org.apache.hadoop.mapreduce.RecordReader input =
        new NewTrackingRecordReader   //通过反射创建RecordReader。InputFormat是通过RecordReader来读取数据,这个也是大学问,在job submit时很关键
          (split, inputFormat, reporter, taskContext);
      
      job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
      org.apache.hadoop.mapreduce.RecordWriter output = null;
      
      // get an output object
      if (job.getNumReduceTasks() == 0) { // 如果没有reduce任务,则直接写入磁盘
        output = 
          new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
      } else { //  核心代码,创建collector收集器  ,点进去
        output = new NewOutputCollector(taskContext, job, umbilical, reporter);
      }
    
      org.apache.hadoop.mapreduce.MapContext 
      mapContext = 
        new MapContextImpl(job, getTaskID(), 
            input, output, 
            committer, 
            reporter, split);
    
      org.apache.hadoop.mapreduce.Mapper.Context 
          mapperContext = 
            new WrappedMapper().getMapContext(
                mapContext);
    
      try {
        input.initialize(split, mapperContext);
        mapper.run(mapperContext);  // 调用我们自己实现的mapper类
        mapPhase.complete();
        setPhase(TaskStatus.Phase.SORT);
        statusUpdate(umbilical);
        input.close();
        input = null;
        output.close(mapperContext);
        output = null;
      } finally {
        closeQuietly(input);
        closeQuietly(output, mapperContext);
      }
    }

    马上进入collect阶段了,点进 NewOutputCollector,看看如何创建Collector

      private class NewOutputCollector<K,V>
        extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
        private final MapOutputCollector collector;
        private final org.apache.hadoop.mapreduce.Partitioner partitioner;
        private final int partitions;
    
        @SuppressWarnings("unchecked")
        NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
                           JobConf job,
                           TaskUmbilicalProtocol umbilical,
                           TaskReporter reporter
                           ) throws IOException, ClassNotFoundException {
          collector = createSortingCollector(job, reporter);
          partitions = jobContext.getNumReduceTasks();  // partitions数等于reduce任务数
          if (partitions > 1) {
            partitioner = (org.apache.hadoop.mapreduce.Partitioner)
              ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
          } else {
            partitioner = new org.apache.hadoop.mapreduce.Partitioner() {
              @Override
              public int getPartition(K key, V value, int numPartitions) {
                return partitions - 1;
              }
            };
          }
        }
    
        @Override
        public void write(K key, V value) throws IOException, InterruptedException {
          collector.collect(key, value, // 向对应分区的环形缓冲区写入(k,v)
                            partitioner.getPartition(key, value, partitions));
        }
    
        @Override
        public void close(TaskAttemptContext context
                          ) throws IOException,InterruptedException {
          try {
            collector.flush();//核心方法,将数据刷出去。
          } catch (ClassNotFoundException cnf) {
            throw new IOException("can't find class ", cnf);
          }
          collector.close();
        }
      }

    点进 creareSortingCollector

    @SuppressWarnings("unchecked")
    private  MapOutputCollector  // collector是map 类型
            createSortingCollector(JobConf job, TaskReporter reporter)
      throws IOException, ClassNotFoundException {
      MapOutputCollector.Context context =
        new MapOutputCollector.Context(this, job, reporter);
    
      Class[] collectorClasses = job.getClasses(  // 获取Map Collector的类型
        JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);  // 说到底还是MapOutputBuffer类型
      int remainingCollectors = collectorClasses.length;
      Exception lastException = null;
      for (Class clazz : collectorClasses) {
        try {
          if (!MapOutputCollector.class.isAssignableFrom(clazz)) {  // MapOutputCollector是不是clazz或者其父类
            throw new IOException("Invalid output collector class: " + clazz.getName() +
              " (does not implement MapOutputCollector)");
          }
          Class subclazz =
            clazz.asSubclass(MapOutputCollector.class);
          LOG.debug("Trying map output collector class: " + subclazz.getName());
          MapOutputCollector collector =
            ReflectionUtils.newInstance(subclazz, job); //  创建collector
          collector.init(context);   // 初始化 点进去
          LOG.info("Map output collector class = " + collector.getClass().getName());
          return collector;
        } catch (Exception e) {
          String msg = "Unable to initialize MapOutputCollector " + clazz.getName();
          if (--remainingCollectors > 0) {
            msg += " (" + remainingCollectors + " more collector(s) to try)";
          }
          lastException = e;
          LOG.warn(msg, e);
        }
      }
    }

    这个init方法十分的关键,不仅涉及了环形缓冲区,还涉及了Spill

    public void init(MapOutputCollector.Context context    
                     // 这个方法中,主要就是对收集器对象进行一些初始化
                    ) throws IOException, ClassNotFoundException {
      job = context.getJobConf();
      reporter = context.getReporter();
      mapTask = context.getMapTask();
      mapOutputFile = mapTask.getMapOutputFile();
      sortPhase = mapTask.getSortPhase();
      spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);
      partitions = job.getNumReduceTasks();
      rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
    
      //sanity checks
      final float spillper =
        job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);  // 设置环形缓冲区溢写比例为0.8
      final int sortmb = job.getInt(MRJobConfig.IO_SORT_MB,
          MRJobConfig.DEFAULT_IO_SORT_MB);  //  默认环形缓冲区大小为100M
      indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
                                         INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
      if (spillper > (float)1.0 || spillper <= (float)0.0) {
        throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
            "\": " + spillper);
      }
      if ((sortmb & 0x7FF) != sortmb) {
        throw new IOException(
            "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
      }
    
      // 排序,默认使用的快排
      // 获取到排序对象,在数据由环形缓冲区溢写到磁盘中前
      // 并且排序是针对索引的,并非对数据进行排序。
      sorter = ReflectionUtils.newInstance(job.getClass(
                   MRJobConfig.MAP_SORT_CLASS, QuickSort.class,
                   IndexedSorter.class), job);
      // buffers and accounting
      // 对环形缓冲区初始化,大名鼎鼎的环形缓冲区本质上是个byte数组
      int maxMemUsage = sortmb << 20;  // 将MB转换为Bytes
      // 一对kv数据有四个元数据MATE,分别是valstart,keystart,partitions,vallen,都是int类型
      // METASIZE 就是4个int转换成byte就是4*4
      maxMemUsage -= maxMemUsage % METASIZE;  // 计算METE数据存储的大小
      kvbuffer = new byte[maxMemUsage]; // 元数据数组  以byte为单位
      bufvoid = kvbuffer.length;
      kvmeta = ByteBuffer.wrap(kvbuffer)
         .order(ByteOrder.nativeOrder())
         .asIntBuffer();  // 将byte单位的kvbuffer转换成int单位的kvmeta
      setEquator(0);
      bufstart = bufend = bufindex = equator;
      kvstart = kvend = kvindex;
      // kvmeta中存放元数据实体的最大个数
      maxRec = kvmeta.capacity() / NMETA;
      softLimit = (int)(kvbuffer.length * spillper); // buffer 溢写的阈值
      bufferRemaining = softLimit;
      LOG.info(JobContext.IO_SORT_MB + ": " + sortmb);
      LOG.info("soft limit at " + softLimit);
      LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
      LOG.info("kvstart = " + kvstart + "; length = " + maxRec);
    
      // k/v serialization
      comparator = job.getOutputKeyComparator();
      keyClass = (Class)job.getMapOutputKeyClass();
      valClass = (Class)job.getMapOutputValueClass();
      serializationFactory = new SerializationFactory(job);
      keySerializer = serializationFactory.getSerializer(keyClass);
      keySerializer.open(bb);  // 将key写入bb中 blockingbuffer
      valSerializer = serializationFactory.getSerializer(valClass);
      valSerializer.open(bb); // 将value写入bb中
    
      // output counters
      mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
      mapOutputRecordCounter =
        reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
      fileOutputByteCounter = reporter
          .getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
    
      // compression  压缩器,减少shuffle数据量
      if (job.getCompressMapOutput()) {
        Class codecClass =
          job.getMapOutputCompressorClass(DefaultCodec.class);
        codec = ReflectionUtils.newInstance(codecClass, job);
      } else {
        codec = null;
      }
    
      // combiner
      // combiner  map端的reduce
      final Counters.Counter combineInputCounter =
        reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
      combinerRunner = CombinerRunner.create(job, getTaskID(), 
                                             combineInputCounter,
                                             reporter, null);
      if (combinerRunner != null) {
        final Counters.Counter combineOutputCounter =
          reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
        combineCollector= new CombineOutputCollector(combineOutputCounter, reporter, job);
      } else {
        combineCollector = null;
      }
      // 溢写线程
      spillInProgress = false;
      minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
      spillThread.setDaemon(true); //  是个守护线程
      spillThread.setName("SpillThread"); //
      spillLock.lock();
      try {
        spillThread.start();  // 启动一个spill线程
        while (!spillThreadRunning) {
          spillDone.await();
        }
      } catch (InterruptedException e) {
        throw new IOException("Spill thread failed to initialize", e);
      } finally {
        spillLock.unlock();
      }
      if (sortSpillException != null) {
        throw new IOException("Spill thread failed to initialize",
            sortSpillException);
      }
    }

    从这个类,我们可以看到环形缓冲区的一些初始化过程,如大小为100M,开始溢写的比例是0.8,实际上,Collector是一个宏观的概念,本质上就是一个MapOutputBuffer对象。

    后面还启动了Spill线程,不过如果是第一次进去会被阻塞这里我们先按下不表。

    至此,一些map开始之前的工作已经准备好了,至于它是怎么工作的我们可以从我们写的mapper中write方法debug进去,发现其实还是NewOutputCollector中定义的write方法,点进去是MapOutputBuffer的collect方法

    public synchronized void collect(K key, V value, final int partition
                                     ) throws IOException {
      reporter.progress();
      if (key.getClass() != keyClass) {
        throw new IOException("Type mismatch in key from map: expected "
                              + keyClass.getName() + ", received "
                              + key.getClass().getName());
      }
      if (value.getClass() != valClass) {
        throw new IOException("Type mismatch in value from map: expected "
                              + valClass.getName() + ", received "
                              + value.getClass().getName());
      }
      if (partition < 0 || partition >= partitions) {
        throw new IOException("Illegal partition for " + key + " (" +
            partition + ")");
      }
      checkSpillException();
      bufferRemaining -= METASIZE;  // 新数据collect时,先将元数据长度前去,之后判断
      if (bufferRemaining <= 0) { // 说明已经超过阈值了
        // start spill if the thread is not running and the soft limit has been
        // reached
        spillLock.lock();
        try {
          do {
            // 首次spill时,spillInProgress是false
            if (!spillInProgress) {
              final int kvbidx = 4 * kvindex; // 单位是byte
              final int kvbend = 4 * kvend;  // 单位是byte
              // serialized, unspilled bytes always lie between kvindex and
              // bufindex, crossing the equator. Note that any void space
              // created by a reset must be included in "used" bytes
              final int bUsed = distanceTo(kvbidx, bufindex);  // 剩下可以写入的空间大小
              final boolean bufsoftlimit = bUsed >= softLimit;  // true说明已经超过softLimit了
              if ((kvbend + METASIZE) % kvbuffer.length !=
                  equator - (equator % METASIZE)) {
                // spill finished, reclaim space
                resetSpill();
                bufferRemaining = Math.min(
                    distanceTo(bufindex, kvbidx) - 2 * METASIZE,
                    softLimit - bUsed) - METASIZE;  // 这里是重新选择equator吧,但是计算方式不了解
                continue;
              } else if (bufsoftlimit && kvindex != kvend) {
                // spill records, if any collected; check latter, as it may
                // be possible for metadata alignment to hit spill pcnt
                startSpill();  //开始溢写,里面唤醒spill线程  
                final int avgRec = (int)
                  (mapOutputByteCounter.getCounter() /
                  mapOutputRecordCounter.getCounter());
                // leave at least half the split buffer for serialization data
                // ensure that kvindex >= bufindex
                final int distkvi = distanceTo(bufindex, kvbidx);
                final int newPos = (bufindex +
                  Math.max(2 * METASIZE - 1,
                          Math.min(distkvi / 2,
                                   distkvi / (METASIZE + avgRec) * METASIZE)))
                  % kvbuffer.length;
                setEquator(newPos);
                bufmark = bufindex = newPos;
                final int serBound = 4 * kvend;
                // bytes remaining before the lock must be held and limits
                // checked is the minimum of three arcs: the metadata space, the
                // serialization space, and the soft limit
                bufferRemaining = Math.min(
                    // metadata max
                    distanceTo(bufend, newPos),
                    Math.min(
                      // serialization max
                      distanceTo(newPos, serBound),
                      // soft limit
                      softLimit)) - 2 * METASIZE;
              }
            }
          } while (false);   // 这是什么写法?????
        } finally {
          spillLock.unlock();
        }
      }
      // 直接写入buffer,不涉及spill
      try {
        // serialize key bytes into buffer
        int keystart = bufindex;
        keySerializer.serialize(key);
        // key所占空间被bufvoid分隔,则移动key,
        // 将其值放在连续的空间中便于sort时key的对比
        if (bufindex < keystart) {
          // wrapped the key; must make contiguous
          bb.shiftBufferedKey();
          keystart = 0;
        }
        // serialize value bytes into buffer
        final int valstart = bufindex;
        valSerializer.serialize(value);
        // It's possible for records to have zero length, i.e. the serializer
        // will perform no writes. To ensure that the boundary conditions are
        // checked and that the kvindex invariant is maintained, perform a
        // zero-length write into the buffer. The logic monitoring this could be
        // moved into collect, but this is cleaner and inexpensive. For now, it
        // is acceptable.
        bb.write(b0, 0, 0);
    
        // the record must be marked after the preceding write, as the metadata
        // for this record are not yet written
        int valend = bb.markRecord();
    
        mapOutputRecordCounter.increment(1);
        mapOutputByteCounter.increment(
            distanceTo(keystart, valend, bufvoid)); //计数器+1
    
        // write accounting info
        kvmeta.put(kvindex + PARTITION, 
                  );
        kvmeta.put(kvindex + KEYSTART, keystart);
        kvmeta.put(kvindex + VALSTART, valstart);
        kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
        // advance kvindex
        kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
      } catch (MapBufferTooSmallException e) {
        LOG.info("Record too large for in-memory buffer: " + e.getMessage());
        spillSingleRecord(key, value, partition);  // 长record就直接写入磁盘
        mapOutputRecordCounter.increment(1);
        return;
      }
    }

    这里首先最重要的方法就是第46行的startSpill()方法,这里点进去会发现一个spillReady.signal(),这就是唤醒之前因spillReady.await()方法阻塞的spill线程,这里的spillReady就是可重入锁,这里spill开始正式工作,这里涉及了环形缓冲区如何写和如何读,会比较抽象,我之后再写一篇关于环形缓冲区的文章。

    这里代码就是Collect,本质上就是map端将输出的(k,v)数据和它的元数据写入MapOutputBuffer中。

    此外,这个代码里也有唤醒spill线程的代码,找到SpillThread的run方法,很明显里面有个很重要的方法sortAndSpill

    private void sortAndSpill() throws IOException, ClassNotFoundException,
                                       InterruptedException {
      //approximate the length of the output file to be the length of the
      //buffer + header lengths for the partitions
      final long size = distanceTo(bufstart, bufend, bufvoid) +
                  partitions * APPROX_HEADER_LENGTH;  // 写出长度
      FSDataOutputStream out = null;
      FSDataOutputStream partitionOut = null;
      try {
        // create spill file
        final SpillRecord spillRec = new SpillRecord(partitions);
        final Path filename =
            mapOutputFile.getSpillFileForWrite(numSpills, size);// 默认是output/spillx.out
        out = rfs.create(filename);// 创建分区文件
    
        final int mstart = kvend / NMETA;
        final int mend = 1 + // kvend is a valid record
          (kvstart >= kvend
          ? kvstart
          : kvmeta.capacity() + kvstart) / NMETA;
        // 对元数据进行排序,先按照partition进行排序,再按照key值进行排序
        // 二次排序,排的是元数据部分
        sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);
        int spindex = mstart;
        final IndexRecord rec = new IndexRecord();
        final InMemValBytes value = new InMemValBytes();
        for (int i = 0; i < partitions; ++i) {//循环分区
          // 溢写时的临时文件 类型是IFile
          IFile.Writer writer = null;
          try {
            long segmentStart = out.getPos();
            partitionOut = CryptoUtils.wrapIfNecessary(job, out, false);
            writer = new Writer(job, partitionOut, keyClass, valClass, codec,
                                      spilledRecordsCounter);
            if (combinerRunner == null) {
              // spill directly
              DataInputBuffer key = new DataInputBuffer();
              // 写入相同的partition数据
              while (spindex < mend &&
                  kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
                final int kvoff = offsetFor(spindex % maxRec);
                int keystart = kvmeta.get(kvoff + KEYSTART);
                int valstart = kvmeta.get(kvoff + VALSTART);
                key.reset(kvbuffer, keystart, valstart - keystart);
                getVBytesForOffset(kvoff, value);
                writer.append(key, value);
                ++spindex;
              }
            } else {    // 进行combiner,避免小文件问题
              int spstart = spindex;
              while (spindex < mend &&
                  kvmeta.get(offsetFor(spindex % maxRec)
                            + PARTITION) == i) {
                ++spindex;
              }
              // Note: we would like to avoid the combiner if we've fewer
              // than some threshold of records for a partition
              if (spstart != spindex) {
                combineCollector.setWriter(writer);
                RawKeyValueIterator kvIter =
                  new MRResultIterator(spstart, spindex);
                combinerRunner.combine(kvIter, combineCollector);
              }
            }
    
            // close the writer
            writer.close();  ///  将文件写入本地磁盘中,不是HDFS上
            if (partitionOut != out) {
              partitionOut.close();
              partitionOut = null;
            }
    
            // record offsets
            // 记录当前partition i的信息写入索文件rec中
            rec.startOffset = segmentStart;
            rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
            rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
            //spillRec中存放了spill中partition的信息
            spillRec.putIndex(rec, i);
    
            writer = null;
          } finally {
            if (null != writer) writer.close();
          }
        }
    
        if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
          // create spill index file
          Path indexFilename =
              mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
                  * MAP_OUTPUT_INDEX_RECORD_LENGTH);
          spillRec.writeToFile(indexFilename, job);  // 将内存中的index文件写入磁盘
        } else {
          indexCacheList.add(spillRec);
          totalIndexCacheMemory +=
            spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
        }
        LOG.info("Finished spill " + numSpills);
        ++numSpills;
      } finally {
        if (out != null) out.close();
        if (partitionOut != null) {
          partitionOut.close();
        }
      }
    }

    很明显,spill有两个临时文件生成,一个是(k,v)文件,它保存在默认路径是output/spill{x}.out文件中,注意,这段代码里并没有明显的将(k,v)文件写入磁盘的代码,这些代码在writer.close()中实现。而另一个明显写入磁盘的是spillRec.writeToFile(indexFilename, job),这个存放的每个partition的index。

    在SpillThread在辛辛苦苦进行sortAndSpill工作时,map Task 也不断地产生新(k,v)写入MapOutputBuffer中,环形缓冲区的读线程和写线程同时工作!!怎么避免冲突呢?答案是反向写。

    红色箭头是写(k,v)数据,蓝色箭头是写元数据,紫色是预留的百分之20的空间不能写,绿色是已经写入的数据部分,正在被spill线程读取操作。

    至此,spillsort阶段算是大功告成,那么还有个疑问,如果MapOutPutBuffer还有部分数据,但这部分数据并没有达到spill的标准,怎么办呢?还是回到NewOutputCollector部分中close方法,里面有MapOutputBuffer的flush方法会解决这个问题。

    最后就是Map Task中Shuffle过程的最后一个阶段Merge,这部分有点多就不贴代码了,感兴趣的同学可以查看MapOutputBuffer中mergeParts方法,这个方法在上面的flush方法里调用,该作用是合并spill阶段产生出来的out文件和index文件。

    Merge过程目的很简单,但是过程确实很复杂。首先,Merge过程会扫描目录获取out文件的地址,存放一个数组中,同时也会获得index文件,存放到另一个数组中。好奇的同学可能再想既然又要读入到内存中,当初为啥要刷进磁盘里呢,这不是闲着没事干嘛,确实,这是MapReduce的缺陷,太过于批处理了,磁盘IO也限制了它的其他可能性,比如机器学习需要反复迭代,MapReduce就做不了这个,但是这一步确实很有必要的,因为早期内存很贵,不是每个人都是土豪的,考虑到OOM的风险,把所有的(K,V)数据和index数据刷进磁盘是非常有必要的,但是后面又可以全读入内存,那是因为缓存缓冲区这个大东西已经不再使用,内存就富裕起来了。

    同时,Merge过程还涉及到归并算法,这个并不是简单的归并过程,而是一个很复杂的过程,因为考虑到一个partition并不只存在一种key,所以源码里有着相当复杂的过程同时注释也很迷惑人,注释里有优先队列和Heap的字样,看代码的时候可能以为采用了堆排序,有兴趣的同学可以看看,并不是太重要(ps我也看得一知半解)。

    Reduce

    Reduce部分我就长话短说,只看重点了。

    同样,第一步就是查看 Reduce Task的run方法,这是启动redduce逻辑的自动过程

     public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
       throws IOException, InterruptedException, ClassNotFoundException {
       job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
    
       if (isMapOrReduce()) { // reduce的三个阶段
         copyPhase = getProgress().addPhase("copy");
         sortPhase  = getProgress().addPhase("sort");
         reducePhase = getProgress().addPhase("reduce");
       }
       // start thread that will handle communication with parent
       // 启动任务状态汇报器,其内部有周期性的汇报线程(状态汇报和心跳)
       TaskReporter reporter = startReporter(umbilical);
       
       boolean useNewApi = job.getUseNewReducer();
       initialize(job, getJobID(), reporter, useNewApi);//核心代码,初始化任务
    
       // check if it is a cleanupJobTask
       if (jobCleanup) {
         runJobCleanupTask(umbilical, reporter);
         return;
       }
       if (jobSetup) {
         runJobSetupTask(umbilical, reporter);
         return;
       }
       if (taskCleanup) {
         runTaskCleanupTask(umbilical, reporter);
         return;
       }
       
       // Initialize the codec
       codec = initCodec();
       RawKeyValueIterator rIter = null;
       ShuffleConsumerPlugin shuffleConsumerPlugin = null;
       
       Class combinerClass = conf.getCombinerClass();
       CombineOutputCollector combineCollector = 
         (null != combinerClass) ? 
        new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
    
       Class clazz =
             job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
    // 设置shuffle插件
       shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
       LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
    
       ShuffleConsumerPlugin.Context shuffleContext = 
         new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, 
                     super.lDirAlloc, reporter, codec, 
                     combinerClass, combineCollector, 
                     spilledRecordsCounter, reduceCombineInputCounter,
                     shuffledMapsCounter,
                     reduceShuffleBytes, failedShuffleCounter,
                     mergedMapOutputsCounter,
                     taskStatus, copyPhase, sortPhase, this,
                     mapOutputFile, localMapFiles);
       shuffleConsumerPlugin.init(shuffleContext);
       // 执行shuffle过程中的远程数据拉取,在拉取的过程中
       // 内部 启动 map-completion event fetch线程 获取map端完成的event信息
       // 在开启默认5个的fetch 线程 拉取数据,里面核心函数就是一直点进去是doShuffle,有两种一种是in-memory另一种就是on-disk
       // 超出shuffle内存就merge到disk
       // shuffle插件内部有个mergeMangager 会在合适的时候就是快超过shuffle内存缓存的时候,启动merge线程
    
       // 这个表面是一次网络IO,本质上是一个RPC,通过umbilical代理获取已经完成的MapTask任务的taskAttempt的ID,存入schedule中,为后面shuffle做准备
    
       rIter = shuffleConsumerPlugin.run();
    
       // free up the data structures
       // 一个sort set,是TreeSet数据结构·
       mapOutputFilesOnDisk.clear();
       
       sortPhase.complete();                         // sort is complete
       setPhase(TaskStatus.Phase.REDUCE); 
       statusUpdate(umbilical);
       Class keyClass = job.getMapOutputKeyClass();
       Class valueClass = job.getMapOutputValueClass();
       RawComparator comparator = job.getOutputValueGroupingComparator();
    
       if (useNewApi) {
         runNewReducer(job, umbilical, reporter, rIter, comparator, 
                       keyClass, valueClass); // 执行reduce操作,(用户定义的逻辑)
       } else {
         runOldReducer(job, umbilical, reporter, rIter, comparator, 
                       keyClass, valueClass);
       }
    
       shuffleConsumerPlugin.close();
       done(umbilical, reporter);
     }

    Reduce Task的重点比较清晰,就是57行的初始化shuffleConsumerPlugin这个Shuffle插件,以及66行运行这个插件,让他拉取数据。

    初始化shuffle插件过程中,有两个组件一个是schedule调度器,另一个就是MergeManager,这个MergeManger有大用处。

    接下来查看run方法

    public RawKeyValueIterator run() throws IOException, InterruptedException {
      // Scale the maximum events we fetch per RPC call to mitigate OOM issues
      // on the ApplicationMaster when a thundering herd of reducers fetch events
      // TODO: This should not be necessary after HADOOP-8942
      int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
          MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
      int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);
    
      // Start the map-completion events fetcher thread
      // 启动 一个 event fetcher线程 获取map端完成的event信息
      final EventFetcher eventFetcher = 
        new EventFetcher(reduceId, umbilical, scheduler, this,
            maxEventsToFetch);
      eventFetcher.start();
      
      // Start the map-output fetcher threads  启动fetch线程
      // fetch 线程 远程从map端拉取对应partition的数据
      boolean isLocal = localMapFiles != null;
      final int numFetchers = isLocal ? 1 :
        jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
      Fetcher[] fetchers = new Fetcher[numFetchers];
      if (isLocal) {
        fetchers[0] = new LocalFetcher(jobConf, reduceId, scheduler,
            merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
            localMapFiles);
        fetchers[0].start();
      } else {
        for (int i=0; i < numFetchers; ++i) {
          fetchers[i] = new Fetcher(jobConf, reduceId, scheduler, merger, 
                                         reporter, metrics, this, 
                                         reduceTask.getShuffleSecret());
          fetchers[i].start();
        }
      }
      
      // Wait for shuffle to complete successfully
      while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
        reporter.progress();
        
        synchronized (this) {
          if (throwable != null) {
            throw new ShuffleError("error in shuffle in " + throwingThreadName,
                                   throwable);
          }
        }
      }
    
      // Stop the event-fetcher thread
      eventFetcher.shutDown();
      
      // Stop the map-output fetcher threads
      for (Fetcher fetcher : fetchers) {
        fetcher.shutDown();
      }
      
      // stop the scheduler
      scheduler.close();
    
      copyPhase.complete(); // copy is already complete
      taskStatus.setPhase(TaskStatus.Phase.SORT);
      reduceTask.statusUpdate(umbilical);
    
      // Finish the on-going merges...
      RawKeyValueIterator kvIter = null;
      try {
        kvIter = merger.close();
      } catch (Throwable e) {
        throw new ShuffleError("Error while doing final merge " , e);
      }
    
      // Sanity check
      synchronized (this) {
        if (throwable != null) {
          throw new ShuffleError("error in shuffle in " + throwingThreadName,
                                 throwable);
        }
      }
      
      return kvIter;
    }

    重点就是两线程,一种是Event fetch,另一种是fetch线程

    首先,event fetch线程的作用是获取TaskAttempt的ID等信息,存入schedule中,方面以后Shuffle尤其是sort时使用,本质上这是个RPC,注意看event fetch初始化时的参数里有个umbilical代理对象。

    而fetch线程的工作原理是通过HTTP向各个Map任务拖取它所需要的数据(至于HTTP和RPC的区别有兴趣的同学可以查查),里面最核心的方法是doShuffle(一直点进去才能找到这个),在Copy的同时还会MergeSort。doShuffle它有两个实现,一个是In-memory,另一个是On-disk有两个实现(同样的,Merge也分为这两种)。是基于考虑到拉取相同的key值可能有很大的数据量,那么有必要写入磁盘中了,但为了减少这种情况,在达到缓存区(默认是64K)阈值的时候会将数据merge(如果太大的话就在磁盘中merge),Merge的工作就是交给Shuffle插件的MergeManager管理。

    所以,copy和Merge和Sort是重叠过程的。

    至此,Shuffle部分的源码基本讲解完成。

    参考资料

    1. MapReduce ReduceTask源码解析

    2. MapReduce中的shuffle详解

    3. 环形缓冲区


    __EOF__

  • 本文作者: 秋招败犬
  • 本文链接: https://www.cnblogs.com/spark-cc/p/17256123.html
  • 关于博主: I am a good person
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
  • 声援博主: 如果您觉得文章对您有帮助,可以点击文章右下角推荐一下。
  • 相关阅读:
    Redis主从复制、哨兵模式、集群模式
    Jmeter介绍以及脚本制作与调试
    既然有 HTTP 协议,为什么还要有 RPC
    C#——泛型相关
    Java多线程探究【一线程简介、实现】
    Au:频谱频率显示器
    如何在3dMax中使用Python按类型选择对象?
    风伤卫,寒伤营,真不是你想的那么容易
    云原生:Docker 实践经验(六)-镜像的使用及分层讲解
    Qt_线程(待完善)
  • 原文地址:https://www.cnblogs.com/spark-cc/p/17256123.html