• SpringBoot 封装 HBase 操作工具类


            最近项目中用到了Hbase相关的操作并封装成工具类,我的Hbase服务器端版本是2.1.0,图示如下:

            特此记录便于日后查阅。

    一、pom.xml 依赖

    1. <dependency>
    2. <groupId>org.apache.hbase</groupId>
    3. <artifactId>hbase-shaded-client</artifactId>
    4. <version>2.1.0</version>
    5. </dependency>
    6. <dependency>
    7. <groupId>org.apache.hadoop</groupId>
    8. <artifactId>hadoop-common</artifactId>
    9. <version>3.0.0</version>
    10. </dependency>

    二、application.yml 项目配置

            此处我是自定义HBase配置,后面会有专门的配置类来加载这个配置

    1. datasource:
    2. hbase:
    3. zookeeper:
    4. port: 2181
    5. quorum: 10.0.61.12,10.0.61.22,10.0.61.24
    6. znode:
    7. parent: ''

    三、HbaseConfig 自定义配置类

    1. import lombok.Data;
    2. import org.apache.hadoop.hbase.HBaseConfiguration;
    3. import org.apache.hadoop.hbase.client.Connection;
    4. import org.apache.hadoop.hbase.client.ConnectionFactory;
    5. import org.springframework.beans.factory.annotation.Value;
    6. import org.springframework.context.annotation.Configuration;
    7. import org.springframework.stereotype.Component;
    8. import java.io.IOException;
    9. /**
    10. * @description: Hbase配置类
    11. * @author: zhangzhixiang
    12. * @createDate: 2022/11/24
    13. * @version: 1.0
    14. */
    15. @Data
    16. @Component
    17. @Configuration
    18. public class HbaseConfig {
    19. @Value("${datasource.hbase.zookeeper.quorum}")
    20. private String zookeeper;
    21. @Value("${datasource.hbase.zookeeper.znode.parent}")
    22. private String parent;
    23. @Value("${datasource.hbase.zookeeper.port}")
    24. private String port;
    25. public Connection getConnection() throws IOException {
    26. org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();
    27. config.set("hbase.zookeeper.quorum", zookeeper);
    28. config.set("hbase.zookeeper.property.clientPort", port);
    29. if (parent != null && !"".equals(parent)) {
    30. config.set("zookeeper.znode.parent", parent);
    31. }
    32. Connection connection = ConnectionFactory.createConnection(config);
    33. return connection;
    34. }
    35. }

    四、HbaseUtil 工具类

            首先添加 SpringContext 工具类,下面会用到:

    1. import org.springframework.beans.BeansException;
    2. import org.springframework.context.ApplicationContext;
    3. import org.springframework.context.ApplicationContextAware;
    4. import org.springframework.core.env.Environment;
    5. import org.springframework.stereotype.Component;
    6. /**
    7. * @Description:
    8. * @Author:zhangzhixiang
    9. * @Date: 2022/7/25
    10. * @Version: 1.0
    11. */
    12. @Component
    13. public class SpringContext implements ApplicationContextAware {
    14. public static ApplicationContext context;
    15. public static Environment env;
    16. @Override
    17. public void setApplicationContext(ApplicationContext context) throws BeansException {
    18. SpringContext.context = context;
    19. SpringContext.env = context.getEnvironment();
    20. }
    21. public static Object getBean(String name) {
    22. return context.getBean(name);
    23. }
    24. public static T getBean(Class clazz) {
    25. return context.getBean(clazz);
    26. }
    27. public static ApplicationContext getContext() {
    28. return context;
    29. }
    30. public static Environment getEnv() {
    31. return env;
    32. }
    33. public static String getProperty(String key) {
    34. return getProperty(key, "");
    35. }
    36. public static String getProperty(String key, String defaultValue) {
    37. return env.getProperty(key, defaultValue);
    38. }
    39. public static T getProperty(String key, Class targetType) {
    40. return env.getProperty(key, targetType);
    41. }
    42. public static String getActiveProfile() {
    43. return env.getActiveProfiles()[0];
    44. }
    45. }

             然后我们来写 HbaseUtil 工具类的代码:

    1. import com.swkj.common.base.context.SpringContext;
    2. import com.swkj.common.base.log.GLog;
    3. import com.swkj.common.base.log.LogFactory;
    4. import com.swkj.common.hbase.config.HbaseConfig;
    5. import org.apache.hadoop.hbase.Cell;
    6. import org.apache.hadoop.hbase.HColumnDescriptor;
    7. import org.apache.hadoop.hbase.HTableDescriptor;
    8. import org.apache.hadoop.hbase.TableName;
    9. import org.apache.hadoop.hbase.client.*;
    10. import org.apache.hadoop.hbase.filter.CompareFilter;
    11. import org.apache.hadoop.hbase.filter.RowFilter;
    12. import org.apache.hadoop.hbase.filter.SubstringComparator;
    13. import org.apache.hadoop.hbase.util.Bytes;
    14. import org.springframework.context.annotation.DependsOn;
    15. import org.springframework.stereotype.Component;
    16. import java.io.IOException;
    17. import java.util.ArrayList;
    18. import java.util.List;
    19. import java.util.NavigableMap;
    20. /**
    21. * @description: Hbase工具类
    22. * @author: zhangzhixiang
    23. * @createDate: 2022/11/24
    24. * @version: 1.0
    25. */
    26. @DependsOn("springContext")
    27. @Component
    28. public class HbaseUtil {
    29. private static final GLog LOG = LogFactory.getLogger(HbaseUtil.class);
    30. private static HbaseConfig hbaseConfig = (HbaseConfig) SpringContext.getBean("hbaseConfig");
    31. private static Connection connection = null;
    32. private static Admin admin = null;
    33. private HbaseUtil() {
    34. if (connection == null) {
    35. try {
    36. connection = hbaseConfig.getConnection();
    37. admin = connection.getAdmin();
    38. } catch (IOException e) {
    39. LOG.error("HbaseUtils实例初始化失败!错误信息为:" + e.getMessage(), e);
    40. }
    41. }
    42. }
    43. /**
    44. * 创建表
    45. *
    46. * @param tableName 表名
    47. * @param columnFamily 列族(数组)
    48. */
    49. public void createTable(String tableName, String[] columnFamily) throws IOException {
    50. TableName name = TableName.valueOf(tableName);
    51. //如果存在则删除
    52. if (admin.tableExists(name)) {
    53. admin.disableTable(name);
    54. admin.deleteTable(name);
    55. LOG.error("create htable error! this table {} already exists!", name);
    56. } else {
    57. HTableDescriptor desc = new HTableDescriptor(name);
    58. for (String cf : columnFamily) {
    59. desc.addFamily(new HColumnDescriptor(cf));
    60. }
    61. admin.createTable(desc);
    62. }
    63. }
    64. /**
    65. * 插入记录(单行单列族-多列多值)
    66. *
    67. * @param tableName 表名
    68. * @param row 行名
    69. * @param columnFamilys 列族名
    70. * @param columns 列名(数组)
    71. * @param values 值(数组)(且需要和列一一对应)
    72. */
    73. public void insertRecords(String tableName, String row, String columnFamilys, String[] columns, String[] values) throws IOException {
    74. TableName name = TableName.valueOf(tableName);
    75. Table table = connection.getTable(name);
    76. Put put = new Put(Bytes.toBytes(row));
    77. for (int i = 0; i < columns.length; i++) {
    78. put.addColumn(Bytes.toBytes(columnFamilys), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i]));
    79. table.put(put);
    80. }
    81. }
    82. /**
    83. * 插入记录(单行单列族-单列单值)
    84. *
    85. * @param tableName 表名
    86. * @param row 行名
    87. * @param columnFamily 列族名
    88. * @param column 列名
    89. * @param value 值
    90. */
    91. public void insertOneRecord(String tableName, String row, String columnFamily, String column, String value) throws IOException {
    92. TableName name = TableName.valueOf(tableName);
    93. Table table = connection.getTable(name);
    94. Put put = new Put(Bytes.toBytes(row));
    95. put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
    96. table.put(put);
    97. }
    98. /**
    99. * 删除一行记录
    100. *
    101. * @param tablename 表名
    102. * @param rowkey 行名
    103. */
    104. public void deleteRow(String tablename, String rowkey) throws IOException {
    105. TableName name = TableName.valueOf(tablename);
    106. Table table = connection.getTable(name);
    107. Delete d = new Delete(rowkey.getBytes());
    108. table.delete(d);
    109. }
    110. /**
    111. * 删除单行单列族记录
    112. *
    113. * @param tablename 表名
    114. * @param rowkey 行名
    115. * @param columnFamily 列族名
    116. */
    117. public void deleteColumnFamily(String tablename, String rowkey, String columnFamily) throws IOException {
    118. TableName name = TableName.valueOf(tablename);
    119. Table table = connection.getTable(name);
    120. Delete d = new Delete(rowkey.getBytes()).addFamily(Bytes.toBytes(columnFamily));
    121. table.delete(d);
    122. }
    123. /**
    124. * 删除单行单列族单列记录
    125. *
    126. * @param tablename 表名
    127. * @param rowkey 行名
    128. * @param columnFamily 列族名
    129. * @param column 列名
    130. */
    131. public void deleteColumn(String tablename, String rowkey, String columnFamily, String column) throws IOException {
    132. TableName name = TableName.valueOf(tablename);
    133. Table table = connection.getTable(name);
    134. Delete d = new Delete(rowkey.getBytes()).addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
    135. table.delete(d);
    136. }
    137. /**
    138. * 查找一行记录
    139. *
    140. * @param tablename 表名
    141. * @param rowKey 行名
    142. */
    143. public static String selectRow(String tablename, String rowKey) throws IOException {
    144. String record = "";
    145. TableName name = TableName.valueOf(tablename);
    146. Table table = connection.getTable(name);
    147. Get g = new Get(rowKey.getBytes());
    148. Result rs = table.get(g);
    149. NavigableMap<byte[], NavigableMap<byte[], NavigableMapbyte[]>>> map = rs.getMap();
    150. for (Cell cell : rs.rawCells()) {
    151. StringBuffer stringBuffer = new StringBuffer().append(Bytes.toString(cell.getRowArray())).append("\t")
    152. .append(Bytes.toString(cell.getFamilyArray())).append("\t")
    153. .append(Bytes.toString(cell.getQualifierArray())).append("\t")
    154. .append(Bytes.toString(cell.getValueArray())).append("\n");
    155. String str = stringBuffer.toString();
    156. record += str;
    157. }
    158. return record;
    159. }
    160. /**
    161. * 查找单行单列族单列记录
    162. *
    163. * @param tablename 表名
    164. * @param rowKey 行名
    165. * @param columnFamily 列族名
    166. * @param column 列名
    167. * @return
    168. */
    169. public static String selectValue(String tablename, String rowKey, String columnFamily, String column) throws IOException {
    170. TableName name = TableName.valueOf(tablename);
    171. Table table = connection.getTable(name);
    172. Get g = new Get(rowKey.getBytes());
    173. g.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
    174. Result rs = table.get(g);
    175. return Bytes.toString(rs.value());
    176. }
    177. /**
    178. * 查询表中所有行(Scan方式)
    179. *
    180. * @param tablename
    181. * @return
    182. */
    183. public String scanAllRecord(String tablename) throws IOException {
    184. String record = "";
    185. TableName name = TableName.valueOf(tablename);
    186. Table table = connection.getTable(name);
    187. Scan scan = new Scan();
    188. ResultScanner scanner = table.getScanner(scan);
    189. try {
    190. for (Result result : scanner) {
    191. for (Cell cell : result.rawCells()) {
    192. StringBuffer stringBuffer = new StringBuffer().append(Bytes.toString(cell.getRowArray())).append("\t")
    193. .append(Bytes.toString(cell.getFamilyArray())).append("\t")
    194. .append(Bytes.toString(cell.getQualifierArray())).append("\t")
    195. .append(Bytes.toString(cell.getValueArray())).append("\n");
    196. String str = stringBuffer.toString();
    197. record += str;
    198. }
    199. }
    200. } finally {
    201. if (scanner != null) {
    202. scanner.close();
    203. }
    204. }
    205. return record;
    206. }
    207. /**
    208. * 根据rowkey关键字查询报告记录
    209. *
    210. * @param tablename
    211. * @param rowKeyword
    212. * @return
    213. */
    214. public List scanReportDataByRowKeyword(String tablename, String rowKeyword) throws IOException {
    215. ArrayList list = new ArrayList<>();
    216. Table table = connection.getTable(TableName.valueOf(tablename));
    217. Scan scan = new Scan();
    218. //添加行键过滤器,根据关键字匹配
    219. RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(rowKeyword));
    220. scan.setFilter(rowFilter);
    221. ResultScanner scanner = table.getScanner(scan);
    222. try {
    223. for (Result result : scanner) {
    224. //TODO 此处根据业务来自定义实现
    225. list.add(null);
    226. }
    227. } finally {
    228. if (scanner != null) {
    229. scanner.close();
    230. }
    231. }
    232. return list;
    233. }
    234. /**
    235. * 根据rowkey关键字和时间戳范围查询报告记录
    236. *
    237. * @param tablename
    238. * @param rowKeyword
    239. * @return
    240. */
    241. public List scanReportDataByRowKeywordTimestamp(String tablename, String rowKeyword, Long minStamp, Long maxStamp) throws IOException {
    242. ArrayList list = new ArrayList<>();
    243. Table table = connection.getTable(TableName.valueOf(tablename));
    244. Scan scan = new Scan();
    245. //添加scan的时间范围
    246. scan.setTimeRange(minStamp, maxStamp);
    247. RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(rowKeyword));
    248. scan.setFilter(rowFilter);
    249. ResultScanner scanner = table.getScanner(scan);
    250. try {
    251. for (Result result : scanner) {
    252. //TODO 此处根据业务来自定义实现
    253. list.add(null);
    254. }
    255. } finally {
    256. if (scanner != null) {
    257. scanner.close();
    258. }
    259. }
    260. return list;
    261. }
    262. /**
    263. * 删除表操作
    264. *
    265. * @param tablename
    266. */
    267. public void deleteTable(String tablename) throws IOException {
    268. TableName name = TableName.valueOf(tablename);
    269. if (admin.tableExists(name)) {
    270. admin.disableTable(name);
    271. admin.deleteTable(name);
    272. }
    273. }
    274. }

    五、使用

            接下来只需要在项目业务类里注入hbaseUtils就可以使用了:

    1. @Autowired
    2. private HbaseUtil hbaseUtil;

            测试方法:

    1. import com.swkj.common.hbase.utils.HbaseUtil;
    2. import org.junit.Test;
    3. import org.junit.runner.RunWith;
    4. import org.springframework.beans.factory.annotation.Autowired;
    5. import org.springframework.boot.test.context.SpringBootTest;
    6. import org.springframework.test.context.ActiveProfiles;
    7. import org.springframework.test.context.junit4.SpringRunner;
    8. /**
    9. * @description: Hbase工具类测试
    10. * @author: zhangzhixiang
    11. * @createDate: 2022/11/24
    12. * @version: 1.0
    13. */
    14. @RunWith(SpringRunner.class)
    15. @SpringBootTest
    16. @ActiveProfiles(profiles = "local")
    17. public class HbaseServiceTest {
    18. @Autowired
    19. private HbaseUtil hbaseUtil;
    20. @Test
    21. public void testHbase() {
    22. try {
    23. hbaseUtil.createTable("Student", new String[]{"StuInfo", "Grades"});
    24. hbaseUtil.insertOneRecord("Student", "0001", "StuInfo", "name", "Tom Green");
    25. hbaseUtil.insertOneRecord("Student", "0002", "StuInfo", "Age", "18");
    26. System.out.println("=================" + hbaseUtil.selectValue("Student", "0001", "StuInfo", "name"));
    27. System.out.println("=================" + hbaseUtil.selectValue("Student", "0002", "StuInfo", "Age"));
    28. System.out.println("=================" + hbaseUtil.selectRow("Student", "0001"));
    29. System.out.println("=================" + hbaseUtil.selectRow("Student", "0002"));
    30. } catch (Exception e) {
    31. e.printStackTrace();
    32. }
    33. }
    34. }

            到此 SpringBoot 封装 HBase 操作工具类介绍完成。

  • 相关阅读:
    手动从0搭建ABP框架-ABP官方完整解决方案和手动搭建简化解决方案实践
    springboot集成nacos作配置中心,动态配置不生效
    Redis实现微博好友功能微服务(关注,取关,共同关注)
    如何在用pip配置文件设置HTTP爬虫IP
    机器学习与图像识别(二)—— OpenCV环境折腾。。
    华为机试题刷题总结
    怎样做ChatGPT应用开发?
    【开发教程10】疯壳·开源蓝牙智能健康手表-OTA镜像制作及下载技术文档
    记将一个大型客户端应用项目迁移到 dotnet 6 的经验和决策
    【夯实算法基础】最近公共祖先
  • 原文地址:https://blog.csdn.net/qq_19734597/article/details/128080643