
可以设计了数据源异常, 已经工厂接口, 三种不通的实现, 我们主要看pooled和unpooled的设计。
/**
* @author Clinton Begin 数据源工厂接口
*/
public interface DataSourceFactory {
/*设置属性*/
void setProperties(Properties props);
/*获取数据源*/
DataSource getDataSource();
}
public class UnpooledDataSource implements DataSource {
/*类加载器*/
private ClassLoader driverClassLoader;
/*驱动属性配置*/
private Properties driverProperties;
/*所有的驱动*/
private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<>();
/*驱动*/
private String driver;
/*地址链接*/
private String url;
/*用户名*/
private String username;
/*密码*/
private String password;
/*是否自动提交*/
private Boolean autoCommit;
/*默认的事务隔离级别*/
private Integer defaultTransactionIsolationLevel;
/*网络链接超时时间*/
private Integer defaultNetworkTimeout;
}
/*静态代码块注册驱动*/
static {
/*DriverManager*/
Enumeration<Driver> drivers = DriverManager.getDrivers();
while (drivers.hasMoreElements()) {
Driver driver = drivers.nextElement();
registeredDrivers.put(driver.getClass().getName(), driver);
}
}
private Connection doGetConnection(Properties properties) throws SQLException {
initializeDriver(); //初始化驱动
Connection connection = DriverManager.getConnection(url, properties); //获取连接
configureConnection(connection); //对获取的连接进行一些配置
return connection;
}
上面一个步骤获取的是通过spi机制加载的所有驱动, 但是我们自己传入的驱动名称有咩有被加载需要进行一个初始化判断。
private synchronized void initializeDriver() throws SQLException {
/*我们自己传入的驱动名称是否已经被spi机制加载*/
if (!registeredDrivers.containsKey(driver)) {
Class<?> driverType;
try {
if (driverClassLoader != null) { /*进行加载*/
driverType = Class.forName(driver, true, driverClassLoader);
} else {
driverType = Resources.classForName(driver);
}
// DriverManager requires the driver to be loaded via the system ClassLoader.
// http://www.kfu.com/~nsayer/Java/dyn-jdbc.html
Driver driverInstance = (Driver) driverType.getDeclaredConstructor().newInstance(); //获取驱动的实例
DriverManager.registerDriver(new DriverProxy(driverInstance));/*静态代理, 注册驱动*/
registeredDrivers.put(driver, driverInstance);
} catch (Exception e) {
throw new SQLException("Error setting driver on UnpooledDataSource. Cause: " + e);
}
}
}
private void configureConnection(Connection conn) throws SQLException {
if (defaultNetworkTimeout != null) { /*超时时间*/
conn.setNetworkTimeout(Executors.newSingleThreadExecutor(), defaultNetworkTimeout);
}
if (autoCommit != null && autoCommit != conn.getAutoCommit()) {
conn.setAutoCommit(autoCommit);/*是否自动提交*/
}
if (defaultTransactionIsolationLevel != null) {/*隔离级别*/
conn.setTransactionIsolation(defaultTransactionIsolationLevel);
}
}
看了这个配置,有没有觉得又是一个很恐怖的地方。。。,为啥还需要传入一个线程池。。
到这里非池化的数据源实现就已经全部看完了, 代码是不难的
public class PooledDataSource implements DataSource {
private static final Log log = LogFactory.getLog(PooledDataSource.class);
/*管理连接资源的对象*/
private final PoolState state = new PoolState(this);
/*使用非赤池化连接源获取连接*/
private final UnpooledDataSource dataSource;
}
public class PoolState {
protected PooledDataSource dataSource;
/*空闲连接*/
protected final List<PooledConnection> idleConnections = new ArrayList<>();
/*活跃连接*/
protected final List<PooledConnection> activeConnections = new ArrayList<>();
}
从以上结构可以看出 其实Mybatis池化数据源就是套的非池化数据源,然后使用一个连接管理池对象保存连接。
private PooledConnection popConnection(String username, String password) throws SQLException {
boolean countedWait = false;
PooledConnection conn = null;
long t = System.currentTimeMillis();
int localBadConnectionCount = 0;
while (conn == null) {
synchronized (state) { //对同一个的池状态连接池进行加锁,保证线程安全
if (!state.idleConnections.isEmpty()) { //空闲连接数不是空的
// Pool has available connection
conn = state.idleConnections.remove(0); //获取一个连接
if (log.isDebugEnabled()) {
log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
}
} else { //空闲连接数没有了
// Pool does not have available connection
if (state.activeConnections.size() < poolMaximumActiveConnections) { //活跃连接数没有达到最大连接数
// Can create new connection 创建一个连接注意 这里是代理过的
conn = new PooledConnection(dataSource.getConnection(), this);
if (log.isDebugEnabled()) {
log.debug("Created connection " + conn.getRealHashCode() + ".");
}
} else { //不能创建新的连接
// Cannot create new connection
PooledConnection oldestActiveConnection = state.activeConnections.get(0);
long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();/*获取最老的活跃连接的持有时间*/
if (longestCheckoutTime > poolMaximumCheckoutTime) { //是否到达了最大持有时间
// Can claim overdue connection 持有时间到了,需要给别人用了
state.claimedOverdueConnectionCount++; /*到达最大持有时间放弃持有连接的数量+1*/
state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;/*持有时间总计*/
state.accumulatedCheckoutTime += longestCheckoutTime;/*持有时间总计*/
state.activeConnections.remove(oldestActiveConnection); //从活跃连接中移除
if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {//不是自动提交
try {
oldestActiveConnection.getRealConnection().rollback(); //进行回滚
} catch (SQLException e) {
/*
Just log a message for debug and continue to execute the following
statement like nothing happened.
Wrap the bad connection with a new PooledConnection, this will help
to not interrupt current executing thread and give current thread a
chance to join the next competition for another valid/good database
connection. At the end of this loop, bad {@link @conn} will be set as null.
*/
log.debug("Bad connection. Could not roll back");
}
}
/*创建代理连接*/
conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
oldestActiveConnection.invalidate(); //设置老的连接是无效的
if (log.isDebugEnabled()) {
log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
}
} else { //说明没有可以用的了, 需要进行阻塞
// Must wait
try {
if (!countedWait) {
state.hadToWaitCount++;/*阻塞次数 + 1*/
countedWait = true;
}
if (log.isDebugEnabled()) {
log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
}
long wt = System.currentTimeMillis();
state.wait(poolTimeToWait); /*限时等待*/
state.accumulatedWaitTime += System.currentTimeMillis() - wt;
} catch (InterruptedException e) {
break;
}
}
}
}
if (conn != null) {
// ping to server and check the connection is valid or not
if (conn.isValid()) {
if (!conn.getRealConnection().getAutoCommit()) { //进行回滚
conn.getRealConnection().rollback();
}
conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
conn.setCheckoutTimestamp(System.currentTimeMillis());
conn.setLastUsedTimestamp(System.currentTimeMillis());
state.activeConnections.add(conn);
state.requestCount++;
state.accumulatedRequestTime += System.currentTimeMillis() - t;
} else {
if (log.isDebugEnabled()) {
log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
}
state.badConnectionCount++;
localBadConnectionCount++;
conn = null;
if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) {
if (log.isDebugEnabled()) {
log.debug("PooledDataSource: Could not get a good connection to the database.");
}
throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
}
}
}
}
}
if (conn == null) {
if (log.isDebugEnabled()) {
log.debug("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection.");
}
throw new SQLException("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection.");
}
return conn;
}
整体逻辑代码是不难的。
里面需要注意的是 连接对象并不是直接使用的使用驱动获取的连接对象,为了对连接支持多样的控制,比如最大持有时间等,需要记录封住有一个对象,这个对象持有真实的连接对象和其他控制参数。
class PooledConnection implements InvocationHandler {
/*连接关闭方法名称*/
private static final String CLOSE = "close";
/*连接的JDBC规范接口*/
private static final Class<?>[] IFACES = new Class<?>[] { Connection.class };
private final int hashCode;
/*池化数据源*/
private final PooledDataSource dataSource;
/*真实的连接,即从驱动获取的连接对象*/
private final Connection realConnection;
/*对真实的连接代理之后的代理对象*/
private final Connection proxyConnection;
/*被线程持有时间*/
private long checkoutTimestamp;
/*创建时间*/
private long createdTimestamp;
/*上一次使用时间*/
private long lastUsedTimestamp;
/*连接类型编码*/
private int connectionTypeCode;
/*是否有效*/
private boolean valid;
}
重点来了, 熟悉动态代理的应该会注意到Mybatis自己封装的PooledConnection 实现动态代理的接口InvocationHandler, 目的是为了什么了呢? 想一下, 数据源里面只有获取连接的方法,没有回收的方法,那么连接回收要在哪里做呢?
方法1: 在我们实现数据源之后添加一个回收的方法。 里面进行回收逻辑。
方法2: 因为连接最终是要关闭的,所以当连接要被释放的时候需要调用连接对象的close方法。我们可以对这个方法进行代理, 把当前要回收的连接重新放入空闲连接里面,而不是真正调用连接对象的回收方法。
如果使用方法1,绝对是一种很不好的做法。 因为作为规范数据源接口并没有回收的方法, 如果在实现类里面硬加一个回收方法让用户去调用,是不可取。
而方法二 close作为规范连接接口的方法,任何连接的回收都需要调用close, 所以只需要进行增强一下,别人在使用的时候都是无感知的。
可见连接已经被代理了在构造方法里面。。。。
public PooledConnection(Connection connection, PooledDataSource dataSource) {
this.hashCode = connection.hashCode();
this.realConnection = connection;
this.dataSource = dataSource;
this.createdTimestamp = System.currentTimeMillis();
this.lastUsedTimestamp = System.currentTimeMillis();
this.valid = true;
this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this);
}
代理逻辑代码
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName(); /*获取执行的方法名*/
if (CLOSE.equals(methodName)) { /*说明调用的是关闭连接的方法*/
dataSource.pushConnection(this);/*调用回收的方法*/
return null;
}
try {
if (!Object.class.equals(method.getDeclaringClass())) {
// issue #579 toString() should never fail
// throw an SQLException instead of a Runtime
checkConnection(); //判断连接是否有效
}
return method.invoke(realConnection, args);
} catch (Throwable t) {
throw ExceptionUtil.unwrapThrowable(t);
}
}
可以如果调用了close方法, 调用的还是数据源自己的方法,和上面说的方法1的实现是不是打脸了呢? pushConnection 方法声明的是受保护的,只能自己使用,用户一般不能直接使用。所以这个方法用户也是无感知的。
protected void pushConnection(PooledConnection conn) throws SQLException {
synchronized (state) {/*加锁*/
state.activeConnections.remove(conn);/*从活跃连接中移除*/
if (conn.isValid()) {/*如果连接还有效*/
/*如果空闲连接数还没有达到最大值*/
if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
state.accumulatedCheckoutTime += conn.getCheckoutTime();//持有时间累加
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
/*创建新的保证连接对象*/
PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
/*加入空闲连接*/
state.idleConnections.add(newConn);
newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
conn.invalidate();
if (log.isDebugEnabled()) {
log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
}
/*唤醒阻塞的线程*/
state.notifyAll();
} else {
state.accumulatedCheckoutTime += conn.getCheckoutTime();
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
/*关闭连接*/
conn.getRealConnection().close();
if (log.isDebugEnabled()) {
log.debug("Closed connection " + conn.getRealHashCode() + ".");
}
/*置为无效*/
conn.invalidate();
}
} else {
if (log.isDebugEnabled()) {
log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
}
state.badConnectionCount++;
}
}
}
整个连接对象是存在于连接池中, 如果连接池对象被垃圾回收掉了, 那么这连接对象怎么释放资源(Sckoct连接时操作系统底层通信机制,需要占用系统资源,不使用了需要手动进行关闭回收)
所以还需要一个收尾工作, 在java中对象要被垃圾回收都会调用finalize 方法, 所以在整个方法中释放连接资源。
protected void finalize() throws Throwable {
forceCloseAll();
super.finalize();
}
public void forceCloseAll() {
synchronized (state) {//加搜
expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
for (int i = state.activeConnections.size(); i > 0; i--) {
try {
PooledConnection conn = state.activeConnections.remove(i - 1);
conn.invalidate();
Connection realConn = conn.getRealConnection();
if (!realConn.getAutoCommit()) {
realConn.rollback();
}
realConn.close();
} catch (Exception e) {
// ignore
}
}
for (int i = state.idleConnections.size(); i > 0; i--) {
try {
PooledConnection conn = state.idleConnections.remove(i - 1);
conn.invalidate();
Connection realConn = conn.getRealConnection();
if (!realConn.getAutoCommit()) {
realConn.rollback();
}
realConn.close();
} catch (Exception e) {
// ignore
}
}
}
if (log.isDebugEnabled()) {
log.debug("PooledDataSource forcefully closed/removed all connections.");
}
}
整体逻辑就是 回滚置为无效关闭连接。。
Mybatis 得池化和非飞池化 的实现整体来说还是比较容易理解的。