目录
hbase(main):003:0> create_namespace 'events_db'
hbase(main):004:0> create 'events_db:users','profile','region','registration'
hbase(main):005:0> create 'events_db:user_friend','uf'
hbase(main):006:0> create 'events_db:events','schedule','location','creator','remark'
hbase(main):007:0> create 'events_db:event_attendee','euat'
hbase(main):008:0> create 'events_db:train','eu'
hbase(main):011:0> list_namespace_tables 'events_db'
TABLE
event_attendee
events
train
user_friend
users
5 row(s)
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.HConstants;
- import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.client.Connection;
- import org.apache.hadoop.hbase.client.ConnectionFactory;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.client.Table;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.serialization.StringDeserializer;
-
- import java.io.IOException;
- import java.time.Duration;
-
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.Properties;
-
- /**
- * 将Kafka中的topic为userfriends中的数据消费到hbase中
- * hbase中的表为events_db:user_friend
- */
- public class UserFriendToHB {
- static int num = 0; //计数器
- public static void main(String[] args) {
- Properties properties = new Properties();
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kb129:9092");
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
- properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, "user_friend_group1");
-
- KafkaConsumer
consumer = new KafkaConsumer<>(properties); - consumer.subscribe(Collections.singleton("userfriends"));
-
- //配置hbase信息,连接hbase数据库
- Configuration conf = HBaseConfiguration.create();
- conf.set(HConstants.HBASE_DIR, "hdfs://kb129:9000/hbase");
- conf.set(HConstants.ZOOKEEPER_QUORUM, "kb129");
- conf.set(HConstants.CLIENT_PORT_STR, "2181");
-
- Connection connection = null;
- try {
- connection = ConnectionFactory.createConnection(conf);
- Table ufTable = connection.getTable(TableName.valueOf("events_db:user_friend"));
- ArrayList
datas = new ArrayList<>(); -
- while (true){
- ConsumerRecords
poll = consumer.poll(Duration.ofMillis(100)); - //每次for循环前清空datas
- datas.clear();
- for (ConsumerRecord
record : poll) { - //System.out.println(record.value());
- String[] split = record.value().split(",");
- int i = (split[0] + split[1]).hashCode();
- Put put = new Put(Bytes.toBytes(i));
- put.addColumn(Bytes.toBytes("uf"), Bytes.toBytes("userid"), split[0].getBytes());
- put.addColumn("uf".getBytes(), "friend".getBytes(),split[1].getBytes());
- datas.add(put);
- }
-
- num = num + datas.size();
- System.out.println("---------num:" + num);
- if (datas.size() > 0){
- ufTable.put(datas);
- }
-
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.HConstants;
- import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.client.*;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.serialization.StringDeserializer;
-
- import java.io.IOException;
- import java.time.Duration;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.Properties;
-
- /**
- * 将Kafka中的topic为userfriends中的数据消费到hbase中
- * hbase中的表为events_db:user_friend
- */
- public class UserFriendToHB2 {
- static int num = 0; //计数器
- public static void main(String[] args) {
- Properties properties = new Properties();
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kb129:9092");
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
- properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, "user_friend_group1");
-
- KafkaConsumer
consumer = new KafkaConsumer<>(properties); - consumer.subscribe(Collections.singleton("userfriends"));
-
- //配置hbase信息,连接hbase数据库
- Configuration conf = HBaseConfiguration.create();
- conf.set(HConstants.HBASE_DIR, "hdfs://kb129:9000/hbase");
- conf.set(HConstants.ZOOKEEPER_QUORUM, "kb129");
- conf.set(HConstants.CLIENT_PORT_STR, "2181");
-
- Connection connection = null;
- try {
- connection = ConnectionFactory.createConnection(conf);
- BufferedMutatorParams bufferedMutatorParams = new BufferedMutatorParams(TableName.valueOf("events_db:user_friend"));
- bufferedMutatorParams.setWriteBufferPeriodicFlushTimeoutMs(10000);//设置超时flush时间最大值
- bufferedMutatorParams.writeBufferSize(10*1024*1024);//设置缓存大小flush
- BufferedMutator bufferedMutator = connection.getBufferedMutator(bufferedMutatorParams) ;
-
- ArrayList
datas = new ArrayList<>(); -
- while (true){
- ConsumerRecords
poll = consumer.poll(Duration.ofMillis(100)); - datas.clear(); //每次for循环前清空datas
- for (ConsumerRecord
record : poll) { - //System.out.println(record.value());
- String[] split = record.value().split(",");
- int i = (split[0] + split[1]).hashCode();
- Put put = new Put(Bytes.toBytes(i));
- put.addColumn(Bytes.toBytes("uf"), Bytes.toBytes("userid"), split[0].getBytes());
- put.addColumn("uf".getBytes(), "friend".getBytes(),split[1].getBytes());
- datas.add(put);
- }
-
- num = num + datas.size();
- System.out.println("---------num:" + num);
- if (datas.size() > 0){
- bufferedMutator.mutate(datas);
- }
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
(1)Iworker接口
- public interface IWorker {
- void fillData(String targetName);
- }
(2)worker实现类
- import nj.zb.kb23.kafkatohbase.oop.writer.IWriter;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.serialization.StringDeserializer;
-
- import java.time.Duration;
- import java.util.Collections;
- import java.util.Properties;
-
- public class Worker implements IWorker {
- private KafkaConsumer
consumer = null; - private IWriter writer = null;
-
- public Worker(String topicName, String consumerGroupId, IWriter writer) {
- this.writer = writer;
- Properties properties = new Properties();
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kb129:9092");
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
-
- consumer = new KafkaConsumer<>(properties);
- consumer.subscribe(Collections.singleton(topicName));
- }
-
- @Override
- public void fillData(String targetName) {
- int num = 0;
- while (true) {
- ConsumerRecords
records = consumer.poll(Duration.ofMillis(100)); - int returnNum = writer.write(targetName, records);
- num += returnNum;
- System.out.println("---------num:" + num);
- }
- }
- }
(3)IWriter接口
- import org.apache.kafka.clients.consumer.ConsumerRecords;
-
- /**
- * 完成kafka消费出的数据 ConsumerRecords 的组装和写入到指定类型的数据库 指定table 的工作
- */
- public interface IWriter {
- int write(String targetTableName, ConsumerRecords
records) ; - }
(4)writer实现类
- import nj.zb.kb23.kafkatohbase.oop.handler.IParseRecord;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.HConstants;
- import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.client.*;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
-
- import java.io.IOException;
- import java.util.List;
-
- public class HBaseWriter implements IWriter{
- private Connection connection = null;
- private BufferedMutator bufferedMutator = null;
- private IParseRecord handler = null;
-
- /**
- * 初始化HBaseWriter对象
- */
- public HBaseWriter(IParseRecord handler) {
- this.handler = handler;
- Configuration conf = HBaseConfiguration.create();
- conf.set(HConstants.HBASE_DIR, "hdfs://kb129:9000/hbase");
- conf.set(HConstants.ZOOKEEPER_QUORUM, "kb129");
- conf.set(HConstants.CLIENT_PORT_STR, "2181");
-
- try {
- connection = ConnectionFactory.createConnection(conf);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- }
-
- private void getBufferedMutator(String targetTableName){
- BufferedMutatorParams bufferedMutatorParams = new BufferedMutatorParams(TableName.valueOf(targetTableName));
- bufferedMutatorParams.setWriteBufferPeriodicFlushTimeoutMs(10000);//设置超时flush时间最大值
- bufferedMutatorParams.writeBufferSize(10*1024*1024);//设置缓存大小flush
-
- if (bufferedMutator == null){
- try {
- bufferedMutator = connection.getBufferedMutator(bufferedMutatorParams);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
-
- @Override
- public int write(String targetTableName, ConsumerRecords
records) { - if (records.count() > 0) {
- this.getBufferedMutator(targetTableName);
- List
datas = handler.parse(records); - try {
- bufferedMutator.mutate(datas);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return datas.size();
- }else {
- return 0;
- }
- }
- }
(5)IParseRecord接口
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
-
- import java.util.List;
-
- /**
- * 将record 装配成 put
- */
- public interface IParseRecord {
- List
parse(ConsumerRecords records) ; - }
(6)具体表对应的handler类(包装Put)
UsersHandler
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
-
- import java.util.ArrayList;
- import java.util.List;
-
- public class UsersHandler implements IParseRecord{
- List
datas = new ArrayList<>(); - @Override
- public List
parse(ConsumerRecords records) { - datas.clear();
- for (ConsumerRecord
record : records) { - String[] users = record.value().split(",");
- Put put = new Put(users[0].getBytes());
- put.addColumn("profile".getBytes(), "birthyear".getBytes(), users[2].getBytes());
- put.addColumn("profile".getBytes(), "gender".getBytes(), users[3].getBytes());
- put.addColumn("region".getBytes(), "locale".getBytes(), users[1].getBytes());
- if (users.length > 4){
- put.addColumn("registration".getBytes(), "joinedAt".getBytes(), users[4].getBytes());
- }
- if (users.length > 5){
- put.addColumn("region".getBytes(), "location".getBytes(), users[5].getBytes());
- }
- if (users.length > 6){
- put.addColumn("region".getBytes(), "timezone".getBytes(), users[6].getBytes());
- }
- datas.add(put);
- }
- return datas;
- }
- }
TrainHandler
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
-
- import java.util.ArrayList;
- import java.util.List;
-
- public class TrainHandler implements IParseRecord{
- List
datas = new ArrayList<>(); - @Override
- public List
parse(ConsumerRecords records) { - datas.clear();
- for (ConsumerRecord
record : records) { - String[] trains = record.value().split(",");
- double random = Math.random();
- Put put = new Put((trains[0]+trains[1]+random).getBytes());
- put.addColumn("eu".getBytes(), "user".getBytes(), trains[0].getBytes());
- put.addColumn("eu".getBytes(), "event".getBytes(), trains[1].getBytes());
- put.addColumn("eu".getBytes(), "invited".getBytes(), trains[2].getBytes());
- put.addColumn("eu".getBytes(), "timestamp".getBytes(), trains[3].getBytes());
- put.addColumn("eu".getBytes(), "interested".getBytes(), trains[4].getBytes());
- put.addColumn("eu".getBytes(), "not_interested".getBytes(), trains[5].getBytes());
- datas.add(put);
- }
- return datas;
- }
- }
EventsHandler
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
-
- import java.util.ArrayList;
- import java.util.List;
-
- public class EventsHandler implements IParseRecord {
- List
datas = new ArrayList<>(); -
- @Override
- public List
parse(ConsumerRecords records) { - datas.clear();
- for (ConsumerRecord
record : records) { - String[] events = record.value().split(",");
- Put put = new Put(events[0].getBytes());
- put.addColumn("creator".getBytes(), "user_id".getBytes(),events[1].getBytes());
- put.addColumn("schedule".getBytes(), "start_time".getBytes(),events[2].getBytes());
- put.addColumn("location".getBytes(), "city".getBytes(),events[3].getBytes());
- put.addColumn("location".getBytes(), "state".getBytes(),events[4].getBytes());
- put.addColumn("location".getBytes(), "zip".getBytes(),events[5].getBytes());
- put.addColumn("location".getBytes(), "country".getBytes(),events[6].getBytes());
- put.addColumn("location".getBytes(), "lat".getBytes(),events[7].getBytes());
- put.addColumn("location".getBytes(), "lng".getBytes(),events[8].getBytes());
- put.addColumn("remark".getBytes(), "common_words".getBytes(),events[9].getBytes());
- datas.add(put);
- }
- return datas;
- }
- }
EventAttendHandler
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
-
- import java.util.ArrayList;
- import java.util.List;
-
- public class EventAttendHandler implements IParseRecord{
- @Override
- public List
parse(ConsumerRecords records) { - List
datas = new ArrayList<>(); - for (ConsumerRecord
record : records) { - String[] splits = record.value().split(",");
- Put put = new Put((splits[0] + splits[1] + splits[2]).getBytes());
- put.addColumn(Bytes.toBytes("euat"), Bytes.toBytes("eventid"), splits[0].getBytes());
- put.addColumn("euat".getBytes(), "friendid".getBytes(),splits[1].getBytes());
- put.addColumn("euat".getBytes(), "state".getBytes(),splits[2].getBytes());
- datas.add(put);
- }
- return datas;
- }
- }
(7)主程序
- import nj.zb.kb23.kafkatohbase.oop.handler.*;
- import nj.zb.kb23.kafkatohbase.oop.worker.Worker;
- import nj.zb.kb23.kafkatohbase.oop.writer.HBaseWriter;
- import nj.zb.kb23.kafkatohbase.oop.writer.IWriter;
- /**
- * 将Kafka中的topic为...中的数据消费到hbase中
- * hbase中的表为events_db:...
- */
- public class KfkToHbTest {
- static int num = 0; //计数器
-
- public static void main(String[] args) {
- //IParseRecord handler = new EventAttendHandler();
- //IWriter writer = new HBaseWriter(handler);
- //String topic = "eventattendees";
- //String consumerGroupId = "eventattendees_group1";
- //String targetName = "events_db:event_attendee";
- //Worker worker = new Worker(topic, consumerGroupId, writer);
- //worker.fillData(targetName);
-
- /*EventsHandler eventsHandler = new EventsHandler();
- IWriter writer = new HBaseWriter(eventsHandler);
- Worker worker = new Worker("events", "events_group1", writer);
- worker.fillData("events_db:eventsb");*/
-
- /*UsersHandler usersHandler = new UsersHandler();
- IWriter writer = new HBaseWriter(usersHandler);
- Worker worker = new Worker("users_raw", "users_group1", writer);
- worker.fillData("events_db:users");*/
-
- TrainHandler trainHandler = new TrainHandler();
- IWriter writer = new HBaseWriter(trainHandler);
- Worker worker = new Worker("train", "train_group1", writer);
- worker.fillData("events_db:train2");
-
- }
- }
- create database if not exists events;
- use events;
-
- create external table hb_users(
- userId string,
- birthyear int,
- gender string,
- locale string,
- location string,
- timezone string,
- joinedAt string
- )stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
- with SERDEPROPERTIES (
- 'hbase.columns.mapping'=':key,profile:birthyear,profile:gender,region:locale,region:location,region:timezone,registration:joinedAt'
- )
- tblproperties ('hbase.table.name'='events_db:users');
-
- select * from hb_users limit 3;
- select count(1) from hb_users;
-
- --orc格式创建内部表存储映射外部表,安全保存数据,创建好可以直接删除hbase中的表
- create table users stored as orc as select * from hb_users;
- select * from users limit 3;
- select count(1) from users;
- drop table hb_users;
-
- --38209 1494
- select count(*) from users where birthyear is null;
-
- select round(avg(birthyear), 0) from users;
- select `floor`(avg(birthyear)) from users;
-
- -- 处理空字段,覆盖写入
- with
- tb as ( select `floor`(avg(birthyear)) avgAge from users ),
- tb2 as ( select userId, nvl(birthyear, tb.avgAge),gender,locale,location,timezone,joinedAt from users,tb)
- insert overwrite table users
- select * from tb2;
-
- -- 查询到性别中空字符串109个
- select count(gender) count from users where gender is null or gender = "";
-
- --------------------------------------------------------
- create external table hb_events(
- event_id string,
- user_id string,
- start_time string,
- city string,
- state string,
- zip string,
- country string,
- lat float,
- lng float,
- common_words string
- )stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
- with SERDEPROPERTIES (
- 'hbase.columns.mapping'=':key,creator:user_id,schedule:start_time,location:city,location:state,location:zip,location:country,location:lat,location:lng,remark:common_words'
- )
- tblproperties ('hbase.table.name'='events_db:events');
-
- select * from hb_events limit 10;
- create table events stored as orc as select * from hb_events;
- select count(*) from hb_events;
- select count(*) from events;
- drop table hb_events;
-
- select event_id from events group by event_id having count(event_id) >1;
- with
- tb as (select event_id, row_number() over (partition by event_id) rn from events)
- select event_id from tb where rn > 1;
-
- select user_id, count(event_id) num from events group by user_id order by num desc;
-
- -----------------------------------------------------
- create external table if not exists hb_user_friend(
- row_key string,
- userid string,
- friendid string
- )stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
- with SERDEPROPERTIES ('hbase.columns.mapping'=':key,uf:userid,uf:friend')
- tblproperties ('hbase.table.name'='events_db:user_friend');
-
-
- select * from hb_user_friend limit 3;
- create table user_friend stored as orc as select * from hb_user_friend;
- select count(*) from hb_user_friend;
- select count(*) from user_friend;
- drop table hb_user_friend;
-
- -----------------------------------------------------------
- create external table if not exists hb_event_attendee(
- row_key string,
- eventid string,
- friendid string,
- attendtype string
- )stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
- with SERDEPROPERTIES ('hbase.columns.mapping'=':key,euat:eventid,euat:friendid,euat:state')
- tblproperties ('hbase.table.name'='events_db:event_attendee');
-
- select * from hb_event_attendee limit 3;
- select count(*) from hb_event_attendee;
- create table event_attendee stored as orc as select * from hb_event_attendee;
- select * from event_attendee limit 3;
- select count(*) from event_attendee;
- drop table hb_event_attendee;
-
-
- --------------------------------------------------------------
- create external table if not exists hb_train(
- row_key string,
- userid string,
- eventid string,
- invited string,
- `timestamp` string,
- interested string
- )stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
- with SERDEPROPERTIES ('hbase.columns.mapping'=':key,eu:user,eu:event,eu:invited,eu:timestamp,eu:interested')
- tblproperties ('hbase.table.name'='events_db:train');
-
- select * from hb_train limit 3;
- select count(*) from hb_train;
- create table train stored as orc as select * from hb_train;
- select * from train limit 3;
- select count(*) from train;
- drop table hb_train;
-
- -----------------------------------------------
- create external table locale(
- locale_id int,
- locale string
- )
- row format delimited fields terminated by '\t'
- location '/events/data/locale';
-
- select * from locale;
-
- create external table time_zone(
- time_zone_id int,
- time_zone string
- )
- row format delimited fields terminated by ','
- location '/events/data/timezone';
-
- select * from time_zone;