• 第1关:MapReduce综合应用案例 — 电信数据清洗


    根据提示,在右侧编辑器补充代码,对数据按照一定规则进行清洗。

    数据说明如下: a.txt

    数据切分方式:,

    数据所在位置:/user/test/input/a.txt

    15733218050,15778423030,1542457633,1542457678,450000,530000

    157332180501577842303015424576331542457678450000530000
    呼叫者手机号接受者手机号开始时间戳(s)接受时间戳(s)呼叫者地址省份编码接受者地址省份编码

    Mysql数据库:

    用户名:root 密码:123123

    数据库名:mydb

    用户表:userphone

    列名类型非空是否自增介绍
    idint(11)用户ID
    phonevarchar(255)手机号
    trueNamevarchar(255)真实姓名

    地址省份表:allregion

    列名类型非空是否自增介绍
    idint(11)用户ID
    CodeNumvarchar(255)编号
    Addressvarchar(255)地址

    清洗规则:

    • 处理数据中的时间戳(秒级)将其转化为2017-06-21 07:01:58,年-月-日 时:分:秒 这种格式;

    • 处理数据中的省份编码,结合mysql的表数据对应,将其转换成省份名称;

    • 处理用户手机号,与mysql的表数据对应,关联用户的真实姓名;

    • 处理数据中的开始时间与结束时间并计算通信时长(以秒为单位);

    • 设置数据来源文件路径及清洗后的数据存储路径: 数据来源路径为: /user/test/input/a.txt (HDFS); 清洗后的数据存放于:/user/test/output (HDFS)

    数据清洗后如下:

    邓二,张倩,13666666666,15151889601,2018-03-29 10:58:12,2018-03-29 10:58:42,30,黑龙江省,上海市

    邓二张倩13666666666151518896012018-03-29 10:58:122018-03-29 10:58:4230黑龙江省上海市
    用户名A用户名B用户A的手机号用户B的手机号开始时间结束时间

    step/com/LogMR.java

    1. package com;
    2. import java.io.IOException;
    3. import java.sql.Connection;
    4. import java.sql.ResultSet;
    5. import java.sql.SQLException;
    6. import java.sql.Statement;
    7. import java.text.SimpleDateFormat;
    8. import java.util.ArrayList;
    9. import java.util.HashMap;
    10. import java.util.Iterator;
    11. import java.util.List;
    12. import java.util.Map;
    13. import org.apache.hadoop.conf.Configuration;
    14. import org.apache.hadoop.fs.FileSystem;
    15. import org.apache.hadoop.fs.Path;
    16. import org.apache.hadoop.io.LongWritable;
    17. import org.apache.hadoop.io.NullWritable;
    18. import org.apache.hadoop.io.Text;
    19. import org.apache.hadoop.mapreduce.Job;
    20. import org.apache.hadoop.mapreduce.Mapper;
    21. import org.apache.hadoop.mapreduce.Reducer;
    22. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    23. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    24. public class LogMR {
    25. /********** begin **********/
    26. static class MyMapper extends Mapper<LongWritable, Text, PhoneLog, NullWritable> {
    27. Map<String, String> userMap = new HashMap<>();
    28. Map<String, String> addressMap = new HashMap<>();
    29. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    30. PhoneLog pl = new PhoneLog();
    31. Text text = new Text();
    32. @Override
    33. protected void setup(Context context) throws IOException, InterruptedException {
    34. Connection connection = DBHelper.getConnection();
    35. try {
    36. Statement statement = connection.createStatement();
    37. String sql = "select * from userphone";
    38. ResultSet resultSet = statement.executeQuery(sql);
    39. while (resultSet.next()) {
    40. String phone = resultSet.getString(2);
    41. String trueName = resultSet.getString(3);
    42. userMap.put(phone, trueName);
    43. }
    44. String sql2 = "select * from allregion";
    45. ResultSet resultSetA = statement.executeQuery(sql2);
    46. while (resultSetA.next()) {
    47. String phone = resultSetA.getString(2);
    48. String trueName = resultSetA.getString(3);
    49. addressMap.put(phone, trueName);
    50. }
    51. } catch (SQLException e) {
    52. e.printStackTrace();
    53. }
    54. }
    55. @Override
    56. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    57. String str = value.toString();
    58. String[] split = str.split(",");
    59. if (split.length == 6) {
    60. String trueName1 = userMap.get(split[0]);
    61. String trueName2 = userMap.get(split[1]);
    62. String address1 = addressMap.get(split[4]);
    63. String address2 = addressMap.get(split[5]);
    64. long startTimestamp = Long.parseLong(split[2]);
    65. String startTime = sdf.format(startTimestamp * 1000);
    66. long endTimestamp = Long.parseLong(split[3]);
    67. String endTime = sdf.format(endTimestamp * 1000);
    68. long timeLen = endTimestamp - startTimestamp;
    69. pl.SetPhoneLog(trueName1, trueName2, split[0], split[1], startTime, endTime, timeLen, address1,
    70. address2);
    71. context.write(pl, NullWritable.get());
    72. }
    73. }
    74. }
    75. public static void main(String[] args) throws Exception {
    76. Configuration conf = new Configuration();
    77. Job job = Job.getInstance(conf);
    78. job.setJarByClass(LogMR.class);
    79. job.setMapperClass(MyMapper.class);
    80. job.setMapOutputKeyClass(PhoneLog.class);
    81. job.setMapOutputValueClass(NullWritable.class);
    82. job.setNumReduceTasks(0);
    83. Path inPath = new Path("/user/test/input/a.txt");
    84. Path out = new Path("/user/test/output");
    85. FileInputFormat.setInputPaths(job, inPath);
    86. FileOutputFormat.setOutputPath(job, out);
    87. job.waitForCompletion(true);
    88. }
    89. /********** end **********/
    90. }

    step/com/DBHelper.java

    1. package com;
    2. import java.sql.Connection;
    3. import java.sql.DriverManager;
    4. import java.sql.SQLException;
    5. public class DBHelper {
    6. /********** begin **********/
    7. private static final String driver = "com.mysql.jdbc.Driver";
    8. private static final String url = "jdbc:mysql://localhost:3306/mydb?useUnicode=true&characterEncoding=UTF-8";
    9. private static final String username = "root";// 数据库的用户名
    10. private static final String password = "123123";// 数据库的密码:这个是自己安装数据库的时候设置的,每个人不同。
    11. private static Connection conn = null; // 声明数据库连接对象
    12. static {
    13. try {
    14. Class.forName(driver);
    15. } catch (Exception ex) {
    16. ex.printStackTrace();
    17. }
    18. }
    19. public static Connection getConnection() {
    20. if (conn == null) {
    21. try {
    22. conn = DriverManager.getConnection(url, username, password);
    23. } catch (SQLException e) {
    24. e.printStackTrace();
    25. } // 连接数据库
    26. return conn;
    27. }
    28. return conn;
    29. }
    30. /********** end **********/
    31. }

    step/com/phonelog.java

    1. package com;
    2. import java.io.DataInput;
    3. import java.io.DataOutput;
    4. import java.io.IOException;
    5. import org.apache.hadoop.io.Writable;
    6. import org.apache.hadoop.io.WritableComparable;
    7. public class PhoneLog implements WritableComparable<PhoneLog> {
    8. private String userA;
    9. private String userB;
    10. private String userA_Phone;
    11. private String userB_Phone;
    12. private String startTime;
    13. private String endTime;
    14. private Long timeLen;
    15. private String userA_Address;
    16. private String userB_Address;
    17. public PhoneLog() {
    18. }
    19. public void SetPhoneLog(String userA, String userB, String userA_Phone, String userB_Phone, String startTime,
    20. String endTime, Long timeLen, String userA_Address, String userB_Address) {
    21. this.userA = userA;
    22. this.userB = userB;
    23. this.userA_Phone = userA_Phone;
    24. this.userB_Phone = userB_Phone;
    25. this.startTime = startTime;
    26. this.endTime = endTime;
    27. this.timeLen = timeLen;
    28. this.userA_Address = userA_Address;
    29. this.userB_Address = userB_Address;
    30. }
    31. public String getUserA_Phone() {
    32. return userA_Phone;
    33. }
    34. public void setUserA_Phone(String userA_Phone) {
    35. this.userA_Phone = userA_Phone;
    36. }
    37. public String getUserB_Phone() {
    38. return userB_Phone;
    39. }
    40. public void setUserB_Phone(String userB_Phone) {
    41. this.userB_Phone = userB_Phone;
    42. }
    43. public String getUserA() {
    44. return userA;
    45. }
    46. public void setUserA(String userA) {
    47. this.userA = userA;
    48. }
    49. public String getUserB() {
    50. return userB;
    51. }
    52. public void setUserB(String userB) {
    53. this.userB = userB;
    54. }
    55. public String getStartTime() {
    56. return startTime;
    57. }
    58. public void setStartTime(String startTime) {
    59. this.startTime = startTime;
    60. }
    61. public String getEndTime() {
    62. return endTime;
    63. }
    64. public void setEndTime(String endTime) {
    65. this.endTime = endTime;
    66. }
    67. public Long getTimeLen() {
    68. return timeLen;
    69. }
    70. public void setTimeLen(Long timeLen) {
    71. this.timeLen = timeLen;
    72. }
    73. public String getUserA_Address() {
    74. return userA_Address;
    75. }
    76. public void setUserA_Address(String userA_Address) {
    77. this.userA_Address = userA_Address;
    78. }
    79. public String getUserB_Address() {
    80. return userB_Address;
    81. }
    82. public void setUserB_Address(String userB_Address) {
    83. this.userB_Address = userB_Address;
    84. }
    85. @Override
    86. public void write(DataOutput out) throws IOException {
    87. out.writeUTF(userA);
    88. out.writeUTF(userB);
    89. out.writeUTF(userA_Phone);
    90. out.writeUTF(userB_Phone);
    91. out.writeUTF(startTime);
    92. out.writeUTF(endTime);
    93. out.writeLong(timeLen);
    94. out.writeUTF(userA_Address);
    95. out.writeUTF(userB_Address);
    96. }
    97. @Override
    98. public void readFields(DataInput in) throws IOException {
    99. userA = in.readUTF();
    100. userB = in.readUTF();
    101. userA_Phone = in.readUTF();
    102. userB_Phone = in.readUTF();
    103. startTime = in.readUTF();
    104. endTime = in.readUTF();
    105. timeLen = in.readLong();
    106. userA_Address = in.readUTF();
    107. userB_Address = in.readUTF();
    108. }
    109. @Override
    110. public String toString() {
    111. return userA + "," + userB + "," + userA_Phone + "," + userB_Phone + "," + startTime + "," + endTime + ","
    112. + timeLen + "," + userA_Address + "," + userB_Address;
    113. }
    114. @Override
    115. public int compareTo(PhoneLog pl) {
    116. if(this.hashCode() == pl.hashCode()) {
    117. return 0;
    118. }
    119. return -1;
    120. }
    121. }

    最后重启hadoop#start-all.sh  完成评测

  • 相关阅读:
    5个高清图片素材网站,无水印,免费商用。
    leetcode-LCP 06. 拿硬币
    git常用指令
    2022-01-08 如何高质量完成需求开发
    CountDownLatch闭锁原理解析
    2023南京财经大学计算机考研信息汇总
    go语言中如何实现同步操作呢
    无锡地铁4号线一期工程天河停车场项目中智能照明监控系统的应用
    2、计算机图形学——视图变换
    Linux C应用编程-2-Makefile编写
  • 原文地址:https://blog.csdn.net/qq_61604164/article/details/127868559