• 聊聊druid的keepalive机制


    本文主要研究一下druid的keepalive机制

    DruidDataSource

    public class DruidDataSource extends DruidAbstractDataSource implements DruidDataSourceMBean, ManagedDataSource, Referenceable, Closeable, Cloneable, ConnectionPoolDataSource, MBeanRegistration {
    
    	private int                              keepAliveCheckCount       = 0;
    	private DruidConnectionHolder[]          keepAliveConnections;
    	private volatile boolean                 keepAlive                 = false;
    
    	// from DruidAbstractDataSource
    	protected volatile long                            keepAliveBetweenTimeMillis                = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS * 2;
    	public static final long                           DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS = 60 * 1000L;
    
    	public void init() throws SQLException {
    
    		//......
    
                if (keepAlive) {
                    // async fill to minIdle
                    if (createScheduler != null) {
                        for (int i = 0; i < minIdle; ++i) {
                            submitCreateTask(true);
                        }
                    } else {
                        this.emptySignal();
                    }
                }
    
    		//......
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    DruidDataSource的init方法在keepAlive的时候触发创建连接,当createScheduler不为null时(默认为null)执行submitCreateTask,否则执行emptySignal

    submitCreateTask

    com/alibaba/druid/pool/DruidDataSource.java

        private void submitCreateTask(boolean initTask) {
            createTaskCount++;
            CreateConnectionTask task = new CreateConnectionTask(initTask);
            if (createTasks == null) {
                createTasks = new long[8];
            }
    
            boolean putted = false;
            for (int i = 0; i < createTasks.length; ++i) {
                if (createTasks[i] == 0) {
                    createTasks[i] = task.taskId;
                    putted = true;
                    break;
                }
            }
            if (!putted) {
                long[] array = new long[createTasks.length * 3 / 2];
                System.arraycopy(createTasks, 0, array, 0, createTasks.length);
                array[createTasks.length] = task.taskId;
                createTasks = array;
            }
    
            this.createSchedulerFuture = createScheduler.submit(task);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    submitCreateTask创建CreateConnectionTask,然后提交到createScheduler

    CreateConnectionTask

    com/alibaba/druid/pool/DruidDataSource.java

        public class CreateConnectionTask implements Runnable {
            private int errorCount;
            private boolean initTask;
            private final long taskId;
    
            public CreateConnectionTask() {
                taskId = createTaskIdSeedUpdater.getAndIncrement(DruidDataSource.this);
            }
    
            public CreateConnectionTask(boolean initTask) {
                taskId = createTaskIdSeedUpdater.getAndIncrement(DruidDataSource.this);
                this.initTask = initTask;
            }
    
            @Override
            public void run() {
                runInternal();
            }
    
            private void runInternal() {
                for (; ; ) {
                    // addLast
                    lock.lock();
                    try {
                        if (closed || closing) {
                            clearCreateTask(taskId);
                            return;
                        }
    
                        boolean emptyWait = true;
    
                        if (createError != null && poolingCount == 0) {
                            emptyWait = false;
                        }
    
                        if (emptyWait) {
                            // 必须存在线程等待,才创建连接
                            if (poolingCount >= notEmptyWaitThreadCount //
                                    && (!(keepAlive && activeCount + poolingCount < minIdle)) // 在keepAlive场景不能放弃创建
                                    && (!initTask) // 线程池初始化时的任务不能放弃创建
                                    && !isFailContinuous() // failContinuous时不能放弃创建,否则会无法创建线程
                                    && !isOnFatalError() // onFatalError时不能放弃创建,否则会无法创建线程
                            ) {
                                clearCreateTask(taskId);
                                return;
                            }
    
                            // 防止创建超过maxActive数量的连接
                            if (activeCount + poolingCount >= maxActive) {
                                clearCreateTask(taskId);
                                return;
                            }
                        }
                    } finally {
                        lock.unlock();
                    }
    
                    PhysicalConnectionInfo physicalConnection = null;
    
                    try {
                        physicalConnection = createPhysicalConnection();
                    } catch (OutOfMemoryError e) {
                        LOG.error("create connection OutOfMemoryError, out memory. ", e);
    
                        errorCount++;
                        if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
                            // fail over retry attempts
                            setFailContinuous(true);
                            if (failFast) {
                                lock.lock();
                                try {
                                    notEmpty.signalAll();
                                } finally {
                                    lock.unlock();
                                }
                            }
    
                            if (breakAfterAcquireFailure) {
                                lock.lock();
                                try {
                                    clearCreateTask(taskId);
                                } finally {
                                    lock.unlock();
                                }
                                return;
                            }
    
                            this.errorCount = 0; // reset errorCount
                            if (closing || closed) {
                                lock.lock();
                                try {
                                    clearCreateTask(taskId);
                                } finally {
                                    lock.unlock();
                                }
                                return;
                            }
    
                            createSchedulerFuture = createScheduler.schedule(this, timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS);
                            return;
                        }
                    } catch (SQLException e) {
                        LOG.error("create connection SQLException, url: " + jdbcUrl, e);
    
                        errorCount++;
                        if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
                            // fail over retry attempts
                            setFailContinuous(true);
                            if (failFast) {
                                lock.lock();
                                try {
                                    notEmpty.signalAll();
                                } finally {
                                    lock.unlock();
                                }
                            }
    
                            if (breakAfterAcquireFailure) {
                                lock.lock();
                                try {
                                    clearCreateTask(taskId);
                                } finally {
                                    lock.unlock();
                                }
                                return;
                            }
    
                            this.errorCount = 0; // reset errorCount
                            if (closing || closed) {
                                lock.lock();
                                try {
                                    clearCreateTask(taskId);
                                } finally {
                                    lock.unlock();
                                }
                                return;
                            }
    
                            createSchedulerFuture = createScheduler.schedule(this, timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS);
                            return;
                        }
                    } catch (RuntimeException e) {
                        LOG.error("create connection RuntimeException", e);
                        // unknow fatal exception
                        setFailContinuous(true);
                        continue;
                    } catch (Error e) {
                        lock.lock();
                        try {
                            clearCreateTask(taskId);
                        } finally {
                            lock.unlock();
                        }
                        LOG.error("create connection Error", e);
                        // unknow fatal exception
                        setFailContinuous(true);
                        break;
                    } catch (Throwable e) {
                        lock.lock();
                        try {
                            clearCreateTask(taskId);
                        } finally {
                            lock.unlock();
                        }
    
                        LOG.error("create connection unexecpted error.", e);
                        break;
                    }
    
                    if (physicalConnection == null) {
                        continue;
                    }
    
                    physicalConnection.createTaskId = taskId;
                    boolean result = put(physicalConnection);
                    if (!result) {
                        JdbcUtils.close(physicalConnection.getPhysicalConnection());
                        LOG.info("put physical connection to pool failed.");
                    }
                    break;
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183

    CreateConnectionTask主要是创建physicalConnection,然后放到connections中。在emptyWait为true的时候会根据条件执行empty.await()

    CreateConnectionThread

        public class CreateConnectionThread extends Thread {
            public CreateConnectionThread(String name) {
                super(name);
                this.setDaemon(true);
            }
    
            public void run() {
                initedLatch.countDown();
    
                long lastDiscardCount = 0;
                int errorCount = 0;
                for (; ; ) {
                    // addLast
                    try {
                        lock.lockInterruptibly();
                    } catch (InterruptedException e2) {
                        break;
                    }
    
                    long discardCount = DruidDataSource.this.discardCount;
                    boolean discardChanged = discardCount - lastDiscardCount > 0;
                    lastDiscardCount = discardCount;
    
                    try {
                        boolean emptyWait = true;
    
                        if (createError != null
                                && poolingCount == 0
                                && !discardChanged) {
                            emptyWait = false;
                        }
    
                        if (emptyWait
                                && asyncInit && createCount < initialSize) {
                            emptyWait = false;
                        }
    
                        if (emptyWait) {
                            // 必须存在线程等待,才创建连接
                            if (poolingCount >= notEmptyWaitThreadCount //
                                    && (!(keepAlive && activeCount + poolingCount < minIdle))
                                    && !isFailContinuous()
                            ) {
                                empty.await();
                            }
    
                            // 防止创建超过maxActive数量的连接
                            if (activeCount + poolingCount >= maxActive) {
                                empty.await();
                                continue;
                            }
                        }
    
                    } catch (InterruptedException e) {
                        lastCreateError = e;
                        lastErrorTimeMillis = System.currentTimeMillis();
    
                        if ((!closing) && (!closed)) {
                            LOG.error("create connection Thread Interrupted, url: " + jdbcUrl, e);
                        }
                        break;
                    } finally {
                        lock.unlock();
                    }
    
                    PhysicalConnectionInfo connection = null;
    
                    try {
                        connection = createPhysicalConnection();
                    } catch (SQLException e) {
                        LOG.error("create connection SQLException, url: " + jdbcUrl + ", errorCode " + e.getErrorCode()
                                + ", state " + e.getSQLState(), e);
    
                        errorCount++;
                        if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
                            // fail over retry attempts
                            setFailContinuous(true);
                            if (failFast) {
                                lock.lock();
                                try {
                                    notEmpty.signalAll();
                                } finally {
                                    lock.unlock();
                                }
                            }
    
                            if (breakAfterAcquireFailure) {
                                break;
                            }
    
                            try {
                                Thread.sleep(timeBetweenConnectErrorMillis);
                            } catch (InterruptedException interruptEx) {
                                break;
                            }
                        }
                    } catch (RuntimeException e) {
                        LOG.error("create connection RuntimeException", e);
                        setFailContinuous(true);
                        continue;
                    } catch (Error e) {
                        LOG.error("create connection Error", e);
                        setFailContinuous(true);
                        break;
                    }
    
                    if (connection == null) {
                        continue;
                    }
    
                    boolean result = put(connection);
                    if (!result) {
                        JdbcUtils.close(connection.getPhysicalConnection());
                        LOG.info("put physical connection to pool failed.");
                    }
    
                    errorCount = 0; // reset errorCount
    
                    if (closing || closed) {
                        break;
                    }
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124

    CreateConnectionThread的逻辑与CreateConnectionTask有点类似,有不少重复的代码,不像是同一个人写的;CreateConnectionThread是在DruidDataSource的init方法中触发createAndStartCreatorThread执行的,看只执行一次

    shrink

        public void shrink(boolean checkTime, boolean keepAlive) {
            try {
                lock.lockInterruptibly();
            } catch (InterruptedException e) {
                return;
            }
    
            boolean needFill = false;
            int evictCount = 0;
            int keepAliveCount = 0;
            int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink;
            fatalErrorCountLastShrink = fatalErrorCount;
    
            try {
                if (!inited) {
                    return;
                }
    
                final int checkCount = poolingCount - minIdle;
                final long currentTimeMillis = System.currentTimeMillis();
                for (int i = 0; i < poolingCount; ++i) {
                    DruidConnectionHolder connection = connections[i];
    
                    if ((onFatalError || fatalErrorIncrement > 0) && (lastFatalErrorTimeMillis > connection.connectTimeMillis)) {
                        keepAliveConnections[keepAliveCount++] = connection;
                        continue;
                    }
    
                    if (checkTime) {
                        if (phyTimeoutMillis > 0) {
                            long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis;
                            if (phyConnectTimeMillis > phyTimeoutMillis) {
                                evictConnections[evictCount++] = connection;
                                continue;
                            }
                        }
    
                        long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;
    
                        if (idleMillis < minEvictableIdleTimeMillis
                                && idleMillis < keepAliveBetweenTimeMillis
                        ) {
                            break;
                        }
    
                        if (idleMillis >= minEvictableIdleTimeMillis) {
                            if (checkTime && i < checkCount) {
                                evictConnections[evictCount++] = connection;
                                continue;
                            } else if (idleMillis > maxEvictableIdleTimeMillis) {
                                evictConnections[evictCount++] = connection;
                                continue;
                            }
                        }
    
                        if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {
                            keepAliveConnections[keepAliveCount++] = connection;
                        }
                    } else {
                        if (i < checkCount) {
                            evictConnections[evictCount++] = connection;
                        } else {
                            break;
                        }
                    }
                }
    
                int removeCount = evictCount + keepAliveCount;
                if (removeCount > 0) {
                    System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
                    Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
                    poolingCount -= removeCount;
                }
                keepAliveCheckCount += keepAliveCount;
    
                if (keepAlive && poolingCount + activeCount < minIdle) {
                    needFill = true;
                }
            } finally {
                lock.unlock();
            }
    
            if (evictCount > 0) {
                for (int i = 0; i < evictCount; ++i) {
                    DruidConnectionHolder item = evictConnections[i];
                    Connection connection = item.getConnection();
                    JdbcUtils.close(connection);
                    destroyCountUpdater.incrementAndGet(this);
                }
                Arrays.fill(evictConnections, null);
            }
    
            if (keepAliveCount > 0) {
                // keep order
                for (int i = keepAliveCount - 1; i >= 0; --i) {
                    DruidConnectionHolder holer = keepAliveConnections[i];
                    Connection connection = holer.getConnection();
                    holer.incrementKeepAliveCheckCount();
    
                    boolean validate = false;
                    try {
                        this.validateConnection(connection);
                        validate = true;
                    } catch (Throwable error) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("keepAliveErr", error);
                        }
                        // skip
                    }
    
                    boolean discard = !validate;
                    if (validate) {
                        holer.lastKeepTimeMillis = System.currentTimeMillis();
                        boolean putOk = put(holer, 0L, true);
                        if (!putOk) {
                            discard = true;
                        }
                    }
    
                    if (discard) {
                        try {
                            connection.close();
                        } catch (Exception e) {
                            // skip
                        }
    
                        lock.lock();
                        try {
                            discardCount++;
    
                            if (activeCount + poolingCount <= minIdle) {
                                emptySignal();
                            }
                        } finally {
                            lock.unlock();
                        }
                    }
                }
                this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
                Arrays.fill(keepAliveConnections, null);
            }
    
            if (needFill) {
                lock.lock();
                try {
                    int fillCount = minIdle - (activeCount + poolingCount + createTaskCount);
                    for (int i = 0; i < fillCount; ++i) {
                        emptySignal();
                    }
                } finally {
                    lock.unlock();
                }
            } else if (onFatalError || fatalErrorIncrement > 0) {
                lock.lock();
                try {
                    emptySignal();
                } finally {
                    lock.unlock();
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161

    DestroyConnectionThread就是每隔timeBetweenEvictionRunsMillis执行一下destroyTask,而DestroyTask的run方法主要是执行shrink(true, keepAlive)
    shrink方法会根据poolingCount遍历connections,在checkTime为true时会根据idleMillis判断是否需要evict,否则判断是否需要keepalive(keepAlive && idleMillis >= keepAliveBetweenTimeMillis),需要的话放入keepAliveConnections中,然后遍历进行validateConnection,如果成功则更新lastKeepTimeMillis,否则执行connection.close(),最后清空keepAliveConnections数组

    小结

    DestroyConnectionThread就是每隔timeBetweenEvictionRunsMillis执行一下destroyTask,而DestroyTask的run方法主要是执行shrink(true, keepAlive);该方法处理了evict及keepalive的逻辑,根据poolingCount遍历connections,在checkTime为true时会根据idleMillis判断是否需要evict,否则判断是否需要keepalive(keepAlive && idleMillis >= keepAliveBetweenTimeMillis),需要的话放入keepAliveConnections中,然后遍历进行validateConnection,如果成功则更新lastKeepTimeMillis,否则执行connection.close(),最后清空keepAliveConnections数组。

    jedis的keepalive是直接设置socket.setKeepAlive(true),而common-pools则没有所谓的keepalive,本质上druid的keepalive与common-pools的testWhileIdle类似;只不过druid直接在getConnection的时候执行testWhileIdle,这个逻辑有点奇怪,如果移除掉,而在shrink方法里头的keepAlive逻辑删除keepAliveBetweenTimeMillis判断,那么就跟common-pools的testWhileIdle的逻辑一致了。druid的keepalive相当于带了keepAliveBetweenTimeMillis的testWhileIdle。

  • 相关阅读:
    Frida IOS 堆栈输出与IDA 对应
    《MongoDB入门教程》第17篇 文档更新之$mul操作符
    在vue3+vite中引入高德开放平台API实现边界范围多边形的绘制
    保证高并发性能以及数据安全的硬件层支持
    Java的设计模式基本概念及使用场景
    Redis查找并删除key
    类与对象的创建
    面试官:解释下什么是死锁?为什么会发生死锁?怎么避免死锁?
    【HttpRunnerManager】搭建接口自动化测试平台实战
    Shiro安全(一):Shiro-550反序列化
  • 原文地址:https://blog.csdn.net/hello_ejb3/article/details/133453819