• JDBC连接池封装MaxCompute/Hive/Oracle/Mysql


    有时候需要连接第三方的各种数据源,总是要去写不同的代码,于是将MaxCompute, Hive, Oracle, Mysql等JDBC连接封装起来,只需要传入不同的参数即可创建一个不同类型的连接池。

    连接参数基础类封装

    封装了JDBC基础的连接参数,如果不需要这些属性可以继承该类,增加新的属性即可。

    1. @Data
    2. public class BaseJdbcConnParam implements Serializable {
    3. /**
    4. * driver name
    5. */
    6. private String driverName;
    7. /**
    8. * IP
    9. */
    10. private String ip;
    11. /**
    12. * db server port
    13. */
    14. private Integer port;
    15. /**
    16. * db name
    17. */
    18. private String dbName;
    19. /**
    20. * db connection username
    21. */
    22. private String username;
    23. /**
    24. * db connection password
    25. */
    26. private String password;
    27. }

    抽象连接工具类封装

    功能如下:

    • 1、构造函数:根据连接参数不同构建不同的连接对象
    • 2、构建具体的连接,子类实现buildConnection()
    • 3、获取连接,构建好之后直接获取getConnection()
    1. /**
    2. * @Description 抽象连接工具类父类
    3. * @Author itdl
    4. * @Date 2022/08/15 09:54
    5. */
    6. public abstract class AbstractConnUtil<P extends BaseJdbcConnParam> {
    7. /**
    8. * connection params
    9. */
    10. protected final P connParam;
    11. /**
    12. * jdbc connection object
    13. */
    14. protected final Connection connection;
    15. /**
    16. * 构造函数, 构造工具类对象
    17. * @param connParam 连接参数
    18. */
    19. public AbstractConnUtil(P connParam) {
    20. this.connParam = connParam;
    21. this.connection = buildConnection();
    22. }
    23. /**
    24. * 构建连接对象
    25. * @return 连接对象
    26. */
    27. protected abstract Connection buildConnection();
    28. /**
    29. * 获取连接
    30. */
    31. public Connection getConnection() {
    32. return connection;
    33. }
    34. }

    连接池管理

    功能如下:

    • 1、根据不同的连接参数,和最大连接数去创建一个对应类型的连接池。
    • 2、获取连接方法,如果连接没有了,等待其他线程释放(最多等待十分钟)
    • 3、释放连接方法,将连接放回连接池,然后唤醒等待的线程
    • 4、关闭连接池所有的连接
    1. /**
    2. * @Description 连接池管理
    3. * @Author itdl
    4. * @Date 2022/08/16 09:42
    5. */
    6. @Slf4j
    7. public class DbConnPool<T extends BaseJdbcConnParam> {
    8. /**
    9. * 用于存放连接
    10. */
    11. private final LinkedList<Connection> connPool = new LinkedList<Connection>();
    12. /**
    13. * 最大连接池数量
    14. */
    15. private final Integer maxPoolSize;
    16. private final T connParam;
    17. /**
    18. * 构造函数
    19. * @param connParam 连接参数
    20. * @param maxPoolSize 连接池大小
    21. */
    22. public DbConnPool(T connParam, Integer maxPoolSize) {
    23. this.maxPoolSize = maxPoolSize;
    24. this.connParam = connParam;
    25. // 初始化连接池
    26. for (int i = 0; i < maxPoolSize; i++) {
    27. connPool.addLast(this.createConnection());
    28. }
    29. }
    30. /**
    31. * 创建数据库连接
    32. * @return 连接
    33. */
    34. private Connection createConnection() {
    35. if (connParam instanceof OracleJdbcConnParam){
    36. final OracleConnUtil util = new OracleConnUtil((OracleJdbcConnParam) connParam);
    37. return util.getConnection();
    38. }
    39. if (connParam instanceof HiveJdbcConnParam){
    40. final HiveConnUtil util = new HiveConnUtil((HiveJdbcConnParam) connParam);
    41. return util.getConnection();
    42. }
    43. if (connParam instanceof MysqlJdbcConnParam){
    44. final MysqlConnUtil util = new MysqlConnUtil((MysqlJdbcConnParam) connParam);
    45. return util.getConnection();
    46. }
    47. if (connParam instanceof MaxComputeJdbcConnParam){
    48. final MaxComputeJdbcUtil util = new MaxComputeJdbcUtil((MaxComputeJdbcConnParam) connParam);
    49. return util.getConnection();
    50. }
    51. throw new BizException(ResultCode.CONN_TYPE_NOT_SUPPORT);
    52. }
    53. /**
    54. * 获取连接
    55. * @return 连接
    56. */
    57. public synchronized Connection getConnection(){
    58. if (connPool.size() == 0){
    59. // throw new BizException(ResultCode.CONN_POOL_EMPTY_ERR);
    60. // 最长等待十分钟
    61. try {
    62. log.info("==========连接池已经空了, 请等待其他线程释放==========");
    63. wait(10 * 60 * 1000);
    64. } catch (InterruptedException e) {
    65. log.info("==========连接池已经空了, 等待了10分钟还没有释放,抛出异常==========");
    66. e.printStackTrace();
    67. throw new BizException(ResultCode.CONN_POOL_EMPTY_ERR);
    68. }
    69. }
    70. // 去除最上面一个连接 如果没有连接了,将会抛出异常
    71. return connPool.removeFirst();
    72. }
    73. /**
    74. * 用完后释放连接
    75. * @param conn 要释放的连接
    76. */
    77. public synchronized void freeConnection(Connection conn){
    78. // 通知连接已经释放
    79. notifyAll();
    80. this.connPool.addLast(conn);
    81. }
    82. /**
    83. * 关闭连接池
    84. */
    85. public synchronized void close(){
    86. for (Connection connection : connPool) {
    87. SqlUtil.close(connection);
    88. }
    89. }
    90. }

    SQL操作工具类

    根据连接对象Connection和数据库房源,封装不同的sql执行。执行SQL核心功能封装。

    1. /**
    2. * @Description SQL操作工具类
    3. * @Author itdl
    4. * @Date 2022/08/10 17:13
    5. */
    6. @Slf4j
    7. public class SqlUtil {
    8. /**查询mysql表注释sql*/
    9. public static final String SELECT_TABLES_MYSQL = "select table_name, table_comment from information_schema.tables where TABLE_SCHEMA = '%s'";
    10. /**查询MaxCompute表注释sql*/
    11. public static final String SELECT_TABLES_MAX_COMPUTE = "select table_name, table_comment from information_schema.tables where TABLE_SCHEMA = '%s'";
    12. /**查询oracle表注释sql*/
    13. public static final String SELECT_TABLES_ORACLE = "SELECT t2.TABLE_NAME as table_name, t2.COMMENTS as table_comment FROM user_tables t1 inner join user_tab_comments t2 on t1.TABLE_NAME = t2.TABLE_NAME";
    14. /**查询hive表注释sql, 先查询表名,根据表名获取建表语句,正则提取表注释*/
    15. public static final String SELECT_TABLES_HIVE = "show tables";
    16. public static final String SELECT_TABLES_2_HIVE = "describe extended %s";
    17. /**分页数量统计Mysql*/
    18. private static final String SELECT_COUNT_MYSQL = "select count(1) from (%s) z";
    19. /**分页数量统计MaxCompute*/
    20. private static final String SELECT_COUNT_MAX_COMPUTE = "select count(1) from (%s) z;";
    21. /**分页数量统计Hive*/
    22. private static final String SELECT_COUNT_ORACLE = "select count(1) from (%s) z";
    23. /**分页数量统计Oracle*/
    24. private static final String SELECT_COUNT_HIVE = "select count(1) from (%s) z";
    25. /**maxCompute开启全表扫描sql*/
    26. private static final String FULL_SCAN_MAX_COMPUTE = "set odps.sql.allow.fullscan=true;";
    27. /**分页查询sql-Mysql*/
    28. private static final String SELECT_PAGE_MYSQL = "select z.* from (%s) z limit %s, %s";
    29. /**分页查询sql-MaxCompute*/
    30. private static final String SELECT_PAGE_MAX_COMPUTE = "select z.* from (%s) z limit %s, %s;";
    31. /**分页查询sql-Hive*/
    32. private static final String SELECT_PAGE_HIVE = "select * from (select row_number() over () as row_num_01,u.* from (%s) u) mm where mm.row_num_01 between %s and %s";
    33. /**分页查询sql-Oracle*/
    34. private static final String SELECT_PAGE_ORACLE = "select * from (SELECT ROWNUM as row_num_01,z.* from (%s) z) h where h.row_num_01 > %s and h.row_num_01 <= %s";
    35. /**数据库连接*/
    36. private final Connection connection;
    37. /**数据库方言*/
    38. private final Integer dbDialect;
    39. /**支持的方言列表*/
    40. private static final List<Integer> supportDbTypes =
    41. Arrays.asList(DbDialectEnum.ORACLE.getCode(), DbDialectEnum.HIVE.getCode(), DbDialectEnum.MYSQL.getCode(), DbDialectEnum.MAX_COMPUTE.getCode());
    42. public SqlUtil(Connection connection, Integer dbDialect) {
    43. if (!supportDbTypes.contains(dbDialect)){
    44. throw new BizException(ResultCode.CONN_TYPE_NOT_SUPPORT);
    45. }
    46. this.connection = connection;
    47. this.dbDialect = dbDialect;
    48. }
    49. /**
    50. * 根据connection获取所有的表和对应的注释
    51. */
    52. public List<TableMetaInfo> getTables(String schemaName){
    53. List<TableMetaInfo> result = new ArrayList<>();
    54. String sql = "";
    55. switch (this.dbDialect){
    56. case 1:
    57. sql = SELECT_TABLES_ORACLE;
    58. break;
    59. case 2:
    60. sql = SELECT_TABLES_HIVE;
    61. break;
    62. case 3:
    63. if (StringUtils.isBlank(schemaName)){
    64. throw new BizException(ResultCode.SELECT_TABLES_SCHEMA_NOT_NULL_ERR);
    65. }
    66. sql = String.format(SELECT_TABLES_MYSQL, schemaName);
    67. break;
    68. case 4:
    69. if (StringUtils.isBlank(schemaName)){
    70. throw new BizException(ResultCode.SELECT_TABLES_SCHEMA_NOT_NULL_ERR);
    71. }
    72. sql = String.format(SELECT_TABLES_MAX_COMPUTE, schemaName);
    73. default:
    74. break;
    75. }
    76. if (StringUtils.isBlank(sql)){
    77. throw new BizException(ResultCode.CONN_TYPE_NOT_SUPPORT);
    78. }
    79. // 执行SQL语句
    80. final List<LinkedHashMap<String, Object>> resultMaps = querySql(sql);
    81. if (ObjectUtils.isEmpty(resultMaps)){
    82. return Lists.newArrayList();
    83. }
    84. // hive单独处理
    85. List<TableMetaInfo> result1 = getHiveTableMetaInfos(result, resultMaps);
    86. if (result1 != null) return result1;
    87. // 转换结果
    88. return resultMaps.stream().map(
    89. m->{
    90. final TableMetaInfo info = new TableMetaInfo();
    91. Object tableNameObj = m.get("table_name");
    92. String tableName = tableNameObj == null ? m.get("TABLE_NAME") == null ? "" : String.valueOf(m.get("TABLE_NAME")) : String.valueOf(tableNameObj);
    93. Object tableCommentObj = m.get("table_comment");
    94. String tableComment = tableCommentObj == null ? m.get("TABLE_COMMENT") == null ? "" : String.valueOf(m.get("TABLE_COMMENT")) : String.valueOf(tableCommentObj);
    95. info.setTableName(tableName);
    96. info.setComment(tableComment);
    97. return info;
    98. }
    99. ).collect(Collectors.toList());
    100. }
    101. /**
    102. * 根据schemeName,表名获取字段列表
    103. * @param tableName 一般是数据库 oracle是用户名
    104. */
    105. public List<TableColumnMetaInfo> getColumnsByTableName(String tableName){
    106. try {
    107. List<TableColumnMetaInfo> list = new ArrayList<>();
    108. final DatabaseMetaData metaData = connection.getMetaData();
    109. final ResultSet columns = metaData.getColumns(null, null, tableName, null);
    110. while (columns.next()){
    111. String columnName = columns.getString("COLUMN_NAME");
    112. String remarks = columns.getString("REMARKS");
    113. remarks = StringUtils.isBlank(remarks) ? "" : remarks;
    114. final TableColumnMetaInfo metaInfo = new TableColumnMetaInfo(tableName, columnName, remarks);
    115. list.add(metaInfo);
    116. }
    117. return list;
    118. } catch (SQLException e) {
    119. e.printStackTrace();
    120. return Lists.newArrayList();
    121. }
    122. }
    123. /**
    124. * 执行sql查询
    125. * @param querySql 查询sql
    126. * @return List> 通过LinkedHashMap接受,序列化时可保证顺序一致
    127. */
    128. public List<LinkedHashMap<String, Object>> queryData(String querySql, boolean... fullScan){
    129. Statement statement = null;
    130. ResultSet resultSet = null;
    131. try {
    132. // 创建statement
    133. statement = this.connection.createStatement();
    134. // 执行全表扫描sql
    135. for (boolean b : fullScan) {
    136. if (b){
    137. statement.execute(FULL_SCAN_MAX_COMPUTE);
    138. break;
    139. }
    140. }
    141. // 执行查询语句
    142. resultSet = statement.executeQuery(querySql);
    143. // 构建结果返回
    144. return buildListMap(resultSet);
    145. } catch (SQLException e) {
    146. e.printStackTrace();
    147. throw new BizException(ResultCode.SQL_EXEC_ERR);
    148. } finally {
    149. // 关闭resultSet, statement
    150. close(resultSet, statement);
    151. }
    152. }
    153. /**
    154. * 执行sql查询
    155. * @param querySql 查询sql
    156. * @return List>
    157. */
    158. public List<LinkedHashMap<String, Object>> queryData(String querySql, Integer page, Integer size){
    159. Statement statement = null;
    160. ResultSet resultSet = null;
    161. try {
    162. // 1、替换分号
    163. querySql = querySql.replaceAll(";", "");
    164. // 创建statement
    165. statement = this.connection.createStatement();
    166. // 2、格式化SQL
    167. int offset = (page - 1 ) * size;
    168. String execSql = "";
    169. switch (this.dbDialect){
    170. case 1:
    171. // oracle
    172. execSql = String.format(SELECT_PAGE_ORACLE, querySql, offset, size);
    173. break;
    174. case 2:
    175. // hive
    176. execSql = String.format(SELECT_PAGE_HIVE, querySql, offset, size);
    177. break;
    178. case 3:
    179. // mysql
    180. execSql = String.format(SELECT_PAGE_MYSQL, querySql, offset, size);
    181. break;
    182. case 4:
    183. // maxCompute
    184. execSql = String.format(SELECT_PAGE_MAX_COMPUTE, querySql, offset, size);
    185. break;
    186. default:
    187. break;
    188. }
    189. // maxCompute开启全表扫描
    190. if (DbDialectEnum.MAX_COMPUTE.getCode().equals(this.dbDialect)){
    191. statement.execute(FULL_SCAN_MAX_COMPUTE);
    192. }
    193. log.info("=======>>>执行分页sql为:{}", execSql);
    194. // 执行查询语句
    195. resultSet = statement.executeQuery(execSql);
    196. // 构建结果返回
    197. return buildListMap(resultSet);
    198. } catch (SQLException e) {
    199. e.printStackTrace();
    200. throw new BizException(ResultCode.SQL_EXEC_ERR);
    201. } finally {
    202. // 关闭resultSet, statement
    203. close(resultSet, statement);
    204. }
    205. }
    206. /**
    207. * 执行分页查询
    208. * @param querySql 分页查询sql
    209. * @param page 页码 从1开始 第n页传n
    210. * @param size 每页记录数
    211. * @return 分页查询结果
    212. */
    213. public PageResult<LinkedHashMap<String, Object>> pageQueryMap(String querySql, Integer page, Integer size){
    214. // 1、替换分号
    215. querySql = querySql.replaceAll(";", "");
    216. String countSql = "";
    217. switch (this.dbDialect){
    218. case 1:
    219. // oracle
    220. countSql = String.format(SELECT_COUNT_ORACLE, querySql);
    221. break;
    222. case 2:
    223. // hive
    224. countSql = String.format(SELECT_COUNT_HIVE, querySql);
    225. break;
    226. case 3:
    227. // mysql
    228. countSql = String.format(SELECT_COUNT_MYSQL, querySql);
    229. break;
    230. case 4:
    231. // maxCompute
    232. countSql = String.format(SELECT_COUNT_MAX_COMPUTE, querySql);
    233. break;
    234. default:
    235. break;
    236. }
    237. log.info("=======>>>执行分页统计总数sql为:{}", countSql);
    238. // 查询总数
    239. final List<LinkedHashMap<String, Object>> countMap = queryData(countSql, DbDialectEnum.MAX_COMPUTE.getCode().equals(this.dbDialect));
    240. if (CollectionUtils.isEmpty(countMap)){
    241. return new PageResult<>(0L, new ArrayList<>());
    242. }
    243. long count = 0L;
    244. for (Object value : countMap.get(0).values()) {
    245. count = Long.parseLong(String.valueOf(value));
    246. }
    247. if (count == 0){
    248. return new PageResult<>(0L, new ArrayList<>());
    249. }
    250. // 执行分页查询 开启全表扫描
    251. final List<LinkedHashMap<String, Object>> resultList = queryData(querySql, page, size);
    252. return new PageResult<>(count, resultList);
    253. }
    254. /**
    255. * 执行分页查询
    256. * @param querySql 分页查询sql
    257. * @param page 页码 从1开始 第n页传n
    258. * @param size 每页记录数
    259. * @return 分页查询结果
    260. */
    261. public <T>PageResult<T> pageQuery(String querySql, Integer page, Integer size, Class<T> clazz){
    262. final PageResult<LinkedHashMap<String, Object>> result = pageQueryMap(querySql, page, size);
    263. List<T> rows = new ArrayList<>();
    264. for (LinkedHashMap<String, Object> row : result.getRows()) {
    265. final T t = JSONObject.parseObject(JSONObject.toJSONString(row), clazz);
    266. rows.add(t);
    267. }
    268. return new PageResult<>(result.getTotal(), rows);
    269. }
    270. /**
    271. * 获取hive的表注释
    272. * @param result 结果
    273. * @param resultMaps show tables结果
    274. * @return List
    275. */
    276. private List<TableMetaInfo> getHiveTableMetaInfos(List<TableMetaInfo> result, List<LinkedHashMap<String, Object>> resultMaps) {
    277. if (dbDialect.equals(DbDialectEnum.HIVE.getCode())){
    278. for (LinkedHashMap<String, Object> resultMap : resultMaps) {
    279. final String tabName = String.valueOf(resultMap.get("tab_name"));
    280. final String descTableCommentSql = String.format(SELECT_TABLES_2_HIVE, tabName);
    281. List<LinkedHashMap<String, Object>> resultMapsComments = querySql(descTableCommentSql);
    282. // col_name -> Detailed Table Information
    283. String comments = resultMapsComments.stream()
    284. .filter(m -> "Detailed Table Information".equals(m.get("col_name")))
    285. .map(m -> String.valueOf(m.get("data_type"))).findFirst()
    286. .orElse("");
    287. comments = ReUtil.get("parameters:\\{(?!.*?\\().*transient_lastDdlTime.*?comment=(.*?)\\}", comments,1);
    288. if (StringUtils.isBlank(comments)) {
    289. comments = "";
    290. }
    291. if (comments.contains(",")){
    292. comments = comments.substring(0, comments.lastIndexOf(","));
    293. }
    294. result.add(new TableMetaInfo(tabName, comments));
    295. log.info("===========>>>获取表{}的注释成功:{}", tabName, comments);
    296. resultMapsComments.clear();
    297. }
    298. return result;
    299. }
    300. return null;
    301. }
    302. /**
    303. * 执行SQL查询
    304. * @param sql sql语句
    305. * @return 数据列表,使用LinkedHashMap是为了防止HashMap序列化后导致顺序乱序
    306. */
    307. public List<LinkedHashMap<String, Object>> querySql(String sql){
    308. // 执行sql
    309. Statement statement = null;
    310. ResultSet resultSet = null;
    311. try {
    312. statement = connection.createStatement();
    313. resultSet = statement.executeQuery(sql);
    314. return buildListMap(resultSet);
    315. } catch (SQLException e) {
    316. e.printStackTrace();
    317. throw new BizException(ResultCode.SQL_EXEC_ERR);
    318. }finally {
    319. // 关闭
    320. close(resultSet, statement);
    321. }
    322. }
    323. /**
    324. * 关闭对象 传入多个时注意顺序, 需要先关闭哪个就传在参数前面
    325. * @param objs 对象动态数组
    326. */
    327. public static void close(Object ...objs){
    328. if (objs == null || objs.length == 0){
    329. return;
    330. }
    331. for (Object obj : objs) {
    332. if (obj instanceof Statement){
    333. try {
    334. ((Statement) obj).close();
    335. }catch (Exception e){
    336. e.printStackTrace();
    337. }
    338. }
    339. if (obj instanceof ResultSet){
    340. try {
    341. ((ResultSet) obj).close();
    342. }catch (Exception e){
    343. e.printStackTrace();
    344. }
    345. }
    346. if (obj instanceof Connection){
    347. try {
    348. ((Connection) obj).close();
    349. }catch (Exception e){
    350. e.printStackTrace();
    351. }
    352. }
    353. }
    354. }
    355. /**
    356. * @Description 功能描述:将resultSet构造为List
    357. * @Author itdl
    358. * @Date 2022/4/18 21:13
    359. * @Param {@link ResultSet} resultSet
    360. * @Return {@link List < Map >}
    361. **/
    362. private List<LinkedHashMap<String, Object>> buildListMap(ResultSet resultSet) throws SQLException {
    363. if (resultSet == null) {
    364. return Lists.newArrayList();
    365. }
    366. List<LinkedHashMap<String, Object>> resultList = new ArrayList<>();
    367. // 获取元数据
    368. ResultSetMetaData metaData = resultSet.getMetaData();
    369. while (resultSet.next()) {
    370. // 获取列数
    371. int columnCount = metaData.getColumnCount();
    372. LinkedHashMap<String, Object> map = new LinkedHashMap<>();
    373. for (int i = 0; i < columnCount; i++) {
    374. String columnName = metaData.getColumnName(i + 1);
    375. // 过滤掉查询的结果包含序号的
    376. if("mm.row_num_01".equalsIgnoreCase(columnName)
    377. || "row_num_01".equalsIgnoreCase(columnName)){
    378. continue;
    379. }
    380. // 去除hive查询结果的mm.别名前缀
    381. if (columnName.startsWith("mm.")){
    382. columnName = columnName.substring(columnName.indexOf(".") + 1);
    383. }
    384. Object object = resultSet.getObject(columnName);
    385. // maxCompute里面的空返回的是使用\n
    386. if ("\\N".equalsIgnoreCase(String.valueOf(object))) {
    387. map.put(columnName, "");
    388. } else {
    389. map.put(columnName, object);
    390. }
    391. }
    392. resultList.add(map);
    393. }
    394. return resultList;
    395. }
    396. }

    MaxCompute JDBC连接池封装

    MaxCompute 已经有了JDBC连接方式 也就是 odbc-jdbc, 最终能够获取一个Connection. 官方文档:https://help.aliyun.com/document_detail/161246.html

    封装MaxCompute JDBC连接参数

    1. /**
    2. * @author itdl
    3. * @description maxCompute使用JDBC的连接参数
    4. * @date 2022/08/08 10:07
    5. */
    6. @Data
    7. public class MaxComputeJdbcConnParam extends BaseJdbcConnParam{
    8. /**阿里云accessId 相当于用户名 */
    9. private String aliyunAccessId;
    10. /**阿里云accessKey 相当于密码 */
    11. private String aliyunAccessKey;
    12. /** maxcompute_endpoint */
    13. private String endpoint;
    14. /**项目名称*/
    15. private String projectName;
    16. }

    封装MaxCompute JDBC连接实现类

    就是实现父类AbstractConnUtil,实现抽象方法buildConnection

    1. /**
    2. * @Description maxCompute JDBC连接实现
    3. * @Author itdl
    4. * @Date 2022/08/08 14:26
    5. */
    6. @Slf4j
    7. public class MaxComputeJdbcUtil extends AbstractConnUtil<MaxComputeJdbcConnParam>{
    8. /**JDBC 驱动名称*/
    9. private static final String DRIVER_NAME = "com.aliyun.odps.jdbc.OdpsDriver";
    10. /**
    11. * 构造函数, 构造工具类对象
    12. *
    13. * @param connParam 连接参数
    14. */
    15. public MaxComputeJdbcUtil(MaxComputeJdbcConnParam connParam) {
    16. super(connParam);
    17. }
    18. @Override
    19. protected Connection buildConnection() {
    20. return buildConn();
    21. }
    22. /**
    23. * 创建连接
    24. * @return 数据库连接
    25. */
    26. private Connection buildConn() {
    27. try {
    28. Class.forName(DRIVER_NAME);
    29. } catch (ClassNotFoundException e) {
    30. e.printStackTrace();
    31. throw new BizException(ResultCode.MAX_COMPUTE_DRIVE_LOAD_ERR);
    32. }
    33. try {
    34. Properties dbProperties = new Properties();
    35. dbProperties.put("user", connParam.getAliyunAccessId());
    36. dbProperties.put("password", connParam.getAliyunAccessKey());
    37. dbProperties.put("remarks", "true");
    38. // JDBCURL连接模板
    39. String jdbcUrlTemplate = "jdbc:odps:%s?project=%s&useProjectTimeZone=true";
    40. // 使用驱动管理器连接获取连接
    41. return DriverManager.getConnection(
    42. String.format(jdbcUrlTemplate, connParam.getEndpoint(), connParam.getProjectName()), dbProperties);
    43. } catch (SQLException e) {
    44. e.printStackTrace();
    45. throw new BizException(ResultCode.CONN_USER_PWD_ERR);
    46. }
    47. }
    48. }

    连接测试代码一起放在结尾,将会开启多个线程获取连接,然后去获取表名,表注释,字段名,字段注释,传入page, size和普通sql就可以实现分页查询的封装方法

    Hive JDBC连接池封装

    Hive JDBC连接参数

    Hive连接参数封装,除了基础的JDBC所需字段,还需要kerberos相关字段,因为hive开启kerberos认证后,需要使用kertab密钥文件和kbr5.conf配置文件去认证。将会在参数和测试代码中得到重复的体现。

    1. /**
    2. * @Description Hive JDBC connection params
    3. * @Author itdl
    4. * @Date 2022/08/10 16:40
    5. */
    6. @Data
    7. @EqualsAndHashCode(callSuper = false)
    8. public class HiveJdbcConnParam extends BaseJdbcConnParam {
    9. /**
    10. * enable kerberos authentication
    11. */
    12. private boolean enableKerberos;
    13. /**
    14. * principal
    15. */
    16. private String principal;
    17. /**
    18. * kbr5 file path in dick
    19. */
    20. private String kbr5FilePath;
    21. /**
    22. * keytab file path in dick
    23. */
    24. private String keytabFilePath;
    25. }

    Hive JDBC获取连接实现

    Hive获取JDBC连接之后,本来可以从Connection的元数据中获取表的注释,但是获取的中文注释居然是乱码,但是我们Hue上查看表注释又是正常,暂时没找到这种方式如何解决,从而退而求其次,通过表名去获取建表语句,从建表语句中通过正则表达式提取表的注释。

    1. /**
    2. * @Description hive connection util
    3. * @Author itdl
    4. * @Date 2022/08/10 16:52
    5. */
    6. @Slf4j
    7. public class HiveConnUtil extends AbstractConnUtil<HiveJdbcConnParam>{
    8. public HiveConnUtil(HiveJdbcConnParam connParam) {
    9. super(connParam);
    10. }
    11. /**
    12. * 获取连接
    13. * @return 连接
    14. */
    15. public Connection getConnection() {
    16. return connection;
    17. }
    18. @Override
    19. protected Connection buildConnection(){
    20. try {
    21. // Class.forName("org.apache.hive.jdbc.HiveDriver");
    22. Class.forName(connParam.getDriverName());
    23. } catch (ClassNotFoundException e) {
    24. e.printStackTrace();
    25. throw new BizException(ResultCode.HIVE_DRIVE_LOAD_ERR);
    26. }
    27. // 开启kerberos后需要私钥
    28. // 拼接jdbcUrl
    29. String jdbcUrl = "jdbc:hive2://%s:%s/%s";
    30. String ip = connParam.getIp();
    31. String port = connParam.getPort() + "";
    32. String dbName = connParam.getDbName();
    33. final String username = connParam.getUsername();
    34. final String password = connParam.getPassword();
    35. // is enable kerberos authentication
    36. final boolean enableKerberos = connParam.isEnableKerberos();
    37. // 格式化
    38. Connection connection;
    39. // 获取连接
    40. try {
    41. Properties dbProperties = new Properties();
    42. dbProperties.put("user", username);
    43. dbProperties.put("password", password);
    44. // 加上remark后, 能够获取到标注释 但是会出现中文乱码
    45. dbProperties.put("remarks", "true");
    46. if (!enableKerberos) {
    47. jdbcUrl = String.format(jdbcUrl, ip, port, dbName);
    48. connection = DriverManager.getConnection(jdbcUrl, dbProperties);
    49. } else {
    50. final String principal = connParam.getPrincipal();
    51. final String kbr5FilePath = connParam.getKbr5FilePath();
    52. final String secretFilePath = connParam.getKeytabFilePath();
    53. String format = "jdbc:hive2://%s:%s/%s;principal=%s";
    54. jdbcUrl = String.format(format, ip, port, dbName, principal);
    55. // 使用hadoop安全认证
    56. System.setProperty("java.security.krb5.conf", kbr5FilePath);
    57. System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
    58. // 解决windows中执行可能出现找不到HADOOP_HOME或hadoop.home.dir问题
    59. // Kerberos认证
    60. org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
    61. conf.set("hadoop.security.authentication", "Kerberos");
    62. conf.set("keytab.file", secretFilePath);
    63. conf.set("kerberos.principal", principal);
    64. UserGroupInformation.setConfiguration(conf);
    65. try {
    66. UserGroupInformation.loginUserFromKeytab(username, secretFilePath);
    67. } catch (IOException e) {
    68. e.printStackTrace();
    69. throw new BizException(ResultCode.KERBEROS_AUTH_FAIL_ERR);
    70. }
    71. try {
    72. connection = DriverManager.getConnection(jdbcUrl, dbProperties);
    73. } catch (SQLException e) {
    74. e.printStackTrace();
    75. throw new BizException(ResultCode.KERBEROS_AUTH_SUCCESS_GET_CONN_FAIL_ERR);
    76. }
    77. }
    78. log.info("=====>>>获取hive连接成功:username:{},jdbcUrl: {}", username, jdbcUrl);
    79. return connection;
    80. } catch (SQLException e) {
    81. e.printStackTrace();
    82. throw new BizException(ResultCode.HIVE_CONN_USER_PWD_ERR);
    83. } catch (BizException e){
    84. throw e;
    85. }
    86. catch (Exception e) {
    87. e.printStackTrace();
    88. throw new BizException(ResultCode.HIVE_CONN_ERR);
    89. }
    90. }
    91. }

    Oracle JDBC连接参数封装

    只需要继承父类即可

    1. /**
    2. * @Description Oracle连接的JDBC参数
    3. * @Author itdl
    4. * @Date 2022/08/15 09:50
    5. */
    6. public class OracleJdbcConnParam extends BaseJdbcConnParam{
    7. }

    Oracle JDBC连接实现类

    包括了普通用户的认证和dba用户的认证

    1. /**
    2. * @Description Oracle获取jdbc连接工具类
    3. * @Author itdl
    4. * @Date 2022/08/15 09:52
    5. */
    6. @Slf4j
    7. public class OracleConnUtil extends AbstractConnUtil<OracleJdbcConnParam> {
    8. /**
    9. * 构造函数, 构造工具类对象
    10. *
    11. * @param connParam 连接参数
    12. */
    13. public OracleConnUtil(OracleJdbcConnParam connParam) {
    14. super(connParam);
    15. }
    16. @Override
    17. protected Connection buildConnection() {
    18. try {
    19. Class.forName("oracle.jdbc.driver.OracleDriver");
    20. } catch (ClassNotFoundException e) {
    21. e.printStackTrace();
    22. throw new BizException(ResultCode.ORACLE_DRIVE_LOAD_ERR);
    23. }
    24. // 拼接jdbcUrl
    25. String jdbcUrl = "jdbc:oracle:thin:@//%s:%s/%s";
    26. final String ip = connParam.getIp();
    27. final String port = connParam.getPort() + "";
    28. final String dbName = connParam.getDbName();
    29. final String username = connParam.getUsername();
    30. final String password = connParam.getPassword();
    31. // 格式化
    32. jdbcUrl = String.format(jdbcUrl, ip, port, dbName);
    33. // 获取连接
    34. Connection connection;
    35. try {
    36. Properties dbProperties = new Properties();
    37. // 用户名 如果是dba,则后面跟了as sysdba
    38. String dba = "as sysdba";
    39. dbProperties.put("password", password);
    40. dbProperties.put("remarks", "true");
    41. if (username.trim().endsWith(dba)) {
    42. dbProperties.put("user", username.trim().substring(0, username.trim().indexOf(dba) - 1));
    43. dbProperties.put("defaultRowPrefetch", "15");
    44. dbProperties.put("internal_logon", "sysdba");
    45. connection = DriverManager.getConnection(jdbcUrl, dbProperties);
    46. } else {
    47. dbProperties.put("user", username);
    48. connection = DriverManager.getConnection(jdbcUrl, dbProperties);
    49. }
    50. log.info("=====>>>获取oracle连接成功:username:{}, jdbcUrl: {}", username, jdbcUrl);
    51. return connection;
    52. } catch (SQLException e) {
    53. e.printStackTrace();
    54. if (e.getMessage().contains("TNS:listener")) {
    55. throw new BizException(ResultCode.CONN_LISTENER_UNKNOWN_ERR);
    56. }
    57. if (e.getMessage().contains("ORA-01017")) {
    58. throw new BizException(ResultCode.CONN_USER_PWD_ERR);
    59. }
    60. if (e.getMessage().contains("IO 错误: Got minus one from a read call")) {
    61. throw new BizException(ResultCode.CONN_CONN_TOO_MANY_ERR);
    62. }
    63. throw new BizException(ResultCode.CONN_UNKNOWN_ERR);
    64. } catch (Exception e) {
    65. throw new BizException(ResultCode.CONN_UNKNOWN_ERR);
    66. }
    67. }
    68. }

    Mysql JDBC连接池封装

    Mysql JDBC连接参数封装

    只需要继承父类即可

    1. /**
    2. * @Description Mysql连接的JDBC参数
    3. * @Author itdl
    4. * @Date 2022/08/15 09:50
    5. */
    6. public class MysqlJdbcConnParam extends BaseJdbcConnParam{
    7. }

    Mysql JDBC连接实现

    需要注意的是连接的属性里面配置useInformationSchema=true,表示可以直接从Connection中获取表和字段的注释。

    1. /**
    2. * @Description Mysql获取jdbc连接工具类
    3. * @Author itdl
    4. * @Date 2022/08/15 09:52
    5. */
    6. @Slf4j
    7. public class MysqlConnUtil extends AbstractConnUtil<MysqlJdbcConnParam> {
    8. /**
    9. * 构造函数, 构造工具类对象
    10. *
    11. * @param connParam 连接参数
    12. */
    13. public MysqlConnUtil(MysqlJdbcConnParam connParam) {
    14. super(connParam);
    15. }
    16. @Override
    17. protected Connection buildConnection() {
    18. try {
    19. Class.forName("com.mysql.cj.jdbc.Driver");
    20. } catch (ClassNotFoundException e) {
    21. e.printStackTrace();
    22. throw new BizException(ResultCode.MYSQL_DRIVE_LOAD_ERR);
    23. }
    24. // 拼接jdbcUrl
    25. String jdbcUrl = "jdbc:mysql://%s:%s/%s?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8";
    26. final String ip = connParam.getIp();
    27. final String port = connParam.getPort() + "";
    28. final String dbName = connParam.getDbName();
    29. final String username = connParam.getUsername();
    30. final String password = connParam.getPassword();
    31. // 格式化
    32. jdbcUrl = String.format(jdbcUrl, ip, port, dbName);
    33. // 获取连接
    34. try {
    35. Properties dbProperties = new Properties();
    36. dbProperties.put("user", username);
    37. dbProperties.put("password", password);
    38. dbProperties.put("remarks", "true");
    39. // 设置可以获取tables remarks信息
    40. dbProperties.setProperty("useInformationSchema", "true");
    41. Connection connection = DriverManager.getConnection(jdbcUrl,dbProperties);
    42. log.info("=====>>>获取mysql连接成功:username:{}, jdbcUrl: {}", username, jdbcUrl);
    43. return connection;
    44. } catch (SQLException e) {
    45. e.printStackTrace();
    46. if (e.getMessage().contains("Unknown database")){
    47. throw new BizException(ResultCode.CONN_UNKNOWN_DB_ERR);
    48. }
    49. throw new BizException(ResultCode.CONN_USER_PWD_ERR);
    50. } catch (Exception e) {
    51. throw new BizException(ResultCode.CONN_UNKNOWN_ERR);
    52. }
    53. }
    54. }

    测试代码连接各自数据库

    1. @SpringBootTest(classes = DbConnectionDemoApplication.class)
    2. @RunWith(value = SpringRunner.class)
    3. @Slf4j
    4. class DbConnectionDemoApplicationTests {
    5. private DbConnPool<?> connPool = null;
    6. @Test
    7. public void testMysqlConn() throws InterruptedException {
    8. // 创建连接参数
    9. final MysqlJdbcConnParam connParam = new MysqlJdbcConnParam();
    10. final String ip = "localhost";
    11. final Integer port = 3306;
    12. final String username = "root";
    13. final String password = "root";
    14. final String dbname = "test_db";
    15. // 设置参数
    16. connParam.setDriverName(Driver.class.getName());
    17. connParam.setIp(ip);
    18. connParam.setPort(port);
    19. connParam.setUsername(username);
    20. connParam.setPassword(password);
    21. connParam.setDbName(dbname);
    22. // 创建连接池
    23. connPool = new DbConnPool<>(connParam, 2);
    24. handler01(dbname, DbDialectEnum.MYSQL);
    25. new Thread(() -> handler01(dbname, DbDialectEnum.MYSQL)).start();
    26. new Thread(() -> handler01(dbname, DbDialectEnum.MYSQL)).start();
    27. Thread.sleep(60 * 1000);
    28. }
    29. @Test
    30. public void testOracleConn() throws InterruptedException {
    31. // 创建连接参数
    32. final OracleJdbcConnParam connParam = new OracleJdbcConnParam();
    33. final String ip = "你的Oracle的IP地址";
    34. final Integer port = 1521;
    35. // 如果是admin账号 用户后面+ as sysdba
    36. final String username = "用户名";
    37. final String password = "密码";
    38. final String dbname = "实例/服务名";
    39. // 设置参数
    40. connParam.setDriverName(Driver.class.getName());
    41. connParam.setIp(ip);
    42. connParam.setPort(port);
    43. connParam.setUsername(username);
    44. connParam.setPassword(password);
    45. connParam.setDbName(dbname);
    46. // 创建连接池
    47. connPool = new DbConnPool<>(connParam, 2);
    48. final DbDialectEnum dbDialectEnum = DbDialectEnum.ORACLE;
    49. // 处理操作(oracle的schemaName就是用户名)
    50. handler01(username, dbDialectEnum);
    51. // 新建两个线程获取连接
    52. new Thread(() -> handler01(username, dbDialectEnum)).start();
    53. new Thread(() -> handler01(username, dbDialectEnum)).start();
    54. Thread.sleep(60 * 1000);
    55. }
    56. @Test
    57. public void testHiveConn() throws InterruptedException {
    58. // 创建连接参数
    59. final HiveJdbcConnParam connParam = new HiveJdbcConnParam();
    60. final String ip = "连接的域名";
    61. final Integer port = 10000;
    62. // 如果是admin账号 用户后面+ as sysdba
    63. final String username = "账号@域名";
    64. final String password = "";
    65. final String dbname = "数据库名";
    66. final String principal = "hive/_HOST@域名";
    67. final String kbr5FilePath = "C:\\workspace\\krb5.conf";
    68. final String keytabFilePath = "C:\\workspace\\zhouyu.keytab";
    69. // 设置参数
    70. connParam.setDriverName(Driver.class.getName());
    71. connParam.setIp(ip);
    72. connParam.setPort(port);
    73. connParam.setUsername(username);
    74. connParam.setPassword(password);
    75. connParam.setDbName(dbname);
    76. connParam.setEnableKerberos(true);
    77. connParam.setPrincipal(principal);
    78. connParam.setKbr5FilePath(kbr5FilePath);
    79. connParam.setKeytabFilePath(keytabFilePath);
    80. // 创建连接池
    81. connPool = new DbConnPool<>(connParam, 2);
    82. final DbDialectEnum dbDialectEnum = DbDialectEnum.HIVE;
    83. // 处理操作(oracle的schemaName就是用户名)
    84. handler01(username, dbDialectEnum);
    85. // 新建两个线程获取连接
    86. new Thread(() -> handler01(username, dbDialectEnum)).start();
    87. new Thread(() -> handler01(username, dbDialectEnum)).start();
    88. Thread.sleep(10 * 60 * 1000);
    89. }
    90. @Test
    91. public void testMaxComputeConn() throws InterruptedException {
    92. // 创建连接参数
    93. final MaxComputeJdbcConnParam connParam = new MaxComputeJdbcConnParam();
    94. String accessId = "你的阿里云accessId";
    95. String accessKey = "你的阿里云accessKey";
    96. String endpoint = "http://service.cn-chengdu.maxcompute.aliyun.com/api";
    97. String projectName = "项目名=数据库名";
    98. // 设置参数
    99. connParam.setDriverName(Driver.class.getName());
    100. connParam.setAliyunAccessId(accessId);
    101. connParam.setAliyunAccessKey(accessKey);
    102. connParam.setEndpoint(endpoint);
    103. connParam.setProjectName(projectName);
    104. // 创建连接池
    105. connPool = new DbConnPool<>(connParam, 2);
    106. final DbDialectEnum dbDialectEnum = DbDialectEnum.MAX_COMPUTE;
    107. // 处理操作(oracle的schemaName就是用户名)
    108. handler01(projectName, dbDialectEnum);
    109. // 新建两个线程获取连接
    110. new Thread(() -> handler01(projectName, dbDialectEnum)).start();
    111. new Thread(() -> handler01(projectName, dbDialectEnum)).start();
    112. Thread.sleep(60 * 1000);
    113. }
    114. private void handler01(String schemaName, DbDialectEnum dbDialectEnum) {
    115. final Connection connection = connPool.getConnection();
    116. // 构建工具类
    117. final SqlUtil sqlUtil = new SqlUtil(connection, dbDialectEnum.getCode());
    118. // 获取表和注释
    119. final List<TableMetaInfo> tables = sqlUtil.getTables(schemaName);
    120. log.info("===============获取所有表和注释开始===================");
    121. log.info(tables.toString());
    122. log.info("===============获取所有表和注释结束===================");
    123. // 获取字段和注释
    124. final String tableName = tables.get(0).getTableName();
    125. final List<TableColumnMetaInfo> columns = sqlUtil.getColumnsByTableName(tableName);
    126. log.info("===============获取第一个表的字段和注释开始===================");
    127. log.info(columns.toString());
    128. log.info("===============获取第一个表的字段和注释结束===================");
    129. final PageResult<LinkedHashMap<String, Object>> pageResult = sqlUtil.pageQueryMap("select * from " + tableName, 1, 10);
    130. log.info("===============SQL分页查询开始===================");
    131. log.info("总数:{}", pageResult.getTotal());
    132. log.info("记录数:{}", JSONObject.toJSONString(pageResult.getRows()));
    133. log.info("===============SQL分页查询结束===================");
    134. connPool.freeConnection(connection);
    135. }
    136. @After
    137. public void close(){
    138. if (connPool != null){
    139. connPool.close();
    140. log.info("==================连接池成功关闭================");
    141. }
    142. }
    143. }
  • 相关阅读:
    Go 学习笔记(88) — 字符串拼接方法和性能、字符串组成、UTF-8 编码方案、字符串之间转换
    uniapp输入框组件easyInput初始状态清除符号显示的bug
    鸿蒙HarmonyOS实战-ArkUI动画(页面转场动画)
    微服务项目部署-POS收银系统
    Bean的生命周期及演示
    Spring Data JPA使用自定义查询进行分页
    井字棋游戏
    选专业,适合理科女生的大学专业有哪些?
    PMP备考大全:经典题库(6月第4周)
    程序化交易是科学和艺术的结合
  • 原文地址:https://blog.csdn.net/qq_35267557/article/details/126394020