GroupingComparator分组案例实操
1.需求
有如下订单数据
表4-2 订单数据
现在需要求出每一个订单中最贵的商品。
(1)输入数据
(2)期望输出数据
1 222.8
2 722.4
3 232.8
2.需求分析
(1)利用“订单id和成交金额”作为key,可以将Map阶段读取到的所有订单数据按照id升序排序,如果id相同再按照金额降序排序,发送到Reduce。
(2)在Reduce端利用groupingComparator将订单id相同的kv聚合成组,然后取第一个即是该订单中最贵商品,如图4-18所示。
图4-18 过程分析
3.代码实现
(1)定义订单信息OrderBean类
package com.atguigu.mapreduce.order; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class OrderBean implements WritableComparable |
(2)编写OrderSortMapper类
package com.atguigu.mapreduce.order; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class OrderMapper extends Mapper |
(3)编写OrderSortGroupingComparator类
package com.atguigu.mapreduce.order; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class OrderGroupingComparator extends WritableComparator { protected OrderGroupingComparator() { super(OrderBean.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean aBean = (OrderBean) a; OrderBean bBean = (OrderBean) b; int result; if (aBean.getOrder_id() > bBean.getOrder_id()) { result = 1; } else if (aBean.getOrder_id() < bBean.getOrder_id()) { result = -1; } else { result = 0; } return result; } } |
(4)编写OrderSortReducer类
package com.atguigu.mapreduce.order; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; public class OrderReducer extends Reducer |
(5)编写OrderSortDriver类
package com.atguigu.mapreduce.order; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class OrderDriver { public static void main(String[] args) throws Exception, IOException { // 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[]{“e:/input/inputorder” , “e:/output1”}; // 1 获取配置信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 设置jar包加载路径 job.setJarByClass(OrderDriver.class); // 3 加载map/reduce类 job.setMapperClass(OrderMapper.class); job.setReducerClass(OrderReducer.class); // 4 设置map输出数据key和value类型 job.setMapOutputKeyClass(OrderBean.class); job.setMapOutputValueClass(NullWritable.class); // 5 设置最终输出数据的key和value类型 job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); // 6 设置输入数据和输出数据路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 8 设置reduce端的分组 job.setGroupingComparatorClass(OrderGroupingComparator.class); // 7 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } } |