如果你的Reduce服务器cpu核数足够多,能够同时处理这么多map服务器,就是并行,否则就是并发(并发包括并行)
MapReduce不擅长串行计算,因为它的效率很低,每次的中间计算结果它都要持久化到磁盘,磁盘的读写效率都比较低;而Spark它的中间结果是可以在内存当中处理的,那这个效率就会很高。
Mapper的输出类型对应Reduce的输入类型
分类和聚合
发送到Reduce,它就会把相同的key的value进行后续的处理,你是加是减是乘还是除,那跟你的业务逻辑有关系。那我们单词统计就是将相同key的value值加在一起。
你从Map到Reduce上下游的数据类型肯定是一致的。
Job的输出结果路径有一个注意事项:它不能提前存在,如果提前存在会报FilealReadyExit异常。
(2)在pom.xml文件中添加如下依赖
org.apache.hadoop</groupId>
hadoop-client</artifactId>
3.1.3</version>
</dependency>
junit</groupId>
junit</artifactId>
4.12</version>
</dependency>
org.slf4j</groupId>
slf4j-log4j12</artifactId>
1.7.30</version>
</dependency>
</dependencies>
(2)在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
写代码始终关注它的输入和输出是什么样子
map是一行内容调用一次
package com.atguigu.mapreduce.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/*
* KEYIN,map阶段输入的key的类型:偏移量,longwritabele
* VALUEIN,map阶段输入value类型:Test
* KEYOUT,map阶段输出的key类型:Text
* VALUEOUT,map阶段输出的value类型:intwritable
*
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text outk = new Text(); // 为了避免每次执行就要大量创建对象,将其提到上面来
private IntWritable outV = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1、获取一行
// atguigu atguigu
String line = value.toString();
// 2、切割
// atguigu
// atguigu
String[] words = line.split(" ");
// 3、循环写出
for (String word : words) {
//封装outK
outk.set(word);
//写出
context.write(outk, outV);
}
}
}
Mapper的输出类型对应Reduce的输入类型
package com.atguigu.mapreduce.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/* (Mapper的输出类型对应Reduce的输入类型)
* KEYIN,reduce阶段输入的key的类型:偏移量,Text
* VALUEIN,reduce阶段输入value类型:intwritable
* KEYOUT,reduce阶段输出的key类型:Text
* VALUEOUT,reduce阶段输出的value类型:intwritable
*
*/
public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
private IntWritable outv = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum=0;
// atguigu,(1,1)
// 累加
for (IntWritable value : values) {
sum+=value.get();
}
// 写出
outv.set(sum);
context.write(key,outv);
}
}
package com.atguigu.mapreduce.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import sun.plugin.dom.core.Text;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//1、获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2、设置jar包路径
job.setJarByClass(WordCountDriver.class);
//3、关联mapper和reducer,纽带为job
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//4、设置map输出的kv类型(反射)
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//5、设置最终输出的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//6、设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path("E:\\Hadoop-Input\\inputword")); //输入路径
FileOutputFormat.setOutputPath(job,new Path("E:\\Hadoop-Input\\output1")); //输出路径
//6、提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
(1)用maven打jar包,需要添加的打包插件依赖
maven-compiler-plugin</artifactId>
3.6.1</version>
这个修改过后的,再进行打包才是我们想要的
要选中选中WordCountDriver
以后在企业里开发,通常情况下是在windows环境下搭建一个hadoop环境进行一个编写,编写完之后的代码进行打包,打包之后上传到HDFS,然后进行执行命令。(在企业里都用hive了,谁还用MapReduce)
/*
把hadoop102的内存当中的数据序列化方式到磁盘,然后将序列化文件拷贝传输到对方(将序列化文件变成字节码文件就允许传输了),传输之后再进行一个加载,加载到hadoop103的内存当中。
其中在内存往字节码文件变换的过程,它就叫序列化;将字节码文件加载到内存的过程,它叫反序列化
那么我们为什么产生序列化的这种方式啊?
就是因为在一个系统当中的内存的对象传输到另一个服务器里面的时候它没法传,那么我们就通过序列化的方式传过去之后再进行一个反序列化过程。
为什么要系列化?
需要在不同服务器传递内存数据时,用序列化
*/
为什么使用序列化?因为hadoop的序列化更轻,用起来更高效
//FlowBean类
package com.atguigu.mapreduce.writable;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/*
* 1、定义类实现writable接口
* 2、重写序列化和反序列化方法
* 3、重写空参构造
* 4、toString方法
*
* */
public class FlowBean implements Writable {
private long upFlow; //上行流量
private long downFlow;// 下行流量
private long sumFlow; // 总流量
//空参构造
public FlowBean() {
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
// 将setSumFlow重载一下
public void setSumFlow() {
this.sumFlow = this.downFlow+this.upFlow;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.upFlow=dataInput.readLong();
this.downFlow=dataInput.readLong();
this.sumFlow=dataInput.readLong();
}
@Override
public String toString() {
return upFlow +"\t" + downFlow + "\t" + sumFlow;
}
}
package com.atguigu.mapreduce.writable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
private Text outk=new Text();
private FlowBean outV=new FlowBean();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
// 1.获取一行
// 1 13736230513 192.196.100.1 www.atguigu.com 2481 24681 200
String line = value.toString(); // 获取一行,需要将它转换成String类型
//2 切割
// 1,13736230513,192.196.100.1,www.atguigu.com,2481,24681,200 7-3=4
// 2 13846544121 192.196.100.2 264 0 200 6-3=3
String[] split = line.split("\t");
// 3抓取想要的数据
// 手机号:13736230513
// 上行流量和下行流量:2481,24681
String phone =split[1];
String up =split[split.length-3];
String down=split[split.length-2];
//4 封装
outk.set(phone);
outV.setUpFlow(Long.parseLong(up));
outV.setDownFlow(Long.parseLong(down));
outV.setSumFlow();
//5.写出
context.write(outk,outV);
}
}
reduce可以把重复的手机号再聚合起来一起算流量
package com.atguigu.mapreduce.writable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer<Text,FlowBean,Text,FlowBean> {
private FlowBean outV= new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {
// 1 遍历集合累加值(因为只累加同一手机号的上下行流量,所以读取到别的手机号需要置0,不能够拿出去)
long totalUp=0;
long totaldown=0;
for (FlowBean value : values) {
totalUp+=value.getUpFlow();
totaldown+=value.getDownFlow();
}
// 2 封装outk.outv
outV.setUpFlow(totalUp);
outV.setDownFlow(totaldown);
outV.setSumFlow();
// 3 写出
context.write(key,outV);// key 就是传进来的手机号,value就是传进来的封装后的outV
}
}
package com.atguigu.mapreduce.writable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1 获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 设置jar
job.setJarByClass(FlowDriver.class);
// 3 关联mapper 和 reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 4 设置mapper 输出的key和value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 5 设置最终数据输出的key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 6 设置数据的输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path("E:\\Hadoop-Input\\inputflow"));
FileOutputFormat.setOutputPath(job, new Path("E:\\Hadoop-output\\output2"));
// 7 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
略
// 按照切片的个数来开启mapTask。你切了多少片,你就开启多少个mapTask;而且每个文件单独切,单独去计算不要和在一起。
略
本地块大小32M,集群128M。剩下的部分小于1.1倍不再切片,大于1.1.倍按照切片大小正常去切割。每一个文件单独切片
/*
1、这个环就叫缓冲区,写百分之百没办法缓冲,所以写入到缓冲区中的数据达到缓冲区大小80%的时候,此时再写进缓冲区的内容就是会反向写,然后之前80%的数据会写入到磁盘中。
2、这里有两个线程,一个线程从把数据从内存里挪到硬盘里,另一个线程把从mapper来的数据写到内存里。
3、因为有两个线程,一个负责把80%的写到磁盘,一个负责把20%存KV。
4、这里是有两个线程,一个在溢出写入进磁盘一个在反向逆写。
5、写到磁盘中,而且还必须等80%部分的数据写到磁盘完毕才行,就是为了防止反向写的时候会将以前的数据进行覆盖。所以这个设计非常巧妙。
6、这里的排序并不是进来个数据就排个序,而是所有的到达80%进行溢写前然后对数据排序。(排序发生的时间是在80%进行溢写前进,不是写一个进行一次排序。)
7、分区是为了把相同的key放到一个区
*/
接上图:
/*
Shuffle是从Map方法之后,Reduce方法之前这段数据处理的过程,这里面可以进行排序、分区、合并、压缩等等很多事情。
排序的手段叫快排,对key的索引排,一定是索引排,按照字典的顺序排
切片对应的是MapTask,分区对应的是ReduceTask
切片是split,一个切片对应一个MapTask,MapTask调用map方法将切片存入缓冲区,在缓冲区里在对切片进行分区。
在对分区进行排序、合并等操作形成分区0、1、2、3等,之后ReduceTask根据配置好的分区拉取数据、在ReduceTask中,一个分区对应一个ReduceTask
reduce拉取map输出数据的时候是拉的两组数据,混合的时候又乱序了,所以需要重新排序。
shuffle清洗数据,主要是找出相同的key
*/
// 自定义分区可以输出多个文件
在整个mapTask阶段,mapTask阶段它执行了两次排序,分别是在环形缓冲区溢写之前进行了一次快排(对key的索引排序,按照字典的顺序排);对溢写文件又进行了一次merge归并操作。
在reduceTask阶段又有一段归并排序。
mapReduce当中的Key为什么一定要排序?是为了提高相应的效率
进到环形缓冲区, 不是进来一条数据就一定对你先排序,而是到达一定阈值(80%)之后,要往磁盘上溢写之前进行一次排序,这个排序的过程是在内存当中完成的。
略
算平均值这种,它会影响你最终计算的结果
combin只是把这个maptask里汇总了,reduce还要汇总
略
Read阶段中的RecorderReader中的k对应的偏移量,v对应的是一行的内容。
ReduceTask首先干的第一件事就是拉取自己指定分区的数据,这个阶段叫做Copy阶段,拉的过程。
相同的key进入到Reduce中,这个阶段叫做Reduce阶段。最后让OutPutFormat输出。
略
略
略
略
将另一个表加载到内存中,,通过获取pid获取其它内容,然后进而将表进行连接
略
略