• Kafka To HBase To Hive


    目录

    1.在HBase中创建表

    2.写入API

    2.1普通模式写入hbase(逐条写入)

    2.2普通模式写入hbase(buffer写入)

    2.3设计模式写入hbase(buffer写入)

    3.HBase表映射至Hive中


    1.在HBase中创建表

    hbase(main):003:0> create_namespace 'events_db'                                                 

    hbase(main):004:0> create 'events_db:users','profile','region','registration'

    hbase(main):005:0> create 'events_db:user_friend','uf'

    hbase(main):006:0> create 'events_db:events','schedule','location','creator','remark'

    hbase(main):007:0> create 'events_db:event_attendee','euat'

    hbase(main):008:0> create 'events_db:train','eu'

    hbase(main):011:0> list_namespace_tables 'events_db'

    TABLE                                                                               

    event_attendee                                                                       

    events                                                                              

    train                                                                               

    user_friend                                                                         

    users                                                                               

    5 row(s)

    2.写入API

    2.1普通模式写入hbase(逐条写入)

    1. import org.apache.hadoop.conf.Configuration;
    2. import org.apache.hadoop.hbase.HBaseConfiguration;
    3. import org.apache.hadoop.hbase.HConstants;
    4. import org.apache.hadoop.hbase.TableName;
    5. import org.apache.hadoop.hbase.client.Connection;
    6. import org.apache.hadoop.hbase.client.ConnectionFactory;
    7. import org.apache.hadoop.hbase.client.Put;
    8. import org.apache.hadoop.hbase.client.Table;
    9. import org.apache.hadoop.hbase.util.Bytes;
    10. import org.apache.kafka.clients.consumer.ConsumerConfig;
    11. import org.apache.kafka.clients.consumer.ConsumerRecord;
    12. import org.apache.kafka.clients.consumer.ConsumerRecords;
    13. import org.apache.kafka.clients.consumer.KafkaConsumer;
    14. import org.apache.kafka.common.serialization.StringDeserializer;
    15. import java.io.IOException;
    16. import java.time.Duration;
    17. import java.util.ArrayList;
    18. import java.util.Collections;
    19. import java.util.Properties;
    20. /**
    21. * 将Kafka中的topic为userfriends中的数据消费到hbase中
    22. * hbase中的表为events_db:user_friend
    23. */
    24. public class UserFriendToHB {
    25. static int num = 0; //计数器
    26. public static void main(String[] args) {
    27. Properties properties = new Properties();
    28. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kb129:9092");
    29. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    30. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
    31. properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
    32. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
    33. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "user_friend_group1");
    34. KafkaConsumer consumer = new KafkaConsumer<>(properties);
    35. consumer.subscribe(Collections.singleton("userfriends"));
    36. //配置hbase信息,连接hbase数据库
    37. Configuration conf = HBaseConfiguration.create();
    38. conf.set(HConstants.HBASE_DIR, "hdfs://kb129:9000/hbase");
    39. conf.set(HConstants.ZOOKEEPER_QUORUM, "kb129");
    40. conf.set(HConstants.CLIENT_PORT_STR, "2181");
    41. Connection connection = null;
    42. try {
    43. connection = ConnectionFactory.createConnection(conf);
    44. Table ufTable = connection.getTable(TableName.valueOf("events_db:user_friend"));
    45. ArrayList datas = new ArrayList<>();
    46. while (true){
    47. ConsumerRecords poll = consumer.poll(Duration.ofMillis(100));
    48. //每次for循环前清空datas
    49. datas.clear();
    50. for (ConsumerRecord record : poll) {
    51. //System.out.println(record.value());
    52. String[] split = record.value().split(",");
    53. int i = (split[0] + split[1]).hashCode();
    54. Put put = new Put(Bytes.toBytes(i));
    55. put.addColumn(Bytes.toBytes("uf"), Bytes.toBytes("userid"), split[0].getBytes());
    56. put.addColumn("uf".getBytes(), "friend".getBytes(),split[1].getBytes());
    57. datas.add(put);
    58. }
    59. num = num + datas.size();
    60. System.out.println("---------num:" + num);
    61. if (datas.size() > 0){
    62. ufTable.put(datas);
    63. }
    64. try {
    65. Thread.sleep(10);
    66. } catch (InterruptedException e) {
    67. throw new RuntimeException(e);
    68. }
    69. }
    70. } catch (IOException e) {
    71. throw new RuntimeException(e);
    72. }
    73. }
    74. }

    2.2普通模式写入hbase(buffer写入)

    1. import org.apache.hadoop.conf.Configuration;
    2. import org.apache.hadoop.hbase.HBaseConfiguration;
    3. import org.apache.hadoop.hbase.HConstants;
    4. import org.apache.hadoop.hbase.TableName;
    5. import org.apache.hadoop.hbase.client.*;
    6. import org.apache.hadoop.hbase.util.Bytes;
    7. import org.apache.kafka.clients.consumer.ConsumerConfig;
    8. import org.apache.kafka.clients.consumer.ConsumerRecord;
    9. import org.apache.kafka.clients.consumer.ConsumerRecords;
    10. import org.apache.kafka.clients.consumer.KafkaConsumer;
    11. import org.apache.kafka.common.serialization.StringDeserializer;
    12. import java.io.IOException;
    13. import java.time.Duration;
    14. import java.util.ArrayList;
    15. import java.util.Collections;
    16. import java.util.Properties;
    17. /**
    18. * 将Kafka中的topic为userfriends中的数据消费到hbase中
    19. * hbase中的表为events_db:user_friend
    20. */
    21. public class UserFriendToHB2 {
    22. static int num = 0; //计数器
    23. public static void main(String[] args) {
    24. Properties properties = new Properties();
    25. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kb129:9092");
    26. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    27. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
    28. properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
    29. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
    30. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "user_friend_group1");
    31. KafkaConsumer consumer = new KafkaConsumer<>(properties);
    32. consumer.subscribe(Collections.singleton("userfriends"));
    33. //配置hbase信息,连接hbase数据库
    34. Configuration conf = HBaseConfiguration.create();
    35. conf.set(HConstants.HBASE_DIR, "hdfs://kb129:9000/hbase");
    36. conf.set(HConstants.ZOOKEEPER_QUORUM, "kb129");
    37. conf.set(HConstants.CLIENT_PORT_STR, "2181");
    38. Connection connection = null;
    39. try {
    40. connection = ConnectionFactory.createConnection(conf);
    41. BufferedMutatorParams bufferedMutatorParams = new BufferedMutatorParams(TableName.valueOf("events_db:user_friend"));
    42. bufferedMutatorParams.setWriteBufferPeriodicFlushTimeoutMs(10000);//设置超时flush时间最大值
    43. bufferedMutatorParams.writeBufferSize(10*1024*1024);//设置缓存大小flush
    44. BufferedMutator bufferedMutator = connection.getBufferedMutator(bufferedMutatorParams) ;
    45. ArrayList datas = new ArrayList<>();
    46. while (true){
    47. ConsumerRecords poll = consumer.poll(Duration.ofMillis(100));
    48. datas.clear(); //每次for循环前清空datas
    49. for (ConsumerRecord record : poll) {
    50. //System.out.println(record.value());
    51. String[] split = record.value().split(",");
    52. int i = (split[0] + split[1]).hashCode();
    53. Put put = new Put(Bytes.toBytes(i));
    54. put.addColumn(Bytes.toBytes("uf"), Bytes.toBytes("userid"), split[0].getBytes());
    55. put.addColumn("uf".getBytes(), "friend".getBytes(),split[1].getBytes());
    56. datas.add(put);
    57. }
    58. num = num + datas.size();
    59. System.out.println("---------num:" + num);
    60. if (datas.size() > 0){
    61. bufferedMutator.mutate(datas);
    62. }
    63. }
    64. } catch (IOException e) {
    65. throw new RuntimeException(e);
    66. }
    67. }
    68. }

    2.3设计模式写入hbase(buffer写入)

    (1)Iworker接口

    1. public interface IWorker {
    2. void fillData(String targetName);
    3. }

    (2)worker实现类

    1. import nj.zb.kb23.kafkatohbase.oop.writer.IWriter;
    2. import org.apache.kafka.clients.consumer.ConsumerConfig;
    3. import org.apache.kafka.clients.consumer.ConsumerRecords;
    4. import org.apache.kafka.clients.consumer.KafkaConsumer;
    5. import org.apache.kafka.common.serialization.StringDeserializer;
    6. import java.time.Duration;
    7. import java.util.Collections;
    8. import java.util.Properties;
    9. public class Worker implements IWorker {
    10. private KafkaConsumer consumer = null;
    11. private IWriter writer = null;
    12. public Worker(String topicName, String consumerGroupId, IWriter writer) {
    13. this.writer = writer;
    14. Properties properties = new Properties();
    15. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kb129:9092");
    16. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    17. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    18. properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    19. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    20. properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
    21. consumer = new KafkaConsumer<>(properties);
    22. consumer.subscribe(Collections.singleton(topicName));
    23. }
    24. @Override
    25. public void fillData(String targetName) {
    26. int num = 0;
    27. while (true) {
    28. ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
    29. int returnNum = writer.write(targetName, records);
    30. num += returnNum;
    31. System.out.println("---------num:" + num);
    32. }
    33. }
    34. }

    (3)IWriter接口

    1. import org.apache.kafka.clients.consumer.ConsumerRecords;
    2. /**
    3. * 完成kafka消费出的数据 ConsumerRecords 的组装和写入到指定类型的数据库 指定table 的工作
    4. */
    5. public interface IWriter {
    6. int write(String targetTableName, ConsumerRecords records);
    7. }

    (4)writer实现类

    1. import nj.zb.kb23.kafkatohbase.oop.handler.IParseRecord;
    2. import org.apache.hadoop.conf.Configuration;
    3. import org.apache.hadoop.hbase.HBaseConfiguration;
    4. import org.apache.hadoop.hbase.HConstants;
    5. import org.apache.hadoop.hbase.TableName;
    6. import org.apache.hadoop.hbase.client.*;
    7. import org.apache.kafka.clients.consumer.ConsumerRecords;
    8. import java.io.IOException;
    9. import java.util.List;
    10. public class HBaseWriter implements IWriter{
    11. private Connection connection = null;
    12. private BufferedMutator bufferedMutator = null;
    13. private IParseRecord handler = null;
    14. /**
    15. * 初始化HBaseWriter对象
    16. */
    17. public HBaseWriter(IParseRecord handler) {
    18. this.handler = handler;
    19. Configuration conf = HBaseConfiguration.create();
    20. conf.set(HConstants.HBASE_DIR, "hdfs://kb129:9000/hbase");
    21. conf.set(HConstants.ZOOKEEPER_QUORUM, "kb129");
    22. conf.set(HConstants.CLIENT_PORT_STR, "2181");
    23. try {
    24. connection = ConnectionFactory.createConnection(conf);
    25. } catch (IOException e) {
    26. throw new RuntimeException(e);
    27. }
    28. }
    29. private void getBufferedMutator(String targetTableName){
    30. BufferedMutatorParams bufferedMutatorParams = new BufferedMutatorParams(TableName.valueOf(targetTableName));
    31. bufferedMutatorParams.setWriteBufferPeriodicFlushTimeoutMs(10000);//设置超时flush时间最大值
    32. bufferedMutatorParams.writeBufferSize(10*1024*1024);//设置缓存大小flush
    33. if (bufferedMutator == null){
    34. try {
    35. bufferedMutator = connection.getBufferedMutator(bufferedMutatorParams);
    36. } catch (IOException e) {
    37. throw new RuntimeException(e);
    38. }
    39. }
    40. }
    41. @Override
    42. public int write(String targetTableName, ConsumerRecords records) {
    43. if (records.count() > 0) {
    44. this.getBufferedMutator(targetTableName);
    45. List datas = handler.parse(records);
    46. try {
    47. bufferedMutator.mutate(datas);
    48. } catch (IOException e) {
    49. throw new RuntimeException(e);
    50. }
    51. return datas.size();
    52. }else {
    53. return 0;
    54. }
    55. }
    56. }

    (5)IParseRecord接口

    1. import org.apache.hadoop.hbase.client.Put;
    2. import org.apache.kafka.clients.consumer.ConsumerRecords;
    3. import java.util.List;
    4. /**
    5. * 将record 装配成 put
    6. */
    7. public interface IParseRecord {
    8. List parse(ConsumerRecords records);
    9. }

    (6)具体表对应的handler类(包装Put)

    UsersHandler

    1. import org.apache.hadoop.hbase.client.Put;
    2. import org.apache.kafka.clients.consumer.ConsumerRecord;
    3. import org.apache.kafka.clients.consumer.ConsumerRecords;
    4. import java.util.ArrayList;
    5. import java.util.List;
    6. public class UsersHandler implements IParseRecord{
    7. List datas = new ArrayList<>();
    8. @Override
    9. public List parse(ConsumerRecords records) {
    10. datas.clear();
    11. for (ConsumerRecord record : records) {
    12. String[] users = record.value().split(",");
    13. Put put = new Put(users[0].getBytes());
    14. put.addColumn("profile".getBytes(), "birthyear".getBytes(), users[2].getBytes());
    15. put.addColumn("profile".getBytes(), "gender".getBytes(), users[3].getBytes());
    16. put.addColumn("region".getBytes(), "locale".getBytes(), users[1].getBytes());
    17. if (users.length > 4){
    18. put.addColumn("registration".getBytes(), "joinedAt".getBytes(), users[4].getBytes());
    19. }
    20. if (users.length > 5){
    21. put.addColumn("region".getBytes(), "location".getBytes(), users[5].getBytes());
    22. }
    23. if (users.length > 6){
    24. put.addColumn("region".getBytes(), "timezone".getBytes(), users[6].getBytes());
    25. }
    26. datas.add(put);
    27. }
    28. return datas;
    29. }
    30. }

    TrainHandler

    1. import org.apache.hadoop.hbase.client.Put;
    2. import org.apache.kafka.clients.consumer.ConsumerRecord;
    3. import org.apache.kafka.clients.consumer.ConsumerRecords;
    4. import java.util.ArrayList;
    5. import java.util.List;
    6. public class TrainHandler implements IParseRecord{
    7. List datas = new ArrayList<>();
    8. @Override
    9. public List parse(ConsumerRecords records) {
    10. datas.clear();
    11. for (ConsumerRecord record : records) {
    12. String[] trains = record.value().split(",");
    13. double random = Math.random();
    14. Put put = new Put((trains[0]+trains[1]+random).getBytes());
    15. put.addColumn("eu".getBytes(), "user".getBytes(), trains[0].getBytes());
    16. put.addColumn("eu".getBytes(), "event".getBytes(), trains[1].getBytes());
    17. put.addColumn("eu".getBytes(), "invited".getBytes(), trains[2].getBytes());
    18. put.addColumn("eu".getBytes(), "timestamp".getBytes(), trains[3].getBytes());
    19. put.addColumn("eu".getBytes(), "interested".getBytes(), trains[4].getBytes());
    20. put.addColumn("eu".getBytes(), "not_interested".getBytes(), trains[5].getBytes());
    21. datas.add(put);
    22. }
    23. return datas;
    24. }
    25. }

    EventsHandler

    1. import org.apache.hadoop.hbase.client.Put;
    2. import org.apache.kafka.clients.consumer.ConsumerRecord;
    3. import org.apache.kafka.clients.consumer.ConsumerRecords;
    4. import java.util.ArrayList;
    5. import java.util.List;
    6. public class EventsHandler implements IParseRecord {
    7. List datas = new ArrayList<>();
    8. @Override
    9. public List parse(ConsumerRecords records) {
    10. datas.clear();
    11. for (ConsumerRecord record : records) {
    12. String[] events = record.value().split(",");
    13. Put put = new Put(events[0].getBytes());
    14. put.addColumn("creator".getBytes(), "user_id".getBytes(),events[1].getBytes());
    15. put.addColumn("schedule".getBytes(), "start_time".getBytes(),events[2].getBytes());
    16. put.addColumn("location".getBytes(), "city".getBytes(),events[3].getBytes());
    17. put.addColumn("location".getBytes(), "state".getBytes(),events[4].getBytes());
    18. put.addColumn("location".getBytes(), "zip".getBytes(),events[5].getBytes());
    19. put.addColumn("location".getBytes(), "country".getBytes(),events[6].getBytes());
    20. put.addColumn("location".getBytes(), "lat".getBytes(),events[7].getBytes());
    21. put.addColumn("location".getBytes(), "lng".getBytes(),events[8].getBytes());
    22. put.addColumn("remark".getBytes(), "common_words".getBytes(),events[9].getBytes());
    23. datas.add(put);
    24. }
    25. return datas;
    26. }
    27. }

    EventAttendHandler

    1. import org.apache.hadoop.hbase.client.Put;
    2. import org.apache.hadoop.hbase.util.Bytes;
    3. import org.apache.kafka.clients.consumer.ConsumerRecord;
    4. import org.apache.kafka.clients.consumer.ConsumerRecords;
    5. import java.util.ArrayList;
    6. import java.util.List;
    7. public class EventAttendHandler implements IParseRecord{
    8. @Override
    9. public List parse(ConsumerRecords records) {
    10. List datas = new ArrayList<>();
    11. for (ConsumerRecord record : records) {
    12. String[] splits = record.value().split(",");
    13. Put put = new Put((splits[0] + splits[1] + splits[2]).getBytes());
    14. put.addColumn(Bytes.toBytes("euat"), Bytes.toBytes("eventid"), splits[0].getBytes());
    15. put.addColumn("euat".getBytes(), "friendid".getBytes(),splits[1].getBytes());
    16. put.addColumn("euat".getBytes(), "state".getBytes(),splits[2].getBytes());
    17. datas.add(put);
    18. }
    19. return datas;
    20. }
    21. }

    (7)主程序

    1. import nj.zb.kb23.kafkatohbase.oop.handler.*;
    2. import nj.zb.kb23.kafkatohbase.oop.worker.Worker;
    3. import nj.zb.kb23.kafkatohbase.oop.writer.HBaseWriter;
    4. import nj.zb.kb23.kafkatohbase.oop.writer.IWriter;
    5. /**
    6. * 将Kafka中的topic为...中的数据消费到hbase中
    7. * hbase中的表为events_db:...
    8. */
    9. public class KfkToHbTest {
    10. static int num = 0; //计数器
    11. public static void main(String[] args) {
    12. //IParseRecord handler = new EventAttendHandler();
    13. //IWriter writer = new HBaseWriter(handler);
    14. //String topic = "eventattendees";
    15. //String consumerGroupId = "eventattendees_group1";
    16. //String targetName = "events_db:event_attendee";
    17. //Worker worker = new Worker(topic, consumerGroupId, writer);
    18. //worker.fillData(targetName);
    19. /*EventsHandler eventsHandler = new EventsHandler();
    20. IWriter writer = new HBaseWriter(eventsHandler);
    21. Worker worker = new Worker("events", "events_group1", writer);
    22. worker.fillData("events_db:eventsb");*/
    23. /*UsersHandler usersHandler = new UsersHandler();
    24. IWriter writer = new HBaseWriter(usersHandler);
    25. Worker worker = new Worker("users_raw", "users_group1", writer);
    26. worker.fillData("events_db:users");*/
    27. TrainHandler trainHandler = new TrainHandler();
    28. IWriter writer = new HBaseWriter(trainHandler);
    29. Worker worker = new Worker("train", "train_group1", writer);
    30. worker.fillData("events_db:train2");
    31. }
    32. }

    3.HBase表映射至Hive

    1. create database if not exists events;
    2. use events;
    3. create external table hb_users(
    4. userId string,
    5. birthyear int,
    6. gender string,
    7. locale string,
    8. location string,
    9. timezone string,
    10. joinedAt string
    11. )stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    12. with SERDEPROPERTIES (
    13. 'hbase.columns.mapping'=':key,profile:birthyear,profile:gender,region:locale,region:location,region:timezone,registration:joinedAt'
    14. )
    15. tblproperties ('hbase.table.name'='events_db:users');
    16. select * from hb_users limit 3;
    17. select count(1) from hb_users;
    18. --orc格式创建内部表存储映射外部表,安全保存数据,创建好可以直接删除hbase中的表
    19. create table users stored as orc as select * from hb_users;
    20. select * from users limit 3;
    21. select count(1) from users;
    22. drop table hb_users;
    23. --38209 1494
    24. select count(*) from users where birthyear is null;
    25. select round(avg(birthyear), 0) from users;
    26. select `floor`(avg(birthyear)) from users;
    27. -- 处理空字段,覆盖写入
    28. with
    29. tb as ( select `floor`(avg(birthyear)) avgAge from users ),
    30. tb2 as ( select userId, nvl(birthyear, tb.avgAge),gender,locale,location,timezone,joinedAt from users,tb)
    31. insert overwrite table users
    32. select * from tb2;
    33. -- 查询到性别中空字符串109个
    34. select count(gender) count from users where gender is null or gender = "";
    35. --------------------------------------------------------
    36. create external table hb_events(
    37. event_id string,
    38. user_id string,
    39. start_time string,
    40. city string,
    41. state string,
    42. zip string,
    43. country string,
    44. lat float,
    45. lng float,
    46. common_words string
    47. )stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    48. with SERDEPROPERTIES (
    49. 'hbase.columns.mapping'=':key,creator:user_id,schedule:start_time,location:city,location:state,location:zip,location:country,location:lat,location:lng,remark:common_words'
    50. )
    51. tblproperties ('hbase.table.name'='events_db:events');
    52. select * from hb_events limit 10;
    53. create table events stored as orc as select * from hb_events;
    54. select count(*) from hb_events;
    55. select count(*) from events;
    56. drop table hb_events;
    57. select event_id from events group by event_id having count(event_id) >1;
    58. with
    59. tb as (select event_id, row_number() over (partition by event_id) rn from events)
    60. select event_id from tb where rn > 1;
    61. select user_id, count(event_id) num from events group by user_id order by num desc;
    62. -----------------------------------------------------
    63. create external table if not exists hb_user_friend(
    64. row_key string,
    65. userid string,
    66. friendid string
    67. )stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    68. with SERDEPROPERTIES ('hbase.columns.mapping'=':key,uf:userid,uf:friend')
    69. tblproperties ('hbase.table.name'='events_db:user_friend');
    70. select * from hb_user_friend limit 3;
    71. create table user_friend stored as orc as select * from hb_user_friend;
    72. select count(*) from hb_user_friend;
    73. select count(*) from user_friend;
    74. drop table hb_user_friend;
    75. -----------------------------------------------------------
    76. create external table if not exists hb_event_attendee(
    77. row_key string,
    78. eventid string,
    79. friendid string,
    80. attendtype string
    81. )stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    82. with SERDEPROPERTIES ('hbase.columns.mapping'=':key,euat:eventid,euat:friendid,euat:state')
    83. tblproperties ('hbase.table.name'='events_db:event_attendee');
    84. select * from hb_event_attendee limit 3;
    85. select count(*) from hb_event_attendee;
    86. create table event_attendee stored as orc as select * from hb_event_attendee;
    87. select * from event_attendee limit 3;
    88. select count(*) from event_attendee;
    89. drop table hb_event_attendee;
    90. --------------------------------------------------------------
    91. create external table if not exists hb_train(
    92. row_key string,
    93. userid string,
    94. eventid string,
    95. invited string,
    96. `timestamp` string,
    97. interested string
    98. )stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    99. with SERDEPROPERTIES ('hbase.columns.mapping'=':key,eu:user,eu:event,eu:invited,eu:timestamp,eu:interested')
    100. tblproperties ('hbase.table.name'='events_db:train');
    101. select * from hb_train limit 3;
    102. select count(*) from hb_train;
    103. create table train stored as orc as select * from hb_train;
    104. select * from train limit 3;
    105. select count(*) from train;
    106. drop table hb_train;
    107. -----------------------------------------------
    108. create external table locale(
    109. locale_id int,
    110. locale string
    111. )
    112. row format delimited fields terminated by '\t'
    113. location '/events/data/locale';
    114. select * from locale;
    115. create external table time_zone(
    116. time_zone_id int,
    117. time_zone string
    118. )
    119. row format delimited fields terminated by ','
    120. location '/events/data/timezone';
    121. select * from time_zone;

  • 相关阅读:
    给你一份精心设计的消息中间件高扩展架构,赶紧写进简历吧!
    SkyWalking快速上手(二)——架构剖析1
    HTML+CSS大作业:使用html设计一个简单好看的公司官网首页 浮动布局
    Ubuntu空间不足,如何扩容
    密评必备网站汇总
    360智慧生活旗舰产品率先接入“360智脑”能力实现升级
    Java基于SSM+JSP的服装定制系统
    C++模板编程(21)---C++实例化实现方案implementation Schemes
    Go语言超全详解(入门级)
    如何简单的获取Bean对象?
  • 原文地址:https://blog.csdn.net/weixin_63713552/article/details/134016370