• 什么是MapReduce?MapReduce整体架构搭建使用介绍


    前言

    本文是MapReduced的详细介绍,MapReduce是hadoop体系下的一种计算模型(计算框架|编程框架),主要是用来对存储在hdfs上的数据进行统计,分析的,分布式计算框架,用来解决分布式大数据平台下数据如何计算,资源调度,任务监控 主要用来整合hadoop集群中的资源(CPU 内存),进行统一调度 同时监控任务的执行情况,联合多个服务器节点的硬件,共同完成一个计算。突破单机服务器的计算能力,还介绍了Yarn分布式集群搭建使用,MapReduce工作的原理源码分析

    MapReduce

    入门

    MapReduce是hadoop体系下的一种计算模型(计算框架|编程框架),主要是用来对存储在hdfs上的数据进行统计,分析的。

    MapReduce的核心思想

    分而治之:大任务拆分小任务。

    在这里插入图片描述

    MapReduce

    概念:分布式计算框架,用来解决分布式大数据平台下数据如何计算。
    简单:分而治之
    Job
    MapTask * 多个并行
    ReduceTask

    1. Job(一个大型任务)[Application]
      一组MapReduce又统称为一个Job作业
    2. MapTask(拆分后的小任务)
      局部计算 并行
    3. Reduce(整合任务)
      对局部计算结果进行汇总计算。
    yarn

    yarn集群核心组成

    NodeManager

    ResourceManager

    在这里插入图片描述

    作用(包工队)
    资源调度,任务监控 主要用来整合hadoop集群中的资源(CPU 内存),进行统一调度 同时监控任务的执行情况
    总结: 联合多个服务器节点的硬件,共同完成一个计算。突破单机服务器的计算能力。

    组成部分

    1. ResourceManager(包工头)
      集群计算资源的管理器,也是yarn架构中的主节点。
      功能:
      1. 监控集群资源
      2. 为计算分配资源。
    2. NodeManager(干活的)
      yarn集群计算资源的提供者,也是yarn架构中的从节点。
      功能
      1. 真正执行计算任务的节点。
    3. 监控本节点的资源情况(CPU 内存 网络 硬盘),并通过心跳向RM汇报。

    在这里插入图片描述

    MapReduce特点

    1. 易于编程:只需要使用hadoop接口进行编程,即可实现多台计算机分布式计算和分布式存储。
    2. 高扩展性:存储空间不足或者计算能力不足,则可以添加计算机完成。
    3. 容错性高:如果某个节点宕机,hadoop可以自动切换讲计算任务转移到其他节点上完成,不会影响计算结果。
      如果计算任务执行了一半失败,出错,内部自动重试机制。
    4. 应用场景:PB级别以上海量数据的离线处理,无法实时处理和流失动态处理。(每日)

    Yarn伪分布式搭建

    1.准备单机的HDFS架构
    要求:安装了并配置了HDFS架构的服务器。
    验证:jps

    [root@hadoop10 ~]# jps
    2224 Jps
    2113 SecondaryNameNode
    1910 DataNode
    1806 NameNode
    
    • 1
    • 2
    • 3
    • 4
    • 5
    关闭掉hdfs
    	stop-dfs.sh
    
    • 1
    • 2
    # 2 初始化配置文件
    
    • 1
    # 拷贝得到mapred-site.xml
    [root@hadoop10 hadoop]# cp mapred-site.xml.template mapred-site.xml
    1. mapred-site.xml
    	
    	<property>
            <name>mapreduce.framework.namename>
            <value>yarnvalue>
        property>
    2. yarn-site.xml
    	
    	<property>
            <name>yarn.nodemanager.aux-servicesname>
            <value>mapreduce_shufflevalue>
        property>
    	
        <property>
            <name>yarn.resourcemanager.hostnamename>
            <value>Hadoopvalue>
        property>
    3. slaves配置文件
    	指定:DataNode和NodeManager节点的ip地址。
    	① Datanode节点ip
    	② Nodemanager的节点ip
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    # 3. 启动yarn集群
    1. 启动HDFS集群
    	start-dfs.sh
    2. 启动yarn集群
    	start-yarn.sh
    	关闭yarn
    	stop-yarn.sh
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    # 验证
    1. jps
    [root@hadoop11 ~]# jps
        6160 DataNode   --- 数据存储节点
        6513 ResourceManager -- 计算机资源调度节点
        6614 NodeManager -- 局部计算节点
        6056 NameNode  -- 文件元数据存储节点
        6349 SecondaryNameNode -- checkpoint节点。
        6831 Jps
    2. 访问yarn的资源调度器web网页。
    	http://resourcemanager所在节点ip:8088
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    在这里插入图片描述

    MapReduce编码

    需求

    在这里插入图片描述

    MapReduce2.0工作机制

    • 数据变化(要干什么)
      在这里插入图片描述

    • 工作角色(谁来干)

    在这里插入图片描述

    MapReduce数据流转机制

    在这里插入图片描述> 1. InputFormat(mr自动处理)

    讲block文件转化成split,其中每条数据是key-value组成。
    key是数据偏移量
    value是每条数据
    2. Map(程序员编码)
    将split逐条输入给map,由map负责,对每条数据进行处理,转化为keyOut-valueOut
    3. Shuffle(MR的默认处理器)
    对map输出的每条数据的key-value进行排序,分组。
    4. Reduce(程序员编码)
    对Shuffle分组后的数据的key-value进行处理,转化为新的key-value。
    5. OutputFormat
    讲reduce产生的数据,存储HDFS文件系统中

    MR编码准备
    # 导入pom依赖
    
    • 1
    
    <dependency>
        <groupId>org.apache.hadoopgroupId>
        <artifactId>hadoop-commonartifactId>
        <version>2.9.2version>
    dependency>
    
    
    <dependency>
        <groupId>org.apache.hadoopgroupId>
        <artifactId>hadoop-hdfsartifactId>
        <version>2.9.2version>
    dependency>
    
    
    <dependency>
        <groupId>junitgroupId>
        <artifactId>junitartifactId>
        <version>4.12version>
    dependency>
    
    
    <dependency>
        <groupId>org.apache.hadoopgroupId>
        <artifactId>hadoop-mapreduce-client-coreartifactId>
        <version>2.9.2version>
    dependency>
    
    <dependency>
        <groupId>org.apache.hadoopgroupId>
        <artifactId>hadoop-mapreduce-client-commonartifactId>
        <version>2.9.2version>
    dependency>
    <dependency>
        <groupId>org.apache.hadoopgroupId>
        <artifactId>hadoop-mapreduce-client-jobclientartifactId>
        <version>2.9.2version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    # 导入log4j配置文件
    
    • 1
    MR编码
    # 编写map程序
    
    • 1
    /*Mapper:
        * 接受:k(0)-v(yangdd yangdd)
        * 输出:k(name)-v(1)
        *
        * */
    /**
         * 继承类上的泛型:
         * Keyin
         * ValueIn
         * KeyOut
         * ValueOut
         *
         */
    static class WordCountMapper extends Mapper<LongWritable,Text, Text, IntWritable>{
        /**
             * 执行时机:每读取一行k-v,调用一次map方法
             * @param key 输入k
             * @param value 输入v
             * @param context 输出k-v写出工具。
             * @throws IOException
             * @throws InterruptedException
             */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //1. 接受k-v
            //2. 对v进行拆分
            String sv = value.toString();
            String[] names = sv.split(" ");
            //遍历数组,将得到每个name,作为k输出。
            for (String name : names) {
                //3. 将k(name)-v(1)
                context.write(new Text(name),new IntWritable(1));
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    # 编写reduce程序
    
    • 1
    /*Reducer:
        * 对maptask输出后,mapreduce合并后的k-vs中的value之,累加和。
        * keyint
        * valuein
        * keyout
        * valueout
        * */
    static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
        /**
             * 执行时机:每读取Reduce端合并后的一组数据(k-vs),调用一次reduce方法。
             * @param key 输入k
             * @param values 输入value [1,2,3,1]
             * @param context 输出k-v
             * @throws IOException
             * @throws InterruptedException
             */
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            //1: 接受k-vs
            //2. 对vs 遍历累加
            int sum = 0;
            for (IntWritable value : values) {
                sum = sum+value.get();
            }
            //3. 输出
            // k(name)-v(累加和)
            context.write(key,new IntWritable(sum));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    # 编写job程序
    
    • 1
    public static void main(String[] args) throws Exception{
        /*组装Job 启动Job*/
        //1. 初始化hdfs的配置文件 入口
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","hdfs://192.168.199.10:9000");
        //2. 创建job,未来是要运行在yarn集群中。
        Job job = Job.getInstance(conf);
        job.setJarByClass(JobForWordCount.class);
        //3. 配置job(MapTask一端): TextInputFormat keyout valueout Mapper
        TextInputFormat.addInputPath(job,new Path("/baizhi/mapreduce/demo1/namecount.txt"));
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setMapperClass(WordCountMapper.class);
        //4. 配置job(ReduceTask一端): TextOutputFormat keyout valueout Reducer
        TextOutputFormat.setOutputPath(job,new Path("/baizhi/mapreduce/demo1/namecountout"));//最后一集目录不能存在,执行目录。
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //5. 启动job
        boolean b = job.waitForCompletion(true);
        System.out.println(b);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    # 本地直接运行。
    	使用本地的方式提交任务,需要HDFS开启写入文件的权限。
    	hdfs dfs -chmod -R 777 /hdfs
    
    • 1
    • 2
    • 3

    在这里插入图片描述

    MapReduce核心api

    在这里插入图片描述

    • InputFormat
    • MapTask
    • maper
    • ReduceTask
    • reducer
    • OutputFormat
    Mapreduce补充细节

    Hadoop的MapReduce适合做大数据的离线处理,不适合做实时处理。

    mapreduce的sort排序,无法取消。

    生产中提交MR任务1

    在这里插入图片描述

    # 打包
    # 1. 设置maven的打包的环境
    
    • 1
    • 2
    <properties>
        
        <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
    properties>
    <build>
        
        <finalName>mr1finalName>
        
        <plugins>
            <plugin>
                <groupId>org.apache.maven.pluginsgroupId>
                <artifactId>maven-jar-pluginartifactId>
                <version>2.4version>
                <configuration>
                    <archive>
                        
                        <manifest>
                            <mainClass>demo1.job.WordCountJobmainClass>
                        manifest>
                    archive>
                configuration>
            plugin>
        plugins>
    build>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    # 2. 执行打包
    	在当前项目所在的目录下执行如下命令
    	> mvn package
    # 3. 上传jar到hadoop的ResourceManager所在的机器
    # 4. 执行程序
    	> yarn jar mr1.jar
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    maven自动化部署插件wagon
    # 1. 配置maven远程提交插件
    1. 添加maven的ssh扩展
    2. 添加maven的远程拷贝插件wagon(货车)
    
    • 1
    • 2
    • 3
    
    <extensions>
        <extension>
            <groupId>org.apache.maven.wagongroupId>
            <artifactId>wagon-sshartifactId>
            <version>2.8version>
        extension>
    extensions>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    
    <plugin>
        <groupId>org.codehaus.mojogroupId>
        <artifactId>wagon-maven-pluginartifactId>
        <version>1.0version>
        <configuration>
            
            <fromFile>target/${project.build.finalName}.jarfromFile>
            
            <url>scp://用户名:密码@ip:/opt/appurl>
        configuration>
    plugin>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    3. 添加远程执行命令,和参数。
    
    • 1
    # 清空
    	mvn clean
    # 打包本地jar
    	mvn package
    # 远程上传jar
    	mvn wagon:upload-single
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    ApplicationMaster

    ResourceManager:任务分配,和nodemanager管理;领导、团队管理[工头]

    NodeManager: 负责运行执行MapTask和ReduceTask。具体干活的人。[工人]

    MRAppMaster:监控、管理 MapReduce任务的执行(开始-过程-结束)。工地监工。

    只有在启动mapreduce程序,才会启动MRAppMaster

    在这里插入图片描述

    在这里插入图片描述

    负责某个任务全部执行过程的监控管理。(监工)

    1. 提交job
      启动ApplicationMaster|MRAppMaster
    2. 管理整个job的运行过程
      ① 向ResourceManager申请资源。
      ② 在NodeManager中启动一个运行环境,执行代码。()
      ③ 跟踪应用job的执行过程和状态
      ④ Job故障管理;
      一旦job任务执行失败(MapTask),AppMaster,自定让NodeManager重启执行任务代码。
    配置yarn的日志服务器-Historyserver

    Hadoop自带了一个历史服务器,可以通过历史服务器查看已经运行完的Mapreduce作业记录

    ​ 比如用了多少个Map、用了多少个Reduce、作业提交时间、作业启动时间、作业完成时间等信息。

    默认未启动。

    在这里插入图片描述

    # 1. 配置mapred-site.xml,指定历史日志服务器的地址
    
    • 1
    
    <property>
        <name>mapreduce.jobhistory.addressname>
        <value>hadoop10:10020value>
    property>
    
    <property>
        <name>mapreduce.jobhistory.webapp.addressname>
        <value>hadoop10:19888value>
    property>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    # 2. 配置yarn-site.xml,指定开启日志聚合和日志保留时间,使得日志文件保存在hdfs上。
    
    
        yarn.log-aggregation-enable
        true
    
    
     
        yarn.log-aggregation.retain-seconds
        604800
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    # 3. 启动历史日志服务器
    0. 启动hdfs
    	
    1. 重启yarn
    	[root@hadoop10 ~]# stop-yarn.sh
    	[root@hadoop10 ~]# start-yarn.sh
    2. 启动
    	[root@hadoop10 ~]# mr-jobhistory-daemon.sh start historyserver
    	如果需要关闭执行如下命令
    	[root@hadoop10 ~]# mr-jobhistory-daemon.sh stop historyserver
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    # 4. 查看日志
    	1. 访问http://ip:8088(访问yarn集群,看到执行过的job信息)
    	2. 点击"Applications"找到刚才执行的job的"history"
    	3. 点击logs
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述
    在这里插入图片描述

    MapReduce详解

    Hadoop序列化

    案例数据

    手机使用的流量数据,每次手机上网记录一条信息。

    需求:统计每个手机号的 上传总流量 下载总流量 总流量

    分析核心点:

    希望那些数据相同的合并在一起,map端就以它为key输出即可。

    # 案例数据
    id				手机号		 						 ip地址					上传	  下载	状态码
    1363157985066	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	24	27	2481	24681	200
    1363157995052	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4	4	0	264	0	200
    1363157991076	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99	2	4	132	1512	200
    1363154400022	13926251106	5C-0E-8B-8B-B1-50:CMCC	120.197.40.4	4	0	240	0	200
    1363157985066	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	24	27	2481	24681	200
    1363157995052	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4	4	0	264	0	200
    1363157991076	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99	2	4	132	1512	200
    1363154400022	13926251106	5C-0E-8B-8B-B1-50:CMCC	120.197.40.4	4	0	240	0	200
    # 期望结果
    13726230503	 上传流量:4962  下载流量:49362  总数据流量:  54324
    13826544101	 上传流量:528  下载流量:0  总数据流量:  528
    13926251106	 上传流量:480  下载流量:0  总数据流量:  480
    13926435656	 上传流量:264  下载流量:3024  总数据流量:  3288
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    # hadoop序列化
    	mapreduce执行过程中,被处理的key-value数据,需要在网络中传输,就需要对象转化为字节,字节转化为对象,这就是序列化和反序列化过程;
    	key和value都要经过序列化传输。
    1. Java序列化(序列化数据+对象描述信息)
    	序列化会包含java的继承关系,验证信息,验证信息。(重量级)
    	不便于在网络中传输。
    2. Hadoop序列化(仅关注数据序列化)
    	空间紧凑
    	传输快速,网络开销小。
    结论:
    	mapreduce中所有key-value都要支持序列化。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    在这里插入图片描述

    hadoop内置可序列化类型

    Java类型Hadoop Writable类型
    booleanBooleanWritable
    byteByteWritable
    intIntWritable
    longLongWritable
    floatFloatWritable
    doubleDoubleWritable
    stringText
    arrayArrayWritable
    mapMapWritable
    nullNullWritable

    自定义序列化类型
    将要封装的数据,放在一个类中。
    自定义一个类实现WritableComparable

    1. 可以被hadoop序列化传输。
    2. 可以支持排序。

    注意序列化和反序列化的属性操作顺序要完全一致

    在这里插入图片描述

    //序列化示例代码
    public class PhoneLogWritable implements WritableComparable<PhoneLogWritable> {
        private Logger log = Logger.getLogger(PhoneLogWritable.class);
        private int upload;
        private int download;
        private int sum;
        public PhoneLogWritable(int upload, int download, int sum) {
            this.upload = upload;
            this.download = download;
            this.sum = sum;
        }
        public PhoneLogWritable() {
            log.info("----对象创建----");
        }
        public int compareTo(PhoneLogWritable o) {
            log.info("--比较--");
            return this.sum-o.sum;
        }
        public void write(DataOutput dataOutput) throws IOException {
            log.info("------write---");
            dataOutput.writeInt(upload);
            dataOutput.writeInt(download);
            dataOutput.writeInt(sum);
        }
        public void readFields(DataInput dataInput) throws IOException {
            log.info("--read---");
            upload = dataInput.readInt();
            download = dataInput.readInt();
            sum = dataInput.readInt();
        }
        @Override
        public boolean equals(Object o) {
            System.out.println("--equals---");
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            PhoneLogWritable that = (PhoneLogWritable) o;
            return upload == that.upload &&
                    download == that.download &&
                    sum == that.sum;
        }
        @Override
        public int hashCode() {
            System.out.println("--hashcode---");
            return Objects.hash(upload, download, sum);
        }
        public int getUpload() {
            return upload;
        }
        public void setUpload(int upload) {
            this.upload = upload;
        }
        public int getDownload() {
            return download;
        }
        public void setDownload(int download) {
            this.download = download;
        }
        public int getSum() {
            return sum;
        }
        public void setSum(int sum) {
            this.sum = sum;
        }
        @Override
        public String toString() {
            return "PhoneLogWritable{" +
                    "upload=" + upload +
                    ", download=" + download +
                    ", sum=" + sum +
                    '}';
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72

    数据清洗

    业务: 将原始数据文件中的脏东西(无效数据,无用数据)洗掉。

    1. 将原始的数据文件(日志文件)中,无效的行数据,去除。
    2. 将本次功能处理,不需要的业务数据,去除。
    
    • 1
    • 2
    # mapreduce中可以没有reduce
    效果:之进行map阶段的执行。执行完毕后即输出到文件中。
    # 代码实现
    1. 取消mapreduce的job有关reduce的所有设置
    2. 保留并设置如下
    	job.setNumReduceTasks(0);//取消reduce
    	FileOutputFormat.setOutputPath(job,new Path("/app/mapreduce/demo3/out"));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    # 原数据
    id				手机号		 						 ip地址					上传	  下载	状态码
    1363157985066	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	24	27	2481	24681	200
    1363157995052	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4	4	0	264	0	200
    1363157991076	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99	2	4	132	1512	200
    1363154400022	13926251106	5C-0E-8B-8B-B1-50:CMCC	120.197.40.4	4	0	240	0	200
    1363157985066	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	24	27	2481	24681	200
    1363157995052	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4	4	0	264	0	200
    1363157991076	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99	2	4	132	1512	200
    1363154400022	13926251106	5C-0E-8B-8B-B1-50:CMCC	120.197.40.4	4	0	240	0	200
    1363157995052	13826544109	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4	4	0
    1363157995052	null	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4	4	0	240	0	200
    1363157991076	13926435659	20-10-7A-28-CC-0A:CMCC	120.196.100.99	2	4	null	null	null
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    # 测试案例
    	删除其中手机号不符合要求,上传流量确实和下载流量确实的数据,并仅保留手机号 上传流量 下载流量。(效果如下)
        13726230503	2481	24681
        13826544101	264	0
        13926435656	132	1512
        13926251106	240	0
        13726230503	2481	24681
        13826544101	264	0
        13926435656	132	1512
        13926251106	240	0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    计数器Counter

    用来记录hadoop执行过程的工具,可以理解为hadoop的日志。
    形式
    group1
    name1 数量

    代码
    context.getCounter(“map阶段”,“map输出”).increment(1L);

    • 效果如下

    在这里插入图片描述

    排序

    1. 简介
      Shuffle期间,MapReduce会对map输出的数据,对key进行排序。
    2. 时机:
    3. map输出之后,shuffle过程中。
    4. map输出之后的map端。
    5. 规则:
    6. key如果是Text类型按照字典顺序,进行字符串排序。
    7. key如果是IntWritable LongWritable 则按照数字大小进行升序排序。
      在这里插入图片描述
    # 测试数据
    用户id	观众人数
    团团	300
    小黑	200
    哦吼	400
    卢本伟	100
    八戒	250
    悟空	100
    唐僧	100
    # 需求:按照观众人数升序排序?
    悟空	100
    唐僧	100
    卢本伟	100
    小黑	200
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    默认排序
    # 默认排序
     	默认按照数据类型内置CompareTo方法规则进行排序
    # 案例
    用户id	观众人数
    团团	300
    小黑	200
    哦吼	400
    卢本伟	100
    八戒	250
    悟空	100
    唐僧	100
    # 期望结果
    卢本伟	100
    悟空	100
    唐僧	100
    小黑	200
    八戒	250
    团团	300
    哦吼	400
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    编码:

    1:将需要排序的值作为MapTask输出的key

    2:默认排序规则,是Hadoop API的内置类型写好的排序规则

    自定义排序
    # 自定义排序
    # 案例
    团团	300
    小黑	200
    哦吼	400
    卢本伟	100
    八戒	250
    悟空	100
    唐僧	100
    # 期望
    哦吼	400
    团团	300
    八戒	250
    小黑	200
    卢本伟	100
    悟空	100
    唐僧	100
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    在这里插入图片描述

    在这里插入图片描述

    二次排序(二排)

    本质上在排序时候,调用了排序key的两个属性参数排序

    二次排序
    核心:排序所依据的字段作为map输出的key。

    # 测试数据
    用户id	观众人数	直播时长
    团团	300	1000
    小黑	200	2000
    哦吼	400	7000
    卢本伟	100	6000
    八戒	250	5000
    悟空	100	4000
    唐僧	100	3000
    # 需求:按照观众人数降序排序,如果观众人数相同,按照直播时长降序。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    核心思路:

    1. 排序所依据的字段,要作为key。
    2. 实现hadoop的序列化。
      重写WritableComparable的compareTo方法
    /**
     * 1 可序列化(write readFiled)
     * 2 可排序(compareTo)
     */
    public class LivePlayLog implements WritableComparable<LivePlayLog> {
        private int viewer;//观众人数
        private long length;//直播时长
        public LivePlayLog(){}
    
        /**
         * 作用:将该对象作为map输出的key
         *   1. mapreduce执行过程中,排序时候会调用该方法
         *   2. 默认在合并操作时候,会将key相同的value合并在一起。
         * 返回值:
         *      1 升序排序(this-o)
         *      -1 降序排序
         *      0 key相同的。
         * @param o
         * @return
         */
        @Override
        public int compareTo(LivePlayLog o) {
            if(this.viewer != o.viewer){
                return -(this.viewer-o.viewer);
            }else if(this.length != o.length){
                return -(int)(this.length-o.length);
            }else{
                return 0;
            }
        }
    
        @Override
        public void write(DataOutput dataOutput) throws IOException {
                dataOutput.writeInt(viewer);
                dataOutput.writeLong(length);
    
        }
    
        @Override
        public void readFields(DataInput dataInput) throws IOException {
                viewer = dataInput.readInt();
                length = dataInput.readLong();
        }
       ......
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    MapReduce优化

    MapTask并行度

    MapTask并行度,是不是越大越好?

    1. MapTask的并行度的产生
    2. inputformat根据配置信息,获得hdfs中文件的split大小和位置。
    3. 每个split就会启动一个MapTask,进行处理。
    4. 总结MapTask并行度决定机制
      split的个数。
    5. 概念:
      block:hdfs文件的最小单元。
      split:文件切分信息,虚拟的文件切片。
    6. 默认:blocksize的大小就是split的大小128M,也就是一个MapTask执行的任务。
      这样能够减少多个节点的MapTask之间的网络IO。
    7. 切片操作是针对1个文件,多个文件的切片不会合并。

    在这里插入图片描述

    InputFormat(优化1)
    1. 作用
    1. 对hdfs中的文件进行split切片,计算。(逻辑切分 start end)
    2. 读入的结果交给MapTask进行处理。
    # 2. TextInputFormat
    	接口:org.apache.hadoop.mapreduce.InputFormat
    	实现类: 
    		org.apache.hadoop.mapreduce.lib.input.TextInputFormat
    		特点:逐行读入,并形成key(偏移量)-value(行数据),key是偏移量,value是当前行的数据。
    	1. 指定一个输入文件
    		TextInputFormat.addInputPath(job,new Path("/hdfs文件"));
    	2. 指定一个输入目录
    		TextInputFormat.addInputPath(job,new Path("/hdfs目录"));
    	3. 指定多个输入文件
    		job.setInputFormatClass(TextInputFormat.class);
            FileInputFormat.addInputPath(job,new Path("/hdfs/文件1.txt"));
            FileInputFormat.addInputPath(job,new Path("/hdfs/文件2.txt"));
            FileInputFormat.addInputPath(job,new Path("/hdfs/文件3.txt"));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    # 3. CombineTextInputFormat
    	特点: 将多个小block合并成1个split处理,设置切片大小为10M。
    	应用: 海量的小数据文件产生海量小block,合并成大的split,减少split数量,减少MapTask数量,提高MapReduce性能。
        代码:
    		job.setInputFormatClass(CombineTextInputFormat.class);
    		CombineTextInputFormat.setMaxInputSplitSize(job,10485760);//10M,只要加起来不超过10M的block数据,都会合并成1个split处理。
            FileInputFormat.addInputPath(job,new Path("/hdfs/目录"));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    Combiner合并(优化2)

    # 案例
    # 测试案例(消费记录)
    姓名	消费金额
    张三	100
    王五	200
    李四	300
    张三	400
    王五	500
    张三	600
    # 期望结果
    张三	1100
    李四	300
    王五	700
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    # 案例实现思路中存在的效率问题
    问题:
       累加的计算任务,几乎全部压在Reduce程序,程序只有1个,压力过大,效率太低。
    解决方案:
       核心:将任务尽可能前置
       方案:将累加(Reduce)的操作在Map端提前执行好
       优势:
           ① MapTask本地存放执行结果大大减少。
           ② Reduce下载MapTask执行结果,效率提升。
           ③ ReduceTask归并排序和合并数据操作效率提升了。
           ④ ReduceTask执行数据量大大减少,效率提升。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    # Combiner说明
       概念:合并汇总
            MapTask端局部Reduce操作(合并merge、执行Reducer.reduce)
       时机:MapTask输出后,排序之后,执行Combiner操作,之后将结果存放在本地。
       代码:
            job.setCombinerClass(Reducer的类.class);
       应用场景:
            适合:累加、统计总数、排名 (支持可迭代性)
            不适合:平均值。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在这里插入图片描述

    ReduceTask并行度(优化3)

    # 测试案例:商品浏览日志
    日期			域名					商品url					  商品名       pid     驻留时间
    2020年3月3日	www.baizhiedu.com	/product/detail/10001.html	iphoneSE	10001	30
    2020年3月3日	www.baizhiedu.com	/product/detail/10001.html	iphoneSE	10001	60
    2020年3月3日	www.baizhiedu.com	/product/detail/10001.html	iphoneSE	10001	100
    2020年3月3日	www.baizhiedu.com	/product/detail/10002.html	xps15	10002	10
    2020年3月3日	www.baizhiedu.com	/product/detail/10003.html	thinkpadx390	10003	200
    2020年3月3日	www.baizhiedu.com	/product/detail/10004.html	iphoneX	10004	100
    2020年3月3日	www.baizhiedu.com	/product/detail/10003.html	thinkpadx390	10003	100
    2020年3月3日	www.baizhiedu.com	/product/detail/10001.html	iphoneSE	10001	120
    2020年3月4日	www.baizhiedu.com	/product/detail/10001.html	iphoneSE	10001	200
    2020年3月5日	www.baizhiedu.com	/product/detail/10001.html	iphoneSE	10001	25
    2020年3月6日	www.baizhiedu.com	/product/detail/10001.html	iphoneSE	10001	20
    
    # 期望结果
    pid    访问次数
    10001	7
    10002	1
    10003	2
    10004	1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    # 提高ReduceTask的数量,提高Reduce的并行度,提高效率
    	1. 增加ReduceTask的并行度(数量) ,可以启动多个ReduceTask程序处理mapTask的汇总结果,可以提高效率。
    	2. 每个ReduceTask输出结果,都会单独的输出到1个文件。(注意)
    # ReduceTask的数量是可以在程序中手动指定
    		默认数量为:  1个 Reduce
    		修改代码:    job.setNumReduceTasks(数字);  0 就是没有   数字是几就是几个
    		            job.setNumReduceTasks(0)
    		            job.setNumReduceTasks(2)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    默认分区partition

    • 默认分区规则
      在这里插入图片描述

    在这里插入图片描述

    • 分区流程(发生时机)

    MapReduce分区的整个流程

    1. 当MapTask任务中的mapper.map()输出结果后,会先根据map输出的key判断分区。(默认按照key.hashcode%reduceTasks)
      不同的key-value进入不同的分区。(从此分道扬镳)
    2. 对分区后数据各自做排序。(免去了分区之间数据的比较交换排序操作)
    3. 如果设置Combiner,会自动对各自分区做本地reduce汇总操作。
    4. 将结果输出mapTask机器本地。(分区存放:分区0、分区1)
    5. ReduceTask阶段拷贝MapTask输出结果,按照分区拷贝。
      a: ReduceTask0 从所有MapTask阶段拷贝所有的分区0的数据。(n多个分区0数据)
      b: 合并所有远程拷贝到的分区0的文件数据,排序(归并排序)
      c: 合并当前分区0中的key的value。(merge)[k-v1,v2,v3]
      d: 启动1个执行ReduceTask,输出到文件中。
    6. ReduceTask阶段拷贝MapTask输出结果,按照分区拷贝。
      a: ReduceTask1 从所有MapTask阶段拷贝所有的分区1的数据。(n多个分区1数据)
      b: 合并所有远程拷贝到的分区1的文件数据,排序(归并排序)
      c: 合并当前分区1中的key的value。(merge)[k-v1,v2,v3]
      d: 启动1个执行ReduceTask,输出到文件中。
      5和6 reduce阶段各自处理各自分区的数据

    在这里插入图片描述

    自定义Partition(优化4)

    # 自定义partition
    将下面数据分区处理:
    人名  科目 成绩
    张三	语文	10
    李四	数学	30
    王五	语文	20
    赵6	英语	40
    张三	数据	50
    李四	语文	10
    张三	英语	70
    李四	英语	80
    王五	英语	45
    王五	数学	10
    赵6	数学	10
    赵6	语文	100
    思路:
    	1:分区依据要作为key
    	2:排序字段也要作为key。
    	3:避免合并,key要唯一,不重复(所有key都不一样)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    在这里插入图片描述

    思路:通过修改Reduce的个数,设置分区的个数。

    自定义分区编码

    ① 定义分区类

    执行时机:

    Map输出key-value,后,会调用getPartition方法,决定当前key-value进入哪个分区。
    注意:
    分区号0开始。
    当前key属于那个分区,就返回对应分区的编号。

    在这里插入图片描述

    ② 使用分区类

    job.setPartitionerClass(自定义Partitioner.class);

    ③ 设定reducer个数(开启分区)

    job.setNumReduceTasks(数字);//reduceTask数量要和分区数量一样。

    MapReduce工作原理(终极版)

    Spill溢写

    环形缓冲区:
    1. map输出的结果k-v会存入环形缓冲区(从start下标开始写,写到80%,则触发溢写程序的线程。环形缓冲区继续写入)
    溢写过程:
    2. 当环形缓冲区中的数据,达到80%,则开始溢写。(每次写够80%/MapTask处理完毕,就开始溢写。)
    	mapred-site.xml中修改mapreduce.map.sort.spill.percent的值。
    3. 从缓冲区中读取key-value,自带分区号。
    4. 每次溢写,对本次溢写范围内的数据做排序。
    5. 如果设置combiner,则执行map端的reduce合并处理(可选,未必有)
    6. 将本次溢写的数据(key有序)写入到本地的磁盘上,产生一个溢写文件。
    7. 循环2~6,将产生多个溢写文件在本地磁盘中。
    mapper处理完毕后
    8. 将各个分区中,多次溢写的文件,再进行一次合并排序,每个分区,多次溢写产生的多个有序文件,合并成1个整体key有序的文件。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    在这里插入图片描述
    在这里插入图片描述

    MapReduce排序
    1:maptask输出环形缓冲区,缓冲区每次溢写,发生一轮排序。--每次溢写
    2:Maptask多次溢写产生的多个溢写文件,要做归并排序(整体排序)---map端本地产生多个溢写文件。
    3:ReduceTask汇总多个MapTask的结果文件,归并排序(整体排序)--- reduce下载maptask处理结果产生的多个文件,归并排序。
    
    • 1
    • 2
    • 3

    Shuffle(混洗-洗牌)

    过程,人为对MapReduce整体中部分过程,做了称呼。

    总结:站在数据角度,k-v从Mapper离开,一直到传给Reducer方法,中间过程,叫做shuffle

    • map阶段
    1. mapper输出结果key-value
      ① 获得ko-vo获得分区号。(Partitioner.getpartion())
      ② 将ko-vo写出到环形缓冲区中。
    2. 一旦环形缓冲区中数据达到溢写条件(80%,写完了),触发溢写的线程2。
      ① 读取环形缓冲区中的数据,本次溢写对应的范围内数据80%;
      ② 根据分区号,分区内排序、(Combiner)
      ③ 将分区内,排序后的key-value数据,写入本地磁盘的文件中。(一次溢写产生一个文件)
      ④ 每次达到溢写条件(80%,写完了),①~③,在mapTask本地磁盘形成溢写文件。(多次溢写,产生多个溢写文件)
      ⑤ 最后在本地完成一次分区内多个溢写文件 归并排序,产生1个文件(maptask处理结果)。

    reduce阶段

    3. 根据分区号,启动ReduceTask,下载多个MapTask处理结果中的对应分区文件
    	MapTaskA(分区0)----ReduceTask0
    	MapTaskB(分区0)----ReduceTask0
    4. 将当前分区中,来自不同MapTask的分区文件,归并排序。(为了reduce的merge操作效率)
    	产生1个大的本分区的文件,且内容key有序。
    5. merge操作: 将多个maptsk下载文件合并、排序、分组、合并。
    6. 逐步读取k-vs,调用reducer.reduce();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    源码分析

    • 启动mapreduce任务 yarn jar xx.jar

      1. 对源文件进行逻辑切片,切分多个splits。
      2. 代码执行从job.job.waitForCompletion(true)开始;
    • MapTask阶段

      入口:MapTask.run()开始

      1. 创建MapTask,创建mapper对象,获得inputformat。
      2. 循环读取key-value,交给mapper.map(key,value)
      3. mapper.map(){context.write(key,value)},进入到环形缓冲区。
        缓冲区放入(key,value,分区号)
    • 溢写过程:

      读key-value
      排序
      combiner(可选)
      写入本地磁盘文件中。
      合并分区内的多个溢写文件。(mapper的所有map执行结束后)
      在这里插入图片描述

    reduceTask阶段源码

    1. ReduceTask调用Reducer.reduce方法逻辑
      在这里插入图片描述

    全工作流程总结

    在这里插入图片描述

    • mapreduce工作流程(终极版)

    提交job的准备
    1. 创建InputFormat,读取数据
    ① 获得文件split

    MapTask过程
    1. InputFormat—LineRecordReader.
    ② 读取split范围内的数据,k-v。
    2. 调用Mapper.run(),来对split范围内数据进行处理
    Mapper.run(){
    setup(context);
    while(xxx){
    // 每读取一条key-value,调用一次map方法。
    map(key,value,context);
    }
    cleanup(context);
    }
    3. mapper输出结果
    ① 获得ko-vo获得分区号。(Partitioner.getpartion(key,value,num))
    ② 将ko-vo,连带分区号,一块写出到环形缓冲区中。
    4. 一旦环形缓冲区中数据达到溢写条件(80%,写完了)—溢写过程。
    ① 读取环形缓冲区中的数据(key-value-分区号)
    ② 根据分区号,分区排序、(Combiner)
    ③ 将处理结果溢写到磁盘中文件中。
    ④ 每次达到溢写条件(80%,写完了),①~③,在mapTask本地磁盘形成各分区内的一个溢写文件。(多个溢写文件。)
    ⑤ 最后mapper处理完毕后边,触发一次溢写,产生一个溢写文件。
    ⑥ 最后,对MapTask本地,分区内,的多个溢写文件,合并成一个大文件。(发生一次归并排序)

    ReduceTask过程:
    1. 根据分区号,启动ReduceTask,下载多个MapTask处理结果中的对应分区文件
    MapTaskA(分区0)----ReduceTask0
    MapTaskB(分区0)----ReduceTask0
    2. 将当前分区中,来自不同MapTask的分区文件,归并排序。
    产生1个大的本分区的文件,且内容key有序。
    3. merge操作,将有序的结果,合并key的value。
    4. 循环调用reducer的reduce方法,处理汇总的数据
    Reducer.run(){
    setup(context);
    while(xxx){
    reduce.reduce(key,values);
    context.write(k,v)
    }
    cleanup(context);
    }
    5. ReduceTask调用OutputFormat将结果写入到hdfs文件中。

    Shuffle:
    # map阶段
    3. mapper输出结果
    ① 获得ko-vo获得分区号。(Partitioner.getpartion(key,value,num))
    ② 将ko-vo,连带分区号,一块写出到环形缓冲区中。
    4. 一旦环形缓冲区中数据达到溢写条件(80%,写完了)—溢写过程。
    ① 读取环形缓冲区中的数据(key-value-分区号)
    ② 根据分区号,分区排序、(Combiner)
    ③ 将处理结果溢写到磁盘中文件中。
    ④ 每次达到溢写条件(80%,写完了),①~③,在mapTask本地磁盘形成各分区内的一个溢写文件。(多个溢写文件。)
    ⑤ 最后mapper处理完毕后边,触发一次溢写,产生一个溢写文件。
    ⑥ 最后,对MapTask本地,分区内,的多个溢写文件,合并成一个大文件。(发生一次归并排序)
    # reduce阶段
    1. 根据分区号,启动ReduceTask,下载多个MapTask处理结果中的对应分区文件
    MapTaskA(分区0)----ReduceTask0
    MapTaskB(分区0)----ReduceTask0
    2. 将当前分区中,来自不同MapTask的分区文件,归并排序。
    产生1个大的本分区的文件,且内容key有序。
    3. merge操作,将有序的结果,合并key的value。

    源码分析

    • MapTask
    public class MapTask{
        // 1. 启动一个新的Mapper程序。
    	private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewMapper(){
            创建inputFormat
            创建split
            创建mapper
            执行mapper处理数据。mapper.run(mapperContext);
                context.write(key,value)---分区--放入环形缓冲区内。
            关闭输出(NewOutputCollector),准备溢写。output.close(mapperContext);
                1. 排序
                2. combiner
                3. 到文件
        }
        //2. 收集key-value进入环形缓冲区
        public void collect(K key, V value)
            先分区再进入缓冲区
        //3. sortAndSpill() 溢写过程
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    在这里插入图片描述

    # mapper对象的代码机制
    	1. 每启动MapTask,执行一次runNewMapper方法,创建一个mapper类。
    	2. 每读取key-value,调用mapper.map();
    
    • 1
    • 2
    • 3
    # 2. InputFormat
    
    • 1
    public abstract class InputFormat<K, V>
        //切片方法
    	public abstract List<InputSplit> getSplits()...
        //根据split信息返回一个RecordReader(用来读取数据)
    	public abstract RecordReader<K,V> createRecordReader(InputSplit split...
    public abstract class FileInputFormat<K, V> extends InputFormat<K, V>{
        public List<InputSplit> getSplits(JobContext job){
    		long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));//获得最小值 1
        	long maxSize = getMaxSplitSize(job);//获得最大值 LongMax
            ...
            long splitSize = computeSplitSize(blockSize, minSize, maxSize);//获取split的切片大小。对应配置文件(split.minsize)
            ...
            //当文件剩余大小大于split大小的1.1倍时,进行分片
            while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                //获取block块的索引位置
              int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
              //分片
              splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                          blkLocations[blkIndex].getHosts(),
                          blkLocations[blkIndex].getCachedHosts()));
              //源文件减去已经分片大小
              bytesRemaining -= splitSize;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    
    
    • 1

    在这里插入图片描述

    在这里插入图片描述

    # 3. ReduceTask
    
    • 1
    private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewReducer(JobConf job...{
        创建reducer
            调用reducer的run
            调用setup 循环调用reduce方法,cleanup
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在这里插入图片描述

    # 4. OutputFormat
    
    • 1
    public class TextOutputFormat{
        // 将key-value写出到文件中。
        public synchronized void write(K key, V value)
    }
    
    • 1
    • 2
    • 3
    • 4
    # 作业
    1. 完成分区代码任务:
    1363157985066	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	24	27	2481	24681	200
    1363157995052	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4	4	0	264	0	200
    1363157991076	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99	2	4	132	1512	200
    1363154400022	13926251106	5C-0E-8B-8B-B1-50:CMCC	120.197.40.4	4	0	240	0	200
    1363157985066	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	24	27	2481	24681	200
    1363157995052	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4	4	0	264	0	200
    1363157991076	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99	2	4	132	1512	200
    1363154400022	13926251106	5C-0E-8B-8B-B1-50:CMCC	120.197.40.4	4	0	240	0	200
    	要求:统计每个手机号的总上传流量,总下载流量 总流量。,按照手机区放在不同的文件内。
    	(只考虑手机号开头  137 138 139 其他)
    2. 整理MapTask ReduceTask Shuffle执行流程===(MapReduce完整流程)
    	文字
    	图
    3. 思考题(非必做)
    	张三	10
    	李四	20
    	王五	30
    	悟空	15
    	八戒	90
    	沙僧	100
    	李旭	150
    	统计,其中的最大值?
    	结果:
    	李旭	150
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    Yarn分布式集群搭建

    # 0-1保证HDFS分布式集群搭建环境确保正确。
    1. jps看到如下结果
    	NameNode
    	DataNode
    	SecondaryNameNode
    2. 查看hadoop11:50070.
    	在datanode标签页看到3个正常的datanode节点信息。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    # 0-2关闭所有NameNode节点和DataNode节点
    	stop-dfs.sh
    
    • 1
    • 2
    # 1:初始化yarn相关配置
    1. mapred-site.xml
    	
            mapreduce.framework.name
            yarn
        
    2. yarn-site.xml
    	
            yarn.nodemanager.aux-services
            mapreduce_shuffle
        
    	
        
            yarn.resourcemanager.hostname
            Hadoop
        
    3. slaves
    	配置nodemanager(从机)所在的ip。
    	配置datanode(从机)所在的ip
    # 2:同步该配置到其他节点服务器上。
    [root@hadoop11 etc]# scp -r hadoop/ root@hadoop12:/opt/installs/hadoop2.9.2/etc/
    [root@hadoop11 etc]# scp -r hadoop/ root@hadoop13:/opt/installs/hadoop2.9.2/etc/
    [root@hadoop11 etc]# scp -r hadoop/ root@hadoop14:/opt/installs/hadoop2.9.2/etc/
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    # 3:启动yarn集群
    # 在namenode所在主机上
    1. 启动HDFS集群
    	start-dfs.sh
    # 在Resourcemanager所在主机上
    2. 启动yarn集群
    	start-yarn.sh
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    # 4:验证
    1. jps
    
    • 1
    • 2
    [root@hadoop11 ~]# jps
        6160 DataNode
        6513 ResourceManager
        6614 NodeManager
        6056 NameNode
        6349 SecondaryNameNode
        6831 Jps
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    2. 访问yarn的资源调度器web网页。
    	http://主节点ResourceManager节点的ip:8088
    
    • 1
    • 2
    # 关闭集群
    	1. 先关闭yarn
    		stop-yarn.sh
    	2. 在关闭hdfs
    		stop-hdfs.sh
    
    • 1
    • 2
    • 3
    • 4
    • 5

    经典案例

    TOPN

    典型:最火主播、最畅销书、最热门的商品----TopN

    需求:获得主播观众人数前3名的信息。
    # 原始数据
    主播id 观众人数  时长
    团团	2345	1000
    小黑	67123	2000
    哦吼	3456	7000
    卢本伟	912345	6000
    
    
    
    八戒	1234	5000
    悟空	456	4000
    唐僧	123345	3000
    # 期望结果
    卢本伟  912345
    唐僧    123345
    小黑    67123
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    # 方案1 傻×方案
    1. 按照观众人数,降序排序。
    2. reduce端输出前3个。
    
    • 1
    • 2
    • 3
    # 方案2 牛×方案
    1. 在每个MapTask端先各自计算Top3,并只输出top3.
    2. Reduce端只需要统计多个MapTask的Top3结果,只输出前3个。
    
    • 1
    • 2
    • 3

    在这里插入图片描述

    MapReduce源码 笔记总结

    验证MapReduce执行流程

    提交job

    //4. 启动job
    boolean b = job.waitForCompletion(true);
    |-
        // 提交job
        submit();
    	// 如果参数为true,在监控job的执行,并打印日志。
    	monitorAndPrintJob();
    |-
        // 提交job
        return submitter.submitJobInternal(Job.this, cluster);
    	|- 
            // 为Job 创建Split:对本次job操作的HDFS文件,进行split切片。
         	int maps = writeSplits(job, submitJobDir);
    			|- 
                    // 使用新API,创建split
          			maps = writeNewSplits(job, jobSubmitDir);
    				|-
                        // 反射获得当前job绑定的InputFormat对象:TextInputFormat
        				InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
        				// 根据inputformat,获得split切片。(这里进入FileInputFormat类):
        				// InputSplit:offset length  host
        				List<InputSplit> splits = input.getSplits(job);
     					|- 自此,进入split具体切片操作---->FileInputFormat
    		// 真正的提交job。
          	status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
    	// 修改job的状态为RUNING
        state = JobState.RUNNING;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    FileInputFormat

     /** 
       * 对文件生成逻辑上的切片Split,对应InputSplit,多个split对应List集合。
       * @param job the job context
       * @throws IOException
       */
      public List<InputSplit> getSplits(JobContext job) throws IOException{
    	// 最小值  1,本质上就是【mapreduce.input.fileinputformat.split.minsize】
    	long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
        // 最大值 LongMax, 本质上对应:【mapreduce.input.fileinputformat.split.maxsize】
        long maxSize = getMaxSplitSize(job);
          ...
        // 创建空的InputSplit的List,一会切一个,放里面放一个Split信息。
        List<InputSplit> splits = new ArrayList<InputSplit>();
         ...
        // 获得blockSize=128MB
        long blockSize = file.getBlockSize();//128MB
        // 获得splitSize=128MB【计算方式:minSize blockSize maxSize 在三者取其中,可以通过调节参数,修改split的大小】
        long splitSize = computeSplitSize(blockSize, minSize, maxSize);
        
        // 循环条件:如果剩余的字节大小 > splitSize的1.1倍。
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
              // block的序号
              int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
              // 构造一个split(文件路径 start length host 内存host)
              splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                                   blkLocations[blkIndex].getHosts(),
                                   blkLocations[blkIndex].getCachedHosts()));
              // 切一刀,减去当前split的字节数。
              bytesRemaining -= splitSize;
          }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    # 计算splitSize
    
    • 1

    在这里插入图片描述

    # split逻辑切片的源码
    
    • 1

    在这里插入图片描述

    MapTask

    Map任务的启动
    MapTask类

    // map程序入口
    public void run(final JobConf job, final TaskUmbilicalProtocol umbilical){
        // 使用newMapper的API运行MapTask
        runNewMapper(job, splitMetaInfo, umbilical, reporter);
    }
    private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewMapper(final JobConf job,
                        final TaskSplitIndex splitIndex,
                        final TaskUmbilicalProtocol umbilical,
                        TaskReporter reporter
                        ){
        // 创建1一个Mapper对象
        org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
          (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
            ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
        // 创建一个输入工具,InputFormat
        org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
          (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
            ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
        // 获得split信息。
        split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
            splitIndex.getStartOffset());
         // 创建一个RecordReader,inputformat中负责读取数据的。//LineRecordReader
        org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
          new NewTrackingRecordReader<INKEY,INVALUE>
            (split, inputFormat, reporter, taskContext);
        
        // 初始化当前split范围的数据读取:开启输入流。
          input.initialize(split, mapperContext);
        
        //启动Mapper的run方法的执行。-------------------------【数据处理核心位置】
        mapper.run(mapperContext);
        // job 状态更新()
        mapPhase.complete();
        
        // 进入 NewOutputCollector?????
        output.close(mapperContext);// 将清空环形缓冲区中的数据,最后溢写一次。
        |-
            // 进入--->MapOutputBuffer:环形缓冲区对象
            collector.flush();
        	|-
                // spill finished
              	resetSpill();
        		// 排序并溢写。下面继续
              	sortAndSpill();
        		// 合并溢写文件。到此MapTask阶段基本结束。
          		mergeParts();
        	// 关闭缓冲区的流
        	collector.close();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • Mapper输出结果的环形缓冲区
    // 一个split切片处理,创建一个Mapper对象
    class 自定义Mapper extends Mapper{
    	public void run(Context context) throws IOException, InterruptedException{
    		//1: 调用setup 一次。:一般用来覆盖后,天加初始化资源操作。---调用1次。
        	setup(context);
        	 // 循环读取 行数据 k(偏移量)-v(行)
              while (context.nextKeyValue()) {
                //2: 每读1行,调用1次map方法。
                map(context.getCurrentKey(), context.getCurrentValue(), context);
              }
              //3: 调用cleanup一次:一般覆盖,重写一些释放资源的代码----调用1次。
         	 cleanup(context);
    	}
    	
    	// 数据处理map方法:被子类覆盖。
    	protected void map(KEYIN key, VALUEIN value, 
                         Context context) throws IOException, InterruptedException {
                         }
    }
    
    // 自定义Mapper的子类。
    static class NameCountMap extends Mapper<LongWritable, Text,Text, IntWritable> {
            /**
             * @param key      输入map数据的key
             * @param value   输入map数据的value
             * @param context map处理完毕后输出的每条数据 包含key-value
             * @throws IOException
             * @throws InterruptedException
             * @do 接收输入的keyvalue转化为输出的keyvalue,交给shuffle分组排序
             */
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException, IOException {
                //局部计算
                String[] names = value.toString().split(" ");
                for (String name : names) {
                    context.write(new Text(name),new IntWritable(1));
                    // context写出key-value,写入到了环形缓冲区中:NewOutputCollector
                }
            }
        }
    //MapTask内部类:环形缓冲区操作类类(从context.write方法进入这里)
    private class NewOutputCollector<K,V>
        extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
        // map写出key-value,调用该方法。
        @Override
        public void write(K key, V value){
           // 从Maper写出去:这里用的partition类,默认是HashPartition
          // ----> MapOutputBuffer:缓冲区,暂时存放map的输出key-value
          collector.collect(key, value,
                            partitioner.getPartition(key, value, partitions));
        }
    }
    //环形缓冲区类
    public static class MapOutputBuffer<K extends Object, V extends Object>{
        // 将key value partition 存入环形缓冲区中。
        public synchronized void collect(key,value){
            // bufferRemaining参考:上面 spillper 和 softLimit
            bufferRemaining -= METASIZE;// 环形缓冲区80%容量内,剩余多少。
            // 如果缓冲区80% 用完了,多线程环形溢写线程,启动spill操作。
            if (bufferRemaining <= 0) {
                // spill finished, reclaim space
                    resetSpill();
                // 开始溢写。
                    startSpill();
                spillLock.unlock();// 唤醒SpillThread.run(),进入SpillTread类的run方法。
            }
            ...
            // 序列化key到缓冲区中。
            int keystart = bufindex;
            keySerializer.serialize(key);
            
            ...
                
            // 序列化value到缓冲区中。
            final int valstart = bufindex;
            valSerializer.serialize(value);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 溢写过程
    // 溢写操作的线程类:MapTask内部类
    protected class SpillThread extends Thread{
        public void run() {
            // 进入一次溢写。
            sortAndSpill();// 一旦唤醒,就开始执行溢写操作。(溢写过程中,进行排序。)
            |--- 进入环形缓冲区操作类的sortAndSpill方法内部
                // 新建一个溢写文件,参数接受了一个分区数。
            	final SpillRecord spillRec = new SpillRecord(partitions);
            	// 快速排序,对环形缓冲区中的元素进行排序
            	sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);
            	// 判断是否需要combine
                if (combinerRunner == null) {
                    // 直接写入文件
                }else{
                    // 先做combine再溢写。
                }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    ReduceTask

    public class ReduceTask extends Task{
    	public void run(JobConf job, final TaskUmbilicalProtocol umbilical){
            runNewReducer(job, umbilical, reporter, rIter, comparator, 
                        keyClass, valueClass);
        }
        private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewReducer(JobConf job,
                         final TaskUmbilicalProtocol umbilical,
                         final TaskReporter reporter,
                         RawKeyValueIterator rIter,
                         RawComparator<INKEY> comparator,
                         Class<INKEY> keyClass,
                         Class<INVALUE> valueClass
                         ) {
            // 根据job绑定的Reducer的类,反射创建出Reducer对象。
        org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
          (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
            ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
            // 进入Reducer的run方法
            reducer.run(reducerContext);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    Reducer的方法和子类覆盖的方法
    每个reduce任务,创建1个Reducer对象。

    /**
       * Advanced application writers can use the 
       * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
       * control how the reduce task works.
       * ReduceTask处理一个合并结果,调用run
       */
      public void run(Context context) throws IOException, InterruptedException {
        //1. 调用setup方法。 1次。
        setup(context);
        try {
          while (context.nextKey()) {
            // 2:循环读取一组k-vs,调用reduce方法处理:循环调用。
            reduce(context.getCurrentKey(), context.getValues(), context);
            // If a back up store is used, reset it
            Iterator<VALUEIN> iter = context.getValues().iterator();
            if(iter instanceof ReduceContext.ValueIterator) {
              ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
            }
          }
        } finally {
          // 3:调用cleanup方法。 reduce方法之后调用一次。
          cleanup(context);
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    ReduceTask

    public class ReduceTask extends Task{
    	public void run(JobConf job, final TaskUmbilicalProtocol umbilical){
            runNewReducer(job, umbilical, reporter, rIter, comparator, 
                        keyClass, valueClass);
        }
        private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewReducer(JobConf job,
                         final TaskUmbilicalProtocol umbilical,
                         final TaskReporter reporter,
                         RawKeyValueIterator rIter,
                         RawComparator<INKEY> comparator,
                         Class<INKEY> keyClass,
                         Class<INVALUE> valueClass
                         ) {
            // 根据job绑定的Reducer的类,反射创建出Reducer对象。
        org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
          (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
            ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
            // 进入Reducer的run方法
            reducer.run(reducerContext);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    Reducer的方法和子类覆盖的方法
    每个reduce任务,创建1个Reducer对象。

    /**
       * Advanced application writers can use the 
       * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
       * control how the reduce task works.
       * ReduceTask处理一个合并结果,调用run
       */
      public void run(Context context) throws IOException, InterruptedException {
        //1. 调用setup方法。 1次。
        setup(context);
        try {
          while (context.nextKey()) {
            // 2:循环读取一组k-vs,调用reduce方法处理:循环调用。
            reduce(context.getCurrentKey(), context.getValues(), context);
            // If a back up store is used, reset it
            Iterator<VALUEIN> iter = context.getValues().iterator();
            if(iter instanceof ReduceContext.ValueIterator) {
              ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
            }
          }
        } finally {
          // 3:调用cleanup方法。 reduce方法之后调用一次。
          cleanup(context);
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    总结:仰天大笑出门去,我辈岂是蓬蒿人

  • 相关阅读:
    取多条数据相同用户时间最小的那条数据
    算法合集(1)——二分查找
    Linux上所有RPM软件包下载地址&&windoes上各种安装包下载地址
    任务及任务切换
    B. Codeforces Subsequences
    牛顿法及Python实现
    python 生成html文件并端口展示
    【3D图像分割】基于Pytorch的 VNet 3D 图像分割4(改写数据流篇)
    (附源码)APP+springboot垃圾自动分类管理系统 毕业设计 160846
    Android中级——ListView和RecycleView解析
  • 原文地址:https://blog.csdn.net/weixin_48207312/article/details/126358557