python 3及以上的。mysql的。可能会有小问题。主要是没时间仔细测试
#!/usr/bin/python
######################
######################
from sqlite3 import Cursor
import pymysql
import string
import re
import datetime
import time
import traceback
import sys
class mysql:
test_db = {“host”: “localhost”, “user”: “root”,
“passwd”: “*******”, “db”: “adsys”, “port”: “3306”, “charset”: “utf8”}
def __init__(self, conn_config):
self.conn_config = conn_config
self.conn = None
self.cur = None
self.where_sql = ""
self.where_params = []
def __del__(self):
if self.conn != None:
self.conn.close
self.conn = None
if self.where_sql != None and self.where_sql != "":
self.where_sql = None
self.where_params = None
def close(self):
if self.conn != None:
self.conn.close
self.conn = None
if self.where_sql != None and self.where_sql != "":
self.where_sql = ""
self.where_params = []
def connection(self):
if self.conn == None:
self.conn = pymysql.connect(host=self.conn_config["host"], user=self.conn_config["user"], passwd=self.conn_config["passwd"],
db=self.conn_config["db"], port=string.atoi(self.conn_config["port"]), charset=self.conn_config["charset"])
self.cur = self.conn.cursor(Cursor="DictCursor")
# 执行SQL并根据SQL访问数据
# 如SQL是select语句则返回全部查询数据
# 如SQL是insert语句则返回插入的ID
# 其它的不返回
def exec_sql(self, sql, args=None, disconnect=True):
try:
self.connection()
if re.search("^\s*SELECT", sql, re.IGNORECASE) != None:
if args != None:
self.cur.execute(sql, args)
else:
self.cur.execute(sql)
data = self.cur.fetchall()
self.conn.commit()
return data
if re.search("^\s*INSERT\s+INTO[\s\S]+VALUES", sql, re.IGNORECASE) != None:
if args != None:
self.cur.execute(sql, args)
else:
self.cur.execute(sql)
insert_id = self.conn.insert_id()
self.conn.commit()
return insert_id
if args != None:
self.cur.execute(sql, args)
else:
self.cur.execute(sql)
self.conn.commit()
except Exception as e:
if self.conn != None:
self.conn.rollback()
raise e
finally:
if disconnect and self.conn != None:
self.conn.close()
self.conn = None
# 插入[insert]
# input table 表名
# input data 数据的关联字典
# out insert_id
def insert(self, table, data, disconnect=True):
sql = ""
try:
keys = ""
values = ""
sqlparams = []
for key, value in data.items():
keys += "," + key if len(keys) > 0 else key
values += ",%s" if len(values) > 0 else "%s"
if type(value) == datetime.datetime:
sqlparams.append(value.strftime("%Y-%m-%d %H:%M:%S"))
continue
if type(value) == time.struct_time:
sqlparams.append(time.strftime("%Y-%m-%d %H:%M:%S", value))
continue
sqlparams.append(value)
sql = "INSERT INTO " + table + \
" (" + keys + ") VALUES (" + values + ")"
self.connection()
self.cur.execute(sql, tuple(sqlparams))
insert_id = self.conn.insert_id()
self.conn.commit()
return insert_id
except Exception as e:
if self.conn != None:
self.conn.rollback()
raise e
finally:
if disconnect and self.conn != None:
self.conn.close()
self.conn = None
if self.where_sql != None and self.where_sql != "":
self.where_sql = ""
self.where_params = []
# 批量插入[insert_batch]
# input table 表名
# input data 数据的关联元组字典
# out insert_id
def insert_batch(self, table, data, disconnect=True):
sql = ""
try:
keys = ""
values = ""
args = range(len(data))
for key, value in data[0].items():
keys += "," + key if len(keys) > 0 else key
values += ",%s" if len(values) > 0 else "%s"
sql = "INSERT INTO " + table + \
" (" + keys + ") VALUES (" + values + ")"
i = 0
for row in data:
sqlparams = []
for key, value in data[i].items():
if type(value) == datetime.datetime:
sqlparams.append(value.strftime("%Y-%m-%d %H:%M:%S"))
continue
if type(value) == time.struct_time:
sqlparams.append(time.strftime(
"%Y-%m-%d %H:%M:%S", value))
continue
sqlparams.append(value)
args[i] = tuple(sqlparams)
i += 1
self.connection()
self.cur.executemany(sql, args)
insert_id = self.conn.insert_id()
self.conn.commit()
return insert_id
except Exception as e:
if self.conn != None:
self.conn.rollback()
raise e
finally:
if disconnect and self.conn != None:
self.conn.close()
self.conn = None
if self.where_sql != None and self.where_sql != "":
self.where_sql = ""
self.where_params = []
# 获取insert sql string[insert_string]
# input table 表名
# input data 数据的关联字典
def insert_string(self, table, data):
keys = ''
values = ''
for key, value in data.items():
keys += "," + key if len(keys) > 0 else key
temp_value = ""
if type(value) == datetime.datetime:
temp_value = value.strftime("%Y-%m-%d %H:%M:%S")
values += "," + temp_value if len(values) > 0 else temp_value
continue
if type(value) == time.struct_time:
temp_value = time.strftime("%Y-%m-%d %H:%M:%S", value)
values += "," + temp_value if len(values) > 0 else temp_value
continue
values += ",'%s'" % ('%s' % value).replace("'", "\\'") if len(
values) > 0 else "'%s'" % ('%s' % value).replace("'", "\\'")
sql = "INSERT INTO " + table + \
" (" + keys + ") VALUES (" + values + ")"
return sql
# 获取insert sql string ... on duplicate key update ... [insert_string]
# input table 表名
# input data 数据的关联字典
# input update_data 可以是字符串也可以是字典
# out sql string
def insert_on_duplicate_key_update_string(self, table, data, update_data=None):
sql = self.insert_string(table, data)
update_str = ''
if update_data == None:
update_data = data
update_type = type(update_data)
if isinstance(update_data, str):
sql += " ON DUPLICATE KEY UPDATE " + update_data
else:
for k, v in update_data.items():
update_str += "%s = '%s'" % (k, ('%s' % v).replace("'", "\\'")) if len(
update_str) == 0 else ", %s = '%s'" % (k, ('%s' % v).replace("'", "\\'"))
sql += " ON DUPLICATE KEY UPDATE " + update_str
return sql
# where条件[where]默认为and
# input data 条件的关联字典
# 用法:{"字段名 >":"值"}
def where(self, data):
for key, value in data.items():
self.where_sql += " AND " + key + \
" %s" if len(self.where_sql) > 0 else " WHERE " + key + " %s"
self.where_params.extend(data.values())
# where or 条件[where]
# input data 条件的关联字典
# 用法:{"字段名 >":"值"} data为一组即如果data有多项时 and (a > 0 or b > 0)。如果一项时则 or a > 0
def where_or(self, data):
data_items = data.items()
if len(data_items) > 1:
for key, value in data_items:
self.where_sql += " OR " + key + \
" %s" if len(
self.where_sql) > 0 else " WHERE ( " + key + " %s"
self.where_sql += " )"
else:
if len(data_items) > 0:
for key, value in data_items:
self.where_sql += " OR " + key + \
" %s" if len(self.where_sql) > 0 else " WHERE " + key
self.where_params.extend(data.values())
# 更新[update]
# input table 表名
# input data 数据的关联字典
# input where 条件的关联字典
def update(self, table, data, where=None, disconnect=True):
sql = ""
try:
set_str = ""
params = data.values()
for key, value in data.items():
set_str += ", " + key + \
" = %s" if len(set_str) > 0 else key + " = %s"
sql = "UPDATE " + table + " SET " + set_str
if where != None:
self.where(where)
if len(self.where_sql) > 0:
sql += self.where_sql
if len(self.where_params) > 0:
params.extend(self.where_params)
self.connection()
self.cur.execute(sql, tuple(params))
self.conn.commit()
except Exception as e:
if self.conn != None:
self.conn.rollback()
raise e
finally:
if disconnect and self.conn != None:
self.conn.close()
self.conn = None
if self.where_sql != None and self.where_sql != "":
self.where_sql = ""
self.where_params = []
# 获取update sql string[update_string]
# input table 表名
# input data 数据的关联字典
# input where 条件的关联字典
# out sql string
def update_string(self, table, data, where):
set_str = ""
for key, value in data.items():
temp_value = ""
if type(value) == datetime.datetime:
temp_value = value.strftime("%Y-%m-%d %H:%M:%S")
set_str += ", " + key + " = " + \
temp_value if len(set_str) > 0 else key + \
" = " + temp_value
continue
if type(value) == time.struct_time:
temp_value = time.strftime("%Y-%m-%d %H:%M:%S", value)
set_str += ", " + key + " = " + \
temp_value if len(set_str) > 0 else key + \
" = " + temp_value
continue
set_str += ", " + key + \
" = %s" % value if len(set_str) > 0 else key + " = %s" % value
sql = "UPDATE " + table + " SET " + set_str
where_str = ""
for key, value in where.items():
where_str += " AND " + key + \
" = %s" % value if len(
where_str) > 0 else " WHERE " + key + " = %s" % value
sql += where_str
return sql
# 删除[delete]
# input table 表名
# input where 条件的关联字典
def delete(self, table, where=None, disconnect=True):
sql = ""
try:
params = None
sql = "DELETE FROM " + table
if where != None:
self.where(where)
if len(self.where_sql) > 0:
sql += self.where_sql
if len(self.where_params) > 0:
params = self.where_params
self.connection()
self.cur.execute(sql, tuple(params))
self.conn.commit()
except Exception as e:
if self.conn != None:
self.conn.rollback()
raise e
finally:
if disconnect and self.conn != None:
self.conn.close()
self.conn = None
if self.where_sql != None and self.where_sql != "":
self.where_sql = ""
self.where_params = []