• 详解 HBase 的常用 API


    一、环境准备

    • 创建一个 Maven 工程并引入依赖

      <dependency>
          <groupId>org.apache.hbasegroupId>
          <artifactId>hbase-serverartifactId>
          <version>1.3.1version>
      dependency>
      <dependency>
          <groupId>org.apache.hbasegroupId>
          <artifactId>hbase-clientartifactId>
          <version>1.3.1version>
      dependency>
      
    • 创建 API 测试类并获取 HBase 客户端连接

      public class TestAPI {
          private static Connection conn = null;
          private static Admin admin = null;  // DDL 操作客户端
          
          static {
              try { 
                  // 1.获取配置信息对象
                  // 过时API
                  // HBaseConfiguration conf1 = new HBaseConfiguration();
                  // conf1.set("hbase.zookeeper.quorum","hadoop102,hadoop103,hadoop104");
      
                  // 新API
                  Configuration conf = HBaseConfiguration.create();
                  conf.set("hbase.zookeeper.quorum","hadoop102,hadoop103,hadoop104");
      
                  // 2.创建连接对象(新API)
                  conn = ConnectionFactory.createConnection(conf);
      
                  // 3.创建hbase管理员对象
                  // 过时API
                  // HBaseAdmin admin1 = new HBaseAdmin(conf1);
      
                  // 新API
                  admin = conn.getAdmin();
                  
              } catch(Exception e) {
              	e.printStackTrace();    
              }
          }
          
          // 关闭资源
          public static void close() {
              try {
                  if(admin != null) {
                      admin.close();
                  }
                  
                  if(conn != null) {
                      conn.close();
                  }
              } catch(Exception e) {
                  e.printStackTrace();
              }
          }
      }
      

    二、HBase API 实操

    1. 判断表是否存在

    public static boolean isTableExist(String tableName) {
        // 过时API
        // boolean exist = admin1.tableExists(tableName);
        
        // 新API
        boolean exist = admin.tableExists(TableName.valueOf(tableName));
        
        return exist;
    }
    

    2. 创建表

    public static void createTable(String tableName, String... cfs) {
        // 1.判断传入列族信息
        if(cfs.length()<=0) {
            System.out.println("必须设置一个列族信息");
            return;
        }
        
        // 2.判断表是否存在
        if(isTableExist(tableName)) {
            System.out.println(tableName + "表已经存在");
            return;
        }
        
        // 3.创建表描述器
        HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
        
        // 4.循环添加列族
        for(String cfs : cfs) {
            // 5.创建列族描述器
            HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cfs);
            // 6.添加单个列族
            hTableDescriptor.addFamily(hColumnDescriptor);
        }
        
        // 7.创建表
        admin.createTable(hTableDescriptor);
        
        System.out.println(tableName + "表创建成功!");
    }
    

    3. 删除表

    public static void deleteTable(String tableName) {
        // 1.判断表是否存在
        if(!isTableExist(tableName)) {
            System.out.println(tableName + "表已经不存在!");
            return;
        }
        
        // 2.使表下线
        admin.disableTable(TableName.valueOf(tableName));
        
        // 3.删除表
        admin.deleteTable(TableName.valueOf(tableName));
        
        System.out.println(tableName + "表删除成功!");
    }
    

    4. 创建命名空间

    public static void createNamespace(String ns) {
        // 1.创建命名空间描述器
        NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(ns).build();
        
        try {
        	// 2.创建命名空间
        	admin.createNamespace(namespaceDescriptor); 
            
            System.out.println(ns + "命名空间创建成功!");
        } catch(NamespaceExistException e) { // 生产上判断 ns 是否存在
            System.out.println(ns + "命名空间已经存在!");
        } catch(Exception e) {
            e.printStackTrace();
        }
        
    }
    

    5. 插入数据

    public static void putData(String tableName, String rowKey, String cf, String cn, String value) {
        // 1.创建hbase表操作对象
        Table table = conn.getTable(TableName.valueOf(tableName));
        
        // 2.创建 put 对象
        // HBase 底层存储的数据格式都是 byte[],可以通过 hbase.util 包下的 Bytes 类进行数据类型转换
        Put put = new Put(Bytes.toBytes(rowKey));
        
        // 3.给put对象赋值
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn), Bytes.toBytes(value));
        
        // 4.插入数据
        table.put(put);
        
        // 5.关闭资源
        table.close();
        
    }
    

    6. 获取数据

    6.1 get
    public static void getData(String tableName, String rowKey, String cf, String cn) {
        // 1.创建hbase表操作对象
        Table table = conn.getTable(TableName.valueOf(tableName));
        
        // 2.创建get对象
        Get get = new Get(Bytes.toBytes(rowKey));
        // 2.1.指定获取的列族
        // get.addFamily(Bytes.toBytes(cf));
        // 2.2.指定获取的列族和列
        // get.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn));
        // 2.3.指定获取数据的版本数
        // get.setMaxVersions();  // 相当于 scan 'tb',{RAW=>TRUE,VERSIONS=>10}
        
        // 3.获取一行的数据
        Result result = table.get(get);
        
        // 4.获取一行的所有 cell
        for(Cell cell : result.rawCells()) {
            // 5.打印数据信息
            System.out.println("family:" + Bytes.toString(CellUtil.cloneFamily(cell)) +
                              ",qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell)) 
                               + ",value:" + Bytes.toString(CellUtil.cloneValue(cell))
                              );
        }
        
        // 6.关闭资源
        table.close();
    }
    
    6.2 scan
    public static void scanTable(String tableName) {
        // 1.创建hbase表操作对象
        Table table = conn.getTable(TableName.valueOf(tableName));
        
        // 2.构建scan对象
        Scan scan = new Scan(); // 全表
        // Scan scan = new Scan(Bytes.toBytes("1001"),Bytes.toBytes("1003")); // 限定 rowKey 范围,左闭右开
        
        // 3.扫描表
        ResultScanner resultScanner = table.getScanner(scan);
        
        // 4.解析resultScanner
        for(Result result : resultScanner) { // 按 rowKey 从小到大遍历获取
            // 5.解析result
            for(Cell cell : result.rawCells()) {
                // 6.打印数据信息
            System.out.println("rk:" + Bytes.toString(CellUtil.cloneRow(cell)) +
                ",family:" + Bytes.toString(CellUtil.cloneFamily(cell)) +
                              ",qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell)) 
                               + ",value:" + Bytes.toString(CellUtil.cloneValue(cell))
                              );
            }
        }
        
        // 7.关闭资源
        table.close();
    }
    

    7. 删除数据

    delete 操作最终还是 put 操作

    public static void deleteData(String tableName, String rowKey, String cf, String cn) {
        // 1.创建hbase表操作对象
        Table table = conn.getTable(TableName.valueOf(tableName));
        
        // 2.构建delete对象
        Delete delete = new Delete(Bytes.toBytes(rowKey)); // 相当于 deleteall 命令
        // 2.1.设置删除列
        // delete.addColumn(Bytes.toBytes(cf),Bytes.toBytes(cn)); // 删除指定版本的列,不指定则为最大版本,小于该版本的列信息可以查询出来(慎用)
        // delete.addColumns(Bytes.toBytes(cf),Bytes.toBytes(cn)); // 删除列的所有版本
        
        // 2.2.设置删除的列族
        // delete.addFamily(Bytes.toBytes(cf));
        
        // 3.删除数据
        table.delete(delete);
        
        // 4.关闭资源
        table.close();
    }
    
    • 只指定 rowKey 删除:删除指定 rowKey 的所有列族所有版本数据,标记为 deleteFamily
    • 指定 rowKey + 列族 [+ 版本]:删除指定 rowKey 的指定列族所有版本 (小于等于该版本) 数据,标记为 deleteFamily
    • 指定 rowKey + 列族 + 列 [+ 版本]:
      • addColumns():删除指定 rowKey 的指定列族的指定列的所有版本 (小于等于该版本) 数据,标记为 deleteColumn
      • addColumn():删除指定 rowKey 的指定列族的指定列的最新版本 (该指定版本) 数据,标记为 delete,生产上慎用

    三、与 MapReduce 交互

    1. 环境搭建

    • 查看 MR 操作 HBase 所需的 jar 包:

      cd /opt/module/hbase
      bin/hbase mapredcp
      
    • Hadoop 中导入环境变量

      • 临时生效:

        # 在命令行执行
        export HBASE_HOME=/opt/module/hbase
        export HADOOP_HOME=/opt/module/hadoop-2.7.2
        export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`
        
      • 永久生效:

        # 在 /etc/profile.d/my_env.sh 配置
        export HBASE_HOME=/opt/module/hbase
        export HADOOP_HOME=/opt/module/hadoop-2.7.2
        
        # 在 hadoop-env.sh 中配置,在有关 HADOOP_CLASSPATH 的 for 循环之后添加
        export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*
        
    • 分发配置到其他节点

    • 启动 Hadoop 集群和 HBase 集群

    2. 案例实操

    2.1 官方案例
    • 统计 stu 表中有多少行数据

      /opt/module/hadoop-2.7.2/bin/yarn jar \
      /opt/module/hbase/lib/hbase-server-1.3.1.jar \ 
      rowcounter student
      
    • 使用 MapReduce 将本地数据导入到 HBase

      # 在本地创建一个 tsv 格式的文件:fruit.tsv
      vim fruit.tsv
      1001  Apple  Red
      1002  Pear  Yellow
      1003  Pineapple Yellow
      
      # 在 HDFS 中创建 input_fruit 文件夹并上传 fruit.tsv 文件
      /opt/module/hadoop-2.7.2/bin/hdfs dfs -mkdir /input_fruit/
      /opt/module/hadoop-2.7.2/bin/hdfs dfs -put fruit.tsv /input_fruit/
      
      # 创建 Hbase 表
      Hbase(main):001:0> create 'fruit','info'
      
      # 执行 MapReduce 到 HBase 的 fruit 表中(若表不存在则报错)
      /opt/module/hadoop-2.7.2/bin/yarn jar \
      /opt/module/hbase/lib/hbase-server-1.3.1.jar \
      importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit \
      hdfs://hadoop102:9000/input_fruit/fruit.tsv
      
      # 使用 scan 命令查看导入后的结果
      Hbase(main):001:0> scan 'fruit'
      
    2.2 自定义案例
    • 案例 1:实现将 HDFS 中的数据写入到 Hbase 表中

      • 编码:

        // 构建 FruitMapper 用于读取 HDFS 中的文件数据
        public class FruitMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
            @override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                context.write(key, value);
            }
        }
        
        // 构建 FruitReducer 用于将 HDFS 中的文件数据写入 Hbase
        // TableReducer 默认的输出value类型是 Mutation,其子类有 put/delete 等
        public class FruitReducer extends TableReducer<LongWritable, Text, NullWritable> {
            @override
            protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                // 1.遍历获取每行数据
                for(Text value : values) {
                    // 2.切割一行数据
                    String[] fields = value.toString().split("\t");
                    // 3.构建put对象
                    Put put = new Put(Bytes.toBytes(fields[0]));
                    // 4.为put对象赋值
                    put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(fields[1]));
                    put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("color"),Bytes.toBytes(fields[2]));
                    // 5.数据写入hbase
                    context.write(NullWritable.get(),put);
                }
            }
        }
        
        // 构建 FruitDriver
        public class FruitDriver implements Tool {
            private Configuration conf = null;
            
            @override
            public int run(String[] args) throws Exception {
                // 1.获取job
                Job job = Job.getInstance(conf);
                // 2.设置驱动类路径
                job.setJarByClass(FruitDriver.class);
                // 3.设置Mapper及输出KV类型
                job.setMapperClass(FruitMapper.class);
                job.setMapOutputKeyClass(LongWritable.class);
                job.setMapOutputValueClass(Text.class);
                
                // 4.设置Reducer(不能使用普通的 Reducer 设置)
                TableMapReduceUtil.initTableReducerJob(args[1], FruitReducer.class, job);
                
                // 5.设置输入路径
                FileInputFormat.setInputPaths(job, new Path(args[0]));
                
                // 6.提交job
                boolean result = job.waitForCompletion(true);
                
                return result ? 0 : 1;
            }
            
            @override
            public void setConf(Configuration conf) {
                this.conf = conf;
            }
            
            @override
            public Configuration getConf() {
                return conf;
            }
            
            public static void main(String[] args) {
                try {
                	Configuration conf = new Configuration();
        			int status = ToolRunner.run(conf, new FruitDriver(), args);
        			System.exit(status);    
                } catch(Exception e) {
                    e.printStackTrace();
                }
            }
        }
        
      • 将代码打包后上传到 hbase 集群服务器并执行

        # 首先创建 hbase fruit 表
        # 执行:
        /opt/module/hadoop-2.7.2/bin/yarn jar \
        /opt/module/hbase/Hbase-0.0.1-SNAPSHOT.jar com.xxx.FruitDriver \ /input_fruit/fruit.tsv fruit
        
    • 案例 2:将 fruit 表中的一部分数据,通过 MR 迁入到 fruit2 表中

      • 编码

        // 构建 Fruit2Mapper 用于读取 Hbase 中的 Fruit 表数据
        // TableMapper 默认的KV输入类型为 ImmutableBytesWritable, Result
        public class Fruit2Mapper extends TableMapper<ImmutableBytesWritable, Put> {
            @override
            protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
                // 1.构建PUT对象
                Put put = new Put(key.get()); // key 是 rowKey 值
                // 2.解析Result
                for(Cell cell : value.rawCells()) {
                    // 3.获取列为 name 的 cell
                    if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
                        // 4.为put对象赋值
                        put.add(cell);
                    }
                }
                
                // 5.写出
                context.write(key, put);
            }
        }
        
        // 构建 Fruit2Reducer 用于将数据写入 Hbase
        // TableReducer 默认的输出value类型是 Mutation,其子类有 put/delete 等
        public class Fruit2Reducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
            @override
            protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
        		// 遍历写出
                for(Put put : values) {
                    context.write(NullWritable.get(), put);
                }
            }
        }
        
        // 构建 Fruit2Driver
        public class Fruit2Driver implements Tool {
            private Configuration conf = null;
            
            @override
            public int run(String[] args) throws Exception {
                // 1.获取job
                Job job = Job.getInstance(conf);
                // 2.设置驱动类路径
                job.setJarByClass(Fruit2Driver.class);
                
                // 3.设置Mapper及输出KV类型(不能使用普通的 Mapper 设置)
                TableMapReduceUtil.initTableMapperJob(args[0], new Scan(), Fruit2Mapper.class, ImmutableBytesWritable.class, Put.class, job);
                
                // 4.设置Reducer(不能使用普通的 Reducer 设置)
                TableMapReduceUtil.initTableReducerJob(args[1], Fruit2Reducer.class, job);
                
                // 5.提交job
                boolean result = job.waitForCompletion(true);
                
                return result ? 0 : 1;
            }
            
            @override
            public void setConf(Configuration conf) {
                this.conf = conf;
            }
            
            @override
            public Configuration getConf() {
                return conf;
            }
            
            public static void main(String[] args) {
                try {
                	// Configuration conf = new Configuration();
                    Configuration conf = HbaseConfiguration.create();
        			int status = ToolRunner.run(conf, new Fruit2Driver(), args);
        			System.exit(status);    
                } catch(Exception e) {
                    e.printStackTrace();
                }
            }
        }
        
      • 集群测试:将代码打包后上传到 hbase 集群服务器并执行

        # 首先创建 hbase fruit2 表
        # 执行:
        /opt/module/hadoop-2.7.2/bin/yarn jar \
        /opt/module/hbase/Hbase-0.0.1-SNAPSHOT.jar com.xxx.Fruit2Driver \
        fruit fruit2
        
      • 本地测试:

        • 在 Maven 工程的 resources 目录下创建 hbase-site.xml 文件
        • hbase/conf/hbase-site.xml 的内容拷贝到上面创建的文件中
        • 执行 Fruit2Driver 程序 main 方法

    四、集成 Hive

    1. 与 Hive 对比

    • Hive
      • 数据仓库:Hive 的本质其实就相当于将 HDFS 中已经存储的文件在 Mysql 中做了一个双射关系,以方便使用 HQL 去管理查询
      • 用于数据分析、清洗:Hive 适用于离线的数据分析和清洗,延迟较高
      • 基于 HDFS、MapReduce:Hive 存储的数据依旧在 DataNode 上,编写的 HQL 语句终将是转换为 MapReduce 代码执行
    • HBase
      • 数据库:是一种面向列族存储的非关系型数据库
      • 用于存储结构化和非结构化的数据:适用于单表非关系型数据的存储,不适合做关联查询,类似 JOIN 等操作
      • 基于 HDFS:数据持久化存储的体现形式是 HFile,存放于 DataNode 中,被 ResionServer 以 region 的形式进行管理
      • 延迟较低,接入在线业务使用:面对大量的企业数据, HBase 可以直线单表大量数据的存储,同时提供了高效的数据访问速度

    2. 集成使用

    2.1 环境搭建
    • /etc/profile.d/my_env.sh 中配置 Hive 和 HBase 环境变量

      vim /etc/profile.d/my_env.sh
      
      export HBASE_HOME=/opt/module/hbase
      export HIVE_HOME=/opt/module/hive
      
    • 使用软链接将操作 HBase 的 Jar 包关联到 Hive

      ln -s $HBASE_HOME/lib/hbase-common-1.3.1.jar $HIVE_HOME/lib/hbase-common-1.3.1.jar
      ln -s $HBASE_HOME/lib/hbase-server-1.3.1.jar $HIVE_HOME/lib/hbaseserver-1.3.1.jar
      ln -s $HBASE_HOME/lib/hbase-client-1.3.1.jar $HIVE_HOME/lib/hbase-client-1.3.1.jar
      ln -s $HBASE_HOME/lib/hbase-protocol-1.3.1.jar $HIVE_HOME/lib/hbase-protocol-1.3.1.jar
      ln -s $HBASE_HOME/lib/hbase-it-1.3.1.jar $HIVE_HOME/lib/hbase-it-1.3.1.jar
      ln -s $HBASE_HOME/lib/htrace-core-3.1.0-incubating.jar $HIVE_HOME/lib/htrace-core-3.1.0-incubating.jar
      ln -s $HBASE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar
      ln -s $HBASE_HOME/lib/hbase-hadoop-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop-compat-1.3.1.jar
      
    • hive-site.xml 中添加 Zookeeper 连接信息

      <property>
          <name>hive.zookeeper.quorumname>
          <value>hadoop102,hadoop103,hadoop104value>
          <description>The  list  of  ZooKeeper  servers  to  talk  to.  This  is 
          only needed for read/write locks.description>
      property>
      <property>
          <name>hive.zookeeper.client.portname>
          <value>2181value>
          <description>The  port  of  ZooKeeper  servers  to  talk  to.  This  is 
          only needed for read/write locks.description>
      property>
      
    • 下载 Hive 对应版本的源码包,使用 Eclipse 工具创建一个普通 Java 工程 import 这个 hbase-handler 文件夹到工程,在工程下创建一个 lib 文件并导入 hive/lib 下所有的 .jar 文件(add build path),使用 export 重新编译 hive-hbase-handler-1.2.2.jar (取消勾选 lib 目录),将新编译的 jar 包替换掉 hive/lib 下的 jar 包

    2.2 案例实操
    • 案例 1:建立 Hive 表,关联 HBase 表,插入数据到 Hive 表的同时能够影响 HBase 表

      # 在 Hive 中创建表同时关联 HBase,然后分别进入 Hive 和 HBase 查看
      CREATE TABLE hive_hbase_emp_table
      (
          empno int,
          ename string,
          job string,
          mgr int,
          hiredate string,
          sal double,
          comm double,
          deptno int
      )
      STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
      WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno")
      TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");
      
      # 在 Hive 中创建临时中间表,用于 load 文件中的数据
      # 提示:不能将数据直接 load 进 Hive 所关联 HBase 的那张表中
      CREATE TABLE emp
      (
          empno int,
          ename string,
          job string,
          mgr int, 
          hiredate string,
          sal double,
          comm double,
          deptno int
      )
      row format delimited fields terminated by '\t';
      
      # 向 Hive 中间表中 load 数据
      load data local inpath '/home/admin/softwares/data/emp.txt' into table emp;
      
      # 通过 insert 命令将中间表中的数据导入到 Hive 关联 Hbase 的那张表中
      insert into table hive_hbase_emp_table select * from emp;
      
      # 查看 Hive 以及关联的 HBase 表中是否已经成功的同步插入了数据
      select * from hive_hbase_emp_table;
      
      scan 'hbase_emp_table'
      
    • 案例 2:针对已经存在的 HBase 表 hbase_emp_table,在 Hive 中创建一个外部表来关联这张表,并使用 HQL 操作

      # 在 Hive 中创建外部表关联 HBase 表(必须创建外部表)
      CREATE EXTERNAL TABLE relevance_hbase_emp
      (
          empno int,
          ename string,
          job string,
          mgr int,
          hiredate string,
          sal double,
          comm double,
          deptno int
      )
      STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
      WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno") 
      TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");
      
      # 使用 HQL 操作
      select * from relevance_hbase_emp;
      
  • 相关阅读:
    python 获取双色球开奖数据的实现
    物联网毕设 --(STM32f407连接云平台检测数据)
    uniapp 版本检查更新
    【SpringSecurity】九、Base64与JWT
    ubuntu系统vscode中配置C++环境(tasks.json、launch.json)
    shell脚本入门-多命令处理与变量
    深度学习 opencv python 实现中国交通标志识别 计算机竞赛_1
    CUDA安装
    多维时序 | MATLAB实现WOA-CNN-BiGRU-Attention多变量时间序列预测(SE注意力机制)
    linux内核分析:x86,BIOS到bootloader,内核初始化,syscall, 进程与线程
  • 原文地址:https://blog.csdn.net/weixin_44480009/article/details/139754716