# -*- coding: utf-8 -*-# @Time : 2022/6/25 11:50# @Author : ChowRunFa# @File : server.py.py# @Software: PyCharmimport pymysql
from dbConfig import*import time
import logging
import traceback
import pymysql.cursors
#mysql settingsclassMySQLConfig(object):
SQLALCHEMY_DATABASE_URI ="mysql+pymysql://{username}:{password}@{ipaddress}:{port}/{database}"\
.format(username="root",password="123456",ipaddress="127.0.0.1",port="3306",database="jx_db4")
SQLALCHEMY_TRACK_MODIFICATIONS =True#动态追踪修改设置
SQLALCHEMY_ECHO =True# WHITE_NAME_LIST = [""]classConnection(object):"""A lightweight wrapper around PyMySQL.
"""def__init__(self,max_idle_time=7*3600, connect_timeout=10,
time_zone="+0:00", charset ="utf8mb4", sql_mode="TRADITIONAL"):
self.host = MYSQL_HOST
self.database = MYSQL_DBNAME
self.max_idle_time =float(max_idle_time)
args =dict(use_unicode=True, charset=MYSQL_CHARSET,
database=MYSQL_DBNAME,
init_command=('SET time_zone = "%s"'% time_zone),
cursorclass=pymysql.cursors.DictCursor,
connect_timeout=connect_timeout, sql_mode=sql_mode)
args["user"]= MYSQL_USER
args["passwd"]= MYSQL_PASSWORD
# We accept a path to a MySQL socket file or a host(:port) string
args["host"]= MYSQL_HOST
args["port"]= MYSQL_PORT
self._db =None
self._db_args = args
self._last_use_time = time.time()try:
self.reconnect()except Exception:
logging.error("Cannot connect to MySQL on %s", self.host,
exc_info=True)def_ensure_connected(self):# Mysql by default closes client connections that are idle for# 8 hours, but the client library does not report this fact until# you try to perform a query and it fails. Protect against this# case by preemptively closing and reopening the connection# if it has been idle for too long (7 hours by default).if(self._db isNoneor(time.time()- self._last_use_time > self.max_idle_time)):
self.reconnect()
self._last_use_time = time.time()def_cursor(self):
self._ensure_connected()return self._db.cursor()def__del__(self):
self.close()defclose(self):"""Closes this database connection."""ifgetattr(self,"_db",None)isnotNone:
self._db.close()
self._db =Nonedefreconnect(self):"""Closes the existing database connection and re-opens it."""
self.close()
self._db = pymysql.connect(**self._db_args)
self._db.autocommit(True)defquery(self, query,*parameters,**kwparameters):"""Returns a row list for the given query and parameters."""
cursor = self._cursor()try:
cursor.execute(query, kwparameters or parameters)
result = cursor.fetchall()return result
finally:
cursor.close()defget(self, query,*parameters,**kwparameters):"""Returns the (singular) row returned by the given query.
"""
cursor = self._cursor()try:
cursor.execute(query, kwparameters or parameters)return cursor.fetchone()finally:
cursor.close()defexecute(self, query,*parameters,**kwparameters):"""Executes the given query, returning the lastrowid from the query."""
cursor = self._cursor()try:
cursor.execute(query, kwparameters or parameters)return cursor.lastrowid
except Exception as e:if e.args[0]==1062:passelse:
traceback.print_exc()raise e
finally:
cursor.close()
insert = execute
## =============== high level method for table ===================deftable_has(self, table_name, field, value):ifisinstance(value,str):
value = value.encode('utf8')
sql ='SELECT %s FROM %s WHERE %s="%s"'%(
field,
table_name,
field,
value)
d = self.get(sql)return d
deftable_insert(self, table_name, item):'''item is a dict : key is mysql table field'''
fields =list(item.keys())
values =list(item.values())
fieldstr =','.join(fields)
valstr =','.join(['%s']*len(item))for i inrange(len(values)):ifisinstance(values[i],str):
values[i]= values[i].encode('utf8')
sql ='INSERT INTO %s (%s) VALUES(%s)'%(table_name, fieldstr, valstr)try:
last_id = self.execute(sql,*values)return last_id
except Exception as e:if e.args[0]==1062:# just skip duplicated itempasselse:
traceback.print_exc()print('sql:', sql)print('item:')for i inrange(len(fields)):
vs =str(values[i])iflen(vs)>300:print(fields[i],' : ',len(vs),type(values[i]))else:print(fields[i],' : ', vs,type(values[i]))raise e
deftable_update(self, table_name, updates,
field_where, value_where):'''updates is a dict of {field_update:value_update}'''
upsets =[]
values =[]for k, v in updates.items():
s ='%s=%%s'% k
upsets.append(s)
values.append(v)
upsets =','.join(upsets)
sql ='UPDATE %s SET %s WHERE %s="%s"'%(
table_name,
upsets,
field_where, value_where,)
self.execute(sql,*(values))