• Hikari连接池2--获取和归还连接


    基于SpringBoot 2.2.7.RELEASE 依赖的 HikariCP 3.4.3。
    源码包中源码和实际Class文件反编译代码有出入,以Class反编译代码为准。
    Hikari连接池有两篇
    Hikari连接池1–初始化连接池
    Hikari连接池2–获取和归还连接

    3、获取连接

    //com.zaxxer.hikari.HikariDataSource#getConnection()
    public Connection getConnection() throws SQLException {
        if (isClosed()) {
            throw new SQLException("HikariDataSource " + this + " has been closed.");
        }
    	//fastPathPool 非空,则从 fastPathPool 获取连接
        if (fastPathPool != null) {
            return fastPathPool.getConnection();
        }
    
        // See http://en.wikipedia.org/wiki/Double-checked_locking#Usage_in_Java
        HikariPool result = pool;
        if (result == null) {
            synchronized (this) {
                result = pool;
                if (result == null) {
                    validate();
                    LOGGER.info("{} - Starting...", getPoolName());
                    try {
                        //创建新的连接池,代码在前面
                        pool = result = new HikariPool(this);
                        this.seal();
                    }
                    catch (PoolInitializationException pie) {
                        if (pie.getCause() instanceof SQLException) {
                            throw (SQLException) pie.getCause();
                        }
                        else {
                            throw pie;
                        }
                    }
                    LOGGER.info("{} - Start completed.", getPoolName());
                }
            }
        }
    	//从 pool 中获取连接
    	return result.getConnection();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    从连接池中获取连接

    //com.zaxxer.hikari.pool.HikariPool#getConnection()
    public Connection getConnection() throws SQLException {
       return getConnection(connectionTimeout);
    }
    
    • 1
    • 2
    • 3
    • 4
    private final ConcurrentBag<PoolEntry> connectionBag;
    //这个锁包装了一个 Semaphore
    private final SuspendResumeLock suspendResumeLock;
    //com.zaxxer.hikari.pool.HikariPool#getConnection(long)
    public Connection getConnection(final long hardTimeout) throws SQLException {
        //获取锁
        suspendResumeLock.acquire();
        //开始时间
        final long startTime = currentTime();
    
        try {
            //保存超时时间
            long timeout = hardTimeout;
            //时间有剩余,尝试获取连接,这里会一直尝试获取连接,类似于自旋锁
            do {
                //获取poolEntry
                PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS);
                //超时未获取到,跳出,获取连接异常
                if (poolEntry == null) {
                    break; // We timed out... break and throw exception
                }
    
                final long now = currentTime();
                if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > aliveBypassWindowMs && !isConnectionAlive(poolEntry.connection))) {
                    closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE);
                    timeout = hardTimeout - elapsedMillis(startTime);
                }
                else {
                    metricsTracker.recordBorrowStats(poolEntry, startTime);
                    return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);
                }
            } while (timeout > 0L);
    
            metricsTracker.recordBorrowTimeoutStats(startTime);
            throw createTimeoutException(startTime);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new SQLException(poolName + " - Interrupted during connection acquisition", e);
        }
        finally {
            suspendResumeLock.release();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    3.1、从ConcurrentBag中获取PoolEntry

    当获取不到entry时,会阻塞timeout的时间

    private final SynchronousQueue<T> handoffQueue;
    private final CopyOnWriteArrayList<T> sharedList;
    private final AtomicInteger waiters;
    private final ThreadLocal<List<Object>> threadList;
    //com.zaxxer.hikari.util.ConcurrentBag#borrow
    public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException {
        // Try the thread-local list first
        //先从threadlocal中尝试获取
        final List<Object> list = threadList.get();
        for (int i = list.size() - 1; i >= 0; i--) {
            final Object entry = list.remove(i);
            @SuppressWarnings("unchecked")
            final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
            //如果从ThreadLocal中获取到了并且将entry的状态设置为正在使用,则返回
            if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
                return bagEntry;
            }
        }
    
        // Otherwise, scan the shared list ... then poll the handoff queue
        //ThreadLocal中没有获取到,则从pool中尝试获取
        //增加等到获取连接的数量
        final int waiting = waiters.incrementAndGet();
        try {
            //遍历bag list
            for (T bagEntry : sharedList) {
                //如果将空闲的entry设置为正在使用,则将当前bag返回
                if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
                    // If we may have stolen another waiter's connection, request another bag add.
                    if (waiting > 1) {
                        listener.addBagItem(waiting - 1);
                    }
                    return bagEntry;
                }
            }
    
            listener.addBagItem(waiting);
    		//没有获取到bag
            timeout = timeUnit.toNanos(timeout);
            do {
                final long start = currentTime();
                //尝试从 handoffQueue 中获取,核心线程池之外的线程
                final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
                //不管是否获取到,都返回,获取到了,返回线程,没获取到,看是否获取连接超时异常
                if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
                    return bagEntry;
                }
    
                timeout -= elapsedNanos(start);
            } while (timeout > 10_000);
    
            return null;
        }
        finally {
            //减少等待获取连接的计数
            waiters.decrementAndGet();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    3.2、创建Connection

    //com.zaxxer.hikari.pool.PoolEntry#createProxyConnection
    Connection createProxyConnection(final ProxyLeakTask leakTask, final long now) {
        return ProxyFactory.getProxyConnection(this, connection, openStatements, leakTask, now, isReadOnly, isAutoCommit);
    }
    
    • 1
    • 2
    • 3
    • 4

    创建HikariProxyConnection

    //com.zaxxer.hikari.pool.ProxyFactory#getProxyConnection
    static ProxyConnection getProxyConnection(PoolEntry entry, Connection connection, FastList<Statement> openStatements, ProxyLeakTask leakTask, long now, boolean isReadOnly, boolean isAutoCommit) {
        return new HikariProxyConnection(entry, connection, openStatements, leakTask, now, isReadOnly, isAutoCommit);
    }
    
    • 1
    • 2
    • 3
    • 4
    protected HikariProxyConnection(PoolEntry entry, Connection connection, FastList openStatements, ProxyLeakTask leakTask, long now, boolean isReadOnly, boolean isAutoCommit) {
        super(entry, connection, openStatements, leakTask, now, isReadOnly, isAutoCommit);
    }
    
    • 1
    • 2
    • 3

    HikariProxyConnection 继承了 ProxyConnection

    protected ProxyConnection(final PoolEntry poolEntry,
                                 final Connection connection,
                                 final FastList<Statement> openStatements,
                                 final ProxyLeakTask leakTask,
                                 final long now,
                                 final boolean isReadOnly,
                                 final boolean isAutoCommit) {
        this.poolEntry = poolEntry;
        this.delegate = connection;
        this.openStatements = openStatements;
        this.leakTask = leakTask;
        this.lastAccess = now;
        this.isReadOnly = isReadOnly;
        this.isAutoCommit = isAutoCommit;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    4、关闭连接

    HikariProxyConnection 没有实现 close 方法,因此关闭连接在他的父类

    ProxyConnection 实现了 Connection,重写了 close 方法

    private int dirtyBits;
    private boolean isAutoCommit;
    private boolean isCommitStateDirty;
    protected Connection delegate;
    //com.zaxxer.hikari.pool.ProxyConnection#close
    public final void close() throws SQLException {
        // Closing statements can cause connection eviction, so this must run before the conditional below
        //关闭连接绑定的所有statement
        closeStatements();
    	//连接不是关闭状态
        if (delegate != ClosedConnection.CLOSED_CONNECTION) {
            //关闭任务
            leakTask.cancel();
    
            try {
                if (isCommitStateDirty && !isAutoCommit) {
                    //回滚
                    delegate.rollback();
                    //更新最后访问时间
                    lastAccess = currentTime();
                    LOGGER.debug("{} - Executed rollback on connection {} due to dirty commit state on close().", poolEntry.getPoolName(), delegate);
                }
    
                if (dirtyBits != 0) {
                    //重置连接状态
                    poolEntry.resetConnectionState(this, dirtyBits);
                    lastAccess = currentTime();
                }
    
                delegate.clearWarnings();
            }
            catch (SQLException e) {
                // when connections are aborted, exceptions are often thrown that should not reach the application
                if (!poolEntry.isMarkedEvicted()) {
                    throw checkException(e);
                }
            }
            finally {
                delegate = ClosedConnection.CLOSED_CONNECTION;
                poolEntry.recycle(lastAccess);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    4.1、关闭连接绑定的所有statement

    //实现了List接口
    private final FastList<Statement> openStatements;
    //com.zaxxer.hikari.pool.ProxyConnection#closeStatements
    private synchronized void closeStatements() {
        final int size = openStatements.size();
        if (size > 0) {
            for (int i = 0; i < size && delegate != ClosedConnection.CLOSED_CONNECTION; i++) {
                try (Statement ignored = openStatements.get(i)) {
                    // automatic resource cleanup
                }
                catch (SQLException e) {
                    LOGGER.warn("{} - Connection {} marked as broken because of an exception closing open statements during Connection.close()",
                                poolEntry.getPoolName(), delegate);
                    leakTask.cancel();
                    poolEntry.evict("(exception closing Statements during Connection.close())");
                    delegate = ClosedConnection.CLOSED_CONNECTION;
                }
            }
    
            openStatements.clear();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    4.2、获取关闭的连接

    private static final class ClosedConnection {
        static final Connection CLOSED_CONNECTION = getClosedConnection();
    
        private static Connection getClosedConnection() {
            InvocationHandler handler = (proxy, method, args) -> {
                final String methodName = method.getName();
                if ("isClosed".equals(methodName)) {
                    return Boolean.TRUE;
                }
                else if ("isValid".equals(methodName)) {
                    return Boolean.FALSE;
                }
                if ("abort".equals(methodName)) {
                    return Void.TYPE;
                }
                if ("close".equals(methodName)) {
                    return Void.TYPE;
                }
                else if ("toString".equals(methodName)) {
                    return ClosedConnection.class.getCanonicalName();
                }
    
                throw new SQLException("Connection is closed");
            };
    
            return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), new Class[] { Connection.class }, handler);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    4.3、归还线程

    //com.zaxxer.hikari.pool.PoolEntry#recycle
    void recycle(final long lastAccessed) {
        if (connection != null) {
            this.lastAccessed = lastAccessed;
            hikariPool.recycle(this);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    //com.zaxxer.hikari.pool.HikariPool#recycle
    void recycle(final PoolEntry poolEntry) {
        metricsTracker.recordConnectionUsage(poolEntry);
    
        connectionBag.requite(poolEntry);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    //com.zaxxer.hikari.util.ConcurrentBag#requite
    public void requite(final T bagEntry) {
        //将entry的状态重新置为未使用,这里不需要加锁,因为只有获取到连接的线程才可以释放连接
        bagEntry.setState(STATE_NOT_IN_USE);
    
        //遍历等待队列,唤醒等待者,尝试获取连接
        for (int i = 0; waiters.get() > 0; i++) {
            //bagEntry.getState() != STATE_NOT_IN_USE 这里是因为又被别的线程获取到连接了
            //handoffQueue.offer 又放回到扩展线程队列里面了,代表当前连接池核心连接够用
            if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
                return;
            }//唤醒等待线程
            else if ((i & 0xff) == 0xff) {
                parkNanos(MICROSECONDS.toNanos(10));
            }
            else {
                Thread.yield();
            }
        }
    
        final List<Object> threadLocalList = threadList.get();
        if (threadLocalList.size() < 50) {
            threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    5、总结

    个人感觉 Hikari 连接池快有几点原因:

    1、将核心连接池和额外连接池进行区分,核心连接池用 CopyOnWriteArrayList,额外连接池用 SynchronousQueue,这样,连接的创建销毁不会影响到核心连接池。

    2、核心连接池获取链接和归还连接不涉及元素的删除,移动等操作,获取链接判断BagEntry的状态,如果是未使用状态,则通过cas变更状态为正在使用,cas避免了获取连接时删除连接的锁错作。归还连接时只用当前线程将BagEntry的状态重新设置为未使用状态即可,避免将连接添加到连接池时的锁错作。

  • 相关阅读:
    Android ZXing 二维码/条形码 开源库的简单应用
    位图介绍以及基本用法
    Ubuntu 发布 qt 程序(c++)
    服务行业能有一个像支付宝这样可靠的交易平台吗?
    【开发日常】insmod: error inserting ‘*.ko‘: -1 Unknown symbol in module原理分析
    量子计算(二):为什么需要量子计算
    一文看懂网络安全五年之巨变
    Flyweight 享元
    索引的设计原则
    程序人生 | 与足球共舞的火柴人(致敬格拉利什,赋予足球更深的意义)
  • 原文地址:https://blog.csdn.net/xuwenjingrenca/article/details/126911419