我们知道使用pymysql连接数据库一般需要下面的步骤:
conn = pymysql.connect(host, port, ...)
cursor = conn.cursor()
sql = "select * from goods;"
cursor.execute(sql)
res = cursor.fetchall() # 查看所有,可以查看一部分
print(res)
cursor.close()
conn.close()
连接池是connection对象的缓冲区,他里面会存放一些connection,当程序需要使用connection时,如果连接池中有,则直接从连接池获取,不需要再重新创建connection。连接池让程序能够复用连接。资源复用.
可以尽量避免连接三次握手四次挥手等非业务流程带来的损耗。池化技术,本质上说就是资源复用。
1、连接池维护着两个容器空闲池和活动池
2、空闲池用于存放未使用的连接,活动池存放正在使用的连接,活动池中的连接使用完之后要归还回空闲池
3、当需要连接时,先判断空闲池是否有连接,如果有则取出一个放置到活动池供程序使用
4、如果没有,则判断活动池中连接是否达到最大连接数,如果没有,则创建一个连接放到活动池供程序使用。
5、如果空闲池中没有连接,活动池中连接也达到上限,则不能创建新连接,此时会判断是否等待超时,如果没有等待超时则需等待活动池中的连接归还回空闲池
6、如果等待超时,可以采取多种处理方式,例如:直接抛出异常,或将活动池中使用最久的连接移除掉归还回空闲池以供程序使用
首先解决一个问题,为什么需要多个连接?对于数据库而言,每个连接都是独占的,如果一个业务占用连接,而又进入了IO等待,那么这个业务就会一直占用这个连接,其他业务也只能等待。如果开多条连接,不同的业务就可以利用不同的连接通道来处理。
一般情况下,线程池线程数量尽量与连接池中的连接数量一致。这样做可以避免不同线程之间抢资源或是资源过多线程处理不过来导致浪费的情况。注意这里说的一般情况,是业务服务器与数据库服务器性能差不多的情况。其实,数据库服务器也是使用多线程来处理业务,如果连接数过多,数据库会处理不过来。
另外,如果不使用连接池,就需要不同的线程去绑定不同的连接,使得整个系统的耦合度变高。还要注意一点的是,在一个数据库服务器中可能有多个连接池。比如,一个连接池对应写业务,与一个只处理写业务的线程池,共同处理写数据库,而再用另一套连接池只处理读业务,与只读数据库连接,这样做就实现了读写分离。
import threading
from typing import List
import pymysql
from dbutils.pooled_db import PooledDB
from MovieRecSystem.config import redis_config, mysql_config
from MovieRecSystem.utils.logger_util import logger
def _encode(v, encoding='UTF-8') -> str:
return str(v, encoding=encoding)
'''
单例是redis, 然后提供静态方法
可以使用其它的
'''
class DB(object):
# 单例模式
# 必须要有一个静态的属性, 或者说类上的属性
__lock = threading.Lock() # 锁
__pool = None # 只要是单例就会有一个静态的属性, mysql就是我们的线程池
def __init__(self):
self.pool = DB.__get_conn_pool() # 链接时是需要时间的,所以创建连接池! 保证整个运行只有一个线程
@staticmethod
def __get_conn_pool():
'''
就是获取一个连接池(因为是单例), 获取锁之后再添加实例(和redis一样)
但是不代表已经连接成功了,还需要connection
:return:
'''
if DB.__pool is None:
DB.__lock.acquire()
try:
if DB.__pool is None:
# 获取配置信息
_cfg = mysql_config.cfg.copy()
if 'creator' not in _cfg:
raise ValueError("必须给定creator参数")
if _cfg['creator'] != 'pymysql':
raise ValueError("creator参数必须给定为pymysql")
del _cfg['creator']
DB.__pool = PooledDB(pymysql, **_cfg)
# DB.__pool = PooledDB(
# creator=pymysql, # 使用链接数据库的模块
# maxconnections=500, # 连接池允许的最大连接数,0和None表示不限制连接数
# mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
# maxcached=50, # 链接池中最多闲置的链接,0和None不限制
# blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
# maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
# setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
# ping=1,
# host="121.40.96.93",
# port=3306,
# user="gerry",
# password="123456",
# database="rec_system",
# charset="utf8"
# )
except Exception as e:
raise ValueError("创建Mysql数据库连接池异常!") from e
finally:
DB.__lock.release()
return DB.__pool
def _get_connection(self):
conn = self.pool.connection()
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
return conn, cursor
@staticmethod
def _close_connection(conn, cursor):
if cursor:
try:
cursor.close()
except Exception as e:
logger.error("关闭Mysql连接cursor异常。", exc_info=e)
if conn:
try:
conn.close()
except Exception as e:
logger.error("关闭Mysql连接conn异常。", exc_info=e)
# 执行查询结果(这个可以放到缓存里)
def query_sql(self, sql, **params):
conn, cursor = self._get_connection()
try:
cursor.execute(sql, params)
result = cursor.fetchall()
except Exception as e:
raise ValueError(f"Query查询异常,当前sql语句为:{sql}, 参数类别为:{params}.") from e
finally:
self._close_connection(conn, cursor)
return result
# 执行
def execute_sql(self, sql, **params):
conn, cursor = self._get_connection()
try:
cursor.execute(sql, params)
result = cursor.lastrowid
conn.commit()
except Exception as e:
conn.rollback()
raise ValueError(f"Execute执行异常,当前sql语句为:{sql}, 参数类别为:{params}.") from e
finally:
self._close_connection(conn, cursor)
return result