• Python01:数据库操作


    写在前面

    这里记录一下如何在python中实现数据库操作。

    一、数据库连接池

    • 数据库连接池可以满足高并发的数据库处理,比原始的JDBC连接更加稳健;
    • 需要引入from dbutils.pooled_db import PooledDB
    • 数据库连接池配置含义如下:

    creator:使用链接数据库的模块;
    maxconnections:连接池允许的最大连接数,0和None表示没有限制;
    mincached:初始化时,连接池至少创建的空闲的连接,0表示不创建;
    maxcached:连接池空闲的最多连接数,0和None表示没有限制;
    maxshared:连接池中最多共享的连接数量,0和None表示全部共享,ps:其实并没有什么用,因为pymsql和MySQLDB等模块中的threadsafety都为1,所有值无论设置多少,_maxcahed永远为0,所以永远是所有链接共享;
    blocking:链接池中如果没有可用共享连接后,是否阻塞等待,True表示等待,False表示不等待然后报错;
    setsession:开始会话前执行的命令列表;
    ping:ping Mysql 服务端,检查服务是否可用;

    • DB_pool.connection()可以获取多线程安全的数据库连接对象,一般一个线程获取一个即可;
    • 使用完毕的时候要释放conn对象;
    import sys
    import pymssql
    from dbutils.pooled_db import PooledDB
    import json
    import os
    
    from utils.database_op import DBUtils
    
    
    if __name__ == '__main__':
        '''
        启动配置
        '''
        if len(sys.argv) < 3:
            print("error: init argv missed.")
        else:
            doc_uid = sys.argv[1]
            doc_rpath = sys.argv[2]
            config_path = sys.argv[3]
        print(sys.argv)
    
        # 数据库连接池
        DB_pool = PooledDB(
            creator=pymssql,
            maxconnections=50,
            mincached=0,
            maxcached=20,
            maxshared=0,
            blocking=True,
            setsession=[],
            ping=5,
            host=xxx,
            port=xxx,
            user=xxx,
            password=xxx,
            database=xxx)
        # conn可以从连接池中获取连接
        DB_conn = DB_pool.connection()
        try:
            DBUtils.update_start_process_at(DB_conn, doc_uid)
            DBUtils.update_phase(DB_conn, doc_uid, 300)
            DBUtils.update_process_id(DB_conn, doc_uid, os.getpid())           
            DBUtils.update_json_data(DB_conn, doc_uid, pattern_json_data)
            DBUtils.update_finish_process_at(DB_conn, doc_uid)        
        except BaseException as e:
            # 打印错误,repr将对象转换成字符串
            print('error: ' + repr(e))
            print(e)
        finally:
            # 释放数据库连接
            DB_conn.close()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    二、封装数据库操作工具类

    • 将数据库操作封装成工具类可以更加简洁地操作数据库;
    • 类内函数均定义为静态函数;
    • 先获取游标对象cursor,然后执行SQL,如果是增删改操作,均需要commit()才能使得数据库生效;
    • commit()可以一次性提交多个未commit的增删改操作;
    • 数据库的datetime类型更新时也是使用字符串的写法来更新,获取当前时间的写法是str(datetime.datetime.now())[0:-3]
    • json数据也转换成字符串来更新,对应的数据库类型是varchar(MAX),意为变长字符串,容量最大为2GB;
    • 如果数据类型是字符串,则在SQL中要用单引号引起来;
    • 注意cursor并不是线程安全的,不能在多线程中同时调用,否则会引发数据库死锁,导致所有的数据库操作均堵塞;
    • 数据库操作的写法可以参考博客:Python数据库操作【三】—— SQLServer
    import pymssql
    from dbutils.pooled_db import PooledDB
    import datetime
    import json
    
    
    class DBUtils:
        @staticmethod
        def update_status(DB_conn, doc_uid, status):
            # 使用 cursor() 方法创建一个游标对象 cursor
            DB_cursor = DB_conn.cursor()
            try:
                DB_sql = "update xxx" + \
                         " set status = " + str(status) + \
                         " where uuid = \'" + doc_uid + '\''
                print(DB_sql)
                # 执行SQL语句
                DB_cursor.execute(DB_sql)
                DB_conn.commit()
                print("Update successfully.")
            except Exception as e:
            	DB_conn.rollback()
                # 打印错误,repr将对象转换成字符串
                print('error: ' + repr(e))
            finally:
                DB_cursor.close()
    
        @staticmethod
        def update_json_data(DB_conn, doc_uid, json_data):
            # 使用 cursor() 方法创建一个游标对象 cursor
            DB_cursor = DB_conn.cursor()
    
            # json转string
            json_str = json.dumps(json_data, ensure_ascii=False)
    
            try:
                DB_sql = "update xxx" + \
                         " set json_data = \'" + json_str + '\'' + \
                         " where uuid = \'" + doc_uid + '\''
                print(DB_sql)
                # 执行SQL语句
                DB_cursor.execute(DB_sql)
                DB_conn.commit()
                print("Update successfully.")
            except Exception as e:
            	DB_conn.rollback()
                # 打印错误,repr将对象转换成字符串
                print('error: ' + repr(e))
            finally:
                DB_cursor.close()
    
        @staticmethod
        def update_start_process_at(DB_conn, doc_uid):
            # 使用 cursor() 方法创建一个游标对象 cursor
            DB_cursor = DB_conn.cursor()
            try:
                DB_sql = "update xxx" + \
                         " set start_process_at = \'" + str(datetime.datetime.now())[0:-3] + '\'' + \
                         " where uuid = \'" + doc_uid + '\''
                print(DB_sql)
                # 执行SQL语句
                DB_cursor.execute(DB_sql)
                DB_conn.commit()
                print("Update successfully.")
            except Exception as e:
            	DB_conn.rollback()
                # 打印错误,repr将对象转换成字符串
                print('error: ' + repr(e))
            finally:
                DB_cursor.close()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70

    补充1:使用Snowflake生成唯一id

    • 下载python库:
    pip install pysnowflake
    
    • 1
    • 下面启动snowflake服务器端
    • 首先找到pip安装目录,命令如下:
    pip show pysnowflake
    
    • 1

    查看安装路径

    • 进入路径c:\users\dell\appdata\roaming\python\python38\Scripts,双击启动snowflake_start_server.exe
    • 然后在程序中引入库:
    from snowflake import client
    
    • 1
    • 在程序中使用如下:
    client.get_guid()
    
    • 1
    • 另外,如果是使用conda环境的话,似乎不用像pip那么麻烦找snowflake_start_server.exe来启动,直接在cmd中执行如下命令即可启动服务器端:
    snowflake_start_server
    
    • 1
  • 相关阅读:
    圣杯布局和双飞翼布局
    VoLTE端到端业务详解 | 网元标识类
    3天精通Postman---动态参数&amp;断言&amp;CSV数据驱动&amp;Mock Server
    Cloud
    react 使用 valtio
    【Codeforces】 CF1762E Tree Sum
    Domino服务器SSL证书安装指南
    13 MySQL-约束
    会议OA项目之我的审批
    什么是 JVM ?
  • 原文地址:https://blog.csdn.net/weixin_43992162/article/details/126443946