连接池(Connection Pool)是在系统初始化的时候,将网络连接作为对象存储在内存中,当用户需要进行连接时,从连接池中取一个已经建立好的空连接对象,使用完后不关闭连接,将连接放回连接池,而不必频繁地创建和关闭连接,这可以减少连接的开销和提高系统响应速度。
连接池的工作原理:
连接池的优势包括:
# python3.7.9 redis 5.0.1
# redis/connection.py
class ConnectionPool(object):
def __init__(self, connection_class=Connection, max_connections=None,
**connection_kwargs):
# 连接池最大连接数,默认可以创建2**31个
max_connections = max_connections or 2 ** 31
if not isinstance(max_connections, (int, long)) or max_connections < 0:
raise ValueError('"max_connections" must be a positive integer')
self.connection_class = connection_class
self.connection_kwargs = connection_kwargs
self.max_connections = max_connections
# 保护_checkpid()临界区
self._fork_lock = threading.Lock()
self.reset()
def reset(self):
self._lock = threading.Lock()
# 连接池大小
self._created_connections = 0
# 可用得连接
self._available_connections = []
# 正在使用得连接
self._in_use_connections = set()
# 连接池的进程ID
self.pid = os.getpid()
# 创建一个连接,并加到连接池中
def make_connection(self):
"Create a new connection"
if self._created_connections >= self.max_connections:
raise ConnectionError("Too many connections")
self._created_connections += 1
return self.connection_class(**self.connection_kwargs)
# 从连接池中获取一个连接
def get_connection(self, command_name, *keys, **options):
"Get a connection from the pool"
self._checkpid()
with self._lock:
try:
# 从可用的连接池中pop一个连接
connection = self._available_connections.pop()
except IndexError:
# 如果无可用的连接,就创建一个
connection = self.make_connection()
# 然后加入到已用的列表中
self._in_use_connections.add(connection)
try:
# ensure this connection is connected to Redis
connection.connect()
try:
if connection.can_read():
raise ConnectionError('Connection has data')
except ConnectionError:
connection.disconnect()
connection.connect()
if connection.can_read():
raise ConnectionError('Connection not ready')
except BaseException:
# release the connection back to the pool so that we don't
# leak it
self.release(connection)
raise
return connection
# 释放一个连接
def release(self, connection):
"Releases the connection back to the pool"
self._checkpid()
with self._lock:
try:
# 从已用的列表中移除
self._in_use_connections.remove(connection)
except KeyError:
# Gracefully fail when a connection is returned to this pool
# that the pool doesn't actually own
pass
if self.owns_connection(connection):
# 如果是连接池中的连接,再加到可用的连接池中
self._available_connections.append(connection)
else:
# 如果是普通的连接,则断开连接
self._created_connections -= 1
connection.disconnect()
return
内置模块 http 中的连接池
# urllib3/connectionpool.py
class HTTPConnectionPool(ConnectionPool, RequestMethods):
def __init__(self):
# 初始化连接池
self.pool = self.QueueCls(maxsize)
def _new_conn(self):
"""新建一个连接"""
conn = self.ConnectionCls(
host=self.host,
port=self.port,
timeout=self.timeout.connect_timeout,
strict=self.strict,
**self.conn_kw
)
return conn
def _get_conn(self, timeout=None):
"""获取一个连接"""
conn = None
try:
conn = self.pool.get(block=self.block, timeout=timeout)
except AttributeError: # self.pool is None
raise ClosedPoolError(self, "Pool is closed.")
"""
.........
"""
# 获取一个连接,如果不存在创建一个新连接
return conn or self._new_conn()
def _put_conn(self, conn):
"""连接使用完后,放回连接池"""
try:
self.pool.put(conn, block=False)
return # Everything is dandy, done.
except AttributeError:
# self.pool is None.
pass
def close(self):
"""关闭所有连接"""
if self.pool is None:
return
# Disable access to the pool
old_pool, self.pool = self.pool, None
try:
while True:
conn = old_pool.get(block=False)
if conn:
conn.close()
except queue.Empty:
pass # Done.