这里记录一下如何在python中实现数据库操作。
from dbutils.pooled_db import PooledDB
;creator:使用链接数据库的模块;
maxconnections:连接池允许的最大连接数,0和None表示没有限制;
mincached:初始化时,连接池至少创建的空闲的连接,0表示不创建;
maxcached:连接池空闲的最多连接数,0和None表示没有限制;
maxshared:连接池中最多共享的连接数量,0和None表示全部共享,ps:其实并没有什么用,因为pymsql和MySQLDB等模块中的threadsafety都为1,所有值无论设置多少,_maxcahed永远为0,所以永远是所有链接共享;
blocking:链接池中如果没有可用共享连接后,是否阻塞等待,True表示等待,False表示不等待然后报错;
setsession:开始会话前执行的命令列表;
ping:ping Mysql 服务端,检查服务是否可用;
DB_pool.connection()
可以获取多线程安全的数据库连接对象,一般一个线程获取一个即可;conn
对象;import sys
import pymssql
from dbutils.pooled_db import PooledDB
import json
import os
from utils.database_op import DBUtils
if __name__ == '__main__':
'''
启动配置
'''
if len(sys.argv) < 3:
print("error: init argv missed.")
else:
doc_uid = sys.argv[1]
doc_rpath = sys.argv[2]
config_path = sys.argv[3]
print(sys.argv)
# 数据库连接池
DB_pool = PooledDB(
creator=pymssql,
maxconnections=50,
mincached=0,
maxcached=20,
maxshared=0,
blocking=True,
setsession=[],
ping=5,
host=xxx,
port=xxx,
user=xxx,
password=xxx,
database=xxx)
# conn可以从连接池中获取连接
DB_conn = DB_pool.connection()
try:
DBUtils.update_start_process_at(DB_conn, doc_uid)
DBUtils.update_phase(DB_conn, doc_uid, 300)
DBUtils.update_process_id(DB_conn, doc_uid, os.getpid())
DBUtils.update_json_data(DB_conn, doc_uid, pattern_json_data)
DBUtils.update_finish_process_at(DB_conn, doc_uid)
except BaseException as e:
# 打印错误,repr将对象转换成字符串
print('error: ' + repr(e))
print(e)
finally:
# 释放数据库连接
DB_conn.close()
cursor
,然后执行SQL,如果是增删改操作,均需要commit()
才能使得数据库生效;commit()
可以一次性提交多个未commit的增删改操作;datetime
类型更新时也是使用字符串的写法来更新,获取当前时间的写法是str(datetime.datetime.now())[0:-3]
;varchar(MAX)
,意为变长字符串,容量最大为2GB;cursor
并不是线程安全的,不能在多线程中同时调用,否则会引发数据库死锁,导致所有的数据库操作均堵塞;import pymssql
from dbutils.pooled_db import PooledDB
import datetime
import json
class DBUtils:
@staticmethod
def update_status(DB_conn, doc_uid, status):
# 使用 cursor() 方法创建一个游标对象 cursor
DB_cursor = DB_conn.cursor()
try:
DB_sql = "update xxx" + \
" set status = " + str(status) + \
" where uuid = \'" + doc_uid + '\''
print(DB_sql)
# 执行SQL语句
DB_cursor.execute(DB_sql)
DB_conn.commit()
print("Update successfully.")
except Exception as e:
DB_conn.rollback()
# 打印错误,repr将对象转换成字符串
print('error: ' + repr(e))
finally:
DB_cursor.close()
@staticmethod
def update_json_data(DB_conn, doc_uid, json_data):
# 使用 cursor() 方法创建一个游标对象 cursor
DB_cursor = DB_conn.cursor()
# json转string
json_str = json.dumps(json_data, ensure_ascii=False)
try:
DB_sql = "update xxx" + \
" set json_data = \'" + json_str + '\'' + \
" where uuid = \'" + doc_uid + '\''
print(DB_sql)
# 执行SQL语句
DB_cursor.execute(DB_sql)
DB_conn.commit()
print("Update successfully.")
except Exception as e:
DB_conn.rollback()
# 打印错误,repr将对象转换成字符串
print('error: ' + repr(e))
finally:
DB_cursor.close()
@staticmethod
def update_start_process_at(DB_conn, doc_uid):
# 使用 cursor() 方法创建一个游标对象 cursor
DB_cursor = DB_conn.cursor()
try:
DB_sql = "update xxx" + \
" set start_process_at = \'" + str(datetime.datetime.now())[0:-3] + '\'' + \
" where uuid = \'" + doc_uid + '\''
print(DB_sql)
# 执行SQL语句
DB_cursor.execute(DB_sql)
DB_conn.commit()
print("Update successfully.")
except Exception as e:
DB_conn.rollback()
# 打印错误,repr将对象转换成字符串
print('error: ' + repr(e))
finally:
DB_cursor.close()
pip install pysnowflake
pip show pysnowflake
c:\users\dell\appdata\roaming\python\python38\Scripts
,双击启动snowflake_start_server.exe
;from snowflake import client
client.get_guid()
snowflake_start_server.exe
来启动,直接在cmd中执行如下命令即可启动服务器端:snowflake_start_server