FlowBean
- package com.atguigu.mr.flow;
-
- import org.apache.hadoop.io.Writable;
-
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
-
- public class FlowBean implements Writable {
- // 流量
- private long upFlow;
- private long downFlow;
- private long sumFlow;
-
- public void set(long upFlow,long downFlow){
- this.upFlow=upFlow;
- this.downFlow=downFlow;
- this.sumFlow=upFlow+downFlow;
- }
-
-
- @Override
- public String toString() {
- return upFlow +"\t"+downFlow+"\t"+sumFlow;
- }
-
- public long getUpFlow() {
- return upFlow;
- }
-
- public void setUpFlow(long upFlow) {
- this.upFlow = upFlow;
- }
-
- public long getDownFlow() {
- return downFlow;
- }
-
- public void setDownFlow(long downFlow) {
- this.downFlow = downFlow;
- }
-
- public long getSumFlow() {
- return sumFlow;
- }
-
- public void setSumFlow(long sumFlow) {
- this.sumFlow = sumFlow;
- }
-
- /**
- *
- * 序列化方法,框架调用该方法将数据序列化到执行缓存
- * @param dataOutput 框架给我们的装数据的箱子。
- * @throws IOException
- */
-
- @Override
- public void write(DataOutput dataOutput) throws IOException {
- dataOutput.writeLong(upFlow);
- dataOutput.writeLong(downFlow);
- dataOutput.writeLong(sumFlow);
-
- }
-
- /**
- *反序列化方法,框架调用这个方法将数据从箱子里面取出来
- * @param dataInput 装数据的箱子
- * @throws IOException
- */
-
- @Override
- public void readFields(DataInput dataInput) throws IOException {
- this.upFlow=dataInput.readLong();
- this.downFlow=dataInput.readLong();
- this.sumFlow=dataInput.readLong();
-
- }
- }
- package com.atguigu.mr.flow;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
- import java.io.IOException;
-
- public class Flowdriver {
- public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
- Job job =Job.getInstance(new Configuration());
-
- job.setMapperClass(Flowmapper.class);
- job.setReducerClass(FlowReduce.class);
-
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(FlowBean.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(FlowBean.class);
-
- FileInputFormat.setInputPaths(job,new Path("F:\\input"));
- FileOutputFormat.setOutputPath(job,new Path("F:\\aa\\output"));
-
- boolean b=job.waitForCompletion(true);
- System.exit(b?0:1);
-
-
- }
- }
- package com.atguigu.mr.flow;
-
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
-
- import java.io.IOException;
-
- public class Flowmapper extends Mapper
{ - private Text phone =new Text();
- private FlowBean flowBean =new FlowBean();
- /**
- * 对数据进行封装
- * @param key
- * @param value
- * @param context
- * @throws IOException
- * @throws InterruptedException
- */
-
-
- @Override
- protected void map(LongWritable key, Text value, Mapper
.Context context) throws IOException, InterruptedException { - // super.map(key, value, context);
- // 拿到一行数据 按照\t切分
- String[] fields=value.toString().split("\t");
-
- // 封装手机号
- phone.set(fields[1]);
- flowBean.set(
- Long.parseLong(fields[fields.length-3]),
- Long.parseLong(fields[fields.length-2])
- );
-
- // 将phone和手机号输出
- context.write(phone,flowBean);
-
- }
-
- }
Flowreduce
- package com.atguigu.mr.flow;
-
- import org.apache.hadoop.classification.InterfaceAudience;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
-
- import java.io.IOException;
-
- public class FlowReduce extends Reducer
{ - private FlowBean result=new FlowBean();
- /**
- * 按照手机号进行分组,--然后在这里累加
- * @param key 手机号
- * @param values 手机号所有的流量
- * @param context
- * @throws IOException
- * @throws InterruptedException
- */
- @Override
- protected void reduce(Text key, Iterable
values, Reducer.Context context) throws IOException, InterruptedException { - // super.reduce(key, values, context);
- // 讲一个手机号的所有流量进行累加
- long sumUpFlow =0;
- long sumDownFlow=0;
- for(FlowBean value:values){
- sumUpFlow+=value.getUpFlow();
- sumDownFlow+=value.getDownFlow();
- }
-
- result.set(sumUpFlow,sumDownFlow);
- // 将累加的流量输出
- context.write(key,result);
-
- }
- }