有时候需要连接第三方的各种数据源,总是要去写不同的代码,于是将MaxCompute, Hive, Oracle, Mysql等JDBC连接封装起来,只需要传入不同的参数即可创建一个不同类型的连接池。
连接参数基础类封装
封装了JDBC基础的连接参数,如果不需要这些属性可以继承该类,增加新的属性即可。
- @Data
- public class BaseJdbcConnParam implements Serializable {
-
- /**
- * driver name
- */
- private String driverName;
-
- /**
- * IP
- */
- private String ip;
-
- /**
- * db server port
- */
- private Integer port;
-
- /**
- * db name
- */
- private String dbName;
-
- /**
- * db connection username
- */
- private String username;
-
- /**
- * db connection password
- */
- private String password;
- }
抽象连接工具类封装
功能如下:
- /**
- * @Description 抽象连接工具类父类
- * @Author itdl
- * @Date 2022/08/15 09:54
- */
- public abstract class AbstractConnUtil<P extends BaseJdbcConnParam> {
-
- /**
- * connection params
- */
- protected final P connParam;
-
- /**
- * jdbc connection object
- */
- protected final Connection connection;
-
-
- /**
- * 构造函数, 构造工具类对象
- * @param connParam 连接参数
- */
- public AbstractConnUtil(P connParam) {
- this.connParam = connParam;
- this.connection = buildConnection();
- }
-
-
- /**
- * 构建连接对象
- * @return 连接对象
- */
- protected abstract Connection buildConnection();
-
-
- /**
- * 获取连接
- */
- public Connection getConnection() {
- return connection;
- }
- }
连接池管理
功能如下:
- /**
- * @Description 连接池管理
- * @Author itdl
- * @Date 2022/08/16 09:42
- */
- @Slf4j
- public class DbConnPool<T extends BaseJdbcConnParam> {
- /**
- * 用于存放连接
- */
- private final LinkedList<Connection> connPool = new LinkedList<Connection>();
-
- /**
- * 最大连接池数量
- */
- private final Integer maxPoolSize;
-
- private final T connParam;
-
- /**
- * 构造函数
- * @param connParam 连接参数
- * @param maxPoolSize 连接池大小
- */
- public DbConnPool(T connParam, Integer maxPoolSize) {
- this.maxPoolSize = maxPoolSize;
- this.connParam = connParam;
- // 初始化连接池
- for (int i = 0; i < maxPoolSize; i++) {
- connPool.addLast(this.createConnection());
- }
- }
-
- /**
- * 创建数据库连接
- * @return 连接
- */
- private Connection createConnection() {
- if (connParam instanceof OracleJdbcConnParam){
- final OracleConnUtil util = new OracleConnUtil((OracleJdbcConnParam) connParam);
- return util.getConnection();
- }
-
- if (connParam instanceof HiveJdbcConnParam){
- final HiveConnUtil util = new HiveConnUtil((HiveJdbcConnParam) connParam);
- return util.getConnection();
- }
-
- if (connParam instanceof MysqlJdbcConnParam){
- final MysqlConnUtil util = new MysqlConnUtil((MysqlJdbcConnParam) connParam);
- return util.getConnection();
- }
-
- if (connParam instanceof MaxComputeJdbcConnParam){
- final MaxComputeJdbcUtil util = new MaxComputeJdbcUtil((MaxComputeJdbcConnParam) connParam);
- return util.getConnection();
- }
-
- throw new BizException(ResultCode.CONN_TYPE_NOT_SUPPORT);
- }
-
-
- /**
- * 获取连接
- * @return 连接
- */
- public synchronized Connection getConnection(){
- if (connPool.size() == 0){
- // throw new BizException(ResultCode.CONN_POOL_EMPTY_ERR);
- // 最长等待十分钟
- try {
- log.info("==========连接池已经空了, 请等待其他线程释放==========");
- wait(10 * 60 * 1000);
- } catch (InterruptedException e) {
- log.info("==========连接池已经空了, 等待了10分钟还没有释放,抛出异常==========");
- e.printStackTrace();
- throw new BizException(ResultCode.CONN_POOL_EMPTY_ERR);
- }
- }
- // 去除最上面一个连接 如果没有连接了,将会抛出异常
- return connPool.removeFirst();
- }
-
- /**
- * 用完后释放连接
- * @param conn 要释放的连接
- */
- public synchronized void freeConnection(Connection conn){
- // 通知连接已经释放
- notifyAll();
- this.connPool.addLast(conn);
- }
-
- /**
- * 关闭连接池
- */
- public synchronized void close(){
- for (Connection connection : connPool) {
- SqlUtil.close(connection);
- }
- }
-
- }
SQL操作工具类
根据连接对象Connection和数据库房源,封装不同的sql执行。执行SQL核心功能封装。
- /**
- * @Description SQL操作工具类
- * @Author itdl
- * @Date 2022/08/10 17:13
- */
- @Slf4j
- public class SqlUtil {
- /**查询mysql表注释sql*/
- public static final String SELECT_TABLES_MYSQL = "select table_name, table_comment from information_schema.tables where TABLE_SCHEMA = '%s'";
- /**查询MaxCompute表注释sql*/
- public static final String SELECT_TABLES_MAX_COMPUTE = "select table_name, table_comment from information_schema.tables where TABLE_SCHEMA = '%s'";
- /**查询oracle表注释sql*/
- public static final String SELECT_TABLES_ORACLE = "SELECT t2.TABLE_NAME as table_name, t2.COMMENTS as table_comment FROM user_tables t1 inner join user_tab_comments t2 on t1.TABLE_NAME = t2.TABLE_NAME";
- /**查询hive表注释sql, 先查询表名,根据表名获取建表语句,正则提取表注释*/
- public static final String SELECT_TABLES_HIVE = "show tables";
- public static final String SELECT_TABLES_2_HIVE = "describe extended %s";
-
- /**分页数量统计Mysql*/
- private static final String SELECT_COUNT_MYSQL = "select count(1) from (%s) z";
- /**分页数量统计MaxCompute*/
- private static final String SELECT_COUNT_MAX_COMPUTE = "select count(1) from (%s) z;";
- /**分页数量统计Hive*/
- private static final String SELECT_COUNT_ORACLE = "select count(1) from (%s) z";
- /**分页数量统计Oracle*/
- private static final String SELECT_COUNT_HIVE = "select count(1) from (%s) z";
- /**maxCompute开启全表扫描sql*/
- private static final String FULL_SCAN_MAX_COMPUTE = "set odps.sql.allow.fullscan=true;";
-
- /**分页查询sql-Mysql*/
- private static final String SELECT_PAGE_MYSQL = "select z.* from (%s) z limit %s, %s";
- /**分页查询sql-MaxCompute*/
- private static final String SELECT_PAGE_MAX_COMPUTE = "select z.* from (%s) z limit %s, %s;";
- /**分页查询sql-Hive*/
- private static final String SELECT_PAGE_HIVE = "select * from (select row_number() over () as row_num_01,u.* from (%s) u) mm where mm.row_num_01 between %s and %s";
- /**分页查询sql-Oracle*/
- private static final String SELECT_PAGE_ORACLE = "select * from (SELECT ROWNUM as row_num_01,z.* from (%s) z) h where h.row_num_01 > %s and h.row_num_01 <= %s";
-
- /**数据库连接*/
- private final Connection connection;
-
- /**数据库方言*/
- private final Integer dbDialect;
-
- /**支持的方言列表*/
- private static final List<Integer> supportDbTypes =
- Arrays.asList(DbDialectEnum.ORACLE.getCode(), DbDialectEnum.HIVE.getCode(), DbDialectEnum.MYSQL.getCode(), DbDialectEnum.MAX_COMPUTE.getCode());
-
- public SqlUtil(Connection connection, Integer dbDialect) {
- if (!supportDbTypes.contains(dbDialect)){
- throw new BizException(ResultCode.CONN_TYPE_NOT_SUPPORT);
- }
- this.connection = connection;
- this.dbDialect = dbDialect;
- }
-
-
- /**
- * 根据connection获取所有的表和对应的注释
- */
- public List<TableMetaInfo> getTables(String schemaName){
- List<TableMetaInfo> result = new ArrayList<>();
- String sql = "";
- switch (this.dbDialect){
- case 1:
- sql = SELECT_TABLES_ORACLE;
- break;
- case 2:
- sql = SELECT_TABLES_HIVE;
- break;
- case 3:
- if (StringUtils.isBlank(schemaName)){
- throw new BizException(ResultCode.SELECT_TABLES_SCHEMA_NOT_NULL_ERR);
- }
- sql = String.format(SELECT_TABLES_MYSQL, schemaName);
- break;
- case 4:
- if (StringUtils.isBlank(schemaName)){
- throw new BizException(ResultCode.SELECT_TABLES_SCHEMA_NOT_NULL_ERR);
- }
- sql = String.format(SELECT_TABLES_MAX_COMPUTE, schemaName);
- default:
- break;
- }
-
- if (StringUtils.isBlank(sql)){
- throw new BizException(ResultCode.CONN_TYPE_NOT_SUPPORT);
- }
-
- // 执行SQL语句
- final List<LinkedHashMap<String, Object>> resultMaps = querySql(sql);
- if (ObjectUtils.isEmpty(resultMaps)){
- return Lists.newArrayList();
- }
-
- // hive单独处理
- List<TableMetaInfo> result1 = getHiveTableMetaInfos(result, resultMaps);
- if (result1 != null) return result1;
-
- // 转换结果
- return resultMaps.stream().map(
- m->{
- final TableMetaInfo info = new TableMetaInfo();
- Object tableNameObj = m.get("table_name");
- String tableName = tableNameObj == null ? m.get("TABLE_NAME") == null ? "" : String.valueOf(m.get("TABLE_NAME")) : String.valueOf(tableNameObj);
- Object tableCommentObj = m.get("table_comment");
- String tableComment = tableCommentObj == null ? m.get("TABLE_COMMENT") == null ? "" : String.valueOf(m.get("TABLE_COMMENT")) : String.valueOf(tableCommentObj);
- info.setTableName(tableName);
- info.setComment(tableComment);
- return info;
- }
- ).collect(Collectors.toList());
- }
-
-
- /**
- * 根据schemeName,表名获取字段列表
- * @param tableName 一般是数据库 oracle是用户名
- */
- public List<TableColumnMetaInfo> getColumnsByTableName(String tableName){
- try {
- List<TableColumnMetaInfo> list = new ArrayList<>();
- final DatabaseMetaData metaData = connection.getMetaData();
- final ResultSet columns = metaData.getColumns(null, null, tableName, null);
- while (columns.next()){
- String columnName = columns.getString("COLUMN_NAME");
- String remarks = columns.getString("REMARKS");
- remarks = StringUtils.isBlank(remarks) ? "" : remarks;
- final TableColumnMetaInfo metaInfo = new TableColumnMetaInfo(tableName, columnName, remarks);
- list.add(metaInfo);
- }
- return list;
- } catch (SQLException e) {
- e.printStackTrace();
- return Lists.newArrayList();
- }
- }
-
-
- /**
- * 执行sql查询
- * @param querySql 查询sql
- * @return List
- */
- public List<LinkedHashMap<String, Object>> queryData(String querySql, boolean... fullScan){
- Statement statement = null;
- ResultSet resultSet = null;
- try {
- // 创建statement
- statement = this.connection.createStatement();
-
- // 执行全表扫描sql
- for (boolean b : fullScan) {
- if (b){
- statement.execute(FULL_SCAN_MAX_COMPUTE);
- break;
- }
- }
- // 执行查询语句
- resultSet = statement.executeQuery(querySql);
-
- // 构建结果返回
- return buildListMap(resultSet);
- } catch (SQLException e) {
- e.printStackTrace();
- throw new BizException(ResultCode.SQL_EXEC_ERR);
- } finally {
- // 关闭resultSet, statement
- close(resultSet, statement);
- }
- }
-
-
- /**
- * 执行sql查询
- * @param querySql 查询sql
- * @return List
- */
- public List<LinkedHashMap<String, Object>> queryData(String querySql, Integer page, Integer size){
- Statement statement = null;
- ResultSet resultSet = null;
- try {
- // 1、替换分号
- querySql = querySql.replaceAll(";", "");
- // 创建statement
- statement = this.connection.createStatement();
- // 2、格式化SQL
- int offset = (page - 1 ) * size;
- String execSql = "";
- switch (this.dbDialect){
- case 1:
- // oracle
- execSql = String.format(SELECT_PAGE_ORACLE, querySql, offset, size);
- break;
- case 2:
- // hive
- execSql = String.format(SELECT_PAGE_HIVE, querySql, offset, size);
- break;
- case 3:
- // mysql
- execSql = String.format(SELECT_PAGE_MYSQL, querySql, offset, size);
- break;
- case 4:
- // maxCompute
- execSql = String.format(SELECT_PAGE_MAX_COMPUTE, querySql, offset, size);
- break;
- default:
- break;
- }
- // maxCompute开启全表扫描
- if (DbDialectEnum.MAX_COMPUTE.getCode().equals(this.dbDialect)){
- statement.execute(FULL_SCAN_MAX_COMPUTE);
- }
- log.info("=======>>>执行分页sql为:{}", execSql);
- // 执行查询语句
- resultSet = statement.executeQuery(execSql);
- // 构建结果返回
- return buildListMap(resultSet);
- } catch (SQLException e) {
- e.printStackTrace();
- throw new BizException(ResultCode.SQL_EXEC_ERR);
- } finally {
- // 关闭resultSet, statement
- close(resultSet, statement);
- }
- }
-
-
- /**
- * 执行分页查询
- * @param querySql 分页查询sql
- * @param page 页码 从1开始 第n页传n
- * @param size 每页记录数
- * @return 分页查询结果
- */
- public PageResult<LinkedHashMap<String, Object>> pageQueryMap(String querySql, Integer page, Integer size){
- // 1、替换分号
- querySql = querySql.replaceAll(";", "");
- String countSql = "";
- switch (this.dbDialect){
- case 1:
- // oracle
- countSql = String.format(SELECT_COUNT_ORACLE, querySql);
- break;
- case 2:
- // hive
- countSql = String.format(SELECT_COUNT_HIVE, querySql);
- break;
- case 3:
- // mysql
- countSql = String.format(SELECT_COUNT_MYSQL, querySql);
- break;
- case 4:
- // maxCompute
- countSql = String.format(SELECT_COUNT_MAX_COMPUTE, querySql);
- break;
- default:
- break;
- }
- log.info("=======>>>执行分页统计总数sql为:{}", countSql);
- // 查询总数
- final List<LinkedHashMap<String, Object>> countMap = queryData(countSql, DbDialectEnum.MAX_COMPUTE.getCode().equals(this.dbDialect));
- if (CollectionUtils.isEmpty(countMap)){
- return new PageResult<>(0L, new ArrayList<>());
- }
-
- long count = 0L;
- for (Object value : countMap.get(0).values()) {
- count = Long.parseLong(String.valueOf(value));
- }
-
- if (count == 0){
- return new PageResult<>(0L, new ArrayList<>());
- }
-
- // 执行分页查询 开启全表扫描
- final List<LinkedHashMap<String, Object>> resultList = queryData(querySql, page, size);
- return new PageResult<>(count, resultList);
- }
-
-
- /**
- * 执行分页查询
- * @param querySql 分页查询sql
- * @param page 页码 从1开始 第n页传n
- * @param size 每页记录数
- * @return 分页查询结果
- */
- public <T>PageResult<T> pageQuery(String querySql, Integer page, Integer size, Class<T> clazz){
- final PageResult<LinkedHashMap<String, Object>> result = pageQueryMap(querySql, page, size);
- List<T> rows = new ArrayList<>();
- for (LinkedHashMap<String, Object> row : result.getRows()) {
- final T t = JSONObject.parseObject(JSONObject.toJSONString(row), clazz);
- rows.add(t);
- }
- return new PageResult<>(result.getTotal(), rows);
- }
-
-
-
- /**
- * 获取hive的表注释
- * @param result 结果
- * @param resultMaps show tables结果
- * @return List
- */
- private List<TableMetaInfo> getHiveTableMetaInfos(List<TableMetaInfo> result, List<LinkedHashMap<String, Object>> resultMaps) {
- if (dbDialect.equals(DbDialectEnum.HIVE.getCode())){
- for (LinkedHashMap<String, Object> resultMap : resultMaps) {
- final String tabName = String.valueOf(resultMap.get("tab_name"));
- final String descTableCommentSql = String.format(SELECT_TABLES_2_HIVE, tabName);
- List<LinkedHashMap<String, Object>> resultMapsComments = querySql(descTableCommentSql);
- // col_name -> Detailed Table Information
- String comments = resultMapsComments.stream()
- .filter(m -> "Detailed Table Information".equals(m.get("col_name")))
- .map(m -> String.valueOf(m.get("data_type"))).findFirst()
- .orElse("");
- comments = ReUtil.get("parameters:\\{(?!.*?\\().*transient_lastDdlTime.*?comment=(.*?)\\}", comments,1);
- if (StringUtils.isBlank(comments)) {
- comments = "";
- }
- if (comments.contains(",")){
- comments = comments.substring(0, comments.lastIndexOf(","));
- }
- result.add(new TableMetaInfo(tabName, comments));
- log.info("===========>>>获取表{}的注释成功:{}", tabName, comments);
- resultMapsComments.clear();
- }
- return result;
- }
- return null;
- }
-
-
- /**
- * 执行SQL查询
- * @param sql sql语句
- * @return 数据列表,使用LinkedHashMap是为了防止HashMap序列化后导致顺序乱序
- */
- public List<LinkedHashMap<String, Object>> querySql(String sql){
- // 执行sql
- Statement statement = null;
- ResultSet resultSet = null;
- try {
- statement = connection.createStatement();
- resultSet = statement.executeQuery(sql);
- return buildListMap(resultSet);
- } catch (SQLException e) {
- e.printStackTrace();
- throw new BizException(ResultCode.SQL_EXEC_ERR);
- }finally {
- // 关闭
- close(resultSet, statement);
- }
- }
-
-
- /**
- * 关闭对象 传入多个时注意顺序, 需要先关闭哪个就传在参数前面
- * @param objs 对象动态数组
- */
- public static void close(Object ...objs){
- if (objs == null || objs.length == 0){
- return;
- }
-
- for (Object obj : objs) {
- if (obj instanceof Statement){
- try {
- ((Statement) obj).close();
- }catch (Exception e){
- e.printStackTrace();
- }
- }
-
- if (obj instanceof ResultSet){
- try {
- ((ResultSet) obj).close();
- }catch (Exception e){
- e.printStackTrace();
- }
- }
-
- if (obj instanceof Connection){
- try {
- ((Connection) obj).close();
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- }
- }
-
-
- /**
- * @Description 功能描述:将resultSet构造为List
- * @Author itdl
- * @Date 2022/4/18 21:13
- * @Param {@link ResultSet} resultSet
- * @Return {@link List < Map
>} - **/
- private List<LinkedHashMap<String, Object>> buildListMap(ResultSet resultSet) throws SQLException {
- if (resultSet == null) {
- return Lists.newArrayList();
- }
-
- List<LinkedHashMap<String, Object>> resultList = new ArrayList<>();
- // 获取元数据
- ResultSetMetaData metaData = resultSet.getMetaData();
- while (resultSet.next()) {
- // 获取列数
- int columnCount = metaData.getColumnCount();
- LinkedHashMap<String, Object> map = new LinkedHashMap<>();
- for (int i = 0; i < columnCount; i++) {
- String columnName = metaData.getColumnName(i + 1);
- // 过滤掉查询的结果包含序号的
- if("mm.row_num_01".equalsIgnoreCase(columnName)
- || "row_num_01".equalsIgnoreCase(columnName)){
- continue;
- }
-
- // 去除hive查询结果的mm.别名前缀
- if (columnName.startsWith("mm.")){
- columnName = columnName.substring(columnName.indexOf(".") + 1);
- }
-
- Object object = resultSet.getObject(columnName);
-
- // maxCompute里面的空返回的是使用\n
- if ("\\N".equalsIgnoreCase(String.valueOf(object))) {
- map.put(columnName, "");
- } else {
- map.put(columnName, object);
- }
- }
-
- resultList.add(map);
- }
- return resultList;
- }
- }
MaxCompute JDBC连接池封装
MaxCompute 已经有了JDBC连接方式 也就是 odbc-jdbc, 最终能够获取一个Connection. 官方文档:https://help.aliyun.com/document_detail/161246.html
封装MaxCompute JDBC连接参数
- /**
- * @author itdl
- * @description maxCompute使用JDBC的连接参数
- * @date 2022/08/08 10:07
- */
- @Data
- public class MaxComputeJdbcConnParam extends BaseJdbcConnParam{
- /**阿里云accessId 相当于用户名 */
- private String aliyunAccessId;
- /**阿里云accessKey 相当于密码 */
- private String aliyunAccessKey;
- /** maxcompute_endpoint */
- private String endpoint;
- /**项目名称*/
- private String projectName;
- }
封装MaxCompute JDBC连接实现类
就是实现父类AbstractConnUtil,实现抽象方法buildConnection
- /**
- * @Description maxCompute JDBC连接实现
- * @Author itdl
- * @Date 2022/08/08 14:26
- */
- @Slf4j
- public class MaxComputeJdbcUtil extends AbstractConnUtil<MaxComputeJdbcConnParam>{
- /**JDBC 驱动名称*/
- private static final String DRIVER_NAME = "com.aliyun.odps.jdbc.OdpsDriver";
- /**
- * 构造函数, 构造工具类对象
- *
- * @param connParam 连接参数
- */
- public MaxComputeJdbcUtil(MaxComputeJdbcConnParam connParam) {
- super(connParam);
- }
-
- @Override
- protected Connection buildConnection() {
- return buildConn();
- }
-
- /**
- * 创建连接
- * @return 数据库连接
- */
- private Connection buildConn() {
- try {
- Class.forName(DRIVER_NAME);
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- throw new BizException(ResultCode.MAX_COMPUTE_DRIVE_LOAD_ERR);
- }
-
- try {
- Properties dbProperties = new Properties();
- dbProperties.put("user", connParam.getAliyunAccessId());
- dbProperties.put("password", connParam.getAliyunAccessKey());
- dbProperties.put("remarks", "true");
- // JDBCURL连接模板
- String jdbcUrlTemplate = "jdbc:odps:%s?project=%s&useProjectTimeZone=true";
- // 使用驱动管理器连接获取连接
- return DriverManager.getConnection(
- String.format(jdbcUrlTemplate, connParam.getEndpoint(), connParam.getProjectName()), dbProperties);
- } catch (SQLException e) {
- e.printStackTrace();
- throw new BizException(ResultCode.CONN_USER_PWD_ERR);
- }
- }
- }
连接测试代码一起放在结尾,将会开启多个线程获取连接,然后去获取表名,表注释,字段名,字段注释,传入page, size和普通sql就可以实现分页查询的封装方法
Hive JDBC连接池封装
Hive JDBC连接参数
Hive连接参数封装,除了基础的JDBC所需字段,还需要kerberos相关字段,因为hive开启kerberos认证后,需要使用kertab密钥文件和kbr5.conf配置文件去认证。将会在参数和测试代码中得到重复的体现。
- /**
- * @Description Hive JDBC connection params
- * @Author itdl
- * @Date 2022/08/10 16:40
- */
- @Data
- @EqualsAndHashCode(callSuper = false)
- public class HiveJdbcConnParam extends BaseJdbcConnParam {
- /**
- * enable kerberos authentication
- */
- private boolean enableKerberos;
-
- /**
- * principal
- */
- private String principal;
-
- /**
- * kbr5 file path in dick
- */
- private String kbr5FilePath;
-
- /**
- * keytab file path in dick
- */
- private String keytabFilePath;
- }
Hive JDBC获取连接实现
Hive获取JDBC连接之后,本来可以从Connection的元数据中获取表的注释,但是获取的中文注释居然是乱码,但是我们Hue上查看表注释又是正常,暂时没找到这种方式如何解决,从而退而求其次,通过表名去获取建表语句,从建表语句中通过正则表达式提取表的注释。
- /**
- * @Description hive connection util
- * @Author itdl
- * @Date 2022/08/10 16:52
- */
- @Slf4j
- public class HiveConnUtil extends AbstractConnUtil<HiveJdbcConnParam>{
-
- public HiveConnUtil(HiveJdbcConnParam connParam) {
- super(connParam);
- }
-
- /**
- * 获取连接
- * @return 连接
- */
- public Connection getConnection() {
- return connection;
- }
-
- @Override
- protected Connection buildConnection(){
- try {
- // Class.forName("org.apache.hive.jdbc.HiveDriver");
- Class.forName(connParam.getDriverName());
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- throw new BizException(ResultCode.HIVE_DRIVE_LOAD_ERR);
- }
- // 开启kerberos后需要私钥
- // 拼接jdbcUrl
- String jdbcUrl = "jdbc:hive2://%s:%s/%s";
- String ip = connParam.getIp();
- String port = connParam.getPort() + "";
- String dbName = connParam.getDbName();
- final String username = connParam.getUsername();
- final String password = connParam.getPassword();
- // is enable kerberos authentication
- final boolean enableKerberos = connParam.isEnableKerberos();
- // 格式化
- Connection connection;
- // 获取连接
- try {
- Properties dbProperties = new Properties();
- dbProperties.put("user", username);
- dbProperties.put("password", password);
- // 加上remark后, 能够获取到标注释 但是会出现中文乱码
- dbProperties.put("remarks", "true");
- if (!enableKerberos) {
- jdbcUrl = String.format(jdbcUrl, ip, port, dbName);
- connection = DriverManager.getConnection(jdbcUrl, dbProperties);
- } else {
- final String principal = connParam.getPrincipal();
- final String kbr5FilePath = connParam.getKbr5FilePath();
- final String secretFilePath = connParam.getKeytabFilePath();
-
- String format = "jdbc:hive2://%s:%s/%s;principal=%s";
- jdbcUrl = String.format(format, ip, port, dbName, principal);
-
- // 使用hadoop安全认证
- System.setProperty("java.security.krb5.conf", kbr5FilePath);
- System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
- // 解决windows中执行可能出现找不到HADOOP_HOME或hadoop.home.dir问题
- // Kerberos认证
- org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
- conf.set("hadoop.security.authentication", "Kerberos");
- conf.set("keytab.file", secretFilePath);
- conf.set("kerberos.principal", principal);
- UserGroupInformation.setConfiguration(conf);
- try {
- UserGroupInformation.loginUserFromKeytab(username, secretFilePath);
- } catch (IOException e) {
- e.printStackTrace();
- throw new BizException(ResultCode.KERBEROS_AUTH_FAIL_ERR);
- }
- try {
- connection = DriverManager.getConnection(jdbcUrl, dbProperties);
- } catch (SQLException e) {
- e.printStackTrace();
- throw new BizException(ResultCode.KERBEROS_AUTH_SUCCESS_GET_CONN_FAIL_ERR);
- }
- }
- log.info("=====>>>获取hive连接成功:username:{},jdbcUrl: {}", username, jdbcUrl);
- return connection;
- } catch (SQLException e) {
- e.printStackTrace();
- throw new BizException(ResultCode.HIVE_CONN_USER_PWD_ERR);
- } catch (BizException e){
- throw e;
- }
- catch (Exception e) {
- e.printStackTrace();
- throw new BizException(ResultCode.HIVE_CONN_ERR);
- }
- }
- }
Oracle JDBC连接参数封装
只需要继承父类即可
- /**
- * @Description Oracle连接的JDBC参数
- * @Author itdl
- * @Date 2022/08/15 09:50
- */
- public class OracleJdbcConnParam extends BaseJdbcConnParam{
-
- }
Oracle JDBC连接实现类
包括了普通用户的认证和dba用户的认证
- /**
- * @Description Oracle获取jdbc连接工具类
- * @Author itdl
- * @Date 2022/08/15 09:52
- */
- @Slf4j
- public class OracleConnUtil extends AbstractConnUtil<OracleJdbcConnParam> {
- /**
- * 构造函数, 构造工具类对象
- *
- * @param connParam 连接参数
- */
- public OracleConnUtil(OracleJdbcConnParam connParam) {
- super(connParam);
- }
-
- @Override
- protected Connection buildConnection() {
- try {
- Class.forName("oracle.jdbc.driver.OracleDriver");
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- throw new BizException(ResultCode.ORACLE_DRIVE_LOAD_ERR);
- }
- // 拼接jdbcUrl
- String jdbcUrl = "jdbc:oracle:thin:@//%s:%s/%s";
- final String ip = connParam.getIp();
- final String port = connParam.getPort() + "";
- final String dbName = connParam.getDbName();
- final String username = connParam.getUsername();
- final String password = connParam.getPassword();
-
- // 格式化
- jdbcUrl = String.format(jdbcUrl, ip, port, dbName);
- // 获取连接
- Connection connection;
- try {
- Properties dbProperties = new Properties();
- // 用户名 如果是dba,则后面跟了as sysdba
- String dba = "as sysdba";
- dbProperties.put("password", password);
- dbProperties.put("remarks", "true");
- if (username.trim().endsWith(dba)) {
- dbProperties.put("user", username.trim().substring(0, username.trim().indexOf(dba) - 1));
- dbProperties.put("defaultRowPrefetch", "15");
- dbProperties.put("internal_logon", "sysdba");
- connection = DriverManager.getConnection(jdbcUrl, dbProperties);
- } else {
- dbProperties.put("user", username);
- connection = DriverManager.getConnection(jdbcUrl, dbProperties);
- }
- log.info("=====>>>获取oracle连接成功:username:{}, jdbcUrl: {}", username, jdbcUrl);
- return connection;
- } catch (SQLException e) {
- e.printStackTrace();
- if (e.getMessage().contains("TNS:listener")) {
- throw new BizException(ResultCode.CONN_LISTENER_UNKNOWN_ERR);
- }
- if (e.getMessage().contains("ORA-01017")) {
- throw new BizException(ResultCode.CONN_USER_PWD_ERR);
- }
-
- if (e.getMessage().contains("IO 错误: Got minus one from a read call")) {
- throw new BizException(ResultCode.CONN_CONN_TOO_MANY_ERR);
- }
- throw new BizException(ResultCode.CONN_UNKNOWN_ERR);
- } catch (Exception e) {
- throw new BizException(ResultCode.CONN_UNKNOWN_ERR);
- }
- }
- }
Mysql JDBC连接池封装
Mysql JDBC连接参数封装
只需要继承父类即可
- /**
- * @Description Mysql连接的JDBC参数
- * @Author itdl
- * @Date 2022/08/15 09:50
- */
- public class MysqlJdbcConnParam extends BaseJdbcConnParam{
-
- }
Mysql JDBC连接实现
需要注意的是连接的属性里面配置useInformationSchema=true,表示可以直接从Connection中获取表和字段的注释。
- /**
- * @Description Mysql获取jdbc连接工具类
- * @Author itdl
- * @Date 2022/08/15 09:52
- */
- @Slf4j
- public class MysqlConnUtil extends AbstractConnUtil<MysqlJdbcConnParam> {
- /**
- * 构造函数, 构造工具类对象
- *
- * @param connParam 连接参数
- */
- public MysqlConnUtil(MysqlJdbcConnParam connParam) {
- super(connParam);
- }
-
- @Override
- protected Connection buildConnection() {
- try {
- Class.forName("com.mysql.cj.jdbc.Driver");
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- throw new BizException(ResultCode.MYSQL_DRIVE_LOAD_ERR);
- }
-
- // 拼接jdbcUrl
- String jdbcUrl = "jdbc:mysql://%s:%s/%s?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8";
-
- final String ip = connParam.getIp();
- final String port = connParam.getPort() + "";
- final String dbName = connParam.getDbName();
- final String username = connParam.getUsername();
- final String password = connParam.getPassword();
-
- // 格式化
- jdbcUrl = String.format(jdbcUrl, ip, port, dbName);
- // 获取连接
- try {
- Properties dbProperties = new Properties();
- dbProperties.put("user", username);
- dbProperties.put("password", password);
- dbProperties.put("remarks", "true");
- // 设置可以获取tables remarks信息
- dbProperties.setProperty("useInformationSchema", "true");
- Connection connection = DriverManager.getConnection(jdbcUrl,dbProperties);
- log.info("=====>>>获取mysql连接成功:username:{}, jdbcUrl: {}", username, jdbcUrl);
- return connection;
- } catch (SQLException e) {
- e.printStackTrace();
- if (e.getMessage().contains("Unknown database")){
- throw new BizException(ResultCode.CONN_UNKNOWN_DB_ERR);
- }
- throw new BizException(ResultCode.CONN_USER_PWD_ERR);
- } catch (Exception e) {
- throw new BizException(ResultCode.CONN_UNKNOWN_ERR);
- }
- }
- }
测试代码连接各自数据库
- @SpringBootTest(classes = DbConnectionDemoApplication.class)
- @RunWith(value = SpringRunner.class)
- @Slf4j
- class DbConnectionDemoApplicationTests {
-
- private DbConnPool<?> connPool = null;
-
- @Test
- public void testMysqlConn() throws InterruptedException {
- // 创建连接参数
- final MysqlJdbcConnParam connParam = new MysqlJdbcConnParam();
- final String ip = "localhost";
- final Integer port = 3306;
- final String username = "root";
- final String password = "root";
- final String dbname = "test_db";
-
- // 设置参数
- connParam.setDriverName(Driver.class.getName());
- connParam.setIp(ip);
- connParam.setPort(port);
- connParam.setUsername(username);
- connParam.setPassword(password);
- connParam.setDbName(dbname);
-
- // 创建连接池
- connPool = new DbConnPool<>(connParam, 2);
-
- handler01(dbname, DbDialectEnum.MYSQL);
-
- new Thread(() -> handler01(dbname, DbDialectEnum.MYSQL)).start();
- new Thread(() -> handler01(dbname, DbDialectEnum.MYSQL)).start();
-
- Thread.sleep(60 * 1000);
- }
-
-
- @Test
- public void testOracleConn() throws InterruptedException {
- // 创建连接参数
- final OracleJdbcConnParam connParam = new OracleJdbcConnParam();
- final String ip = "你的Oracle的IP地址";
- final Integer port = 1521;
- // 如果是admin账号 用户后面+ as sysdba
- final String username = "用户名";
- final String password = "密码";
- final String dbname = "实例/服务名";
-
- // 设置参数
- connParam.setDriverName(Driver.class.getName());
- connParam.setIp(ip);
- connParam.setPort(port);
- connParam.setUsername(username);
- connParam.setPassword(password);
- connParam.setDbName(dbname);
-
- // 创建连接池
- connPool = new DbConnPool<>(connParam, 2);
-
- final DbDialectEnum dbDialectEnum = DbDialectEnum.ORACLE;
-
- // 处理操作(oracle的schemaName就是用户名)
- handler01(username, dbDialectEnum);
-
- // 新建两个线程获取连接
- new Thread(() -> handler01(username, dbDialectEnum)).start();
- new Thread(() -> handler01(username, dbDialectEnum)).start();
-
- Thread.sleep(60 * 1000);
- }
-
-
- @Test
- public void testHiveConn() throws InterruptedException {
- // 创建连接参数
- final HiveJdbcConnParam connParam = new HiveJdbcConnParam();
- final String ip = "连接的域名";
- final Integer port = 10000;
- // 如果是admin账号 用户后面+ as sysdba
- final String username = "账号@域名";
- final String password = "";
- final String dbname = "数据库名";
- final String principal = "hive/_HOST@域名";
- final String kbr5FilePath = "C:\\workspace\\krb5.conf";
- final String keytabFilePath = "C:\\workspace\\zhouyu.keytab";
-
- // 设置参数
- connParam.setDriverName(Driver.class.getName());
- connParam.setIp(ip);
- connParam.setPort(port);
- connParam.setUsername(username);
- connParam.setPassword(password);
- connParam.setDbName(dbname);
- connParam.setEnableKerberos(true);
- connParam.setPrincipal(principal);
- connParam.setKbr5FilePath(kbr5FilePath);
- connParam.setKeytabFilePath(keytabFilePath);
-
- // 创建连接池
- connPool = new DbConnPool<>(connParam, 2);
-
- final DbDialectEnum dbDialectEnum = DbDialectEnum.HIVE;
-
- // 处理操作(oracle的schemaName就是用户名)
- handler01(username, dbDialectEnum);
-
- // 新建两个线程获取连接
- new Thread(() -> handler01(username, dbDialectEnum)).start();
- new Thread(() -> handler01(username, dbDialectEnum)).start();
-
- Thread.sleep(10 * 60 * 1000);
- }
-
-
- @Test
- public void testMaxComputeConn() throws InterruptedException {
- // 创建连接参数
- final MaxComputeJdbcConnParam connParam = new MaxComputeJdbcConnParam();
- String accessId = "你的阿里云accessId";
- String accessKey = "你的阿里云accessKey";
- String endpoint = "http://service.cn-chengdu.maxcompute.aliyun.com/api";
- String projectName = "项目名=数据库名";
-
- // 设置参数
- connParam.setDriverName(Driver.class.getName());
- connParam.setAliyunAccessId(accessId);
- connParam.setAliyunAccessKey(accessKey);
- connParam.setEndpoint(endpoint);
- connParam.setProjectName(projectName);
-
- // 创建连接池
- connPool = new DbConnPool<>(connParam, 2);
-
- final DbDialectEnum dbDialectEnum = DbDialectEnum.MAX_COMPUTE;
-
- // 处理操作(oracle的schemaName就是用户名)
- handler01(projectName, dbDialectEnum);
-
- // 新建两个线程获取连接
- new Thread(() -> handler01(projectName, dbDialectEnum)).start();
- new Thread(() -> handler01(projectName, dbDialectEnum)).start();
-
- Thread.sleep(60 * 1000);
- }
-
-
- private void handler01(String schemaName, DbDialectEnum dbDialectEnum) {
- final Connection connection = connPool.getConnection();
-
- // 构建工具类
- final SqlUtil sqlUtil = new SqlUtil(connection, dbDialectEnum.getCode());
-
- // 获取表和注释
- final List<TableMetaInfo> tables = sqlUtil.getTables(schemaName);
- log.info("===============获取所有表和注释开始===================");
- log.info(tables.toString());
- log.info("===============获取所有表和注释结束===================");
-
- // 获取字段和注释
- final String tableName = tables.get(0).getTableName();
- final List<TableColumnMetaInfo> columns = sqlUtil.getColumnsByTableName(tableName);
- log.info("===============获取第一个表的字段和注释开始===================");
- log.info(columns.toString());
- log.info("===============获取第一个表的字段和注释结束===================");
-
-
- final PageResult<LinkedHashMap<String, Object>> pageResult = sqlUtil.pageQueryMap("select * from " + tableName, 1, 10);
- log.info("===============SQL分页查询开始===================");
- log.info("总数:{}", pageResult.getTotal());
- log.info("记录数:{}", JSONObject.toJSONString(pageResult.getRows()));
- log.info("===============SQL分页查询结束===================");
-
- connPool.freeConnection(connection);
- }
-
- @After
- public void close(){
- if (connPool != null){
- connPool.close();
- log.info("==================连接池成功关闭================");
- }
- }
- }