Mapper端的Shuffle:
Reducer端的Shuffle:
Shuffle整体流程:
将统计结果按照条件输出到不同文件中(分区),比如将MapReduce结果按照手机号前3位输出到不同的文件中。
分区的个数决定了会产生多少个ReduceTask,也决定了最后生成的结果文件。
当不配置ReduceTask个数时,默认只有1个ReduceTask,也就只有1个分区。此时走的分区类是Hadoop的一个内部类,其分区方法getPartition会固定返回一个0,即最后所有的结果都生成到0号文件(part-r-00000)中。
当配置了ReduceTask个数大于1,但是没有指定分区类时,Hadoop 默认使用的分区类是HashPartitioner,分区方式是 hash分区 :
将key取 hash 值,然后对ReduceTask个数取余。key.hashcode() % numReduceTask(每个分区都会产生一个ReduceTask,所以ReduceTask个数就是分区个数)
// 设置ReduceTask个数为2,
// 每个ReduceTask产生一个结果文件,最后就会产生2个结果文件:0号文件(part-r-00000)、1号文件(part-r-00001)
// 按照key的hash值,对2取余,结果为0的存入0号文件,结果为1的存入1号文件
job.setNumReduceTasks(2);
用户可以自定义分区类Partitioner:
//第二步
job.setPartitionerClass(MyPartitioner.class);
//第三步
job.setNumReduceTasks(5);
自定义Partitioner:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* 根据手机号前3位分区
*/
public class MyPartitioner extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text key, IntWritable value, int numPartitions) {
String phoneStart = key.toString().substring(0, 3);
int result = 0;
// 分区号必须从0开始,逐一增加
switch (phoneStart) {
case "133":
result = 0;
break;
case "139":
result = 1;
break;
case "192":
result = 2;
break;
case "188":
result = 3;
break;
default:
break;
}
return result;
}
}
在Driver中配置使用该分区类,根据分区类中的逻辑设置ReduceTask个数:
// 设置分区类、ReduceTask个数
job.setPartitionerClass(MyPartitioner.class);
job.setNumReduceTasks(4);
如果Partitioner中分区结果有 5 个,而设置的ReduceTask个数是 4 个(比分区个数少),程序运行时就可能出现IOException:Illegal partition for xxxx(4)
。
如果设置的ReduceTask为1,那么就不会走我们的Partitioner,而是全部输出的0号文件(走的Hadoop默认的Partitioner内部类)。
如果Partitioner中分区结果有5个,但是设置的 ReduceTask 个数是 6 个,程序可以正常运行,最后会产生 6 个结果文件,且第6个结果文件是空的。这样最后分配的第6个ReduceTask被浪费了,分配了该节点但是没有处理数据。
总结:
MapTask和ReduceTask均会对数据按照key进行排序。该操作是Hadoop的默认行为,任何应用程序中的数据均会被排序,不管业务逻辑上是否需要。
默认的排序是按照字典顺序排序,且实现该排序的方法是快速排序算法。
所以mapper()方法的结果中,key必须要是可以排序的。
对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。
对于ReduceTask,它从每个MapTask上远程拷贝(拉取fetch)相应的数据文件。如果文件大小超过一定阈值,则溢写到磁盘上,否则存储在内存中:
排序分类:
如果使用自定义的 JavaBean 作为key传输,那么这个 JavaBean 需要实现 WritableComparable 接口并重写compareTo方法。
WritableComparable接口继承了 Writable 和 Comparable。我们也可以直接在 JavaBean 里面实现 Writable 和 Comparable
在Reduce
r之前,对当前Mapper
计算结果先进行一次小的合并再输出。例如第一个MapTask上计算结果是{a:1, a:1, a:1, b:1, c:1, c:1}
,经过Combiner合并后,MapTask输出结果为{a:3, b:1, c:2}
。(Combiner只对当前MapTask合并,而Reducer是对从MapTask上拉取来的所有数据进行合并)
特点:
编写自定义Combiner:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* Combiner就是一个小的Reducer,继承的类也是Reducer
* 只在当前的MapTask上运行,对当前MapTask上的结果进行汇总
*/
public class MyCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
IntWritable outValue = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get(); // value是IntWritable类型,需要调用get()进行类型转换
}
outValue.set(sum);
context.write(key, outValue);
}
}
在 job 中使用 Combiner:
// 设置Combiner
job.setCombinerClass(MyCombiner.class);
// 将 ReduceTask 数量设置为0,程序就不会运行ReduceTask。
// 没有ReduceTask,也就不会执行Shuffle阶段,所以Shuffle阶段的Combiner也就不会执行
job.setNumReduceTasks(0);
一般情况下,Combiner和Reducer的逻辑完全一样,所以可以直接将Reducer设置到Combiner中:
// 因为MyCombiner和WordCountReducer的逻辑完全一样,所以可以直接使用Reducer
job.setCombinerClass(WordCountReducer.class);
OutputFormat是MapReduce输出的基类,所有MapReduce的输出类都实现了 OutputFormat接口。
默认使用的是OutputFormat下的 FileOutputFormat下的TextOutputFormat。
自定义OutputFormat:例如在FileOutputFormat基础上自定义
自定义OutputFormat示例:
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.UUID;
/**
* 自定义输出:
* 在job中设置输出文件夹路径,如果结果的key带有spring就输出到指定文件夹下的test01.txt,否则输出到指定文件夹下的test02.txt
*/
public class MyOutputFormat extends FileOutputFormat<Text, IntWritable> {
@Override
public RecordWriter<Text, IntWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
// 返回一个我们自定义的输出Writer
MyRecordWriter writer = new MyRecordWriter(job);
return writer;
}
}
/**
* 自定义RecordWriter类,需要继承RecordWriter
*/
class MyRecordWriter extends RecordWriter<Text, IntWritable> {
private FSDataOutputStream fsOut1;
private FSDataOutputStream fsOut2;
public MyRecordWriter(TaskAttemptContext job) {
try {
// 创建输出流
FileSystem fileSystem = FileSystem.get(job.getConfiguration());
String defaultPath = "/app/WordCount/myoutput/" + UUID.randomUUID().toString();
String pathParent = job.getConfiguration().get(FileOutputFormat.OUTDIR, defaultPath); // 读取job设置的输出路径
// 创建两个输出的文件
// 获取job中配置的SpringFileName、OtherFileName
String subPath1 = pathParent + "/" + job.getConfiguration().get("SpringFileName");
String subPath2 = pathParent + "/" + job.getConfiguration().get("OtherFileName");
fsOut1 = fileSystem.create(new Path(subPath1));
fsOut2 = fileSystem.create(new Path(subPath2));
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 写操作
*/
@Override
public void write(Text key, IntWritable value) throws IOException, InterruptedException {
// 根据key是否为spring,输出到指定文件
if("spring".equalsIgnoreCase(key.toString())) {
fsOut1.writeBytes(key + "@" + value + "\n");
} else {
fsOut2.writeBytes(key + "@" + value + "\n");
}
}
/**
* 关闭IO流
*/
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
IOUtils.closeStream(fsOut1);
IOUtils.closeStream(fsOut2);
}
}
在 job 中使用我们自定义的OutputFormat:
// 设置自定义OutputFormat
job.setOutputFormatClass(MyOutputFormat.class);
// 给job添加一个属性,用作配置输出文件路径。属性名为FileOutputFormat.OUTDIR常量
// 程序中可以通过 job.getConfiguration().get(FileOutputFormat.OUTDIR) 获取
// FileOutputFormat的 _SUCCESS 文件默认也会生成到该路径下
FileOutputFormat.setOutputPath(job, new Path("/app/WordCount/myoutput/output1"));
// 给job再添加两个属性SpringFileName、OtherFileName
job.getConfiguration().set("SpringFileName", "test01.txt");
job.getConfiguration().set("OtherFileName", "test02.txt");
MapTask工作流程图:
ReduceTask工作机制:
MapTask的并行度由切片个数决定,切片个数由输入文件和切片规则决定。
ReduceTask的并行度同样影响整个 job 的执行并发度和效率,但与MapTask的并发数不同,ReduceTask数量可以直接手工设置:
// 设置ReduceTask的个数,默认值是1
job.setNumReduceTasks(4);
当ReduceTask = 0 时,表示没有Reduce阶段,输出的文件个数和MapTask个数一致(直接将MapTask的结果输出到文件。
ReduceTask默认值是1,且OutputFormat默认每个ReduceTask有1个结果文件,所以最终输出的文件只有1个。
如果ReduceTask个数设置为1,即使设置了自定义分区,最后依然使用Hadoop默认的不分区。即 执行分区的前提是 ReduceTask 个数大于1。
如果数据分布不均匀,就可能在Reduce阶段产生数据倾斜。此时如果分区不合理,就会造成某几个ReduceTask过于繁忙,而其他的ReduceTask又过于空闲。
有些情况下需要计算全局的汇总结果,这种情况就只能有1个ReduceTask。
类似SQL中的 join,需要将两个不同文件进行关联输出,例如一张是详情数据、一张是码值数据,需要关联输出的详情中进行了码值转换。
Map端的主要工作:为来自不同表或文件的 key-value 对,打标签以区别不同来源的记录。然后用连接字段作为 key ,其余部分和新加的标志作为value,最后进行输出。
Reduce端的主要工作:在Reduce端以连接字段作为 key 的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就可以了。
示例,将订单表和产品表使用MapReduce做 join。
在Mapper中,将 pid 设置为 key,这样相同 key 的订单数据、产品数据 就都能进入同一个 reduce 方法中处理。
然后在这个reduce方法中,判断如果是产品表,就获取产品名称;如果是订单表,就先存入集合。最后再遍历订单集合,将产品名称设置进去。
订单表表数据:
id(订单编号) | pid(产品编号) | amount(数量) |
---|---|---|
1001 | 01 | 1 |
1002 | 02 | 1 |
1003 | 03 | 4 |
1004 | 01 | 2 |
1005 | 01 | 3 |
1006 | 02 | 5 |
产品表:
pid(产品编号) | pname(产品名称) |
---|---|
01 | 苹果 |
02 | 桔子 |
03 | 香蕉 |
最终输出结果:订单编号、产品名称、数量
编写JavaBean:
package com.study.mapreduce.reducejoin;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* JavaBean结果
*/
public class OrderInfo implements Writable {
private String id; // 订单编号
private String pid; // 产品编号
private Integer amount; // 数量
private String pname; // 产品名称
private String _from_table; // 标记该条数据的来源表:order、product
public OrderInfo() {
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(id); // 序列化String时,使用 writeUTF写出、readUTF读取
out.writeUTF(pid);
out.writeInt(amount);
out.writeUTF(pname);
out.writeUTF(_from_table);
}
@Override
public void readFields(DataInput in) throws IOException {
id = in.readUTF();
pid = in.readUTF();
amount = in.readInt();
pname = in.readUTF();
_from_table = in.readUTF();
}
// 重写toString,将JavaBean输出到文件时会调用toString方法输出
@Override
public String toString() {
return "OrderInfo{" +
"id='" + id + '\'' +
", pid='" + pid + '\'' +
", amount=" + amount +
", pname='" + pname + '\'' +
", _from_table='" + _from_table + '\'' +
'}';
}
//getter setter
}
编写Mapper:
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.log4j.Logger;
import java.io.IOException;
public class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, OrderInfo> {
private Logger logger = Logger.getLogger(ReduceJoinMapper.class);
private Text outKey = new Text();
private String fileName = ""; // 当前MapTask处理的文件名称
/**
* 重写初始化方法,在初始化方法中通过切片信息获取到文件名称,避免每次都在map方法中读取耗费资源
* 因为每个切片对应一个MapTask,所以这个MapTask的初始化方法就是这个切片的初始化方法
* 因为一个文件可以切多个片,但是一个切片只会对应一个文件,所以这个切片对应的文件信息是固定的
*/
@Override
protected void setup(Mapper<LongWritable, Text, Text, OrderInfo>.Context context) throws IOException, InterruptedException {
// JobSubmitter 的 writeNewSplits 中会调用 input.getSplits(job) 进行切片
// 我们使用的默认InputFormat,也就是 TextInputFormat。TextInputFormat 的 getSplits 方法在 FileInputFormat(TextInputFormat的父类)中
// FileInputFormat 的 getSplits 方法会调用makeSplit创建 FileSplit 类型的切片对象
fileName = ((FileSplit) context.getInputSplit()).getPath().getName();
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, OrderInfo>.Context context) throws IOException, InterruptedException {
String[] valueArray = value.toString().split("@");
logger.info("---------------------------------------------------------");
logger.info("fileName:" + fileName);
OrderInfo orderInfo = new OrderInfo();
if(fileName.startsWith("order")) { // 处理 order.txt文件的内容
orderInfo.setId(valueArray[0]);
orderInfo.setPid(valueArray[1]);
orderInfo.setAmount(Integer.valueOf(valueArray[2]));
orderInfo.set_from_table("_ORDER_"); // 设置来源表为order表
orderInfo.setPname(""); // 属性值不能有null,否则会报NPE空指针异常。可以设置一个默认值
outKey.set(valueArray[1]);
} else { // 处理 product.txt文件的内容
orderInfo.setPid(valueArray[0]);
orderInfo.setPname(valueArray[1]);
orderInfo.set_from_table("_PRODUCT_"); // 设置来源表为product
// 防止空指针异常,对null属性设置一个默认值
orderInfo.setId("");
orderInfo.setAmount(-1);
outKey.set(valueArray[0]);
}
context.write(outKey, orderInfo);
logger.info(orderInfo);
logger.info("-------------------------------------------");
}
}
编写Reducer:
package com.study.mapreduce.reducejoin;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class ReduceJoinReducer extends Reducer<Text, OrderInfo, Text, OrderInfo> {
private Logger logger = Logger.getLogger(ReduceJoinReducer.class);
@Override
protected void reduce(Text key, Iterable<OrderInfo> values, Reducer<Text, OrderInfo, Text, OrderInfo>.Context context) throws IOException, InterruptedException {
logger.info("=====================================");
logger.info("key: " + key);
List<OrderInfo> orderInfoList = new ArrayList<>();
String pname = "";
Iterator<OrderInfo> iterator = values.iterator();
while (iterator.hasNext()) {
OrderInfo orderInfo = iterator.next();
logger.info(orderInfo);
if("_PRODUCT_".equals(orderInfo.get_from_table())) {
pname = orderInfo.getPname();
} else {
// 此处不能直接使用:orderInfoList.add(orderInfo);
// Hadoop迭代器为了优化效率,使用了对象重用。
// 迭代时value始终指向同一个内存地址,改变的只是引用地址中的字段属性
// 即这个循环了这么多次的orderInfo,其实是同一个对象,只是对象的属性值在不断变化
// 所以这里需要 new 一个新对象来存放这些属性,然后将这个新对象塞给集合
OrderInfo orderInfoTemp = new OrderInfo();
// orderInfoTemp.setId(orderInfo.getId());
// orderInfoTemp.setPid(orderInfo.getPid());
// orderInfoTemp.setAmount(orderInfo.getAmount());
// orderInfoTemp.set_from_table(orderInfo.get_from_table());
try {
// 使用BeanUtils工具对同名属性进行赋值
// 第一个参数:要设置值的目标对象
// 第二个参数:源对象
BeanUtils.copyProperties(orderInfoTemp, orderInfo);
} catch (Exception e) {
e.printStackTrace();
}
orderInfoList.add(orderInfoTemp);
}
}
logger.info("**********************************");
for (OrderInfo orderInfo : orderInfoList) {
orderInfo.setPname(pname);
context.write(key, orderInfo);
logger.info(orderInfo);
}
logger.info("===========================================");
}
}
编写Driver:
package com.study.mapreduce.reducejoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class ReduceJoinDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration config = new Configuration();
Job job = Job.getInstance(config);
job.setJarByClass(ReduceJoinDriver.class);
job.setMapperClass(ReduceJoinMapper.class);
job.setReducerClass(ReduceJoinReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(OrderInfo.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(OrderInfo.class);
FileInputFormat.setInputPaths(job, new Path("/app/order/input"));
FileOutputFormat.setOutputPath(job, new Path("/app/order/output/output2"));
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1); // 程序退出
}
}
join合并的操作如果在Reduce段完成,Reduce端的处理压力就会很大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易发生数据倾斜。
因为一般情况下,MapTask的节点数量要多于ReduceTask,可以在Map段进行数据join合并。
适用场景:一张表很小、另一张表很大。先将小表读取加载进内存中,然后在Map端进行 join合并。
具体方法:采用DistributedCache
// 缓存普通文件到Task运行节点
job.addCacheFile(new URI("file:///app/cache/product.txt"));
// 如果是集群运行,需要设置成 hdfs 路径
// 因为在Map端已经完成了join,达到了目的,不再需要ReduceTask,可以设置取消ReduceTask
job.setNumReduceTasks(0);
编写Driver:
package com.study.mapreduce.mapjoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class MapJoinDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
Configuration config = new Configuration();
Job job = Job.getInstance(config);
job.setJarByClass(MapJoinDriver.class);
job.setMapperClass(MapJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
// 不需要汇总出Value,可以设置为null(NullWritable.get())
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 设置取消ReduceTask
job.setNumReduceTasks(0);
// 加载缓存数据
job.addCacheFile(new URI("/app/order/input/product.txt"));
FileInputFormat.setInputPaths(job, new Path("/app/order/input/order.txt"));
FileOutputFormat.setOutputPath(job, new Path("/app/order/output/output3"));
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1); // 程序退出
}
}
编写Mapper:
package com.study.mapreduce.mapjoin;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
private Map<String, String> productMap = new HashMap<>(); // product表数据
private Text outKey = new Text();
/**
* 在MapTask初始化时加载缓存文件
*/
@Override
protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
// 获取product.txt缓存文件
URI[] cacheFiles = context.getCacheFiles();
URI productFile = cacheFiles[0];
// 读取product.txt文件
FileSystem fileSystem = FileSystem.get(context.getConfiguration());
FSDataInputStream inputStream = fileSystem.open(new Path(productFile));
BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
String line = "";
while((line = br.readLine()) != null) {
String[] strArray = line.split("@");
productMap.put(strArray[0], strArray[1]);
}
IOUtils.closeStream(br);
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("@");
String id = fields[0];
String pid = fields[1];
int amount = Integer.valueOf(fields[2]);
String pname = productMap.get(pid);
String resultKey = id +"\t" + pname + "\t" + amount;
outKey.set(resultKey);
context.write(outKey, NullWritable.get());
}
}