创建一个 Maven 工程并引入依赖
<dependency>
<groupId>org.apache.hbasegroupId>
<artifactId>hbase-serverartifactId>
<version>1.3.1version>
dependency>
<dependency>
<groupId>org.apache.hbasegroupId>
<artifactId>hbase-clientartifactId>
<version>1.3.1version>
dependency>
创建 API 测试类并获取 HBase 客户端连接
public class TestAPI {
private static Connection conn = null;
private static Admin admin = null; // DDL 操作客户端
static {
try {
// 1.获取配置信息对象
// 过时API
// HBaseConfiguration conf1 = new HBaseConfiguration();
// conf1.set("hbase.zookeeper.quorum","hadoop102,hadoop103,hadoop104");
// 新API
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","hadoop102,hadoop103,hadoop104");
// 2.创建连接对象(新API)
conn = ConnectionFactory.createConnection(conf);
// 3.创建hbase管理员对象
// 过时API
// HBaseAdmin admin1 = new HBaseAdmin(conf1);
// 新API
admin = conn.getAdmin();
} catch(Exception e) {
e.printStackTrace();
}
}
// 关闭资源
public static void close() {
try {
if(admin != null) {
admin.close();
}
if(conn != null) {
conn.close();
}
} catch(Exception e) {
e.printStackTrace();
}
}
}
public static boolean isTableExist(String tableName) {
// 过时API
// boolean exist = admin1.tableExists(tableName);
// 新API
boolean exist = admin.tableExists(TableName.valueOf(tableName));
return exist;
}
public static void createTable(String tableName, String... cfs) {
// 1.判断传入列族信息
if(cfs.length()<=0) {
System.out.println("必须设置一个列族信息");
return;
}
// 2.判断表是否存在
if(isTableExist(tableName)) {
System.out.println(tableName + "表已经存在");
return;
}
// 3.创建表描述器
HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
// 4.循环添加列族
for(String cfs : cfs) {
// 5.创建列族描述器
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cfs);
// 6.添加单个列族
hTableDescriptor.addFamily(hColumnDescriptor);
}
// 7.创建表
admin.createTable(hTableDescriptor);
System.out.println(tableName + "表创建成功!");
}
public static void deleteTable(String tableName) {
// 1.判断表是否存在
if(!isTableExist(tableName)) {
System.out.println(tableName + "表已经不存在!");
return;
}
// 2.使表下线
admin.disableTable(TableName.valueOf(tableName));
// 3.删除表
admin.deleteTable(TableName.valueOf(tableName));
System.out.println(tableName + "表删除成功!");
}
public static void createNamespace(String ns) {
// 1.创建命名空间描述器
NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(ns).build();
try {
// 2.创建命名空间
admin.createNamespace(namespaceDescriptor);
System.out.println(ns + "命名空间创建成功!");
} catch(NamespaceExistException e) { // 生产上判断 ns 是否存在
System.out.println(ns + "命名空间已经存在!");
} catch(Exception e) {
e.printStackTrace();
}
}
public static void putData(String tableName, String rowKey, String cf, String cn, String value) {
// 1.创建hbase表操作对象
Table table = conn.getTable(TableName.valueOf(tableName));
// 2.创建 put 对象
// HBase 底层存储的数据格式都是 byte[],可以通过 hbase.util 包下的 Bytes 类进行数据类型转换
Put put = new Put(Bytes.toBytes(rowKey));
// 3.给put对象赋值
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn), Bytes.toBytes(value));
// 4.插入数据
table.put(put);
// 5.关闭资源
table.close();
}
public static void getData(String tableName, String rowKey, String cf, String cn) {
// 1.创建hbase表操作对象
Table table = conn.getTable(TableName.valueOf(tableName));
// 2.创建get对象
Get get = new Get(Bytes.toBytes(rowKey));
// 2.1.指定获取的列族
// get.addFamily(Bytes.toBytes(cf));
// 2.2.指定获取的列族和列
// get.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn));
// 2.3.指定获取数据的版本数
// get.setMaxVersions(); // 相当于 scan 'tb',{RAW=>TRUE,VERSIONS=>10}
// 3.获取一行的数据
Result result = table.get(get);
// 4.获取一行的所有 cell
for(Cell cell : result.rawCells()) {
// 5.打印数据信息
System.out.println("family:" + Bytes.toString(CellUtil.cloneFamily(cell)) +
",qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell))
+ ",value:" + Bytes.toString(CellUtil.cloneValue(cell))
);
}
// 6.关闭资源
table.close();
}
public static void scanTable(String tableName) {
// 1.创建hbase表操作对象
Table table = conn.getTable(TableName.valueOf(tableName));
// 2.构建scan对象
Scan scan = new Scan(); // 全表
// Scan scan = new Scan(Bytes.toBytes("1001"),Bytes.toBytes("1003")); // 限定 rowKey 范围,左闭右开
// 3.扫描表
ResultScanner resultScanner = table.getScanner(scan);
// 4.解析resultScanner
for(Result result : resultScanner) { // 按 rowKey 从小到大遍历获取
// 5.解析result
for(Cell cell : result.rawCells()) {
// 6.打印数据信息
System.out.println("rk:" + Bytes.toString(CellUtil.cloneRow(cell)) +
",family:" + Bytes.toString(CellUtil.cloneFamily(cell)) +
",qualifier:" + Bytes.toString(CellUtil.cloneQualifier(cell))
+ ",value:" + Bytes.toString(CellUtil.cloneValue(cell))
);
}
}
// 7.关闭资源
table.close();
}
delete 操作最终还是 put 操作
public static void deleteData(String tableName, String rowKey, String cf, String cn) {
// 1.创建hbase表操作对象
Table table = conn.getTable(TableName.valueOf(tableName));
// 2.构建delete对象
Delete delete = new Delete(Bytes.toBytes(rowKey)); // 相当于 deleteall 命令
// 2.1.设置删除列
// delete.addColumn(Bytes.toBytes(cf),Bytes.toBytes(cn)); // 删除指定版本的列,不指定则为最大版本,小于该版本的列信息可以查询出来(慎用)
// delete.addColumns(Bytes.toBytes(cf),Bytes.toBytes(cn)); // 删除列的所有版本
// 2.2.设置删除的列族
// delete.addFamily(Bytes.toBytes(cf));
// 3.删除数据
table.delete(delete);
// 4.关闭资源
table.close();
}
查看 MR 操作 HBase 所需的 jar 包:
cd /opt/module/hbase
bin/hbase mapredcp
在 Hadoop 中导入环境变量
临时生效:
# 在命令行执行
export HBASE_HOME=/opt/module/hbase
export HADOOP_HOME=/opt/module/hadoop-2.7.2
export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`
永久生效:
# 在 /etc/profile.d/my_env.sh 配置
export HBASE_HOME=/opt/module/hbase
export HADOOP_HOME=/opt/module/hadoop-2.7.2
# 在 hadoop-env.sh 中配置,在有关 HADOOP_CLASSPATH 的 for 循环之后添加
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*
分发配置到其他节点
启动 Hadoop 集群和 HBase 集群
统计 stu 表中有多少行数据
/opt/module/hadoop-2.7.2/bin/yarn jar \
/opt/module/hbase/lib/hbase-server-1.3.1.jar \
rowcounter student
使用 MapReduce 将本地数据导入到 HBase
# 在本地创建一个 tsv 格式的文件:fruit.tsv
vim fruit.tsv
1001 Apple Red
1002 Pear Yellow
1003 Pineapple Yellow
# 在 HDFS 中创建 input_fruit 文件夹并上传 fruit.tsv 文件
/opt/module/hadoop-2.7.2/bin/hdfs dfs -mkdir /input_fruit/
/opt/module/hadoop-2.7.2/bin/hdfs dfs -put fruit.tsv /input_fruit/
# 创建 Hbase 表
Hbase(main):001:0> create 'fruit','info'
# 执行 MapReduce 到 HBase 的 fruit 表中(若表不存在则报错)
/opt/module/hadoop-2.7.2/bin/yarn jar \
/opt/module/hbase/lib/hbase-server-1.3.1.jar \
importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit \
hdfs://hadoop102:9000/input_fruit/fruit.tsv
# 使用 scan 命令查看导入后的结果
Hbase(main):001:0> scan 'fruit'
案例 1:实现将 HDFS 中的数据写入到 Hbase 表中
编码:
// 构建 FruitMapper 用于读取 HDFS 中的文件数据
public class FruitMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
@override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(key, value);
}
}
// 构建 FruitReducer 用于将 HDFS 中的文件数据写入 Hbase
// TableReducer 默认的输出value类型是 Mutation,其子类有 put/delete 等
public class FruitReducer extends TableReducer<LongWritable, Text, NullWritable> {
@override
protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 1.遍历获取每行数据
for(Text value : values) {
// 2.切割一行数据
String[] fields = value.toString().split("\t");
// 3.构建put对象
Put put = new Put(Bytes.toBytes(fields[0]));
// 4.为put对象赋值
put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(fields[1]));
put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("color"),Bytes.toBytes(fields[2]));
// 5.数据写入hbase
context.write(NullWritable.get(),put);
}
}
}
// 构建 FruitDriver
public class FruitDriver implements Tool {
private Configuration conf = null;
@override
public int run(String[] args) throws Exception {
// 1.获取job
Job job = Job.getInstance(conf);
// 2.设置驱动类路径
job.setJarByClass(FruitDriver.class);
// 3.设置Mapper及输出KV类型
job.setMapperClass(FruitMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
// 4.设置Reducer(不能使用普通的 Reducer 设置)
TableMapReduceUtil.initTableReducerJob(args[1], FruitReducer.class, job);
// 5.设置输入路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 6.提交job
boolean result = job.waitForCompletion(true);
return result ? 0 : 1;
}
@override
public void setConf(Configuration conf) {
this.conf = conf;
}
@override
public Configuration getConf() {
return conf;
}
public static void main(String[] args) {
try {
Configuration conf = new Configuration();
int status = ToolRunner.run(conf, new FruitDriver(), args);
System.exit(status);
} catch(Exception e) {
e.printStackTrace();
}
}
}
将代码打包后上传到 hbase 集群服务器并执行
# 首先创建 hbase fruit 表
# 执行:
/opt/module/hadoop-2.7.2/bin/yarn jar \
/opt/module/hbase/Hbase-0.0.1-SNAPSHOT.jar com.xxx.FruitDriver \ /input_fruit/fruit.tsv fruit
案例 2:将 fruit 表中的一部分数据,通过 MR 迁入到 fruit2 表中
编码
// 构建 Fruit2Mapper 用于读取 Hbase 中的 Fruit 表数据
// TableMapper 默认的KV输入类型为 ImmutableBytesWritable, Result
public class Fruit2Mapper extends TableMapper<ImmutableBytesWritable, Put> {
@override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
// 1.构建PUT对象
Put put = new Put(key.get()); // key 是 rowKey 值
// 2.解析Result
for(Cell cell : value.rawCells()) {
// 3.获取列为 name 的 cell
if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
// 4.为put对象赋值
put.add(cell);
}
}
// 5.写出
context.write(key, put);
}
}
// 构建 Fruit2Reducer 用于将数据写入 Hbase
// TableReducer 默认的输出value类型是 Mutation,其子类有 put/delete 等
public class Fruit2Reducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
@override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
// 遍历写出
for(Put put : values) {
context.write(NullWritable.get(), put);
}
}
}
// 构建 Fruit2Driver
public class Fruit2Driver implements Tool {
private Configuration conf = null;
@override
public int run(String[] args) throws Exception {
// 1.获取job
Job job = Job.getInstance(conf);
// 2.设置驱动类路径
job.setJarByClass(Fruit2Driver.class);
// 3.设置Mapper及输出KV类型(不能使用普通的 Mapper 设置)
TableMapReduceUtil.initTableMapperJob(args[0], new Scan(), Fruit2Mapper.class, ImmutableBytesWritable.class, Put.class, job);
// 4.设置Reducer(不能使用普通的 Reducer 设置)
TableMapReduceUtil.initTableReducerJob(args[1], Fruit2Reducer.class, job);
// 5.提交job
boolean result = job.waitForCompletion(true);
return result ? 0 : 1;
}
@override
public void setConf(Configuration conf) {
this.conf = conf;
}
@override
public Configuration getConf() {
return conf;
}
public static void main(String[] args) {
try {
// Configuration conf = new Configuration();
Configuration conf = HbaseConfiguration.create();
int status = ToolRunner.run(conf, new Fruit2Driver(), args);
System.exit(status);
} catch(Exception e) {
e.printStackTrace();
}
}
}
集群测试:将代码打包后上传到 hbase 集群服务器并执行
# 首先创建 hbase fruit2 表
# 执行:
/opt/module/hadoop-2.7.2/bin/yarn jar \
/opt/module/hbase/Hbase-0.0.1-SNAPSHOT.jar com.xxx.Fruit2Driver \
fruit fruit2
本地测试:
hbase-site.xml 文件hbase/conf/hbase-site.xml 的内容拷贝到上面创建的文件中在 /etc/profile.d/my_env.sh 中配置 Hive 和 HBase 环境变量
vim /etc/profile.d/my_env.sh
export HBASE_HOME=/opt/module/hbase
export HIVE_HOME=/opt/module/hive
使用软链接将操作 HBase 的 Jar 包关联到 Hive
ln -s $HBASE_HOME/lib/hbase-common-1.3.1.jar $HIVE_HOME/lib/hbase-common-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-server-1.3.1.jar $HIVE_HOME/lib/hbaseserver-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-client-1.3.1.jar $HIVE_HOME/lib/hbase-client-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-protocol-1.3.1.jar $HIVE_HOME/lib/hbase-protocol-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-it-1.3.1.jar $HIVE_HOME/lib/hbase-it-1.3.1.jar
ln -s $HBASE_HOME/lib/htrace-core-3.1.0-incubating.jar $HIVE_HOME/lib/htrace-core-3.1.0-incubating.jar
ln -s $HBASE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-hadoop-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop-compat-1.3.1.jar
在 hive-site.xml 中添加 Zookeeper 连接信息
<property>
<name>hive.zookeeper.quorumname>
<value>hadoop102,hadoop103,hadoop104value>
<description>The list of ZooKeeper servers to talk to. This is
only needed for read/write locks.description>
property>
<property>
<name>hive.zookeeper.client.portname>
<value>2181value>
<description>The port of ZooKeeper servers to talk to. This is
only needed for read/write locks.description>
property>
下载 Hive 对应版本的源码包,使用 Eclipse 工具创建一个普通 Java 工程 import 这个 hbase-handler 文件夹到工程,在工程下创建一个 lib 文件并导入 hive/lib 下所有的 .jar 文件(add build path),使用 export 重新编译 hive-hbase-handler-1.2.2.jar (取消勾选 lib 目录),将新编译的 jar 包替换掉 hive/lib 下的 jar 包
案例 1:建立 Hive 表,关联 HBase 表,插入数据到 Hive 表的同时能够影响 HBase 表
# 在 Hive 中创建表同时关联 HBase,然后分别进入 Hive 和 HBase 查看
CREATE TABLE hive_hbase_emp_table
(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int
)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno")
TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");
# 在 Hive 中创建临时中间表,用于 load 文件中的数据
# 提示:不能将数据直接 load 进 Hive 所关联 HBase 的那张表中
CREATE TABLE emp
(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int
)
row format delimited fields terminated by '\t';
# 向 Hive 中间表中 load 数据
load data local inpath '/home/admin/softwares/data/emp.txt' into table emp;
# 通过 insert 命令将中间表中的数据导入到 Hive 关联 Hbase 的那张表中
insert into table hive_hbase_emp_table select * from emp;
# 查看 Hive 以及关联的 HBase 表中是否已经成功的同步插入了数据
select * from hive_hbase_emp_table;
scan 'hbase_emp_table'
案例 2:针对已经存在的 HBase 表 hbase_emp_table,在 Hive 中创建一个外部表来关联这张表,并使用 HQL 操作
# 在 Hive 中创建外部表关联 HBase 表(必须创建外部表)
CREATE EXTERNAL TABLE relevance_hbase_emp
(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int
)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno")
TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");
# 使用 HQL 操作
select * from relevance_hbase_emp;