排序是MapReduce框架中最重要的操作之一
MapTask和ReduceTask均会按照key进行排序.该操作属于Hadoop的默认行为。任何程序中的数据都会进行排序,而不管逻辑上是否需要。
默认排序是按照字典序进行排序,而实现排序的方法是快速排序
对于MapTask,它会将处理的结果暂时存放在环形缓冲区,当环形缓冲区使用率到达一定阈值,再对缓冲区中数据进行一次快速排序,并将这些数据溢写到磁盘上,而当数据处理完毕后,它会磁盘对所有文件进行归并排序
对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写到磁盘上,否则存在内存中。如果磁盘上文件数目到达一定数目,则会进行一次归并排序以生成一个跟大文件,如果内存中文件大小或数目超过一定阈值,则会进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask会统一对内存和磁盘上的数据进行一次归并排序
排序分类
(1)部分排序
MapReduce根据输入记录的键对数据集排序,保证输出的每个文件内部有序
(2)全排序
最终输出结果只有一个文件,且内部有序,实现方法是设置一个ReduceTask,但该放法在处理大型文件时效率极低,因为一代机器处理所有文件,完全丧失了MapReduce提供的并行架构
(3)辅助排序
在Reduce端对key进行分组。应用:在接收key为bean对象时,想让一个或几个字段相同(全部字段比较不同)的key进入相同的reduce方法。可以采用分组排序
(4)二次排序
在自定义排序过程中,如果compareTo中判断条件为两个即为二次排序
自定义排序WritableComparable原理分
bean对象做为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序。
- @Override
- public int compareTo(FlowBean o) {
- // 总流量的倒叙排序
- if(this.sumFlow>o.sumFlow)
- return -1;
- else
- if(this.sumFlow
- return 1;
- else
- {
- //按上行流量的正序排
- if(this.upFlow>o.upFlow)
- return 1;
- else if (this.upFlow
- return -1;
- else{
- return 0;}
- }
- }
WritableComparable案例实操
1 全排序
根据序列化案例产生的结果在对总数量进行倒序排序
(1)输入数据
pjhone_data.tat
处理后数据 part-r-0000
需求分析

代码:FlowBean类
- import org.apache.hadoop.io.WritableComparable;
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
-
- public class FlowBean implements WritableComparable
{ -
- private long upFlow; //上行流量
- private long downFlow; //下行流量
- private long sumFlow; //总流量
-
- //提供无参构造
- public FlowBean() {
- }
-
- //生成三个属性的getter和setter方法
- public long getUpFlow() {
- return upFlow;
- }
-
- public void setUpFlow(long upFlow) {
- this.upFlow = upFlow;
- }
-
- public long getDownFlow() {
- return downFlow;
- }
-
- public void setDownFlow(long downFlow) {
- this.downFlow = downFlow;
- }
-
- public long getSumFlow() {
- return sumFlow;
- }
-
- public void setSumFlow(long sumFlow) {
- this.sumFlow = sumFlow;
- }
-
- public void setSumFlow() {
- this.sumFlow = this.upFlow + this.downFlow;
- }
-
- //实现序列化和反序列化方法,注意顺序一定要一致
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeLong(this.upFlow);
- out.writeLong(this.downFlow);
- out.writeLong(this.sumFlow);
-
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- this.upFlow = in.readLong();
- this.downFlow = in.readLong();
- this.sumFlow = in.readLong();
- }
-
- //重写ToString,最后要输出FlowBean
- @Override
- public String toString() {
- return upFlow + "\t" + downFlow + "\t" + sumFlow;
- }
-
- @Override
- public int compareTo(FlowBean o) {
-
- //按照总流量比较,倒序排列
- if(this.sumFlow > o.sumFlow){
- return -1;
- }else if(this.sumFlow < o.sumFlow){
- return 1;
- }else {
- return 0;
- }
- }
- }
Mapper类
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
- import java.io.IOException;
-
- public class FlowMapper extends Mapper
{ - private FlowBean outK = new FlowBean();
- private Text outV = new Text();
-
- @Override
- protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
-
- //1 获取一行数据
- String line = value.toString();
-
- //2 按照"\t",切割数据
- String[] split = line.split("\t");
-
- //3 封装outK outV
- outK.setUpFlow(Long.parseLong(split[1]));
- outK.setDownFlow(Long.parseLong(split[2]));
- outK.setSumFlow();
- outV.set(split[0]);
-
- //4 写出outK outV
- context.write(outK,outV);
- }
- }
Reducer类
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
- import java.io.IOException;
-
- public class FlowReducer extends Reducer
{ - @Override
- protected void reduce(FlowBean key, Iterable
values, Context context) throws IOException, InterruptedException { -
- //遍历values集合,循环写出,避免总流量相同的情况
- for (Text value : values) {
- //调换KV位置,反向写出
- context.write(value,key);
- }
- }
- }
Driver类
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import java.io.IOException;
-
- public class FlowDriver {
-
- public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
-
- //1 获取job对象
- Configuration conf = new Configuration();
- Job job = Job.getInstance(conf);
-
- //2 关联本Driver类
- job.setJarByClass(FlowDriver.class);
-
- //3 关联Mapper和Reducer
- job.setMapperClass(FlowMapper.class);
- job.setReducerClass(FlowReducer.class);
-
- //4 设置Map端输出数据的KV类型
- job.setMapOutputKeyClass(FlowBean.class);
- job.setMapOutputValueClass(Text.class);
-
- //5 设置程序最终输出的KV类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(FlowBean.class);
-
- //6 设置输入输出路径
- FileInputFormat.setInputPaths(job, new Path("D:\\inputflow2"));
- FileOutputFormat.setOutputPath(job, new Path("D:\\comparout"));
-
- //7 提交Job
- boolean b = job.waitForCompletion(true);
- System.exit(b ? 0 : 1);
- }
- }
区内排序:
要求每个省份手机号按照总流量内部排序
基于前一个需求,增加自定义分区类,分区按照省份手机号设置。
(1)添加指定分区类
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Partitioner;
-
- public class ProvincePartitioner2 extends Partitioner
{ -
- @Override
- public int getPartition(FlowBean flowBean, Text text, int numPartitions) {
- //获取手机号前三位
- String phone = text.toString();
- String prePhone = phone.substring(0, 3);
-
- //定义一个分区号变量partition,根据prePhone设置分区号
- int partition;
- if("136".equals(prePhone)){
- partition = 0;
- }else if("137".equals(prePhone)){
- partition = 1;
- }else if("138".equals(prePhone)){
- partition = 2;
- }else if("139".equals(prePhone)){
- partition = 3;
- }else {
- partition = 4;
- }
-
- //最后返回分区号partition
- return partition;
- }
- }
(2)在驱动中添加分区
- // 设置自定义分区器
-
- job.setPartitionerClass(ProvincePartitioner2.class);
-
-
-
- // 设置对应的ReduceTask的个数
-
- job.setNumReduceTasks(5);
-
相关阅读:
黑马瑞吉外卖之新增分类
7年阿里测试岗,我眼中的阿里虽然不完美,但值得去学5年
Quarto 入门教程 (2):如何使用并编译出不同文档
林木种苗生产vr虚拟实训教学降低培训等待周期
5个实用的WhatsApp 群发消息模板推荐!
如何实现毫米波波束成形和大规模MiMo?
Android 指定有线网或Wifi进行网络请求
【C++】运算符重载 ④ ( 一元运算符重载 | 使用 全局函数 实现 前置 ++ 自增运算符重载 | 使用 全局函数 实现 前置 - - 自减运算符重载 )
关于wukong-kong项目在树莓派启动后只运行一次卡死的问题的解决方法
工程师如何对待开源
-
原文地址:https://blog.csdn.net/m0_65136138/article/details/126090239