• mapreduce--流量统计


    FlowBean

    1. package com.atguigu.mr.flow;
    2. import org.apache.hadoop.io.Writable;
    3. import java.io.DataInput;
    4. import java.io.DataOutput;
    5. import java.io.IOException;
    6. public class FlowBean implements Writable {
    7. // 流量
    8. private long upFlow;
    9. private long downFlow;
    10. private long sumFlow;
    11. public void set(long upFlow,long downFlow){
    12. this.upFlow=upFlow;
    13. this.downFlow=downFlow;
    14. this.sumFlow=upFlow+downFlow;
    15. }
    16. @Override
    17. public String toString() {
    18. return upFlow +"\t"+downFlow+"\t"+sumFlow;
    19. }
    20. public long getUpFlow() {
    21. return upFlow;
    22. }
    23. public void setUpFlow(long upFlow) {
    24. this.upFlow = upFlow;
    25. }
    26. public long getDownFlow() {
    27. return downFlow;
    28. }
    29. public void setDownFlow(long downFlow) {
    30. this.downFlow = downFlow;
    31. }
    32. public long getSumFlow() {
    33. return sumFlow;
    34. }
    35. public void setSumFlow(long sumFlow) {
    36. this.sumFlow = sumFlow;
    37. }
    38. /**
    39. *
    40. * 序列化方法,框架调用该方法将数据序列化到执行缓存
    41. * @param dataOutput 框架给我们的装数据的箱子。
    42. * @throws IOException
    43. */
    44. @Override
    45. public void write(DataOutput dataOutput) throws IOException {
    46. dataOutput.writeLong(upFlow);
    47. dataOutput.writeLong(downFlow);
    48. dataOutput.writeLong(sumFlow);
    49. }
    50. /**
    51. *反序列化方法,框架调用这个方法将数据从箱子里面取出来
    52. * @param dataInput 装数据的箱子
    53. * @throws IOException
    54. */
    55. @Override
    56. public void readFields(DataInput dataInput) throws IOException {
    57. this.upFlow=dataInput.readLong();
    58. this.downFlow=dataInput.readLong();
    59. this.sumFlow=dataInput.readLong();
    60. }
    61. }

    FlowDriver

    1. package com.atguigu.mr.flow;
    2. import org.apache.hadoop.conf.Configuration;
    3. import org.apache.hadoop.fs.Path;
    4. import org.apache.hadoop.io.Text;
    5. import org.apache.hadoop.mapreduce.Job;
    6. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    7. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    8. import java.io.IOException;
    9. public class Flowdriver {
    10. public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
    11. Job job =Job.getInstance(new Configuration());
    12. job.setMapperClass(Flowmapper.class);
    13. job.setReducerClass(FlowReduce.class);
    14. job.setMapOutputKeyClass(Text.class);
    15. job.setMapOutputValueClass(FlowBean.class);
    16. job.setOutputKeyClass(Text.class);
    17. job.setOutputValueClass(FlowBean.class);
    18. FileInputFormat.setInputPaths(job,new Path("F:\\input"));
    19. FileOutputFormat.setOutputPath(job,new Path("F:\\aa\\output"));
    20. boolean b=job.waitForCompletion(true);
    21. System.exit(b?0:1);
    22. }
    23. }

    Flowmapper

    1. package com.atguigu.mr.flow;
    2. import org.apache.hadoop.io.LongWritable;
    3. import org.apache.hadoop.io.Text;
    4. import org.apache.hadoop.mapreduce.Mapper;
    5. import java.io.IOException;
    6. public class Flowmapper extends Mapper {
    7. private Text phone =new Text();
    8. private FlowBean flowBean =new FlowBean();
    9. /**
    10. * 对数据进行封装
    11. * @param key
    12. * @param value
    13. * @param context
    14. * @throws IOException
    15. * @throws InterruptedException
    16. */
    17. @Override
    18. protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
    19. // super.map(key, value, context);
    20. // 拿到一行数据 按照\t切分
    21. String[] fields=value.toString().split("\t");
    22. // 封装手机号
    23. phone.set(fields[1]);
    24. flowBean.set(
    25. Long.parseLong(fields[fields.length-3]),
    26. Long.parseLong(fields[fields.length-2])
    27. );
    28. // 将phone和手机号输出
    29. context.write(phone,flowBean);
    30. }
    31. }

    Flowreduce

    1. package com.atguigu.mr.flow;
    2. import org.apache.hadoop.classification.InterfaceAudience;
    3. import org.apache.hadoop.io.Text;
    4. import org.apache.hadoop.mapreduce.Reducer;
    5. import java.io.IOException;
    6. public class FlowReduce extends Reducer {
    7. private FlowBean result=new FlowBean();
    8. /**
    9. * 按照手机号进行分组,--然后在这里累加
    10. * @param key 手机号
    11. * @param values 手机号所有的流量
    12. * @param context
    13. * @throws IOException
    14. * @throws InterruptedException
    15. */
    16. @Override
    17. protected void reduce(Text key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {
    18. // super.reduce(key, values, context);
    19. // 讲一个手机号的所有流量进行累加
    20. long sumUpFlow =0;
    21. long sumDownFlow=0;
    22. for(FlowBean value:values){
    23. sumUpFlow+=value.getUpFlow();
    24. sumDownFlow+=value.getDownFlow();
    25. }
    26. result.set(sumUpFlow,sumDownFlow);
    27. // 将累加的流量输出
    28. context.write(key,result);
    29. }
    30. }

  • 相关阅读:
    ExpressPOS - GST Point of sale & Inventory system
    大数据知识面试题-通用(2022版)
    自主移动机器人模型制作
    网络安全等级保护与分级保护指导案例
    A1052 Linked List Sorting(25分)PAT 甲级(Advanced Level) Practice(C++)满分题解【链表地址+排序]
    c# 扩展类,扩展方法
    Day10—SQL那些事(特殊场景的查询)
    LeetCode刷题(python版)——Topic70. 爬楼梯
    Windows平台下OpenCV的编译与安装
    22-09-04 西安 谷粒商城(01)MySQL主从复制、MyCat读写分离、MyCat分库分表
  • 原文地址:https://blog.csdn.net/weixin_74711824/article/details/134426641