根据提示,在右侧编辑器补充代码,对数据按照一定规则进行清洗。
数据说明如下: a.txt
数据切分方式:,
数据所在位置:/user/test/input/a.txt
15733218050,15778423030,1542457633,1542457678,450000,530000
15733218050 | 15778423030 | 1542457633 | 1542457678 | 450000 | 530000 |
---|---|---|---|---|---|
呼叫者手机号 | 接受者手机号 | 开始时间戳(s) | 接受时间戳(s) | 呼叫者地址省份编码 | 接受者地址省份编码 |
Mysql
数据库:
用户名:root
密码:123123
数据库名:mydb
用户表:userphone
列名 | 类型 | 非空 | 是否自增 | 介绍 |
---|---|---|---|---|
id | int(11) | √ | √ | 用户ID |
phone | varchar(255) | 手机号 | ||
trueName | varchar(255) | 真实姓名 |
地址省份表:allregion
列名 | 类型 | 非空 | 是否自增 | 介绍 |
---|---|---|---|---|
id | int(11) | √ | √ | 用户ID |
CodeNum | varchar(255) | 编号 | ||
Address | varchar(255) | 地址 |
清洗规则:
处理数据中的时间戳(秒级)将其转化为2017-06-21 07:01:58
,年-月-日 时:分:秒 这种格式;
处理数据中的省份编码,结合mysql
的表数据对应,将其转换成省份名称;
处理用户手机号,与mysql
的表数据对应,关联用户的真实姓名;
处理数据中的开始时间与结束时间并计算通信时长(以秒为单位);
设置数据来源文件路径及清洗后的数据存储路径: 数据来源路径为: /user/test/input/a.txt (HDFS)
; 清洗后的数据存放于:/user/test/output (HDFS)
。
数据清洗后如下:
邓二,张倩,13666666666,15151889601,2018-03-29 10:58:12,2018-03-29 10:58:42,30,黑龙江省,上海市
邓二 | 张倩 | 13666666666 | 15151889601 | 2018-03-29 10:58:12 | 2018-03-29 10:58:42 | 30 | 黑龙江省 | 上海市 |
---|---|---|---|---|---|---|---|---|
用户名A | 用户名B | 用户A的手机号 | 用户B的手机号 | 开始时间 | 结束时间 |
step/com/LogMR.java
- package com;
- import java.io.IOException;
- import java.sql.Connection;
- import java.sql.ResultSet;
- import java.sql.SQLException;
- import java.sql.Statement;
- import java.text.SimpleDateFormat;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.Iterator;
- import java.util.List;
- import java.util.Map;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- public class LogMR {
- /********** begin **********/
- static class MyMapper extends Mapper<LongWritable, Text, PhoneLog, NullWritable> {
- Map<String, String> userMap = new HashMap<>();
- Map<String, String> addressMap = new HashMap<>();
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- PhoneLog pl = new PhoneLog();
- Text text = new Text();
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- Connection connection = DBHelper.getConnection();
- try {
- Statement statement = connection.createStatement();
- String sql = "select * from userphone";
- ResultSet resultSet = statement.executeQuery(sql);
- while (resultSet.next()) {
- String phone = resultSet.getString(2);
- String trueName = resultSet.getString(3);
- userMap.put(phone, trueName);
- }
- String sql2 = "select * from allregion";
- ResultSet resultSetA = statement.executeQuery(sql2);
- while (resultSetA.next()) {
- String phone = resultSetA.getString(2);
- String trueName = resultSetA.getString(3);
- addressMap.put(phone, trueName);
- }
- } catch (SQLException e) {
- e.printStackTrace();
- }
- }
- @Override
- protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- String str = value.toString();
- String[] split = str.split(",");
- if (split.length == 6) {
- String trueName1 = userMap.get(split[0]);
- String trueName2 = userMap.get(split[1]);
- String address1 = addressMap.get(split[4]);
- String address2 = addressMap.get(split[5]);
- long startTimestamp = Long.parseLong(split[2]);
- String startTime = sdf.format(startTimestamp * 1000);
- long endTimestamp = Long.parseLong(split[3]);
- String endTime = sdf.format(endTimestamp * 1000);
- long timeLen = endTimestamp - startTimestamp;
- pl.SetPhoneLog(trueName1, trueName2, split[0], split[1], startTime, endTime, timeLen, address1,
- address2);
- context.write(pl, NullWritable.get());
- }
- }
- }
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- Job job = Job.getInstance(conf);
- job.setJarByClass(LogMR.class);
- job.setMapperClass(MyMapper.class);
- job.setMapOutputKeyClass(PhoneLog.class);
- job.setMapOutputValueClass(NullWritable.class);
- job.setNumReduceTasks(0);
- Path inPath = new Path("/user/test/input/a.txt");
- Path out = new Path("/user/test/output");
- FileInputFormat.setInputPaths(job, inPath);
- FileOutputFormat.setOutputPath(job, out);
- job.waitForCompletion(true);
- }
- /********** end **********/
- }
step/com/DBHelper.java
- package com;
- import java.sql.Connection;
- import java.sql.DriverManager;
- import java.sql.SQLException;
- public class DBHelper {
- /********** begin **********/
- private static final String driver = "com.mysql.jdbc.Driver";
- private static final String url = "jdbc:mysql://localhost:3306/mydb?useUnicode=true&characterEncoding=UTF-8";
- private static final String username = "root";// 数据库的用户名
- private static final String password = "123123";// 数据库的密码:这个是自己安装数据库的时候设置的,每个人不同。
- private static Connection conn = null; // 声明数据库连接对象
- static {
- try {
- Class.forName(driver);
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- public static Connection getConnection() {
- if (conn == null) {
- try {
- conn = DriverManager.getConnection(url, username, password);
- } catch (SQLException e) {
- e.printStackTrace();
- } // 连接数据库
- return conn;
- }
- return conn;
- }
- /********** end **********/
- }
step/com/phonelog.java
- package com;
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.io.WritableComparable;
- public class PhoneLog implements WritableComparable<PhoneLog> {
- private String userA;
- private String userB;
- private String userA_Phone;
- private String userB_Phone;
- private String startTime;
- private String endTime;
- private Long timeLen;
- private String userA_Address;
- private String userB_Address;
- public PhoneLog() {
- }
- public void SetPhoneLog(String userA, String userB, String userA_Phone, String userB_Phone, String startTime,
- String endTime, Long timeLen, String userA_Address, String userB_Address) {
- this.userA = userA;
- this.userB = userB;
- this.userA_Phone = userA_Phone;
- this.userB_Phone = userB_Phone;
- this.startTime = startTime;
- this.endTime = endTime;
- this.timeLen = timeLen;
- this.userA_Address = userA_Address;
- this.userB_Address = userB_Address;
- }
- public String getUserA_Phone() {
- return userA_Phone;
- }
- public void setUserA_Phone(String userA_Phone) {
- this.userA_Phone = userA_Phone;
- }
- public String getUserB_Phone() {
- return userB_Phone;
- }
- public void setUserB_Phone(String userB_Phone) {
- this.userB_Phone = userB_Phone;
- }
- public String getUserA() {
- return userA;
- }
- public void setUserA(String userA) {
- this.userA = userA;
- }
- public String getUserB() {
- return userB;
- }
- public void setUserB(String userB) {
- this.userB = userB;
- }
- public String getStartTime() {
- return startTime;
- }
- public void setStartTime(String startTime) {
- this.startTime = startTime;
- }
- public String getEndTime() {
- return endTime;
- }
- public void setEndTime(String endTime) {
- this.endTime = endTime;
- }
- public Long getTimeLen() {
- return timeLen;
- }
- public void setTimeLen(Long timeLen) {
- this.timeLen = timeLen;
- }
- public String getUserA_Address() {
- return userA_Address;
- }
- public void setUserA_Address(String userA_Address) {
- this.userA_Address = userA_Address;
- }
- public String getUserB_Address() {
- return userB_Address;
- }
- public void setUserB_Address(String userB_Address) {
- this.userB_Address = userB_Address;
- }
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeUTF(userA);
- out.writeUTF(userB);
- out.writeUTF(userA_Phone);
- out.writeUTF(userB_Phone);
- out.writeUTF(startTime);
- out.writeUTF(endTime);
- out.writeLong(timeLen);
- out.writeUTF(userA_Address);
- out.writeUTF(userB_Address);
- }
- @Override
- public void readFields(DataInput in) throws IOException {
- userA = in.readUTF();
- userB = in.readUTF();
- userA_Phone = in.readUTF();
- userB_Phone = in.readUTF();
- startTime = in.readUTF();
- endTime = in.readUTF();
- timeLen = in.readLong();
- userA_Address = in.readUTF();
- userB_Address = in.readUTF();
- }
- @Override
- public String toString() {
- return userA + "," + userB + "," + userA_Phone + "," + userB_Phone + "," + startTime + "," + endTime + ","
- + timeLen + "," + userA_Address + "," + userB_Address;
- }
- @Override
- public int compareTo(PhoneLog pl) {
- if(this.hashCode() == pl.hashCode()) {
- return 0;
- }
- return -1;
- }
- }
最后重启hadoop#start-all.sh 完成评测