• Flask 数据库 连接池、DBUtils、http 连接池


    1、DBUtils 简介、使用

    DBUtils 简介

    DBUtils 是一套用于管理 数据库 "连接池" 的Python包,为 "高频度、高并发"  的数据库访问提供更好的性能,可以自动管理连接对象的创建和释放。并允许对非线程安全的数据库接口进行线程安全包装和连接。该连接可在各种多线程环境中使用。

    使用场景:如果使用的是流行的对象关系映射器 SQLObject 或 SQLAlchemy 之一,则不需要 DBUtils,因为它们带有自己的连接池。SQLObject 2 (SQL-API) 实际上是从 DBUtils 中借用了一些代码,将池化分离到一个单独的层中。

    DBUtils 提供两种外部接口:

    • PersistentDB :提供线程专用的数据库连接,并自动管理连接。
    • PooledDB :提供线程间可共享的数据库连接,并自动管理连接。

    另外,实际使用的数据库驱动也有所依赖,比如SQLite数据库只能使用PersistentDB作连接池。 下载地址:http://www.webwareforpython.org/downloads/DBUtils/

    使用 DBUtils 数据库 连接池

    安装:pip install DBUtils

    示例:MySQLdb 模块使用

    连接池对象只初始化一次,一般可以作为模块级代码来确保。 PersistentDB 的连接例子:

    1. import DBUtils.PersistentDB
    2. # maxusage 则为一个连接最大使用次数
    3. persist = DBUtils.PersistentDB.PersistentDB(dbpai=MySQLdb,maxusage=1000,**kwargs)
    4. # 获取连接池
    5. conn = persist.connection()
    6. # 关闭连接池
    7. conn.close()

    参数 dbpai 指定使用的数据库模块,兼容 DB-API 。下面是支持 DB-API 2 规范的数据库模块

    pip install pymysql(mysql)
    pip install pymssql(sqlserver)
    pip install cx_Oracle(oracle)
    pip install phoenixdb(hbase)
    pip install sqlite3(sqlite3 python自带)

    DBUtils 仅提供给了连接池管理,实际的数据库操作依然是由符合 DB-API 2 标准的目标数据库模块完成的。

    示例:pymysql 模块使用

    PooledDB 使用方法同 PersistentDB,只是参数有所不同。

    • dbapi :数据库接口
    • mincached :启动时开启的空连接数量
    • maxcached :连接池最大可用连接数量
    • maxshared :连接池最大可共享连接数量
    • maxconnections :最大允许连接数量
    • blocking :达到最大数量时是否阻塞
    • maxusage :单个连接最大复用次数
    • setsession :用于传递到数据库的准备会话,如 [”set name UTF-8″] 。

    conn = pooled.connection()
    cur = conn.cursor()
    cur.execute(sql)
    res = cur.fetchone()
    cur.close()   # 或者 del cur
    conn.close()  # 或者 del conn

    1. import pymysql
    2. from dbutils.pooled_db import PooledDB
    3. # 定义连接参数
    4. pool = PooledDB(
    5. creator=pymysql,
    6. maxconnections=6,
    7. mincached=2,
    8. maxcached=5,
    9. blocking=True,
    10. host='localhost',
    11. user='root',
    12. passwd='123456',
    13. db='mydb',
    14. port=3306,
    15. charset='utf8mb4'
    16. )
    17. def main():
    18. # 从连接池获取连接
    19. conn = pool.connection()
    20. cursor = conn.cursor()
    21. # 执行 SQL 语句
    22. sql = "SELECT * FROM students"
    23. cursor.execute(sql)
    24. result = cursor.fetchall()
    25. # 处理查询结果
    26. for row in result:
    27. print(row)
    28. # 关闭游标和连接
    29. cursor.close()
    30. conn.close()
    31. if __name__ == '__main__':
    32. main()

    示例:面向对象 使用 DBUtils

    1. """
    2. 使用DBUtils数据库连接池中的连接,操作数据库
    3. """
    4. import json
    5. import pymysql
    6. import datetime
    7. from DBUtils.PooledDB import PooledDB
    8. import pymysql
    9. class MysqlClient(object):
    10. __pool = None;
    11. def __init__(self, mincached=10, maxcached=20, maxshared=10, maxconnections=200, blocking=True,
    12. maxusage=100, setsession=None, reset=True,
    13. host='127.0.0.1', port=3306, db='test',
    14. user='root', passwd='123456', charset='utf8mb4'):
    15. """
    16. :param mincached:连接池中空闲连接的初始数量
    17. :param maxcached:连接池中空闲连接的最大数量
    18. :param maxshared:共享连接的最大数量
    19. :param maxconnections:创建连接池的最大数量
    20. :param blocking:超过最大连接数量时候的表现,为True等待连接数量下降,为false直接报错处理
    21. :param maxusage:单个连接的最大重复使用次数
    22. :param setsession:optional list of SQL commands that may serve to prepare
    23. the session, e.g. ["set datestyle to ...", "set time zone ..."]
    24. :param reset:how connections should be reset when returned to the pool
    25. (False or None to rollback transcations started with begin(),
    26. True to always issue a rollback for safety's sake)
    27. :param host:数据库ip地址
    28. :param port:数据库端口
    29. :param db:库名
    30. :param user:用户名
    31. :param passwd:密码
    32. :param charset:字符编码
    33. """
    34. if not self.__pool:
    35. self.__class__.__pool = PooledDB(pymysql,
    36. mincached, maxcached,
    37. maxshared, maxconnections, blocking,
    38. maxusage, setsession, reset,
    39. host=host, port=port, db=db,
    40. user=user, passwd=passwd,
    41. charset=charset,
    42. cursorclass=pymysql.cursors.DictCursor
    43. )
    44. self._conn = None
    45. self._cursor = None
    46. self.__get_conn()
    47. def __get_conn(self):
    48. self._conn = self.__pool.connection();
    49. self._cursor = self._conn.cursor();
    50. def close(self):
    51. try:
    52. self._cursor.close()
    53. self._conn.close()
    54. except Exception as e:
    55. print(e)
    56. def __execute(self, sql, param=()):
    57. count = self._cursor.execute(sql, param)
    58. print(count)
    59. return count
    60. @staticmethod
    61. def __dict_datetime_obj_to_str(result_dict):
    62. """把字典里面的datatime对象转成字符串,使json转换不出错"""
    63. if result_dict:
    64. result_replace = {k: v.__str__() for k, v in result_dict.items() if isinstance(v, datetime.datetime)}
    65. result_dict.update(result_replace)
    66. return result_dict
    67. def select_one(self, sql, param=()):
    68. """查询单个结果"""
    69. count = self.__execute(sql, param)
    70. result = self._cursor.fetchone()
    71. """:type result:dict"""
    72. result = self.__dict_datetime_obj_to_str(result)
    73. return count, result
    74. def select_many(self, sql, param=()):
    75. """
    76. 查询多个结果
    77. :param sql: qsl语句
    78. :param param: sql参数
    79. :return: 结果数量和查询结果集
    80. """
    81. count = self.__execute(sql, param)
    82. result = self._cursor.fetchall()
    83. """:type result:list"""
    84. [self.__dict_datetime_obj_to_str(row_dict) for row_dict in result]
    85. return count, result
    86. def execute(self, sql, param=()):
    87. count = self.__execute(sql, param)
    88. return count
    89. def begin(self):
    90. """开启事务"""
    91. self._conn.autocommit(0)
    92. def end(self, option='commit'):
    93. """结束事务"""
    94. if option == 'commit':
    95. self._conn.autocommit()
    96. else:
    97. self._conn.rollback()
    98. if __name__ == "__main__":
    99. mc = MysqlClient()
    100. sql1 = 'SELECT * FROM shiji WHERE id = 1'
    101. result1 = mc.select_one(sql1)
    102. print(json.dumps(result1[1], ensure_ascii=False))
    103. sql2 = 'SELECT * FROM shiji WHERE id IN (%s,%s,%s)'
    104. param = (2, 3, 4)
    105. print(json.dumps(mc.select_many(sql2, param)[1], ensure_ascii=False))

    不用 连接池

    1. import MySQLdb
    2. conn= MySQLdb.connect(host='localhost',user='root',passwd='pwd',db='myDB',port=3306)
    3. #import pymysql
    4. #conn = pymysql.connect(host='localhost', port='3306', db='game', user='root', password='123456', charset='utf8')
    5. cur=conn.cursor()
    6. SQL="select * from table1"
    7. r=cur.execute(SQL)
    8. r=cur.fetchall()
    9. cur.close()
    10. conn.close()

    使用 连接池

    1. import MySQLdb
    2. from DBUtils.PooledDB import PooledDB
    3. #5为连接池里的最少连接数
    4. pool = PooledDB(MySQLdb,5,host='localhost',user='root',passwd='pwd',db='myDB',port=3306)
    5. # 以后每次需要数据库连接就是用connection()函数获取连接就好了
    6. conn = pool.connection()
    7. cur=conn.cursor()
    8. SQL="select * from table1"
    9. r=cur.execute(SQL)
    10. r=cur.fetchall()
    11. cur.close()
    12. conn.close()

    多线程 使用 连接池

    1. import sys
    2. import threading
    3. import MySQLdb
    4. import DBUtils.PooledDB
    5. connargs = { "host":"localhost", "user":"user1", "passwd":"123456", "db":"test" }
    6. def test(conn):
    7. try:
    8. cursor = conn.cursor()
    9. count = cursor.execute("select * from users")
    10. rows = cursor.fetchall()
    11. for r in rows: pass
    12. finally:
    13. conn.close()
    14. def testloop():
    15. print ("testloop")
    16. for i in range(1000):
    17. conn = MySQLdb.connect(**connargs)
    18. test(conn)
    19. def testpool():
    20. print ("testpool")
    21. pooled = DBUtils.PooledDB.PooledDB(MySQLdb, **connargs)
    22. for i in range(1000):
    23. conn = pooled.connection()
    24. test(conn)
    25. def main():
    26. t = testloop if len(sys.argv) == 1 else testpool
    27. for i in range(10):
    28. threading.Thread(target = t).start()
    29. if __name__ == "__main__":
    30. main()

    虽然测试方式不是很严谨,但从测试结果还是能感受到 DBUtils 带来的性能提升。当然,我们我们也可以在 testloop() 中一直重复使用一个不关闭的 Connection,但这却不适合实际开发时的情形。

    2、Flask 配置,蓝图,数据库连接池,上下文原理

    Flask之配置文件,蓝图,数据库连接池,上下文原理:https://www.cnblogs.com/yunweixiaoxuesheng/p/8418135.html

    配  置

    方式一,使用字典方式配置

    app.config['SESSION_COOKE_NAME'] = 'session_liling'

    方式二,引入文件,设置

    from flask import Flask

    app = Flask(__name__)

    app.config.from_pyfile('settings.py')    # 引用settings.py中的AAAA
    print(app.config['AAAA'])        # 123

    # settings.py
    AAAA = 123

    方法三,使用环境变量设置,推荐使用

    from flask import Flask

    app = Flask(__name__)

    import os

    os.environ['FLASK-SETTINGS'] = 'settings.py'

    app.config.from_envvar('FLASK-SETTINGS')

    方式四,通过对象方式导入使用,可根据不同环境选择不同的配置,推荐使用

    from flask import Flask

    app = Flask(__name__)

    app.config.from_object('settings.BaseConfig')
    print(app.config['NNNN'])  # 123


    # settings.py

    class BaseConfig(object):  # 公用配置
        NNNN = 123


    class TestConfig(object):
        DB = '127.0.0.1'


    class DevConfig(object):
        DB = '192.168.1.1'


    class ProConfig(object):
        DB = '47.18.1.1'

    不同的文件 引用配置

    1. from flask import Flask,current_app
    2. app = Flask(__name__)
    3. app.secret_key = 'adfadsfhjkhakljsdfh'
    4. app.config.from_object('settings.BaseConfig')
    5. @app.route('/index')
    6. def index():
    7. print(current_app.config['NNNN'])
    8. return 'xxx'
    9. if __name__ == '__main__':
    10. app.run()

    instance_path、instance_relative_config

    1. from flask import Flask,current_app
    2. app = Flask(__name__,instance_path=None,instance_relative_config=False)
    3. # 默认 instance_relative_config = False 那么 instance_relative_config和instance_path都不会生效
    4. # instance_relative_config=True,instance_path才会生效,app.config.from_pyfile('settings.py')将会失效
    5. # 配置文件找的路径,按instance_path的值作为配置文件路径
    6. # 默认instance_path=None,None会按照当前路径下的instance文件夹为配置文件的路径
    7. # 如果设置路径,按照设置的路径查找配置文件。
    8. app.config.from_pyfile('settings.py')
    9. @app.route('/index')
    10. def index():
    11. print(current_app.config['NNNN'])
    12. return 'xxx'
    13. if __name__ == '__main__':
    14. app.run()

    蓝  图

    对应用程序的目录结构进行分配,一般适用于小中型企业

    代码:

    1. # crm/__init__.py
    2. # 创建flask项目,用蓝图注册不同的模块
    3. from flask import Flask
    4. from .views import account
    5. from .views import order
    6. app = Flask(__name__)
    7. app.register_blueprint(account.account)
    8. app.register_blueprint(order.order)
    9. ------------------------------------------------------------------------
    10. # manage.py
    11. # 启动文件
    12. import crm
    13. if __name__ == '__main__':
    14. crm.app.run()
    15. ------------------------------------------------------------------------
    16. # crm/views/account.py
    17. # 视图函数模块,Blueprint,将函数引入app
    18. from flask import Blueprint
    19. account = Blueprint('account',__name__,url_prefix='/xxx')
    20. @account.route('/login')
    21. def login():
    22. return 'Login'

    Flask 数据库 连接池

    https://www.cnblogs.com/TheLand/p/9178305.html

    ORM ( 对象关系映射 )

    ORM(Object-Relational Mapping,对象关系映射)是一种编程技术,用于在关系型数据库和面向对象编程语言之间建立映射关系。它允许开发人员使用面向对象的方式来操作数据库,而无需直接编写或执行 SQL 查询。

    ORM 提供了一个抽象层,将数据库表格映射为对象,并提供了一组方法和工具,以便于进行数据库的增删改查操作。开发人员可以通过使用对象和方法来表示和操作数据,而不必关心底层的 SQL 语句和数据库细节。

    常见的 ORM 框架包括:

    • SQLAlchemy:是 python 操作数据库的一个库,能够进行 orm 映射,是一个功能强大的 Python ORM 框架,支持多种数据库后端,提供了高级的查询功能和事务管理等特性。是为高效和高性能的数据库访问设计,实现了完整的企业级持久模型。SQLAlchemy 的理念是,SQL 数据库的量级和性能重要于对象集合;而对象集合的抽象又重要于表和行。
    • Flask-SQLAlchemy:Flask-SQLAlchemy 是一个与 Flask 框架集成的 SQLAlchemy 扩展,它简化了在 Flask 应用程序中使用 SQLAlchemy 进行数据库操作的过程。它提供了一组简单而强大的工具和功能,使得与数据库的交互变得更加轻松和高效。
    • Django ORM:Django 框架自带的 ORM,提供了简单易用的接口,支持多种数据库后端,并具有强大的查询和模型关联功能。
    • Hibernate:Java 领域中最流行的 ORM 框架,为 Java 对象和关系型数据库之间提供了映射和管理。

    使用 ORM 的好处包括:

    • 提高开发效率:ORM 提供了面向对象的编程接口,使得开发人员能够更快速地进行数据库操作,减少了编写和调试 SQL 语句的工作量。
    • 跨数据库平台:ORM 框架通常支持多种数据库后端,使得开发人员能够轻松地切换或同时使用不同的数据库系统。
    • 数据库抽象和安全性:ORM 隐藏了底层的数据库细节,提供了一层抽象,有助于维护和管理数据库结构,并提供了安全性保护,如参数绑定和防止 SQL 注入。
    • 更好的可维护性和可测试性:使用 ORM 可以提高代码的可读性和可维护性,使得进行单元测试和集成测试更加容易。

    注意:ORM 并不能解决所有数据库问题。在某些情况下,复杂的查询和性能要求可能需要直接使用原生 SQL。因此,根据具体的需求和场景,谨慎选择和使用合适的 ORM 框架。

    为什么 使用 数据库 连接池

    • 多连接:如果不用连接池时,每次操作都要链接数据库,链接次数过多,数据库会耗费过多资源,数量过大的话,数据库会过载,导致程序运行缓慢。
    • 单连接:在程序中全局创建连接,导致程序会一直使用一个连接,避免了反复连接造成的问题。但是多线程时就得加锁。这样就变成串行,没法实现并发

    解决方法:

    • 方式 1:为每一个线程创建一个链接(是基于本地线程来实现的。thread.local),每个线程独立使用自己的数据库链接,该线程关闭不是真正的关闭,本线程再次调用时,还是使用的最开始创建的链接,直到线程终止,数据库链接才关闭。如果线程比较多还是会创建很多连接
    • 方式 2:创建一个链接池,为所有线程提供连接,使用时来进行获取,使用完毕后在放回到连接池。假设最大链接数有10个,其实也就是一个列表,当你pop一个,人家会在append一个,链接池的所有的链接都是按照排队的这样的方式来链接的。链接池里所有的链接都能重复使用,共享的, 即实现了并发,又防止了链接次数太多

    基于 DBUtils 数据库连接池

    数据库连接池避免每次操作都要连接数据库,一直使用一个连接,多线程也会出现问题,可加锁,但变为串行

    1. import pymysql
    2. import threading
    3. from threading import RLock
    4. LOCK = RLock()
    5. CONN = pymysql.connect(
    6. host='127.0.0.1',
    7. port=3306,
    8. user='root',
    9. password='123',
    10. database='ok1',
    11. charset='utf8'
    12. )
    13. def task(arg):
    14. with LOCK:
    15. cursor = CONN.cursor()
    16. cursor.execute('select * from book')
    17. result = cursor.fetchall()
    18. cursor.close()
    19. print(result)
    20. for i in range(10):
    21. t = threading.Thread(target=task, args=(i,))
    22. t.start()

    线程之间 的 数据隔离

    "本地线程" 可以实现线程之间的数据隔离。保证每个线程都只有自己的一份数据,在操作时不会影响别人的,即使是多线程,自己的值也是互相隔离的

    1. import threading
    2. import time
    3. # 本地线程对象
    4. local_values = threading.local()
    5. def func(num):
    6. """
    7. # 第一个线程进来,本地线程对象会为他创建一个
    8. # 第二个线程进来,本地线程对象会为他创建一个
    9. {
    10. 线程1的唯一标识:{name:1},
    11. 线程2的唯一标识:{name:2},
    12. }
    13. :param num:
    14. :return:
    15. """
    16. local_values.name = num # 4
    17. # 线程停下来了
    18. time.sleep(2)
    19. # 第二个线程: local_values.name,去local_values中根据自己的唯一标识作为key,获取value中name对应的值
    20. print(local_values.name, threading.current_thread().name)
    21. for i in range(5):
    22. th = threading.Thread(target=func, args=(i,), name='线程%s' % i)
    23. th.start()

    模式一:每个线程创建一个连接

    基于threading.local实现创建每个连接。
    每个线程会创建一个连接,该线程没有真正关闭。
    再次调用该线程时,还是使用原有的连接。
    线程真正终止的时候,连接才会关闭。

    1. from DBUtils.PersistentDB import PersistentDB
    2. import pymysql
    3. POOL = PersistentDB(
    4. creator=pymysql, # 使用链接数据库的模块
    5. maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
    6. setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    7. ping=0,
    8. # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    9. closeable=False,
    10. # 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
    11. threadlocal=None, # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
    12. host='127.0.0.1',
    13. port=3306,
    14. user='root',
    15. password='123',
    16. database='pooldb',
    17. charset='utf8'
    18. )
    19. def func():
    20. # conn = SteadyDBConnection()
    21. conn = POOL.connection()
    22. cursor = conn.cursor()
    23. cursor.execute('select * from tb1')
    24. result = cursor.fetchall()
    25. cursor.close()
    26. conn.close() # 不是真的关闭,而是假的关闭。 conn = pymysql.connect() conn.close()
    27. conn = POOL.connection()
    28. cursor = conn.cursor()
    29. cursor.execute('select * from tb1')
    30. result = cursor.fetchall()
    31. cursor.close()
    32. conn.close()
    33. import threading
    34. for i in range(10):
    35. t = threading.Thread(target=func)
    36. t.start()

    模式二:线程复用连接池 (推荐)

    创建一个连接池,为所有线程提供连接,线程使用连接时获取连接,使用完毕放回连接池。
    线程不断地重用连接池里的连接。

    1. import time
    2. import pymysql
    3. import threading
    4. from DBUtils.PooledDB import PooledDB, SharedDBConnection
    5. POOL = PooledDB(
    6. creator=pymysql, # 使用链接数据库的模块
    7. maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
    8. mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    9. maxcached=5, # 链接池中最多闲置的链接,0和None不限制
    10. maxshared=3,
    11. # 链接池中最多共享的链接数量,0和None表示全部共享。
    12. # PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,
    13. # _maxcached永远为0,所以永远是所有链接都共享。
    14. blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    15. maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
    16. setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    17. # ping MySQL服务端,检查是否服务可用。
    18. # 如:0 = None = never, 1 = default = whenever it is requested,
    19. # 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    20. ping=0,
    21. host='127.0.0.1',
    22. port=3306,
    23. user='root',
    24. password='123456',
    25. database='flask_test',
    26. charset='utf8'
    27. )
    28. def func():
    29. # 检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常
    30. # 否则
    31. # 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。
    32. # 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
    33. # 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,
    34. # 再封装到PooledDedicatedDBConnection中并返回。
    35. # 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。
    36. # PooledDedicatedDBConnection
    37. conn = POOL.connection()
    38. # print(th, '链接被拿走了', conn1._con)
    39. # print(th, '池子里目前有', pool._idle_cache, '\r\n')
    40. cursor = conn.cursor()
    41. cursor.execute('select * from userinfo')
    42. result = cursor.fetchall()
    43. print(result)
    44. conn.close()
    45. conn = POOL.connection()
    46. # print(th, '链接被拿走了', conn1._con)
    47. # print(th, '池子里目前有', pool._idle_cache, '\r\n')
    48. cursor = conn.cursor()
    49. cursor.execute('select * from userinfo')
    50. result = cursor.fetchall()
    51. conn.close()
    52. func()

    上下文管理

    所谓上下文,像考试题目根据上下文,回答一下问题。
    程序中,泛指的外部环境,像wsgi来的网络请求,而且通常只有上文。
    flask中的上下文,被使用在 current_app,session,request 上。

    flask 本地线程

    1. from flask import session
    2. try:
    3. from greenlet import getcurrent as get_ident # grenlet协程模块
    4. except ImportError:
    5. try:
    6. from thread import get_ident
    7. except ImportError:
    8. from _thread import get_ident # get_ident(),获取线程的唯一标识
    9. class Local(object): # 引用session中的LocalStack下的Local
    10. __slots__ = ('__storage__', '__ident_func__') # __slots__该类在外面调用时,只能调用定义的字段,其他的不能调用
    11. def __init__(self):
    12. # object.__setattr__为self设置值,等价于self.__storage__ = {}
    13. # 为父类object中包含的__steattr__方法中的self.__storage__ = {}
    14. # 由于类内包含__steattr__,self.xxx(对象.xxx)时会自动会触发__steattr__,
    15. # 当前__steattr__中storage = self.__storage__又会像self.xxx要值,故会造成递归
    16. # 所以在父类中__steattr__方法赋值,避免self.xxx调用__setattr__造成的递归
    17. object.__setattr__(self, '__storage__', {})
    18. object.__setattr__(self, '__ident_func__', get_ident) # 赋值为协程
    19. def __iter__(self):
    20. return iter(self.__storage__.items())
    21. def __release_local__(self):
    22. self.__storage__.pop(self.__ident_func__(), None)
    23. def __getattr__(self, name):
    24. try:
    25. return self.__storage__[self.__ident_func__()][name]
    26. except KeyError:
    27. raise AttributeError(name)
    28. def __setattr__(self, name, value):
    29. ident = self.__ident_func__() # 获取单钱线程(协程)的唯一标识
    30. storage = self.__storage__ # {}
    31. try:
    32. storage[ident][name] = value # { 111 : {'stack':[] },222 : {'stack':[] } }
    33. except KeyError:
    34. storage[ident] = {name: value}
    35. def __delattr__(self, name):
    36. try:
    37. del self.__storage__[self.__ident_func__()][name]
    38. except KeyError:
    39. raise AttributeError(name)
    40. _local = Local() # flask的本地线程功能,类似于本地线程,如果有人创建Local对象并,设置值,每个线程里一份
    41. _local.stack = [] # _local.stack会调用__setattr__的self.__ident_func__()取唯一标识等

    特殊栈

    1. from flask import session
    2. try:
    3. from greenlet import getcurrent as get_ident
    4. except ImportError:
    5. try:
    6. from thread import get_ident
    7. except ImportError:
    8. from _thread import get_ident # 获取线程的唯一标识 get_ident()
    9. class Local(object):
    10. __slots__ = ('__storage__', '__ident_func__')
    11. def __init__(self):
    12. # self.__storage__ = {}
    13. # self.__ident_func__ = get_ident
    14. object.__setattr__(self, '__storage__', {})
    15. object.__setattr__(self, '__ident_func__', get_ident)
    16. def __iter__(self):
    17. return iter(self.__storage__.items())
    18. def __release_local__(self):
    19. self.__storage__.pop(self.__ident_func__(), None)
    20. def __getattr__(self, name):
    21. try:
    22. return self.__storage__[self.__ident_func__()][name]
    23. except KeyError:
    24. raise AttributeError(name)
    25. def __setattr__(self, name, value):
    26. ident = self.__ident_func__() # 获取当前线程(协程)的唯一标识
    27. storage = self.__storage__ # {}
    28. try:
    29. storage[ident][name] = value # { 111:{'stack':[] },222:{'stack':[] } }
    30. except KeyError:
    31. storage[ident] = {name: value}
    32. def __delattr__(self, name):
    33. try:
    34. del self.__storage__[self.__ident_func__()][name]
    35. except KeyError:
    36. raise AttributeError(name)
    37. _local = Local()
    38. _local.stack = []

    使用 flask 中的 stack 和 local

    1. from functools import partial
    2. from flask.globals import LocalStack, LocalProxy
    3. _request_ctx_stack = LocalStack()
    4. class RequestContext(object):
    5. def __init__(self, environ):
    6. self.request = environ
    7. def _lookup_req_object(name):
    8. top = _request_ctx_stack.top
    9. if top is None:
    10. raise RuntimeError(_request_ctx_stack)
    11. return getattr(top, name)
    12. # 实例化了LocalProxy对象,_lookup_req_object参数传递
    13. session = LocalProxy(partial(_lookup_req_object, 'session'))
    14. """
    15. local = {
    16. “标识”: {'stack': [RequestContext(),]}
    17. }
    18. """
    19. _request_ctx_stack.push(RequestContext('c1')) # 当请求进来时,放入
    20. print(session) # 获取 RequestContext('c1'), top方法
    21. print(session) # 获取 RequestContext('c1'), top方法
    22. _request_ctx_stack.pop() # 请求结束pop

    示例:

    1. from functools import partial
    2. from flask.globals import LocalStack, LocalProxy
    3. ls = LocalStack()
    4. class RequestContext(object):
    5. def __init__(self, environ):
    6. self.request = environ
    7. def _lookup_req_object(name):
    8. top = ls.top
    9. if top is None:
    10. raise RuntimeError(ls)
    11. return getattr(top, name)
    12. session = LocalProxy(partial(_lookup_req_object, 'request'))
    13. ls.push(RequestContext('c1')) # 当请求进来时,放入
    14. print(session) # 视图函数使用
    15. print(session) # 视图函数使用
    16. ls.pop() # 请求结束pop
    17. ls.push(RequestContext('c2'))
    18. print(session)
    19. ls.push(RequestContext('c3'))
    20. print(session)

    Flask SQLAlchemy 使用 连接池

    Flask SQLAlchemy 提供了内置的连接池功能,可以方便地配置和使用。

    在Flask应用中,我们可以通过配置 SQLALCHEMY_POOL_SIZE 参数来设置连接池的大小。连接池的大小决定了同时打开的数据库连接的数量。例如,我们可以将连接池的大小设置为10:

    app.config['SQLALCHEMY_POOL_SIZE'] = 10

    1. from flask_sqlalchemy import SQLAlchemy
    2. app = Flask(__name__)
    3. app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://user:password@localhost/db_name'
    4. db = SQLAlchemy(app)
    5. # 创建模型类
    6. class User(db.Model):
    7. id = db.Column(db.Integer, primary_key=True)
    8. name = db.Column(db.String(50))
    9. # 添加数据到数据库
    10. user = User(name='John')
    11. db.session.add(user)
    12. db.session.commit()
    13. # 查询数据
    14. all_users = User.query.all()
    15. # 更新数据
    16. user = User.query.filter_by(name='John').first()
    17. user.name = 'Jane'
    18. db.session.commit()
    19. # 删除数据
    20. user = User.query.filter_by(name='Jane').first()
    21. db.session.delete(user)
    22. db.session.commit()

    3、flask http 连接池

    Flask 本身并不提供内置的 HTTP 连接池功能,但可以使用第三方库来实现在 Flask 中使用 HTTP 连接池。其中一个常用的库是 urllib3,它提供了高级的连接池管理功能。

    示例:在 Flask 中使用 urllib3 来创建和管理 HTTP 连接池:

    安装:pip install urllib3

    1. from flask import Flask
    2. import urllib3
    3. app = Flask(__name__)
    4. """
    5. 使用 urllib3.PoolManager() 创建了一个连接池管理器对象 http,然后使用 http.request() 方法发送了一个 GET 请求。
    6. 您可以根据需要进行配置和自定义,例如设置最大连接数、超时时间、重试策略等。以下是一个示例,展示了如何进行自定义设置:
    7. """
    8. @app.route('/index_1')
    9. def index_1():
    10. http = urllib3.PoolManager()
    11. response = http.request('GET', 'http://api.example.com')
    12. return response.data
    13. """
    14. 对连接池进行了一些自定义配置,包括最大连接数、每个连接的最大数量、连接和读取的超时时间以及重试策略。
    15. 使用 urllib3 可以更好地控制和管理 HTTP 连接,提高 Flask 应用程序的性能和效率。
    16. """
    17. @app.route('/index_2')
    18. def index_2():
    19. http = urllib3.PoolManager(
    20. num_pools=10, # 最大连接数
    21. maxsize=100, # 每个连接的最大数量
    22. timeout=urllib3.Timeout(connect=2.0, read=5.0), # 连接和读取的超时时间
    23. retries=urllib3.Retry(total=3, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504]) # 重试策略
    24. )
    25. response = http.request('GET', 'http://api.example.com')
    26. return response.data

  • 相关阅读:
    mybatis-plus填充功能,自动设置值
    基于非对称纳什谈判的多微网电能共享运行优化策略(附带MATLAB程序)
    数仓建设教程
    SB树,看这一篇就够了
    QIBOX1-014-栏目的调用2
    前端面试常见问题总结
    【C++】C++11——C++11介绍、初始化列表、声明、auto、decltype、nullptr、范围for循环
    博客网页制作基础大二dw作业 web课程设计网页制作 个人网页设计与实现 我的个人博客网页开发
    NameNode的image和edits里面到底存放了些啥
    docker 部署vue
  • 原文地址:https://blog.csdn.net/freeking101/article/details/132876026