• hadoop MapReduce运营商案例关于用户基站停留数据统计


    如果需要文件和代码的话可评论区留言邮箱,我给你发源代码

    本文来自博客园,作者:Arway,转载请注明原文链接:https://www.cnblogs.com/cenjw/p/hadoop-mapReduce-operator-case.html

    实验要求

    统计每个用户在不同时段中各个基站的停留时间。

    1.功能描述

    用户的手机,连接到不同的基站会产生一条记录。
    数据格式为:用户标识 设备标识 基站位置 通讯的日期 通讯时间
    example: 0000009999 0054785806 00000089 2016-02-21 21:55:37

    需要得到的数据格式为:
    用户标识 时段 基站位置 停留时间
    example: 0000000001 09-18 00000003 15
    用户0000000001在09-18点这个时间段在基站00000003停留了15分钟

    2.实现思路

    程序运行支持传入时间段,比如“09-18-24”,表示分为0点到9点,9点到18点,18点到24点三个时间段。

    • (1)Mapper阶段
      对输入的数据,算出它属于哪个时间段。
      k1:每行记录在文本中的偏移量。
      v2:一条记录
      k2用“用户ID,时间段”输出。
      v2用“基站位置,时间”。时间用unix time

    • (2)Reducer阶段
      对获取的v3(v3是一个集合,每个元素是v2,相当于按照k2对v2分组)进行排序,以时间升序排序。
      计算两两之间的时间间隔,保存到另一个集合中,两个不同的时间间隔中,从基站A移动到基站B,这样获取到在A基站的停留的时间。
      同理从基站B移动到基站C,基站C移动到基站D,依次类推,所有的时间都获取到。再把时间累加起来,就可以获取到总的时间。

    本文来自博客园,作者:Arway,转载请注明原文链接:https://www.cnblogs.com/cenjw/p/hadoop-mapReduce-operator-case.html

    代码实现

    PhoneMain.java

    复制代码
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    package phoneMapReduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * Created by ue50 on 11/13/19. */ public class PhoneMain { public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException { //String.equals()比较字符串的值是否相同 if(args == null || "0".equals(args[0])) { throw new RuntimeException("argument is not right!"); } //Configuration是作业的配置信息类 Configuration configuration = new Configuration(); //set(String name, String value)设置配置项 configuration.set("timeRange", args[0]); Job job = Job.getInstance(configuration); job.setJarByClass(PhoneMain.class); job.setMapperClass(PhoneMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputKeyClass(Text.class); job.setReducerClass(PhoneReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //FileInputFormat.setInputPaths(job, new Path("hdfs://xdata-m0:8020/user/ue50/pos.txt")); //FileOutputFormat.setOutputPath(job, new Path("hdfs://xdata-m0:8020/user/ue50/out")); FileInputFormat.setInputPaths(job, new Path(args[1])); FileOutputFormat.setOutputPath(job, new Path(args[2])); job.waitForCompletion(true); } }

    Mapper阶段
    PhoneMapper.java

    复制代码
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    package phoneMapReduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * Created by ue50 on 11/13/19. */ public class PhoneMapper extends Mapper<LongWritable, Text, Text, Text> { private int[] timeRangeList; @Override //setup()被MapReduce框架仅且执行一次,在执行Map任务前,进行相关变量或者资源的集中初始化工作 protected void setup(Context context) throws IOException,InterruptedException { //Configuration是作业的配置信息类,通过Configuration可以实现在多个mapper和多个reducer任务之间共享信息 Configuration configuration = context.getConfiguration(); //get(String name)根据配置项的键name获取相应的值 String timeRange = configuration.get("timeRange");//运行时传入的时间段,比如“09-18-24” String[] timeRangeString = timeRange.split("-"); timeRangeList = new int[timeRangeString.length]; for(int i = 0; i < timeRangeString.length;i++) { //timeRangeList数组保存传入的时间,如:09、18、24 timeRangeList[i] = Integer.parseInt(timeRangeString[i]); } } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String values[] = value.toString().split("\\s+");//对一条记录"用户标识 设备标识 基站位置 通讯的时间"按空格拆分 String userId = values[0];//用户标识 String baseStation = values[2];//基站位置 String timeString = values[4];//访问时间,如:21:55:37 String[] times = timeString.split(":");//对访问时间按':'拆分 int hour = Integer.parseInt(times[0]);//小时 //startHour、endHour时间段的起止时间 int startHour = 0; int endHour = 0; for(int i = 0; i < timeRangeList.length; i++) { if(hour < timeRangeList[i]) { if(i == 0) { startHour = 0; } else { startHour = timeRangeList[i-1]; } endHour = timeRangeList[i]; break; } } if(startHour == 0 && endHour == 0) { return; } //k2:用户标识 时间段 v2:基站位置-访问时间 context.write(new Text(userId + "\t" + startHour + "-" + endHour + "\t"), new Text(baseStation + "-" + timeString)); } }

    Reducer阶段

    复制代码
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    package phoneMapReduce; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; /** * Created by ue50 on 11/13/19. */ public class PhoneReducer extends Reducer<Text, Text, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { List<String> valueList = new LinkedList<String>();//基于链表的动态数组 //Map是一种把键对象和值对象映射的集合,TreeMap是一个有序的key-value集合, //它是通过红黑树实现的,TreeMap中的元素默认按照key的自然排序排列 Map<String, Long> residenceTimeMap = new TreeMap<String, Long>(); for(Text value : values) { String item = value.toString(); valueList.add(item);//"基站位置-访问时间"的集合 } if(valueList == null || valueList.size() <= 1) { return; } //Comparator是比较器 //Collections.sort()方法中的自定义比较器,根据比较器的实现逻辑对valueList进行排序 Collections.sort(valueList, new Comparator<String>() {//匿名内部类 @Override //重写比较器中的比较方法:compare方法 public int compare(String o1, String o2) { o1 = o1.split("-")[1]; o2 = o2.split("-")[1]; return o1.compareTo(o2);//根据访问时间对valueList排序,第一个参数.compareTo(第二个参数)升序 } }); for(int i = 0;i < valueList.size()-1; i++) { String station = valueList.get(i).split("-")[0];//基站位置 String time1 = valueList.get(i).split("-")[1];//访问时间 String time2 = valueList.get(i + 1).split("-")[1]; //对日期/时间进行格式化,HH:24小时制 DateFormat dateFormat = new SimpleDateFormat("HH:hh:ss"); //Date对象用于处理日期与时间 Date date1 = null; Date date2 = null; try{ date1 = dateFormat.parse(time1);//parse():把String型的字符串转换成特定格式的Date类型 date2 = dateFormat.parse(time2); }catch (ParseException e) { e.printStackTrace(); } //date1.before(date2),当date1小于date2时,返回TRUE,当大于等于时,返回false; if(date1.before(date2)) { long time = date2.getTime() - date1.getTime();//getTime方法返回的是毫秒数 Long count = residenceTimeMap.get(station);//返回key关联的值,没有值返回null if(count == null) { residenceTimeMap.put(station, time);//<基站位置,停留时间> } else { residenceTimeMap.put(station, count + time);//将停留时间累积 } } } valueList = null; //TreeMap的keySet():以升序返回一个具有TreeMap键的Set视图 Set<String> keySet = residenceTimeMap.keySet();//keySet:<基站位置> for(String mapKey : keySet) { long minute = residenceTimeMap.get(mapKey);//停留时间毫秒 minute = minute/1000/60;//分钟 //minute = minute/1000;//秒 context.write(new Text(key +"\t" + mapKey +"\t"), new LongWritable(minute)); } residenceTimeMap = null; } }

    如果需要文件和代码的话可评论区留言邮箱,我给你发源代码

  • 相关阅读:
    【The design pattern of Attribute-Based Dynamic Routing Pattern (ADRP)】
    UIAutomatorViewer排查问题
    【Vue3】--setup两个属性+computed+watch【练习代码已上传至Gitee】
    shell脚本语句控制命令(exit、break、continue)
    Mac 环境下 Java JDK 的安装与环境变量配置详解(已完美解决)
    【附源码】计算机毕业设计java元江特色农产品售卖平台设计与实现
    在QGIS中加载显示3DTiles数据
    本地笔记同步到博客
    Seata 环境搭建
    CV计算机视觉每日开源代码Paper with code速览-2023.11.14
  • 原文地址:https://www.cnblogs.com/cenjw/p/hadoop-mapReduce-operator-case.html