• Python中连接池的分析和应用


    连接池(Connection Pool)是在系统初始化的时候,将网络连接作为对象存储在内存中,当用户需要进行连接时,从连接池中取一个已经建立好的空连接对象,使用完后不关闭连接,将连接放回连接池,而不必频繁地创建和关闭连接,这可以减少连接的开销和提高系统响应速度。
    连接池的工作原理:

    1. 连接的创建和初始化:初始阶段,设置最小连接数、最多连接数以及连接的时间等参数,然后会创建一定数量的数据库连接。这些连接通常被称为连接池的“空闲连接”。
    2. 连接的获取:当应用程序需要与数据库交互时,可以在连接池中寻找是否有空闲的连接,如果有则使用,没有就创建一个连接或等待其他客户端释放连接。
    3. 连接的使用:应用程序可以使用数据库连接来执行查询、插入、更新或删除操作。一旦操作完成,连接仍然可以保持开放,以便稍后的重复使用。
    4. 连接的归还:当应用程序不再需要连接时,它将连接归还到连接池,以便其他应用程序可以再次使用它。
    5. 连接的管理:连接池通常会跟踪连接的状态和可用性。如果连接变得无效(例如,由于连接断开或发生错误),连接池会自动替换它。

    连接池的优势包括:

    • 减少连接开销:连接池避免了频繁创建和关闭连接,从而减少了连接的开销,提高了数据库操作的性能。
    • 提高响应速度:由于连接池中已经预先创建了一些连接,当有数据库操作请求时,可以直接从连接池中获取一个空闲的连接,避免了等待创建连接的时间,从而提高了响应速度。
    • 资源管理:连接池可以通过配置参数来动态调整连接的数量和状态,从而能够更好地适应不同的业务场景和负载情况,避免过多的数据库连接对数据库服务器造成负担。
    • 连接复用:连接池允许多个应用程序共享连接,以实现连接的复用,这对于高负载系统非常有用。

    Python 库中的连接池

    redis第三方库 redis的连接池

    # python3.7.9 redis 5.0.1 
    # redis/connection.py
    
    class ConnectionPool(object):
        def __init__(self, connection_class=Connection, max_connections=None,
                     **connection_kwargs):
            # 连接池最大连接数,默认可以创建2**31个
            max_connections = max_connections or 2 ** 31
            if not isinstance(max_connections, (int, long)) or max_connections < 0:
                raise ValueError('"max_connections" must be a positive integer')
    
            self.connection_class = connection_class
            self.connection_kwargs = connection_kwargs
            self.max_connections = max_connections
            # 保护_checkpid()临界区
            self._fork_lock = threading.Lock()
            self.reset()
        
        def reset(self):
            self._lock = threading.Lock()
            # 连接池大小
            self._created_connections = 0
            # 可用得连接
            self._available_connections = []
            # 正在使用得连接
            self._in_use_connections = set()
        	# 连接池的进程ID
            self.pid = os.getpid()
        
        # 创建一个连接,并加到连接池中
        def make_connection(self):
            "Create a new connection"
            if self._created_connections >= self.max_connections:
                raise ConnectionError("Too many connections")
            self._created_connections += 1
            return self.connection_class(**self.connection_kwargs)
    	
        # 从连接池中获取一个连接
        def get_connection(self, command_name, *keys, **options):
            "Get a connection from the pool"
            self._checkpid()
            with self._lock:
                try:
                    # 从可用的连接池中pop一个连接
                    connection = self._available_connections.pop()
                except IndexError:
                    # 如果无可用的连接,就创建一个
                    connection = self.make_connection()
                # 然后加入到已用的列表中
                self._in_use_connections.add(connection)
    
            try:
                # ensure this connection is connected to Redis
                connection.connect()
                try:
                    if connection.can_read():
                        raise ConnectionError('Connection has data')
                except ConnectionError:
                    connection.disconnect()
                    connection.connect()
                    if connection.can_read():
                        raise ConnectionError('Connection not ready')
            except BaseException:
                # release the connection back to the pool so that we don't
                # leak it
                self.release(connection)
                raise
    
            return connection
        
        # 释放一个连接
        def release(self, connection):
            "Releases the connection back to the pool"
            self._checkpid()
            with self._lock:
                try:
                    # 从已用的列表中移除
                    self._in_use_connections.remove(connection)
                except KeyError:
                    # Gracefully fail when a connection is returned to this pool
                    # that the pool doesn't actually own
                    pass
    
                if self.owns_connection(connection):
                    # 如果是连接池中的连接,再加到可用的连接池中
                    self._available_connections.append(connection)
                else:
                    # 如果是普通的连接,则断开连接
                    self._created_connections -= 1
                    connection.disconnect()
                    return
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91

    内置模块 http 中的连接池

    # urllib3/connectionpool.py
    class HTTPConnectionPool(ConnectionPool, RequestMethods):
        def __init__(self):
            # 初始化连接池
            self.pool = self.QueueCls(maxsize)
        
        def _new_conn(self):
            """新建一个连接"""
            conn = self.ConnectionCls(
                host=self.host,
                port=self.port,
                timeout=self.timeout.connect_timeout,
                strict=self.strict,
                **self.conn_kw
            )
            return conn
    
    	def _get_conn(self, timeout=None):
            """获取一个连接"""
            conn = None
            try:
                conn = self.pool.get(block=self.block, timeout=timeout)
    
            except AttributeError:  # self.pool is None
                raise ClosedPoolError(self, "Pool is closed.")
            """
            .........
            """
            # 获取一个连接,如果不存在创建一个新连接
            return conn or self._new_conn()
            
        def _put_conn(self, conn):
            """连接使用完后,放回连接池"""
            try:
                self.pool.put(conn, block=False)
                return  # Everything is dandy, done.
            except AttributeError:
                # self.pool is None.
                pass
        
        def close(self):
            """关闭所有连接"""
            if self.pool is None:
                return
            # Disable access to the pool
            old_pool, self.pool = self.pool, None
    
            try:
                while True:
                    conn = old_pool.get(block=False)
                    if conn:
                        conn.close()
    
            except queue.Empty:
                pass  # Done.
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
  • 相关阅读:
    kubernetes 高可用集群
    python基础学习笔记1
    Google Earth Engine(GEE)——核函数应用
    网站收录查询-批量网站收录查询软件
    jquery导航图片全屏滚动、首页全屏轮播图,各式相册
    面向对象开发方法
    Leetcode 895. Maximum Frequency Stack (Heap 题)
    Navicat 常见错误代码汇总
    337. 打家劫舍 III
    使用卡特兰数来解决的问题
  • 原文地址:https://blog.csdn.net/u010442378/article/details/134381506