适用场景:
单例模式只允许创建一个对象,因此节省内存,加快对象访问速度,因此对象需要被公用的场合适合使用,如多个模块使用同一个数据源连接对象等等。如:
以下都是单例模式的经典使用场景:
接下来就是以 pymongo模块的来举例说明
pymongo 是 python 操作 mongodb 的官方库,它pymongo 提供了mongdb和python交互的所有方法,文档地址:https://www.mongodb.com/docs/drivers/pymongo/
pip install pymongo
先使用常规的连接方式来创建 pymongo 的连接看下,下面展示的是一个测试用例代码,具体源码在 gitee 上
源码的具体链接:https://gitee.com/allen-huang/python/blob/master/python-code/do-mongodb/test_client.py
...
def test_client1(self):
"""
简单连接1
@return:
"""
# 创建连接
client = MongoClient(host="127.0.0.1", port=27017, username="admin", password="YrnKzEubSv6")
# 选择一个库下面的集合,如果集合不存在,则自动新建
coll = client['test']['user']
print(coll.find_one({'uid': 1}))
pass
...

...
def test_client2(self):
"""
简单连接方式2:使用连接字符串
@return:
"""
# 创建连接
uri = f"mongodb://admin:YrnKzEubSv6@127.0.0.1:27017/?maxPoolSize=20"
client = MongoClient(uri, connectTimeoutMS=5000)
# 选择一个库下面的集合,如果集合不存在,则自动新建
coll = client['test']['user']
print(coll.find_one({'uid': 1}))
pass
...

这里创建单例模式,是基于__new__方法结合 pymongo的连接池来实现,pymongo 的连接池是内部已经集成了的,只要设置 maxPoolSize 参数就行。
import threading
from pymongo import MongoClient
from conf.load_config import Config
class MongoPool(object):
"""
mongodb连接池创建
"""
_pool = None
_instance = None
_lock = threading.Lock() # 线程锁,是属于同步锁
def __new__(cls):
if not cls._instance: # 在并发进来的时候,只会创建一次,如果已经创建了,就不用排队再去等待上锁了再判断实例是否为空,这样子可以提高性能。
with cls._lock: # 类似JAVA的synchronized,自动获取和释放锁
if not cls._instance:
cls._instance = cls.get_pool()
return cls._instance
@classmethod
def get_pool(cls):
"""
获取连接池实例
"""
user = Config['mongodb']['user']
host = Config['mongodb']['host']
port = Config['mongodb']['port']
password = Config['mongodb']['password']
maxPoolSize = Config['mongodb']['maxPoolSize']
connectTimeoutMS = Config['mongodb']['connectTimeoutMS']
# 连接mongodb的URI
uri = f"mongodb://{user}:{password}@{host}:{port}/?maxPoolSize={maxPoolSize}"
client = MongoClient(uri, connectTimeoutMS=connectTimeoutMS)
return client
是基于 yaml 来的,根据不同服务器环境变量来 自动匹配配置文件,pyYaml的使用参考这一篇文章。
安装:
pip install pyyaml
可以根据服务器各自的环境变量 来匹配(开发、测试、生产环境)的配置文件,这样子的好处就是各自的环境配置都不受到影响,提高的系统的稳定
我这边测试的是mac本地,在.zshrc文件中,环境变量共用了之前 chatgpt 项目设置好的环境变量CHATGPT_PYTHON_ENV,在测试和生产环境的原理也一样,只需要改成CHATGPT_PYTHON_ENV="test"和CHATGPT_PYTHON_ENV="prod"就行。

源码的链接地址:https://gitee.com/allen-huang/python/tree/master/conf/load_config.py
import os
from os.path import abspath, dirname
import yaml
# 获取各个环境对应的配置文件
CONFIG_FILES = {
'dev': 'config-dev.yaml',
'test': 'config-test.yaml',
'prod': 'config-prod.yaml'
}
# 获取配置
Config = {}
def load_config():
global Config
# 获取当前运行环境
env = os.getenv('CHATGPT_PYTHON_ENV', 'dev')
if env not in CONFIG_FILES:
raise ValueError('没有该环境的配置文件')
with open(abspath(dirname(__file__)) + '/' + CONFIG_FILES[env], 'r', encoding='utf-8') as f:
try:
Config = yaml.safe_load(f)
except yaml.YAMLError as e:
raise e
# 读取当前项目的运行环境,如果没有则默认是dev环境
try:
load_config()
except Exception as e:
print(e)
开启10个线程测试, 这里不采用单元测试,而是采用__main__里面执行线程,因为单元测试使用多线程会有问题,线程执行不全。
源码地址:https://gitee.com/allen-huang/python/blob/master/python-code/do-mongodb/test_client_singleton.py
import threading
import time
from db.mongo_pool import MongoPool
def task(arg):
mongo_obj = MongoPool()
# 连接池对象是否是同一个地址
str = f"执行编号:{arg}, 连接对象的内存地址: {id(mongo_obj)}"
# 等待2秒
time.sleep(2)
print(str)
# 使用多线程进行测试
if __name__ == '__main__':
thread_list = []
for i in range(10):
t = threading.Thread(target=task, args=(i,))
thread_list.append(t)
# 开始运行线程
t.start()
for t in thread_list:
# 等待线程结束
t.join()

下面是测试用例的代码,具体源码看链接https://gitee.com/allen-huang/python/blob/master/python-code/do-mongodb/test_client.py
...
def test_client3(self):
"""
简单连接方式3:使用单例
@return:
"""
data = MongoPool().test.user.find_one({'uid': 1})
print(data)
...
