• 聊聊druid的borrow行为


    本文主要研究一下druid的borrow行为

    getConnection

    com/alibaba/druid/pool/DruidDataSource.java

        public DruidPooledConnection getConnection() throws SQLException {
            return getConnection(maxWait);
        }
    
        public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
            init();
    
            if (filters.size() > 0) {
                FilterChainImpl filterChain = new FilterChainImpl(this);
                return filterChain.dataSource_connect(this, maxWaitMillis);
            } else {
                return getConnectionDirect(maxWaitMillis);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    DruidDataSource的getConnection方法内部调用的是getConnectionDirect(maxWaitMillis)

    getConnectionDirect

    com/alibaba/druid/pool/DruidDataSource.java

        public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
            int notFullTimeoutRetryCnt = 0;
            for (; ; ) {
                // handle notFullTimeoutRetry
                DruidPooledConnection poolableConnection;
                try {
                    poolableConnection = getConnectionInternal(maxWaitMillis);
                } catch (GetConnectionTimeoutException ex) {
                    if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) {
                        notFullTimeoutRetryCnt++;
                        if (LOG.isWarnEnabled()) {
                            LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt);
                        }
                        continue;
                    }
                    throw ex;
                }
    
                if (testOnBorrow) {
                    boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
                    if (!validate) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("skip not validate connection.");
                        }
    
                        discardConnection(poolableConnection.holder);
                        continue;
                    }
                } else {
                    if (poolableConnection.conn.isClosed()) {
                        discardConnection(poolableConnection.holder); // 传入null,避免重复关闭
                        continue;
                    }
    
                    if (testWhileIdle) {
                        final DruidConnectionHolder holder = poolableConnection.holder;
                        long currentTimeMillis = System.currentTimeMillis();
                        long lastActiveTimeMillis = holder.lastActiveTimeMillis;
                        long lastExecTimeMillis = holder.lastExecTimeMillis;
                        long lastKeepTimeMillis = holder.lastKeepTimeMillis;
    
                        if (checkExecuteTime
                                && lastExecTimeMillis != lastActiveTimeMillis) {
                            lastActiveTimeMillis = lastExecTimeMillis;
                        }
    
                        if (lastKeepTimeMillis > lastActiveTimeMillis) {
                            lastActiveTimeMillis = lastKeepTimeMillis;
                        }
    
                        long idleMillis = currentTimeMillis - lastActiveTimeMillis;
    
                        long timeBetweenEvictionRunsMillis = this.timeBetweenEvictionRunsMillis;
    
                        if (timeBetweenEvictionRunsMillis <= 0) {
                            timeBetweenEvictionRunsMillis = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS;
                        }
    
                        if (idleMillis >= timeBetweenEvictionRunsMillis
                                || idleMillis < 0 // unexcepted branch
                        ) {
                            boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
                            if (!validate) {
                                if (LOG.isDebugEnabled()) {
                                    LOG.debug("skip not validate connection.");
                                }
    
                                discardConnection(poolableConnection.holder);
                                continue;
                            }
                        }
                    }
                }
    
                if (removeAbandoned) {
                    StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
                    poolableConnection.connectStackTrace = stackTrace;
                    poolableConnection.setConnectedTimeNano();
                    poolableConnection.traceEnable = true;
    
                    activeConnectionLock.lock();
                    try {
                        activeConnections.put(poolableConnection, PRESENT);
                    } finally {
                        activeConnectionLock.unlock();
                    }
                }
    
                if (!this.defaultAutoCommit) {
                    poolableConnection.setAutoCommit(false);
                }
    
                return poolableConnection;
            }
        }
    
        public boolean isFull() {
            lock.lock();
            try {
                return this.poolingCount + this.activeCount >= this.maxActive;
            } finally {
                lock.unlock();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104

    getConnectionDirect在一个for循环里头进行获取连接,首先执行getConnectionInternal(maxWaitMillis),若出现GetConnectionTimeoutException异常,则在notFull且notFullTimeoutRetryCnt小于等于this.notFullTimeoutRetryCount时会递增notFullTimeoutRetryCnt,然后continue继续循环,否则直接抛出GetConnectionTimeoutException跳出循环

    获取到连接之后,判断是否是testOnBorrow,如果是则执行testConnectionInternal,若校验不成功则执行discardConnection,然后继续循环;若非testOnBorrow则判断conn是否closed,若是则执行discardConnection,然后继续循环,若非closed则进入testWhileIdle的逻辑(druid直接在getConnection的时候执行testWhileIdle有点令人匪夷所思)

    最后是removeAbandoned,维护connectedTimeNano,将当前连接放到activeConnections中

    getConnectionInternal

    com/alibaba/druid/pool/DruidDataSource.java

        private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {
            if (closed) {
                connectErrorCountUpdater.incrementAndGet(this);
                throw new DataSourceClosedException("dataSource already closed at " + new Date(closeTimeMillis));
            }
    
            if (!enable) {
                connectErrorCountUpdater.incrementAndGet(this);
    
                if (disableException != null) {
                    throw disableException;
                }
    
                throw new DataSourceDisableException();
            }
    
            final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait);
            final int maxWaitThreadCount = this.maxWaitThreadCount;
    
            DruidConnectionHolder holder;
    
            for (boolean createDirect = false; ; ) {
                if (createDirect) {
                    createStartNanosUpdater.set(this, System.nanoTime());
                    if (creatingCountUpdater.compareAndSet(this, 0, 1)) {
                        PhysicalConnectionInfo pyConnInfo = DruidDataSource.this.createPhysicalConnection();
                        holder = new DruidConnectionHolder(this, pyConnInfo);
                        holder.lastActiveTimeMillis = System.currentTimeMillis();
    
                        creatingCountUpdater.decrementAndGet(this);
                        directCreateCountUpdater.incrementAndGet(this);
    
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("conn-direct_create ");
                        }
    
                        boolean discard;
                        lock.lock();
                        try {
                            if (activeCount < maxActive) {
                                activeCount++;
                                holder.active = true;
                                if (activeCount > activePeak) {
                                    activePeak = activeCount;
                                    activePeakTime = System.currentTimeMillis();
                                }
                                break;
                            } else {
                                discard = true;
                            }
                        } finally {
                            lock.unlock();
                        }
    
                        if (discard) {
                            JdbcUtils.close(pyConnInfo.getPhysicalConnection());
                        }
                    }
                }
    
                try {
                    lock.lockInterruptibly();
                } catch (InterruptedException e) {
                    connectErrorCountUpdater.incrementAndGet(this);
                    throw new SQLException("interrupt", e);
                }
    
                try {
                    if (maxWaitThreadCount > 0
                            && notEmptyWaitThreadCount >= maxWaitThreadCount) {
                        connectErrorCountUpdater.incrementAndGet(this);
                        throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count "
                                + lock.getQueueLength());
                    }
    
                    if (onFatalError
                            && onFatalErrorMaxActive > 0
                            && activeCount >= onFatalErrorMaxActive) {
                        connectErrorCountUpdater.incrementAndGet(this);
    
                        StringBuilder errorMsg = new StringBuilder();
                        errorMsg.append("onFatalError, activeCount ")
                                .append(activeCount)
                                .append(", onFatalErrorMaxActive ")
                                .append(onFatalErrorMaxActive);
    
                        if (lastFatalErrorTimeMillis > 0) {
                            errorMsg.append(", time '")
                                    .append(StringUtils.formatDateTime19(
                                            lastFatalErrorTimeMillis, TimeZone.getDefault()))
                                    .append("'");
                        }
    
                        if (lastFatalErrorSql != null) {
                            errorMsg.append(", sql \n")
                                    .append(lastFatalErrorSql);
                        }
    
                        throw new SQLException(
                                errorMsg.toString(), lastFatalError);
                    }
    
                    connectCount++;
    
                    if (createScheduler != null
                            && poolingCount == 0
                            && activeCount < maxActive
                            && creatingCountUpdater.get(this) == 0
                            && createScheduler instanceof ScheduledThreadPoolExecutor) {
                        ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) createScheduler;
                        if (executor.getQueue().size() > 0) {
                            createDirect = true;
                            continue;
                        }
                    }
    
                    if (maxWait > 0) {
                        holder = pollLast(nanos);
                    } else {
                        holder = takeLast();
                    }
    
                    if (holder != null) {
                        if (holder.discard) {
                            continue;
                        }
    
                        activeCount++;
                        holder.active = true;
                        if (activeCount > activePeak) {
                            activePeak = activeCount;
                            activePeakTime = System.currentTimeMillis();
                        }
                    }
                } catch (InterruptedException e) {
                    connectErrorCountUpdater.incrementAndGet(this);
                    throw new SQLException(e.getMessage(), e);
                } catch (SQLException e) {
                    connectErrorCountUpdater.incrementAndGet(this);
                    throw e;
                } finally {
                    lock.unlock();
                }
    
                break;
            }
    
            if (holder == null) {
                long waitNanos = waitNanosLocal.get();
    
                final long activeCount;
                final long maxActive;
                final long creatingCount;
                final long createStartNanos;
                final long createErrorCount;
                final Throwable createError;
                try {
                    lock.lock();
                    activeCount = this.activeCount;
                    maxActive = this.maxActive;
                    creatingCount = this.creatingCount;
                    createStartNanos = this.createStartNanos;
                    createErrorCount = this.createErrorCount;
                    createError = this.createError;
                } finally {
                    lock.unlock();
                }
    
                StringBuilder buf = new StringBuilder(128);
                buf.append("wait millis ")
                        .append(waitNanos / (1000 * 1000))
                        .append(", active ").append(activeCount)
                        .append(", maxActive ").append(maxActive)
                        .append(", creating ").append(creatingCount);
    
                if (creatingCount > 0 && createStartNanos > 0) {
                    long createElapseMillis = (System.nanoTime() - createStartNanos) / (1000 * 1000);
                    if (createElapseMillis > 0) {
                        buf.append(", createElapseMillis ").append(createElapseMillis);
                    }
                }
    
                if (createErrorCount > 0) {
                    buf.append(", createErrorCount ").append(createErrorCount);
                }
    
                List sqlList = this.getDataSourceStat().getRuningSqlList();
                for (int i = 0; i < sqlList.size(); ++i) {
                    if (i != 0) {
                        buf.append('\n');
                    } else {
                        buf.append(", ");
                    }
                    JdbcSqlStatValue sql = sqlList.get(i);
                    buf.append("runningSqlCount ").append(sql.getRunningCount());
                    buf.append(" : ");
                    buf.append(sql.getSql());
                }
    
                String errorMessage = buf.toString();
    
                if (createError != null) {
                    throw new GetConnectionTimeoutException(errorMessage, createError);
                } else {
                    throw new GetConnectionTimeoutException(errorMessage);
                }
            }
    
            holder.incrementUseCount();
    
            DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder);
            return poolalbeConnection;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213

    getConnectionInternal方法先判断是否closed,如果是则抛出DataSourceClosedException,接着判断是否enable,如果不是则抛出DataSourceDisableException,紧接着for循环,它主要根据createDirect来执行不同逻辑,第一次默认createDirect为false;

    createDirect为false,对于notEmptyWaitThreadCount大于等于maxWaitThreadCount则抛出SQLException,对于poolingCount为0且activeCount小于maxActive,createScheduler的queue大小大于0的,则设置createDirect为true;否则对于maxWait大于0的,执行pollLast(nanos),否则执行takeLast()

    createDirect为true,会通过DruidDataSource.this.createPhysicalConnection()创建物理连接,对于activeCount小于maxActive的,则维护activeCount跳出循环,否则标记discard为true,通过JdbcUtils.close(pyConnInfo.getPhysicalConnection())关闭连接

    pollLast

        private DruidConnectionHolder pollLast(long nanos) throws InterruptedException, SQLException {
            long estimate = nanos;
    
            for (; ; ) {
                if (poolingCount == 0) {
                    emptySignal(); // send signal to CreateThread create connection
    
                    if (failFast && isFailContinuous()) {
                        throw new DataSourceNotAvailableException(createError);
                    }
    
                    if (estimate <= 0) {
                        waitNanosLocal.set(nanos - estimate);
                        return null;
                    }
    
                    notEmptyWaitThreadCount++;
                    if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {
                        notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
                    }
    
                    try {
                        long startEstimate = estimate;
                        estimate = notEmpty.awaitNanos(estimate); // signal by
                        // recycle or
                        // creator
                        notEmptyWaitCount++;
                        notEmptyWaitNanos += (startEstimate - estimate);
    
                        if (!enable) {
                            connectErrorCountUpdater.incrementAndGet(this);
    
                            if (disableException != null) {
                                throw disableException;
                            }
    
                            throw new DataSourceDisableException();
                        }
                    } catch (InterruptedException ie) {
                        notEmpty.signal(); // propagate to non-interrupted thread
                        notEmptySignalCount++;
                        throw ie;
                    } finally {
                        notEmptyWaitThreadCount--;
                    }
    
                    if (poolingCount == 0) {
                        if (estimate > 0) {
                            continue;
                        }
    
                        waitNanosLocal.set(nanos - estimate);
                        return null;
                    }
                }
    
                decrementPoolingCount();
                DruidConnectionHolder last = connections[poolingCount];
                connections[poolingCount] = null;
    
                long waitNanos = nanos - estimate;
                last.setLastNotEmptyWaitNanos(waitNanos);
    
                return last;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    pollLast方法在poolingCount为0时执行emptySignal,另外主要是处理notEmpty这个condition,然后取connections[poolingCount]

    takeLast

        DruidConnectionHolder takeLast() throws InterruptedException, SQLException {
            try {
                while (poolingCount == 0) {
                    emptySignal(); // send signal to CreateThread create connection
    
                    if (failFast && isFailContinuous()) {
                        throw new DataSourceNotAvailableException(createError);
                    }
    
                    notEmptyWaitThreadCount++;
                    if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {
                        notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
                    }
                    try {
                        notEmpty.await(); // signal by recycle or creator
                    } finally {
                        notEmptyWaitThreadCount--;
                    }
                    notEmptyWaitCount++;
    
                    if (!enable) {
                        connectErrorCountUpdater.incrementAndGet(this);
                        if (disableException != null) {
                            throw disableException;
                        }
    
                        throw new DataSourceDisableException();
                    }
                }
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to non-interrupted thread
                notEmptySignalCount++;
                throw ie;
            }
    
            decrementPoolingCount();
            DruidConnectionHolder last = connections[poolingCount];
            connections[poolingCount] = null;
    
            return last;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    takeLast方法在poolingCount为0的时候执行emptySignal,然后通过notEmpty.await()进行阻塞等待,最后返回connections[poolingCount]

    emptySignal

        private void emptySignal() {
            if (createScheduler == null) {
                empty.signal();
                return;
            }
    
            if (createTaskCount >= maxCreateTaskCount) {
                return;
            }
    
            if (activeCount + poolingCount + createTaskCount >= maxActive) {
                return;
            }
            submitCreateTask(false);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    emptySignal方法,对于createScheduler为null的执行empty.signal(),之后判断task数量即maxActive判断,最后执行submitCreateTask(false)

    submitCreateTask

        private void submitCreateTask(boolean initTask) {
            createTaskCount++;
            CreateConnectionTask task = new CreateConnectionTask(initTask);
            if (createTasks == null) {
                createTasks = new long[8];
            }
    
            boolean putted = false;
            for (int i = 0; i < createTasks.length; ++i) {
                if (createTasks[i] == 0) {
                    createTasks[i] = task.taskId;
                    putted = true;
                    break;
                }
            }
            if (!putted) {
                long[] array = new long[createTasks.length * 3 / 2];
                System.arraycopy(createTasks, 0, array, 0, createTasks.length);
                array[createTasks.length] = task.taskId;
                createTasks = array;
            }
    
            this.createSchedulerFuture = createScheduler.submit(task);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    submitCreateTask会创建CreateConnectionTask,然后提交到createScheduler执行

    CreateConnectionTask

    com/alibaba/druid/pool/DruidDataSource.java

        public class CreateConnectionTask implements Runnable {
            private int errorCount;
            private boolean initTask;
            private final long taskId;
    
            public CreateConnectionTask() {
                taskId = createTaskIdSeedUpdater.getAndIncrement(DruidDataSource.this);
            }
    
            public CreateConnectionTask(boolean initTask) {
                taskId = createTaskIdSeedUpdater.getAndIncrement(DruidDataSource.this);
                this.initTask = initTask;
            }
    
            @Override
            public void run() {
                runInternal();
            }
    
            private void runInternal() {
                for (; ; ) {
                    // addLast
                    lock.lock();
                    try {
                        if (closed || closing) {
                            clearCreateTask(taskId);
                            return;
                        }
    
                        boolean emptyWait = true;
    
                        if (createError != null && poolingCount == 0) {
                            emptyWait = false;
                        }
    
                        if (emptyWait) {
                            // 必须存在线程等待,才创建连接
                            if (poolingCount >= notEmptyWaitThreadCount //
                                    && (!(keepAlive && activeCount + poolingCount < minIdle)) // 在keepAlive场景不能放弃创建
                                    && (!initTask) // 线程池初始化时的任务不能放弃创建
                                    && !isFailContinuous() // failContinuous时不能放弃创建,否则会无法创建线程
                                    && !isOnFatalError() // onFatalError时不能放弃创建,否则会无法创建线程
                            ) {
                                clearCreateTask(taskId);
                                return;
                            }
    
                            // 防止创建超过maxActive数量的连接
                            if (activeCount + poolingCount >= maxActive) {
                                clearCreateTask(taskId);
                                return;
                            }
                        }
                    } finally {
                        lock.unlock();
                    }
    
                    PhysicalConnectionInfo physicalConnection = null;
    
                    try {
                        physicalConnection = createPhysicalConnection();
                    } catch (OutOfMemoryError e) {
                        LOG.error("create connection OutOfMemoryError, out memory. ", e);
    
                        errorCount++;
                        if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
                            // fail over retry attempts
                            setFailContinuous(true);
                            if (failFast) {
                                lock.lock();
                                try {
                                    notEmpty.signalAll();
                                } finally {
                                    lock.unlock();
                                }
                            }
    
                            if (breakAfterAcquireFailure) {
                                lock.lock();
                                try {
                                    clearCreateTask(taskId);
                                } finally {
                                    lock.unlock();
                                }
                                return;
                            }
    
                            this.errorCount = 0; // reset errorCount
                            if (closing || closed) {
                                lock.lock();
                                try {
                                    clearCreateTask(taskId);
                                } finally {
                                    lock.unlock();
                                }
                                return;
                            }
    
                            createSchedulerFuture = createScheduler.schedule(this, timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS);
                            return;
                        }
                    } catch (SQLException e) {
                        LOG.error("create connection SQLException, url: " + jdbcUrl, e);
    
                        errorCount++;
                        if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
                            // fail over retry attempts
                            setFailContinuous(true);
                            if (failFast) {
                                lock.lock();
                                try {
                                    notEmpty.signalAll();
                                } finally {
                                    lock.unlock();
                                }
                            }
    
                            if (breakAfterAcquireFailure) {
                                lock.lock();
                                try {
                                    clearCreateTask(taskId);
                                } finally {
                                    lock.unlock();
                                }
                                return;
                            }
    
                            this.errorCount = 0; // reset errorCount
                            if (closing || closed) {
                                lock.lock();
                                try {
                                    clearCreateTask(taskId);
                                } finally {
                                    lock.unlock();
                                }
                                return;
                            }
    
                            createSchedulerFuture = createScheduler.schedule(this, timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS);
                            return;
                        }
                    } catch (RuntimeException e) {
                        LOG.error("create connection RuntimeException", e);
                        // unknow fatal exception
                        setFailContinuous(true);
                        continue;
                    } catch (Error e) {
                        lock.lock();
                        try {
                            clearCreateTask(taskId);
                        } finally {
                            lock.unlock();
                        }
                        LOG.error("create connection Error", e);
                        // unknow fatal exception
                        setFailContinuous(true);
                        break;
                    } catch (Throwable e) {
                        lock.lock();
                        try {
                            clearCreateTask(taskId);
                        } finally {
                            lock.unlock();
                        }
    
                        LOG.error("create connection unexecpted error.", e);
                        break;
                    }
    
                    if (physicalConnection == null) {
                        continue;
                    }
    
                    physicalConnection.createTaskId = taskId;
                    boolean result = put(physicalConnection);
                    if (!result) {
                        JdbcUtils.close(physicalConnection.getPhysicalConnection());
                        LOG.info("put physical connection to pool failed.");
                    }
                    break;
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183

    CreateConnectionTask通过for循环,然后加锁处理minIdle及maxActive,最后通过createPhysicalConnection创建物理连接

    createPhysicalConnection

    com/alibaba/druid/pool/DruidAbstractDataSource.java

        public PhysicalConnectionInfo createPhysicalConnection() throws SQLException {
            String url = this.getUrl();
            Properties connectProperties = getConnectProperties();
    
            String user;
            if (getUserCallback() != null) {
                user = getUserCallback().getName();
            } else {
                user = getUsername();
            }
    
            String password = getPassword();
            PasswordCallback passwordCallback = getPasswordCallback();
    
            if (passwordCallback != null) {
                if (passwordCallback instanceof DruidPasswordCallback) {
                    DruidPasswordCallback druidPasswordCallback = (DruidPasswordCallback) passwordCallback;
    
                    druidPasswordCallback.setUrl(url);
                    druidPasswordCallback.setProperties(connectProperties);
                }
    
                char[] chars = passwordCallback.getPassword();
                if (chars != null) {
                    password = new String(chars);
                }
            }
    
            Properties physicalConnectProperties = new Properties();
            if (connectProperties != null) {
                physicalConnectProperties.putAll(connectProperties);
            }
    
            if (user != null && user.length() != 0) {
                physicalConnectProperties.put("user", user);
            }
    
            if (password != null && password.length() != 0) {
                physicalConnectProperties.put("password", password);
            }
    
            Connection conn = null;
    
            long connectStartNanos = System.nanoTime();
            long connectedNanos, initedNanos, validatedNanos;
    
            Map variables = initVariants
                    ? new HashMap()
                    : null;
            Map globalVariables = initGlobalVariants
                    ? new HashMap()
                    : null;
    
            createStartNanosUpdater.set(this, connectStartNanos);
            creatingCountUpdater.incrementAndGet(this);
            try {
                conn = createPhysicalConnection(url, physicalConnectProperties);
                connectedNanos = System.nanoTime();
    
                if (conn == null) {
                    throw new SQLException("connect error, url " + url + ", driverClass " + this.driverClass);
                }
    
                initPhysicalConnection(conn, variables, globalVariables);
                initedNanos = System.nanoTime();
    
                validateConnection(conn);
                validatedNanos = System.nanoTime();
    
                setFailContinuous(false);
                setCreateError(null);
            } catch (SQLException ex) {
                setCreateError(ex);
                JdbcUtils.close(conn);
                throw ex;
            } catch (RuntimeException ex) {
                setCreateError(ex);
                JdbcUtils.close(conn);
                throw ex;
            } catch (Error ex) {
                createErrorCountUpdater.incrementAndGet(this);
                setCreateError(ex);
                JdbcUtils.close(conn);
                throw ex;
            } finally {
                long nano = System.nanoTime() - connectStartNanos;
                createTimespan += nano;
                creatingCountUpdater.decrementAndGet(this);
            }
    
            return new PhysicalConnectionInfo(conn, connectStartNanos, connectedNanos, initedNanos, validatedNanos, variables, globalVariables);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92

    createPhysicalConnection通过try catch去创建物理连接,若有异常则会通过JdbcUtils.close(conn)去关闭连接

    testConnectionInternal

        protected boolean testConnectionInternal(DruidConnectionHolder holder, Connection conn) {
            String sqlFile = JdbcSqlStat.getContextSqlFile();
            String sqlName = JdbcSqlStat.getContextSqlName();
    
            if (sqlFile != null) {
                JdbcSqlStat.setContextSqlFile(null);
            }
            if (sqlName != null) {
                JdbcSqlStat.setContextSqlName(null);
            }
            try {
                if (validConnectionChecker != null) {
                    boolean valid = validConnectionChecker.isValidConnection(conn, validationQuery, validationQueryTimeout);
                    long currentTimeMillis = System.currentTimeMillis();
                    if (holder != null) {
                        holder.lastValidTimeMillis = currentTimeMillis;
                        holder.lastExecTimeMillis = currentTimeMillis;
                    }
    
                    if (valid && isMySql) { // unexcepted branch
                        long lastPacketReceivedTimeMs = MySqlUtils.getLastPacketReceivedTimeMs(conn);
                        if (lastPacketReceivedTimeMs > 0) {
                            long mysqlIdleMillis = currentTimeMillis - lastPacketReceivedTimeMs;
                            if (lastPacketReceivedTimeMs > 0 //
                                    && mysqlIdleMillis >= timeBetweenEvictionRunsMillis) {
                                discardConnection(holder);
                                String errorMsg = "discard long time none received connection. "
                                        + ", jdbcUrl : " + jdbcUrl
                                        + ", version : " + VERSION.getVersionNumber()
                                        + ", lastPacketReceivedIdleMillis : " + mysqlIdleMillis;
                                LOG.warn(errorMsg);
                                return false;
                            }
                        }
                    }
    
                    if (valid && onFatalError) {
                        lock.lock();
                        try {
                            if (onFatalError) {
                                onFatalError = false;
                            }
                        } finally {
                            lock.unlock();
                        }
                    }
    
                    return valid;
                }
    
                if (conn.isClosed()) {
                    return false;
                }
    
                if (null == validationQuery) {
                    return true;
                }
    
                Statement stmt = null;
                ResultSet rset = null;
                try {
                    stmt = conn.createStatement();
                    if (getValidationQueryTimeout() > 0) {
                        stmt.setQueryTimeout(validationQueryTimeout);
                    }
                    rset = stmt.executeQuery(validationQuery);
                    if (!rset.next()) {
                        return false;
                    }
                } finally {
                    JdbcUtils.close(rset);
                    JdbcUtils.close(stmt);
                }
    
                if (onFatalError) {
                    lock.lock();
                    try {
                        if (onFatalError) {
                            onFatalError = false;
                        }
                    } finally {
                        lock.unlock();
                    }
                }
    
                return true;
            } catch (Throwable ex) {
                // skip
                return false;
            } finally {
                if (sqlFile != null) {
                    JdbcSqlStat.setContextSqlFile(sqlFile);
                }
                if (sqlName != null) {
                    JdbcSqlStat.setContextSqlName(sqlName);
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98

    testConnectionInternal主要通过validConnectionChecker.isValidConnection(conn, validationQuery, validationQueryTimeout)来校验连接,如果validConnectionChecker为null则通过jdbc执行validationQuery进行校验

    discardConnection

        public void discardConnection(DruidConnectionHolder holder) {
            if (holder == null) {
                return;
            }
    
            Connection conn = holder.getConnection();
            if (conn != null) {
                JdbcUtils.close(conn);
            }
    
            lock.lock();
            try {
                if (holder.discard) {
                    return;
                }
    
                if (holder.active) {
                    activeCount--;
                    holder.active = false;
                }
                discardCount++;
    
                holder.discard = true;
    
                if (activeCount <= minIdle) {
                    emptySignal();
                }
            } finally {
                lock.unlock();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    discardConnection方法主要是关闭connection,之后枷锁处理一些统计标记

    小结

    DruidDataSource的getConnection方法内部调用的是getConnectionDirect(maxWaitMillis)

    getConnectionDirect在一个for循环里头进行获取连接,首先执行getConnectionInternal(maxWaitMillis),若出现GetConnectionTimeoutException异常,则在notFull且notFullTimeoutRetryCnt小于等于this.notFullTimeoutRetryCount时会递增notFullTimeoutRetryCnt,然后continue继续循环,否则直接抛出GetConnectionTimeoutException跳出循环

    获取到连接之后,判断是否是testOnBorrow,如果是则执行testConnectionInternal,若校验不成功则执行discardConnection,然后继续循环;若非testOnBorrow则判断conn是否closed,若是则执行discardConnection,然后继续循环,若非closed则进入testWhileIdle的逻辑

    最后是removeAbandoned,维护connectedTimeNano,将当前连接放到activeConnections中

    整体代码看下来感觉跟commons-pool相比,druid代码的实现感觉有点粗糙,抽象层级不够高,代码充斥大量统计标记、状态位的处理,维护起来得很小心,另外druid直接在getConnection的时候执行testWhileIdle有点令人匪夷所思

  • 相关阅读:
    重学JavaSE 第9章 : 异常、try-catch-finally、throws、throw、自定义异常、try-with-resources
    current file is not included in a workspace moduleg 存在多个 main函数的 Go项目 无法成功导包
    文心一言 VS 讯飞星火 VS chatgpt (122)-- 算法导论10.4 3题
    如何找到‘.‘ is not recognized as an internal or external command的根本原因和解决方案
    某大厂面试题:说一说Java、Spring、Dubbo三者SPI机制的原理和区别
    使用Pytorch进行多卡训练
    鸿鹄工程项目管理系统em Spring Cloud+Spring Boot+前后端分离构建工程项目管理系统
    javascript:如何在 array.reduce 中返回一个对象?
    19.Spring源码解读之简单手写spring框架
    elementui表单的验证问题
  • 原文地址:https://blog.csdn.net/hello_ejb3/article/details/133279788