• MapReduce综合应用案例 — 招聘数据清洗


    根据提示,在右侧编辑器补充代码,对数据按照一定规则进行清洗。

    数据说明如下:data.json

    数据所在位置:/root/data/data.json

    {
        "id":4,
        "company_name":"智联招聘网/Zhaopin.com",
        "eduLevel_name":"本科",
        "emplType":"全职",
        "jobName":"大数据工程师010",
        "salary":"20K-30K",
        "createDate":"2019-04-21T12:14:27.000+08:00",
        "endDate":"2019-05-21T12:14:27.000+08:00",
        "city_code":"530",
        "companySize":"1000-9999人",
        "welfare":"",
        "responsibility":"岗位职责:1、负责体系大数据分析的ETL的代码开发及优化;2、...",
        "place":"北京市朝阳区望京阜荣街10号首开广场5层",
        "workingExp":"1-3年"
    }
    idcompany_nameeduLevel_nameemplTypejobNamesalarycreateDateendDatecity_codecompanySizewelfareresponsibilityplaceworkingExp
    id编号公司名称学历要求工作类型工作名称薪资发布时间截止时间城市编码公司规模福利岗位职责地区工作经验

    Mysql数据库:

    用户名:root; 密码:123123

    数据库名:mydb

    城市编码表:province

    列名类型非空是否自增介绍
    city_codevarchar(255)城市编码
    city_namevarchar(255)城市名称

    HBase数据库:

    最终结果表:job 列族:info

    清洗规则:

    • 若某个属性为空则删除这条数据;

    • 处理数据中的salary

      1)mK-nK:(m+n)/2; 2)其余即为0

    • 按照MySQLprovince 将城市编码转化为城市名;

    • 将结果存入HBasejob中;

    • 设置数据来源文件路径及清洗后的数据存储路径: 数据来源路径为: /root/data/data.json; 清洗后的数据存放于:HBasejob

    (1)DBHelper类代码:

    1. package com;
    2. import java.sql.Connection;
    3. import java.sql.DriverManager;
    4. import java.sql.SQLException;
    5. public class DBHelper {
    6. /********** begin **********/
    7. private static final String driver = "com.mysql.jdbc.Driver";
    8. private static final String url = "jdbc:mysql://localhost:3306/mydb?useUnicode=true&characterEncoding=UTF-8";
    9. private static final String username = "root";
    10. private static final String password = "123123";
    11. private static Connection conn = null;
    12. static {
    13. try {
    14. Class.forName(driver);
    15. } catch (Exception ex) {
    16. ex.printStackTrace();
    17. }
    18. }
    19. public static Connection getConnection() {
    20. if (conn == null) {
    21. try {
    22. conn = DriverManager.getConnection(url, username, password);
    23. } catch (SQLException e) {
    24. e.printStackTrace();
    25. }
    26. return conn;
    27. }
    28. return conn;
    29. }
    30. public static void main(String[] args) {
    31. Connection connection = DBHelper.getConnection();
    32. }
    33. /********** end **********/
    34. }

    (2)JsonMap类代码:

    1. package com;
    2. import com.alibaba.fastjson.JSONObject;
    3. import org.apache.hadoop.hbase.client.Put;
    4. import org.apache.hadoop.io.LongWritable;
    5. import org.apache.hadoop.io.NullWritable;
    6. import org.apache.hadoop.io.Text;
    7. import org.apache.hadoop.mapreduce.Mapper;
    8. import java.io.IOException;
    9. import java.sql.Connection;
    10. import java.sql.ResultSet;
    11. import java.sql.SQLException;
    12. import java.sql.Statement;
    13. import java.util.HashMap;
    14. import java.util.Map;
    15. public class JsonMap extends Mapper<LongWritable, Text, NullWritable, Put> {
    16. /********** begin **********/
    17. Map<String, String> pro = new HashMap<String, String>();
    18. Put put;
    19. @Override
    20. protected void setup(Context context) throws IOException, InterruptedException {
    21. Connection connection = DBHelper.getConnection();
    22. try {
    23. Statement statement = connection.createStatement();
    24. String sql = "select * from province";
    25. ResultSet resultSetA = statement.executeQuery(sql);
    26. while (resultSetA.next()) {
    27. String city_code = resultSetA.getString(1);
    28. String city_name = resultSetA.getString(2);
    29. pro.put(city_code, city_name);
    30. }
    31. } catch (SQLException e) {
    32. e.printStackTrace();
    33. }
    34. }
    35. public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {
    36. String line = value.toString();
    37. //解析json数据
    38. JSONObject jsonObject = JSONObject.parseObject(line);
    39. String[] data = new String[14];
    40. data[0] = jsonObject.getString("id");
    41. data[1] = jsonObject.getString("company_name");
    42. data[2] = jsonObject.getString("eduLevel_name");
    43. data[3] = jsonObject.getString("emplType");
    44. data[4] = jsonObject.getString("jobName");
    45. String salary=jsonObject.getString("salary");
    46. if (salary.contains("K-")) {
    47. Double a =Double.valueOf(salary.substring(0,salary.indexOf("K")));
    48. Double b =Double.valueOf(salary.substring(salary.indexOf("-")+1,salary.lastIndexOf("K")));
    49. data[5] = (a+b)/2+"";
    50. }else {
    51. data[5]="0";
    52. }
    53. data[6] = jsonObject.getString("createDate");
    54. data[7] = jsonObject.getString("endDate");
    55. String code = jsonObject.getString("city_code");
    56. //data[8] = pro.get(code);
    57. data[8] = code;
    58. data[9] = jsonObject.getString("companySize");
    59. data[10] = jsonObject.getString("welfare");
    60. data[11] = jsonObject.getString("responsibility");
    61. data[12] = jsonObject.getString("place");
    62. data[13] = jsonObject.getString("workingExp");
    63. //循环判空
    64. for(String i : data) {
    65. if(i==null||i.equals("")) {
    66. return;
    67. }
    68. }
    69. String columnFamily = "info";
    70. put= new Put(data[0].getBytes());
    71. put.addColumn(columnFamily.getBytes(), "company_name".getBytes(), data[1].getBytes());
    72. put.addColumn(columnFamily.getBytes(), "eduLevel_name".getBytes(), data[2].getBytes());
    73. put.addColumn(columnFamily.getBytes(), "emplType".getBytes(), data[3].getBytes());
    74. put.addColumn(columnFamily.getBytes(), "jobName".getBytes(), data[4].getBytes());
    75. put.addColumn(columnFamily.getBytes(), "salary".getBytes(), data[5].getBytes());
    76. put.addColumn(columnFamily.getBytes(), "createDate".getBytes(), data[6].getBytes());
    77. put.addColumn(columnFamily.getBytes(), "endDate".getBytes(), data[7].getBytes());
    78. put.addColumn(columnFamily.getBytes(), "city_name".getBytes(), data[8].getBytes());
    79. put.addColumn(columnFamily.getBytes(), "companySize".getBytes(), data[9].getBytes());
    80. put.addColumn(columnFamily.getBytes(), "welfare".getBytes(), data[10].getBytes());
    81. put.addColumn(columnFamily.getBytes(), "responsibility".getBytes(), data[11].getBytes());
    82. put.addColumn(columnFamily.getBytes(), "place".getBytes(), data[12].getBytes());
    83. put.addColumn(columnFamily.getBytes(), "workingExp".getBytes(), data[13].getBytes());
    84. context.write(NullWritable.get(), put);
    85. }
    86. /********** end **********/
    87. }

    (3)PhoneLog类代码:

    1. package com;
    2. import org.apache.hadoop.conf.Configuration;
    3. import org.apache.hadoop.fs.Path;
    4. import org.apache.hadoop.hbase.HBaseConfiguration;
    5. import org.apache.hadoop.hbase.TableName;
    6. import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
    7. import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
    8. import org.apache.hadoop.hbase.client.Connection;
    9. import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
    10. import org.apache.hadoop.hbase.client.Admin;
    11. import org.apache.hadoop.hbase.client.ConnectionFactory;
    12. import org.apache.hadoop.hbase.util.Bytes;
    13. import org.apache.hadoop.io.NullWritable;
    14. import org.apache.hadoop.mapreduce.Job;
    15. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    16. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    17. public class JsonTest {
    18. public static void main(String[] args) throws Exception{
    19. Configuration config = HBaseConfiguration.create();
    20. //设置zookeeper的配置
    21. config.set("hbase.zookeeper.quorum", "127.0.0.1");
    22. Connection connection = ConnectionFactory.createConnection(config);
    23. Admin admin = connection.getAdmin();
    24. TableName tableName = TableName.valueOf("job");
    25. boolean isExists = admin.tableExists(tableName);
    26. if (!isExists) {
    27. TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(tableName);
    28. ColumnFamilyDescriptor family = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info")).build();// 构建列族对象
    29. tableDescriptor.setColumnFamily(family); // 设置列族
    30. admin.createTable(tableDescriptor.build()); // 创建表
    31. } else {
    32. admin.disableTable(tableName);
    33. admin.deleteTable(tableName);
    34. TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(tableName);
    35. ColumnFamilyDescriptor family = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info")).build();// 构建列族对象
    36. tableDescriptor.setColumnFamily(family); // 设置列族
    37. admin.createTable(tableDescriptor.build()); // 创建表
    38. }
    39. /********** begin **********/
    40. Job job = Job.getInstance(config);
    41. job.setJarByClass(JsonTest.class);
    42. job.setMapperClass(JsonMap.class);
    43. job.setMapOutputKeyClass(NullWritable.class);
    44. //只有map没有reduce,所以设置reduce的数目为0
    45. job.setNumReduceTasks(0);
    46. //设置数据的输入路径,没有使用参数,直接在程序中写入HDFS的路径
    47. FileInputFormat.setInputPaths(job, new Path("/root/data/data.json"));
    48. //驱动函数
    49. TableMapReduceUtil.initTableReducerJob("job",null, job);
    50. TableMapReduceUtil.addDependencyJars(job);
    51. job.waitForCompletion(true);
    52. /********** end **********/
    53. }
    54. }

    启动HBASE#start-hbase.sh

  • 相关阅读:
    DelayQueue的源码分析
    0922hw
    8李沐d2l(七)kaggle房价预测+数值稳定性+模型初始化和激活函数
    Hive分区表数据压缩
    吉力宝:智能科技鞋品牌步力宝引领传统产业创新思维
    3、C++动态内存管理
    VMware Ubuntu 关闭自动更新
    移动硬盘删除的文件如何恢复呢?
    F - 爱丽丝、鲍勃和巧克力
    中级经济师考试题型有哪些?具体分值是多少?
  • 原文地址:https://blog.csdn.net/qq_61604164/article/details/127870540