• CENTOS上的网络安全工具(十一)走向Hadoop(3) MapReduce示例解析


            关于MapReduce的介绍及源码分析网上也不少,但是有些东西看过只是看过,没有亲自手撸一遍的过程,总感觉还是有点那么不够实在。所以为了搞明白MapReduce的Grep示例具体怎么工作的,我们上一篇首先构建了可以支撑我们手撸的环境,这一篇就基于这个环境尝试解剖一下Grep。当然,解剖的程度是以我自己感觉大致明白为界限的,毕竟水平有限也没打算过于深究。只是若读者感兴趣,完全可以在该篇基础上继续深入,如果有兴趣写出来最好,余必拜访学习:)

            一、MapReduce的执行逻辑

            第一段不能免俗,先来张MapReduce的执行逻辑图。只不过,如果只看这张图,会感觉似乎明白MapReduce在干什么,但似乎又确实不知道MapReduce在干什么——我刚接触到这个玩意儿的时候,就这么个感觉。

            毕竟要说他在干什么,打个比方就行:一个叫做DFS的图书馆里,存了好几百书架的书,现在需要统计一下,这些书总共有多少本。如果你是一个人,做法无非就是从第一个书架开始,一本一本数;像快一点的话,那就来一群人,分开数完一加就是。

            但是要像真弄清楚,这个比方还得再戏谑……啊不,再细致点:

            这一群人,组建了一个叫MapReduce的公司。其中,有个大佬比较NB,是董事会请来的CEO,大家都听他(JobTracer进程)的,所以他来安排这个任务原则上怎么干:绝大部分的人(Mapper类),每人负责若干书架,一个书架一个书架的数,数完就会报<第几书架,共多少本>;一个(或少数)会算加法的聪明人(Reducer类)负责统计,把所有的<第几书架,共多少本>报告记下来,把本数加起来,最后报告<总数:多少本>。

            如果把这个数图书馆的事作为一个工程项目的话,Mapper和Reducer这种具体搬砖的人,就是现实生活中苦逼的技术线码农们。如果书架不多还好说,如果书架太多,码农们可能数着数着就忘记自己接下来该数哪个书架了,或者一些聪明的混混们跑去摸鱼去了,Reducer可能好久也凑不齐需要统计的数据。JobTracer作为行政线唯一的大佬,盯着码农们干活这种事当然不能亲自操刀,所以还要找一些具备PUA特长的中层们(TaskTracer进程),这些中层们负责盯着码农们干活,干得快的,就多分点任务给他,让他多数几个书架;摸鱼太久的,叫到办公室批评教育洗洗脑,重新分配任务从头数;累傻累晕的直接拖出去,招个新人进来顶岗……

            老板JobTracer会把任务情况写在公司门口的大白板上,比如任务地点在哪,公司派出多少人,行政线是谁,技术线可调度人员多少,相互怎么配合等等,这个白板叫做Configuration;然后任务(Job类)执行路线会写在一个叫Grep的红头文件里面,安排秘书ToolRunner下发。这个Tool红头文件其实是一个有格式的工单,规定了开展工作必须的要素(Job类——工作内容,Configured接口——执行工作内容需要的附件,也就是白板上的内容,Tool接口——秘书用这个接口签字下发)。JobTracer和TaskTracer会按照这个文件精神督促搬砖的干活。

            当然,光靠学习文件精神是没法把活干起来的,所以Mapper和Reducer们还需要工作文档来记录工作内容,这个文档叫做Context,Mapper和Reducer都会管理和使用。

            哦,对了。为了防止搬砖的对任务理解不一致,Tool文件中规定了任务的输入输出格式,分别叫做InputFormat和OutputFormat。如果遇到任何与规定不符的输入,Mapper和Reducer会拒绝干活,毕竟输入错了责任在输出方,自己傻不愣登接着往下干,责任就说不清,锅就被自己背上了

            :P

            嗯,大公司就有大公司的道理,看起来很简单的事情,真要招一群人来做,其实就没那么简单了。上面这一段看起来就比较复杂,但其实HR部门(Yarn)还没考虑进来。如果公司的业务比数书架要复杂得多,那可能还得有个规划办(ZooKeeper)。

            二、MapReduce主要组件梳理

            上面我们大概了解了一下MapReduce公司的组织架构和各部门的职能分工,接着就从代码的角度挨个梳理:

            1.Configuration

           

    1. 1 package org.apache.hadoop.conf;
    2. 2
    3. 3 /** Base class for things that may be configured with a {@link Configuration}. */
    4. 4 public class Configured implements Configurable {
    5. 5
    6. 6 private Configuration conf;
    7. 7
    8. 8 /** Construct a Configured. */
    9. 9 public Configured() {
    10. 10 this(null);
    11. 11 }
    12. 12
    13. 13 /** Construct a Configured. */
    14. 14 public Configured(Configuration conf) {
    15. 15 setConf(conf);
    16. 16 }
    17. 17
    18. 18 // inherit javadoc
    19. 19 public void setConf(Configuration conf) {
    20. 20 this.conf = conf;
    21. 21 }
    22. 22
    23. 23 // inherit javadoc
    24. 24 public Configuration getConf() {
    25. 25 return conf;
    26. 26 }
    27. 27
    28. 28 }

            Hadoop集群环境配置需要设置4个xml文件,因为之前我们只安装了Single Node版本,所以还没涉及到这一块。不过在官方的Cluster Setup指南里,是如下图这般说的——Site-specific configuration那一行。

            Hadoop使用了Configuration类来维护管理这些xml文件中的配置信息。Configuration源代码中一段描述指出了这个事实:       

    1. /*
    2. *

      Unless explicitly turned off, Hadoop by default specifies two

    3. * resources, loaded in-order from the classpath:
      1. *
    4. *
    5. * core-default.xml: Read-only defaults for hadoop.
  • *
  • core-site.xml: Site-specific configuration for a given hadoop
  • * installation.
  • *
  • */
  •         当然,也可以自己构建xml文件放在同目录下,使用Configuration进行操作——然而,这些内容我们就不打算关注了。理解mapreduce只需要知道,Configuration记录了hadoop的配置信息,而mapreduce任务如何分解、如何分发等等是需要使用这些信息的,所以,每个job里面都需要携带并传递Configuration的句柄,以确保想知道配置的时候能够读得到。

            Configuration类实现了Iterable和Writable接口。Hadoop.io.Writable主要是2个方法:

                    方法1:write(Java.io.DataOutput arg)

                    方法2:readFields(Java.io.DataInput arg)

            Java.lang.Iterable主要是2个方法:

                    方法1:forEach(Consumer action)

                    方法2:spliterator()

           从此也可见Configuration的主要功能就是读写枚举配置信息。

           2.Configurable接口和Configured类

            Configured类用于具体实现Configurable接口

            上图来自org.apache.hadoop.conf-Configured_weixin_34148508的博客-CSDN博客 文章。

            Configurable接口主要维护一个私有属性:

            属性1:private Configuration conf

            并定义2个方法:

            方法1:setConf(Configuration conf)

            方法2:getConf(Configuration conf)

            可见,该接口主要确定继承自Configured类的类可维护Configuration配置环境,并提供对该配置环境的管理手段。

            3.Tool接口

    1. public interface Tool extends Configurable {
    2. int run(String [] args) throws Exception;
    3. }
    4. }
    1. Public class Configured implements Configurable{
    2. Private Configuration conf;
    3. Public Configured(Configuration conf){setConf(conf);}//构造方法
    4. Public void setConf(Configuration conf)
    5. {
    6. This.conf=conf;
    7. }
    8. Public getConf()
    9. {
    10. Return conf;
    11. }
    12. }

               Tool接口继承自Configurable接口,新增了1个方法:

            方法1:run(String[] args)

            由于后面可以看到,ToolRunner会调用实现MapReduce算法的类的run方法,以启动一个MapReduce工作。所以,实现MapReduce算法的类需要实现Tool接口。

            在Tool类的注释中,给出了Tool.run()的典型实现方法:

    1. public class MyApp extends Configured implements Tool {
    2. public int run(String[] args) throws Exception {
    3. Configuration conf = getConf();//获取Hadoop环境
    4. JobConf Job = new JobConf(conf,MyApp.class);//构建MR环境
    5. Path in = new Path(args[1])
    6. Path out = new Path(args[2])
    7. //设置MapReduce任务
    8. Job.setJobName(“my-app”);
    9. Job.setInputPath(in);      //该方法并不存在,可能是伪代码
    10. Job.setOutputPath(out);     //该方法并不存在,可能是伪代码
    11. Job.setMapperClass(MyMapper.class)
    12. Job.setReducerClass(MyReducer.class)
    13. //提交MapReduce任务并等待结束
    14. RunningJob runningJob = JobClient.runJob(Job);
    15. if(runningJob.isSuccessful())return 0;
    16. else return 1;
    17. }
    18.     public static void main(String[] args) throws Exception{
    19.         //新建Hadoop环境,新建MR任务,并使用ToolRunner启动
    20.         int res = ToolRunner.run(new Configuration(),new MyApp(),args);
    21.         system.exit(res);
    22.     }
    23. }

            4.Grep类

            Grep类用于实现“Grep”MapReduce算法,所以需要实现Tool接口;且如前所述,Tool.run()中需要使用Hadoop的环境配置,故Grep也需要实现Configurable接口。所以,Grep的定义方式是:

            public class Grep extends Configurable implements Tool{}

            和Tool类注释中给出的示例大体上是一样的,区别在于Tool类中示例使用了JobConf类直接控制MR任务的配置,并使用JobClient类,以JobConf为配置参数直接提交任务;而Grep类示例中,使用Job类,似乎是封装更严实,提供的选项更多些:

    1. //设置并构置Hadoop环境
    2. Configuration conf = getConf();
    3. Job grepJob = Job.getInstance(conf);
    4. grepJob.setJobName("grep-search");
    5. grepJob.setJarByClass(Grep.class);
    6. //配置MapReduce环境和任务
    7. FileInputFormat.setInputPaths(grepJob, args[0]);
    8. grepJob.setMapperClass(RegexMapper.class);
    9. grepJob.setCombinerClass(LongSumReducer.class);
    10. grepJob.setReducerClass(LongSumReducer.class);
    11. FileOutputFormat.setOutputPath(grepJob, tempDir);
    12. grepJob.setOutputFormatClass(SequenceFileOutputFormat.class);
    13. grepJob.setOutputKeyClass(Text.class);
    14. grepJob.setOutputValueClass(LongWritable.class);
    15. //启动并等待任务
    16. grepJob.waitForCompletion(true);

            实际上,Grep是在Job.waitForCompletion(true)中,调用了Job.submit();在Job.submit()中,调用了JobSubmitter.submitJobInternal(Job.this,cluster)。

            而在Tool的示例中,是使用静态方法JobClient.runJob(JobConf job),以传入的Hadoop配置环境conf为参数新建一个JobClient对象,并以该对象调用了JobClient类的submitJob方法,其中又调用了JobClient.submitJobInternal(conf);在submitJobInternal中则新建Job对象,并调用了重写的Job.run()方法;该重写方法主要作用是跳过MR配置过程——因为已经配置过了,直接使用Job.submit()提交MR任务,最终也还是归于JobSubmitter.submitJobInternal(Job.this,cluster)。

            输入输出方面,需要注意的是JobConf并没有实现setInputPath方法,但是在JobConf的注释里给出了对应的示例,使用的是FileInputFormat的setInputPaths方法,这个就和Grep示例一样了,内部都是调用的Configuration.set方法。

            总而言之——

            简单的理解,开始一个MR任务的主要动作,其实都是在配置Configuration的内部参数,手段是通过Configuration的set方法、setClass方法、setJar方法等等。配置完成后,以配置好的Configuration为参数创建一个Job对象,submit就行了。

            5.ToolRunner类        

    1. public class ToolRunner {
    2. public static int run(Configuration conf, Tool tool, String[] args)
    3. throws Exception{
    4. if(conf == null) {
    5. conf = new Configuration();
    6. }
    7. GenericOptionsParser parser = new GenericOptionsParser(conf, args);
    8. //set the configuration back, so that Tool can configure itself
    9. tool.setConf(conf)
    10. String[] toolArgs = parser.getRemainingArgs();
    11. return tool.run(toolArgs);
    12. }
    13. public static int run(Tool tool, String[] args)
    14. throws Exception{
    15. return run(tool.getConf(), tool, args);
    16. }
    17. public static void printGenericCommandUsage(PrintStream out) {
    18. GenericOptionsParser.printGenericCommandUsage(out);
    19. }
    20. }

            ToolRunner是一个封装类,主要目的是提供了一个静态run函数,用于启动实现tool.run接口的mapreduce任务,通常该run函数在main方法中调用。函数包含3个参数:

            参数1:Configuration conf是整个mapreduce的运行环境封装

            参数2:Tool tool是Mapreduce的算法流程封装

            参数3:args argstring输入输出。

            概要代码如下:

    1.     If(conf==nullptr)conf = new Configuration();
    2.     tool.setConf(conf);
    3.     tool.run(args);

            6.RegexMapper

            Grep使用MapReduce提供的正则搜索Mapper算法RegexMapper,对输入文件进行分片处理并按行匹配模板。RegexMapper类继承自Mapper类。

            Mapper作为模板类共4个参数,用来定义输入KV对和输出KV对的数据类型:

            参数1,2:KEYIN,VALUEIN

            参数3,4:KEYOUT,VALUEOUT

            Mapper内部还嵌入了一个Context抽象类,该类用于管理Mapper算法内部的数据处理,比如:①输出Mapper结果时,是调用Context的write方法;②枚举需要Mapper的输入时,调用的是Context的getCurrentKey、getCurrentValue和nextKeyvalue方法。

            实际上,实现任何一个Map算法都应该从继承Mapper类开始,并重写(override)Mapper的setup、cleanup、map、run方法。

            方法1:setup(Context context),在初始化时调用一次,可用于设置全局参数,比如在RegexMapper中,此处用于初始化pattern、group这2个私有变量的值,其具体取值是通过Grep在run方法中调用conf.set(RegexMapper.PATTERN,args[2])和conf.set(RegexMapper.GROUP,args[3])设置在常量中的。

            方法2:cleanup(Context context),用于在对象销毁时提供清理资源的机会

            方法3:map(KEYIN key,VALUEIN value,Context context),用于定制map方法。每次map被调用时,实际都是一个键值对被传输进来,定制用户需要根据输入key,value值来确定输出键值对,并通过Context.write方法输出。比如mapper中内置了这个默认实现,直接将输入的key,value通过Context.write给写了出去。

            方法4:run(Context context)。run方法代码如下,可见当mapper类被加载时,其主要工作就是从context中枚举被拆分的键值对,然后调用map方法。

    1. {
    2. setup(context);
    3. try {
    4. while (context.nextKeyValue()) {
    5. map(context.getCurrentKey(), context.getCurrentValue(), context);
    6. }
    7.     }
    8. finally {
    9.       cleanup(context);
    10.     }
    11. }

            7.TextInputFormat/FileInputFormat/InputFormat

            在示例代码中,第一次MR操作仅仅对输出的键值对数据类型做了配置,没有对输入进行配置。事实上,在未显式设置的情况下,MapReduce默认使用TextInputFormat作为输入类型,即输入文件为文本文件,输入键值对为类型,其中Text为文本文件中的每一行,为TextInputFormat中的createRecordReader方法通过实例化LineRecordReader构造。

            TextInputFormat类定义为public abstract class TextInputFormat extends FileInputFormat,可见默认情况下其键值对的类型已明确定义,且TextInputForamt继承自FileInputFormat。

            继承自FileInputFormat类定义为Public abstract class FileInputFormat extends InputFormat,即该类继承自InputFormat。

            TextInputFormat主要重写了3个方法:

            方法1:RecordReader createRecordReader(InputWritable split, TaskAttemptContext context);通过实例化LineRecordReader类,将根据JobSubmitDir中的信息读取的文件分块进一步切分为键值对。

            方法2:protected Boolean isSplitable(JobContext context,Path file),用来指示通过JobSubmit提交的splite是否需要进一步切分。

    FileInputFormat主要重写了1个方法:

            方法3:public List getSplits(JobContext job);主要用于将大文件切分为MapReduce适宜调度的块。该方法将在工作调度程序中使用。

    InputFormat是一个抽象类,主要定义了需要被重写的initialize、getSplits、createRecordReader方法。

            8.LineRecordReader/RecordReader

            RecordReader是用于维护管理键值对的抽象类,主要定义了5个方法。

            方法1:initialize(InputSplit,TaskAttemptContext);在初始化时调用一次,可在这里实现对split的进一步拆分,构建键值对列表

            方法2:nextKeyValue();移动到下一个键值对;

            方法3:KEYIN getCurrentKey();得到当前键值对的键;

            方法4:VALUEIN getCurrentValue();得到当前键值对的值;

            方法5:float getProgress();得到当前RecordReader的处理进度(0.0-1.0)

            LineRecordReader继承了recordReader,重写了initialize、nextKeyValue、getCurrentKey、getCurrentValue和getProgress等方法。值得提一下的是,LineRecordReader似乎并未在initialize中直接将split完全拆开后保存,而是仅构建了维护信息,读取了首个键值对。其余键值对仅在nextkeyValue被调用时通过in.readLine(value,……)临时读出。这应该是针对海量文件批处理的特点考虑,毕竟都放在内存里面可能就爆了。

            RecordReader的5个方法,和Context所实现的TaskAttemptContext接口有明显的意义对应关系,容易猜到,在mapper的run方法中,context对应的接口,应该是调用了RecordReader的方法。这个实际上是在MapContextImpl类的nextKeyValue中实现的——MapContextImpl实现了Context规定的接口。

            9.MapReduce的mapper的总体执行流程

            前面提到,整个MapReduce的工作流程都是在JobTracer和TaskTracer的管理下运转的。事实上,这也是人云亦云的结果。我们在3.3.4版本的Single Node中没有观察到这2个进程。当然,这可能是版本(原描述来自于2.0版本)和模式(Single Node不可能出现多进程)的结果。然而我们还是选择相信,因为即使在SingleNode模式下的多线程方式,其工作流程和人云的方式也是非常相似的。至于Multi模式就不尝试了,毕竟我们的目的只是大致了解一下MR的工作流程而已,Single Node模式的流程如下图所示。也由于不求甚解的原因,部分代码并没有深究,所以不能保证完全正确:

             图中,实线箭头代表进程执行流程和函数调用关系;虚线代表参数关联关系;空心箭头代表线程迁移关系;更细的空心箭头则代表数据在被处理过程中的形态变化。

            Grep调用时,InputFormat的具体类型是TextInputFormat,其父类是FileInputFormat,此时getSplits的作用是按照Hadoop默认的块大小切分文件。

            JobSplitWriter.createSplitFiles将切分的文件的文件名、偏移等信息写回文件系统,代码中称其为JobSubmitDir,临时文件位置在/tmp/Hadoop/mapred/staging/下。

            然后,在JobSubmitter中开始正式提交工作,使用的是submit name

            调用submitClient.submitJob()后就看不到代码了,下一次命中是在Local Job Runner Map Task Executor线程中。

            MapTask.run(),这里读取JobSubmitDir,并根据任务状态(setup/cleanup/runnewmapper或者runoldmapper)执行动作。如调用runNewMapper函数,构建mapper;进一步调用NewTrackingRecordReader方法,其中会调用具体输入类TextInputFormat的createRecordReader方法,基于LineRecordReader类,在split的基础上,再次拆分split为RecordReader。

            然后,MapTask会真正的启动Map任务:

            Input.initialize(split,mapperContext);

            Mapper.run(mapperContext);

            在Mapper.run中对RecordReader拆分的键值对进行处理。

  • 相关阅读:
    windows如何使用自带的MD5加密字符串
    磁性核壳四氧化三铁颗粒负载金纳米星|磁性Fe3O4-POSS-COOH|超顺磁四氧化三铁聚多巴胺核壳结构纳米粒子
    Git链接上游仓库
    java 对接微信支付Native下单API报错: java.security.InvalidKeyException: Illegal key size
    【HTML+CSS】博客系统(页面设计)
    Python画图系列——折线图
    c: Queue Calling in Ubuntu
    python 爬取网站首页并获取资源文件
    【matplotlib 实战】--饼图
    是什么让EDI如此困难?
  • 原文地址:https://blog.csdn.net/lhyzws/article/details/126652064