根据提示,在右侧编辑器补充代码,对数据按照一定规则进行清洗。
数据说明如下:data.json
;
数据所在位置:/root/data/data.json
;
{ "id":4, "company_name":"智联招聘网/Zhaopin.com", "eduLevel_name":"本科", "emplType":"全职", "jobName":"大数据工程师010", "salary":"20K-30K", "createDate":"2019-04-21T12:14:27.000+08:00", "endDate":"2019-05-21T12:14:27.000+08:00", "city_code":"530", "companySize":"1000-9999人", "welfare":"", "responsibility":"岗位职责:1、负责体系大数据分析的ETL的代码开发及优化;2、...", "place":"北京市朝阳区望京阜荣街10号首开广场5层", "workingExp":"1-3年" }
id | company_name | eduLevel_name | emplType | jobName | salary | createDate | endDate | city_code | companySize | welfare | responsibility | place | workingExp |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
id编号 | 公司名称 | 学历要求 | 工作类型 | 工作名称 | 薪资 | 发布时间 | 截止时间 | 城市编码 | 公司规模 | 福利 | 岗位职责 | 地区 | 工作经验 |
Mysql
数据库:
用户名:root
; 密码:123123
。
数据库名:mydb
;
城市编码表:province
;
列名 | 类型 | 非空 | 是否自增 | 介绍 |
---|---|---|---|---|
city_code | varchar(255) | 城市编码 | ||
city_name | varchar(255) | 城市名称 |
HBase
数据库:
最终结果表:job
列族:info
。
清洗规则:
若某个属性为空则删除这条数据;
处理数据中的salary
;
1)mK-nK:(m+n)/2
; 2)其余即为0
。
按照MySQL
表province
将城市编码转化为城市名;
将结果存入HBase
表job
中;
设置数据来源文件路径及清洗后的数据存储路径: 数据来源路径为: /root/data/data.json
; 清洗后的数据存放于:HBase
表job
。
(1)DBHelper类代码:
- package com;
- import java.sql.Connection;
- import java.sql.DriverManager;
- import java.sql.SQLException;
- public class DBHelper {
- /********** begin **********/
- private static final String driver = "com.mysql.jdbc.Driver";
- private static final String url = "jdbc:mysql://localhost:3306/mydb?useUnicode=true&characterEncoding=UTF-8";
- private static final String username = "root";
- private static final String password = "123123";
- private static Connection conn = null;
- static {
- try {
- Class.forName(driver);
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- public static Connection getConnection() {
- if (conn == null) {
- try {
- conn = DriverManager.getConnection(url, username, password);
- } catch (SQLException e) {
- e.printStackTrace();
- }
- return conn;
- }
- return conn;
- }
- public static void main(String[] args) {
- Connection connection = DBHelper.getConnection();
- }
- /********** end **********/
- }
(2)JsonMap类代码:
- package com;
- import com.alibaba.fastjson.JSONObject;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
- import java.io.IOException;
- import java.sql.Connection;
- import java.sql.ResultSet;
- import java.sql.SQLException;
- import java.sql.Statement;
- import java.util.HashMap;
- import java.util.Map;
- public class JsonMap extends Mapper<LongWritable, Text, NullWritable, Put> {
- /********** begin **********/
- Map<String, String> pro = new HashMap<String, String>();
- Put put;
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- Connection connection = DBHelper.getConnection();
- try {
- Statement statement = connection.createStatement();
- String sql = "select * from province";
- ResultSet resultSetA = statement.executeQuery(sql);
- while (resultSetA.next()) {
- String city_code = resultSetA.getString(1);
- String city_name = resultSetA.getString(2);
- pro.put(city_code, city_name);
- }
- } catch (SQLException e) {
- e.printStackTrace();
- }
- }
- public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {
- String line = value.toString();
- //解析json数据
- JSONObject jsonObject = JSONObject.parseObject(line);
- String[] data = new String[14];
- data[0] = jsonObject.getString("id");
- data[1] = jsonObject.getString("company_name");
- data[2] = jsonObject.getString("eduLevel_name");
- data[3] = jsonObject.getString("emplType");
- data[4] = jsonObject.getString("jobName");
- String salary=jsonObject.getString("salary");
- if (salary.contains("K-")) {
- Double a =Double.valueOf(salary.substring(0,salary.indexOf("K")));
- Double b =Double.valueOf(salary.substring(salary.indexOf("-")+1,salary.lastIndexOf("K")));
- data[5] = (a+b)/2+"";
- }else {
- data[5]="0";
- }
- data[6] = jsonObject.getString("createDate");
- data[7] = jsonObject.getString("endDate");
- String code = jsonObject.getString("city_code");
- //data[8] = pro.get(code);
- data[8] = code;
- data[9] = jsonObject.getString("companySize");
- data[10] = jsonObject.getString("welfare");
- data[11] = jsonObject.getString("responsibility");
- data[12] = jsonObject.getString("place");
- data[13] = jsonObject.getString("workingExp");
- //循环判空
- for(String i : data) {
- if(i==null||i.equals("")) {
- return;
- }
- }
- String columnFamily = "info";
- put= new Put(data[0].getBytes());
- put.addColumn(columnFamily.getBytes(), "company_name".getBytes(), data[1].getBytes());
- put.addColumn(columnFamily.getBytes(), "eduLevel_name".getBytes(), data[2].getBytes());
- put.addColumn(columnFamily.getBytes(), "emplType".getBytes(), data[3].getBytes());
- put.addColumn(columnFamily.getBytes(), "jobName".getBytes(), data[4].getBytes());
- put.addColumn(columnFamily.getBytes(), "salary".getBytes(), data[5].getBytes());
- put.addColumn(columnFamily.getBytes(), "createDate".getBytes(), data[6].getBytes());
- put.addColumn(columnFamily.getBytes(), "endDate".getBytes(), data[7].getBytes());
- put.addColumn(columnFamily.getBytes(), "city_name".getBytes(), data[8].getBytes());
- put.addColumn(columnFamily.getBytes(), "companySize".getBytes(), data[9].getBytes());
- put.addColumn(columnFamily.getBytes(), "welfare".getBytes(), data[10].getBytes());
- put.addColumn(columnFamily.getBytes(), "responsibility".getBytes(), data[11].getBytes());
- put.addColumn(columnFamily.getBytes(), "place".getBytes(), data[12].getBytes());
- put.addColumn(columnFamily.getBytes(), "workingExp".getBytes(), data[13].getBytes());
- context.write(NullWritable.get(), put);
- }
- /********** end **********/
- }
(3)PhoneLog类代码:
- package com;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
- import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
- import org.apache.hadoop.hbase.client.Connection;
- import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
- import org.apache.hadoop.hbase.client.Admin;
- import org.apache.hadoop.hbase.client.ConnectionFactory;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
- public class JsonTest {
- public static void main(String[] args) throws Exception{
- Configuration config = HBaseConfiguration.create();
- //设置zookeeper的配置
- config.set("hbase.zookeeper.quorum", "127.0.0.1");
- Connection connection = ConnectionFactory.createConnection(config);
- Admin admin = connection.getAdmin();
- TableName tableName = TableName.valueOf("job");
- boolean isExists = admin.tableExists(tableName);
- if (!isExists) {
- TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(tableName);
- ColumnFamilyDescriptor family = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info")).build();// 构建列族对象
- tableDescriptor.setColumnFamily(family); // 设置列族
- admin.createTable(tableDescriptor.build()); // 创建表
- } else {
- admin.disableTable(tableName);
- admin.deleteTable(tableName);
- TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(tableName);
- ColumnFamilyDescriptor family = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info")).build();// 构建列族对象
- tableDescriptor.setColumnFamily(family); // 设置列族
- admin.createTable(tableDescriptor.build()); // 创建表
- }
- /********** begin **********/
- Job job = Job.getInstance(config);
- job.setJarByClass(JsonTest.class);
- job.setMapperClass(JsonMap.class);
- job.setMapOutputKeyClass(NullWritable.class);
- //只有map没有reduce,所以设置reduce的数目为0
- job.setNumReduceTasks(0);
- //设置数据的输入路径,没有使用参数,直接在程序中写入HDFS的路径
- FileInputFormat.setInputPaths(job, new Path("/root/data/data.json"));
- //驱动函数
- TableMapReduceUtil.initTableReducerJob("job",null, job);
- TableMapReduceUtil.addDependencyJars(job);
- job.waitForCompletion(true);
- /********** end **********/
- }
- }
启动HBASE#start-hbase.sh