业务中有时会需要解析excel中的数据,按照要求处理后,写入到db
中;
用python
处理这个正好简便快捷
没有依赖就 pip install pymysql
一下
import pymysql
from pymysql.converters import escape_string
from openpyxl import load_workbook
from Snowflake import Snowflake
def load_excel_data(snowflake):
# 连接到MySQL数据库
mydb = pymysql.connect(
host="xxx.xxx.xxx.xxx",
port=3306,
user="xxx",
passwd="xxx",
db="xxxx"
)
# 打开Excel文件
wb = load_workbook(filename=r'D:\xx\test.xlsx')
sheet = wb.active
# 获取表头
header = [cell.value for cell in sheet[1]]
column_header = []
# 表头转换列名
for excel_head_name in header:
if '11' == excel_head_name:
column_header.append("xx")
elif '22' == excel_head_name:
column_header.append("xx")
elif '33' == excel_head_name:
column_header.append("xx")
elif '1122' == excel_head_name:
column_header.append("xx")
# 遍历每一行数据,并将其插入到数据库中
cursor = mydb.cursor()
count = 0
defaultUser = "'xxx'"
for row in sheet.iter_rows(min_row=2, values_only=True):
cId = snowflake.next_id()
date = row[0]
# datetime 转 date
date = date.date()
a2 = row[1]
reason = row[2]
detail = row[3]
# \'%s\' 将含有特殊内容的字符串整个塞进去
sql = f"INSERT INTO test_table (id, store_id, num, handler, create_by, update_by, date, a2, reason, detail) VALUES ({cId}, 3, 0, 43, {defaultUser}, {defaultUser}, \'%s\', \'%s\', \'%s\', \'%s\')" % (date, self_escape_string(a2), self_escape_string(reason), self_escape_string(detail))
print(sql)
# cursor.execute(sql, row)
cursor.execute(sql)
count += 1
print(f"正在插入{count}条数据")
# 提交更改并关闭数据库连接
mydb.commit()
cursor.close()
mydb.close()
# 将字符串中的特殊字符转义
# python中没有null只有None
def self_escape_string(data):
if data is None:
return ""
return escape_string(data)
if __name__ == '__main__':
worker_id = 1
data_center_id = 1
snowflake = Snowflake(worker_id, data_center_id)
load_excel_data(snowflake)
import time
import random
class Snowflake:
def __init__(self, worker_id, data_center_id):
### 机器标识ID
self.worker_id = worker_id
### 数据中心ID
self.data_center_id = data_center_id
### 计数序列号
self.sequence = 0
### 时间戳
self.last_timestamp = -1
def next_id(self):
timestamp = int(time.time() * 1000)
if timestamp < self.last_timestamp:
raise Exception(
"Clock moved backwards. Refusing to generate id for %d milliseconds" % abs(timestamp - self.last_timestamp))
if timestamp == self.last_timestamp:
self.sequence = (self.sequence + 1) & 4095
if self.sequence == 0:
timestamp = self.wait_for_next_millis(self.last_timestamp)
else:
self.sequence = 0
self.last_timestamp = timestamp
return ((timestamp - 1288834974657) << 22) | (self.data_center_id << 17) | (self.worker_id << 12) | self.sequence
def next_id(self):
timestamp = int(time.time() * 1000)
if timestamp < self.last_timestamp:
raise Exception("Clock moved backwards. Refusing to generate id for %d milliseconds" % abs(timestamp - self.last_timestamp))
if timestamp == self.last_timestamp:
self.sequence = (self.sequence + 1) & 4095
if self.sequence == 0:
timestamp = self.wait_for_next_millis(self.last_timestamp)
else:
self.sequence = 0
self.last_timestamp = timestamp
return ((timestamp - 1288834974657) << 22) | (self.data_center_id << 17) | (self.worker_id << 12) | self.sequence
def wait_for_next_millis(self, last_timestamp):
timestamp = int(time.time() * 1000)
while timestamp <= last_timestamp:
timestamp = int(time.time() * 1000)
return timestamp