最近项目中用到了Hbase相关的操作并封装成工具类,我的Hbase服务器端版本是2.1.0,图示如下:

特此记录便于日后查阅。
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-shaded-client</artifactId>
- <version>2.1.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>3.0.0</version>
- </dependency>
二、application.yml 项目配置
此处我是自定义HBase配置,后面会有专门的配置类来加载这个配置
- datasource:
- hbase:
- zookeeper:
- port: 2181
- quorum: 10.0.61.12,10.0.61.22,10.0.61.24
- znode:
- parent: ''
三、HbaseConfig 自定义配置类
- import lombok.Data;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.client.Connection;
- import org.apache.hadoop.hbase.client.ConnectionFactory;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.stereotype.Component;
-
- import java.io.IOException;
-
- /**
- * @description: Hbase配置类
- * @author: zhangzhixiang
- * @createDate: 2022/11/24
- * @version: 1.0
- */
- @Data
- @Component
- @Configuration
- public class HbaseConfig {
-
- @Value("${datasource.hbase.zookeeper.quorum}")
- private String zookeeper;
-
- @Value("${datasource.hbase.zookeeper.znode.parent}")
- private String parent;
-
- @Value("${datasource.hbase.zookeeper.port}")
- private String port;
-
- public Connection getConnection() throws IOException {
- org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();
- config.set("hbase.zookeeper.quorum", zookeeper);
- config.set("hbase.zookeeper.property.clientPort", port);
- if (parent != null && !"".equals(parent)) {
- config.set("zookeeper.znode.parent", parent);
- }
- Connection connection = ConnectionFactory.createConnection(config);
- return connection;
- }
- }
四、HbaseUtil 工具类
首先添加 SpringContext 工具类,下面会用到:
- import org.springframework.beans.BeansException;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.ApplicationContextAware;
- import org.springframework.core.env.Environment;
- import org.springframework.stereotype.Component;
-
- /**
- * @Description:
- * @Author:zhangzhixiang
- * @Date: 2022/7/25
- * @Version: 1.0
- */
- @Component
- public class SpringContext implements ApplicationContextAware {
- public static ApplicationContext context;
- public static Environment env;
-
- @Override
- public void setApplicationContext(ApplicationContext context) throws BeansException {
- SpringContext.context = context;
- SpringContext.env = context.getEnvironment();
- }
-
- public static Object getBean(String name) {
- return context.getBean(name);
- }
-
- public static
T getBean(Class clazz) { - return context.getBean(clazz);
- }
-
- public static ApplicationContext getContext() {
- return context;
- }
-
- public static Environment getEnv() {
- return env;
- }
-
- public static String getProperty(String key) {
- return getProperty(key, "");
- }
-
- public static String getProperty(String key, String defaultValue) {
- return env.getProperty(key, defaultValue);
- }
-
- public static
T getProperty(String key, Class targetType) { - return env.getProperty(key, targetType);
- }
-
- public static String getActiveProfile() {
- return env.getActiveProfiles()[0];
- }
- }
然后我们来写 HbaseUtil 工具类的代码:
- import com.swkj.common.base.context.SpringContext;
- import com.swkj.common.base.log.GLog;
- import com.swkj.common.base.log.LogFactory;
- import com.swkj.common.hbase.config.HbaseConfig;
- import org.apache.hadoop.hbase.Cell;
- import org.apache.hadoop.hbase.HColumnDescriptor;
- import org.apache.hadoop.hbase.HTableDescriptor;
- import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.client.*;
- import org.apache.hadoop.hbase.filter.CompareFilter;
- import org.apache.hadoop.hbase.filter.RowFilter;
- import org.apache.hadoop.hbase.filter.SubstringComparator;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.springframework.context.annotation.DependsOn;
- import org.springframework.stereotype.Component;
-
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.NavigableMap;
-
- /**
- * @description: Hbase工具类
- * @author: zhangzhixiang
- * @createDate: 2022/11/24
- * @version: 1.0
- */
- @DependsOn("springContext")
- @Component
- public class HbaseUtil {
-
- private static final GLog LOG = LogFactory.getLogger(HbaseUtil.class);
- private static HbaseConfig hbaseConfig = (HbaseConfig) SpringContext.getBean("hbaseConfig");
- private static Connection connection = null;
- private static Admin admin = null;
-
- private HbaseUtil() {
- if (connection == null) {
- try {
- connection = hbaseConfig.getConnection();
- admin = connection.getAdmin();
- } catch (IOException e) {
- LOG.error("HbaseUtils实例初始化失败!错误信息为:" + e.getMessage(), e);
- }
- }
- }
-
- /**
- * 创建表
- *
- * @param tableName 表名
- * @param columnFamily 列族(数组)
- */
- public void createTable(String tableName, String[] columnFamily) throws IOException {
- TableName name = TableName.valueOf(tableName);
- //如果存在则删除
- if (admin.tableExists(name)) {
- admin.disableTable(name);
- admin.deleteTable(name);
- LOG.error("create htable error! this table {} already exists!", name);
- } else {
- HTableDescriptor desc = new HTableDescriptor(name);
- for (String cf : columnFamily) {
- desc.addFamily(new HColumnDescriptor(cf));
- }
- admin.createTable(desc);
- }
- }
-
- /**
- * 插入记录(单行单列族-多列多值)
- *
- * @param tableName 表名
- * @param row 行名
- * @param columnFamilys 列族名
- * @param columns 列名(数组)
- * @param values 值(数组)(且需要和列一一对应)
- */
- public void insertRecords(String tableName, String row, String columnFamilys, String[] columns, String[] values) throws IOException {
- TableName name = TableName.valueOf(tableName);
- Table table = connection.getTable(name);
- Put put = new Put(Bytes.toBytes(row));
- for (int i = 0; i < columns.length; i++) {
- put.addColumn(Bytes.toBytes(columnFamilys), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i]));
- table.put(put);
- }
- }
-
- /**
- * 插入记录(单行单列族-单列单值)
- *
- * @param tableName 表名
- * @param row 行名
- * @param columnFamily 列族名
- * @param column 列名
- * @param value 值
- */
- public void insertOneRecord(String tableName, String row, String columnFamily, String column, String value) throws IOException {
- TableName name = TableName.valueOf(tableName);
- Table table = connection.getTable(name);
- Put put = new Put(Bytes.toBytes(row));
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
- table.put(put);
- }
-
- /**
- * 删除一行记录
- *
- * @param tablename 表名
- * @param rowkey 行名
- */
- public void deleteRow(String tablename, String rowkey) throws IOException {
- TableName name = TableName.valueOf(tablename);
- Table table = connection.getTable(name);
- Delete d = new Delete(rowkey.getBytes());
- table.delete(d);
- }
-
- /**
- * 删除单行单列族记录
- *
- * @param tablename 表名
- * @param rowkey 行名
- * @param columnFamily 列族名
- */
- public void deleteColumnFamily(String tablename, String rowkey, String columnFamily) throws IOException {
- TableName name = TableName.valueOf(tablename);
- Table table = connection.getTable(name);
- Delete d = new Delete(rowkey.getBytes()).addFamily(Bytes.toBytes(columnFamily));
- table.delete(d);
- }
-
- /**
- * 删除单行单列族单列记录
- *
- * @param tablename 表名
- * @param rowkey 行名
- * @param columnFamily 列族名
- * @param column 列名
- */
- public void deleteColumn(String tablename, String rowkey, String columnFamily, String column) throws IOException {
- TableName name = TableName.valueOf(tablename);
- Table table = connection.getTable(name);
- Delete d = new Delete(rowkey.getBytes()).addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
- table.delete(d);
- }
-
- /**
- * 查找一行记录
- *
- * @param tablename 表名
- * @param rowKey 行名
- */
- public static String selectRow(String tablename, String rowKey) throws IOException {
- String record = "";
- TableName name = TableName.valueOf(tablename);
- Table table = connection.getTable(name);
- Get g = new Get(rowKey.getBytes());
- Result rs = table.get(g);
- NavigableMap<byte[], NavigableMap<byte[], NavigableMap
byte[]>>> map = rs.getMap(); - for (Cell cell : rs.rawCells()) {
- StringBuffer stringBuffer = new StringBuffer().append(Bytes.toString(cell.getRowArray())).append("\t")
- .append(Bytes.toString(cell.getFamilyArray())).append("\t")
- .append(Bytes.toString(cell.getQualifierArray())).append("\t")
- .append(Bytes.toString(cell.getValueArray())).append("\n");
- String str = stringBuffer.toString();
- record += str;
- }
- return record;
- }
-
- /**
- * 查找单行单列族单列记录
- *
- * @param tablename 表名
- * @param rowKey 行名
- * @param columnFamily 列族名
- * @param column 列名
- * @return
- */
- public static String selectValue(String tablename, String rowKey, String columnFamily, String column) throws IOException {
- TableName name = TableName.valueOf(tablename);
- Table table = connection.getTable(name);
- Get g = new Get(rowKey.getBytes());
- g.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
- Result rs = table.get(g);
- return Bytes.toString(rs.value());
- }
-
- /**
- * 查询表中所有行(Scan方式)
- *
- * @param tablename
- * @return
- */
- public String scanAllRecord(String tablename) throws IOException {
- String record = "";
- TableName name = TableName.valueOf(tablename);
- Table table = connection.getTable(name);
- Scan scan = new Scan();
- ResultScanner scanner = table.getScanner(scan);
- try {
- for (Result result : scanner) {
- for (Cell cell : result.rawCells()) {
- StringBuffer stringBuffer = new StringBuffer().append(Bytes.toString(cell.getRowArray())).append("\t")
- .append(Bytes.toString(cell.getFamilyArray())).append("\t")
- .append(Bytes.toString(cell.getQualifierArray())).append("\t")
- .append(Bytes.toString(cell.getValueArray())).append("\n");
- String str = stringBuffer.toString();
- record += str;
- }
- }
- } finally {
- if (scanner != null) {
- scanner.close();
- }
- }
- return record;
- }
-
- /**
- * 根据rowkey关键字查询报告记录
- *
- * @param tablename
- * @param rowKeyword
- * @return
- */
- public List scanReportDataByRowKeyword(String tablename, String rowKeyword) throws IOException {
- ArrayList
list = new ArrayList<>(); -
- Table table = connection.getTable(TableName.valueOf(tablename));
- Scan scan = new Scan();
-
- //添加行键过滤器,根据关键字匹配
- RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(rowKeyword));
- scan.setFilter(rowFilter);
-
- ResultScanner scanner = table.getScanner(scan);
- try {
- for (Result result : scanner) {
- //TODO 此处根据业务来自定义实现
- list.add(null);
- }
- } finally {
- if (scanner != null) {
- scanner.close();
- }
- }
- return list;
- }
-
- /**
- * 根据rowkey关键字和时间戳范围查询报告记录
- *
- * @param tablename
- * @param rowKeyword
- * @return
- */
- public List scanReportDataByRowKeywordTimestamp(String tablename, String rowKeyword, Long minStamp, Long maxStamp) throws IOException {
- ArrayList
list = new ArrayList<>(); -
- Table table = connection.getTable(TableName.valueOf(tablename));
- Scan scan = new Scan();
- //添加scan的时间范围
- scan.setTimeRange(minStamp, maxStamp);
-
- RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(rowKeyword));
- scan.setFilter(rowFilter);
-
- ResultScanner scanner = table.getScanner(scan);
- try {
- for (Result result : scanner) {
- //TODO 此处根据业务来自定义实现
- list.add(null);
- }
- } finally {
- if (scanner != null) {
- scanner.close();
- }
- }
- return list;
- }
-
- /**
- * 删除表操作
- *
- * @param tablename
- */
- public void deleteTable(String tablename) throws IOException {
- TableName name = TableName.valueOf(tablename);
- if (admin.tableExists(name)) {
- admin.disableTable(name);
- admin.deleteTable(name);
- }
- }
- }
五、使用
接下来只需要在项目业务类里注入hbaseUtils就可以使用了:
- @Autowired
- private HbaseUtil hbaseUtil;
测试方法:
- import com.swkj.common.hbase.utils.HbaseUtil;
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
- import org.springframework.test.context.ActiveProfiles;
- import org.springframework.test.context.junit4.SpringRunner;
-
- /**
- * @description: Hbase工具类测试
- * @author: zhangzhixiang
- * @createDate: 2022/11/24
- * @version: 1.0
- */
- @RunWith(SpringRunner.class)
- @SpringBootTest
- @ActiveProfiles(profiles = "local")
- public class HbaseServiceTest {
-
- @Autowired
- private HbaseUtil hbaseUtil;
-
- @Test
- public void testHbase() {
- try {
- hbaseUtil.createTable("Student", new String[]{"StuInfo", "Grades"});
- hbaseUtil.insertOneRecord("Student", "0001", "StuInfo", "name", "Tom Green");
- hbaseUtil.insertOneRecord("Student", "0002", "StuInfo", "Age", "18");
- System.out.println("=================" + hbaseUtil.selectValue("Student", "0001", "StuInfo", "name"));
- System.out.println("=================" + hbaseUtil.selectValue("Student", "0002", "StuInfo", "Age"));
- System.out.println("=================" + hbaseUtil.selectRow("Student", "0001"));
- System.out.println("=================" + hbaseUtil.selectRow("Student", "0002"));
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
到此 SpringBoot 封装 HBase 操作工具类介绍完成。