• python读取excel数据写入mysql


    概述

    业务中有时会需要解析excel中的数据,按照要求处理后,写入到db中;
    python处理这个正好简便快捷

    demo

    没有依赖就 pip install pymysql一下

    import pymysql
    from pymysql.converters import escape_string
    from openpyxl import load_workbook
    from Snowflake import Snowflake
    
    
    def load_excel_data(snowflake):
        # 连接到MySQL数据库
        mydb = pymysql.connect(
            host="xxx.xxx.xxx.xxx",
            port=3306,
            user="xxx",
            passwd="xxx",
            db="xxxx"
        )
    
        # 打开Excel文件
        wb = load_workbook(filename=r'D:\xx\test.xlsx')
        sheet = wb.active
    
        # 获取表头
        header = [cell.value for cell in sheet[1]]
    
        column_header = []
    	# 表头转换列名
        for excel_head_name in header:
            if '11' == excel_head_name:
                column_header.append("xx")
            elif '22' == excel_head_name:
                column_header.append("xx")
            elif '33' == excel_head_name:
                column_header.append("xx")
            elif '1122' == excel_head_name:
                column_header.append("xx")
    
    
        # 遍历每一行数据,并将其插入到数据库中
        cursor = mydb.cursor()
        count = 0
    
        defaultUser = "'xxx'"
    
        for row in sheet.iter_rows(min_row=2, values_only=True):
            cId = snowflake.next_id()
    
            date = row[0]
            # datetime 转 date
            date = date.date()
    
            a2 = row[1]
            reason = row[2]
            detail = row[3]
    		
    		# \'%s\' 将含有特殊内容的字符串整个塞进去
            sql = f"INSERT INTO test_table (id, store_id, num, handler, create_by, update_by, date, a2, reason, detail) VALUES ({cId}, 3, 0, 43, {defaultUser}, {defaultUser}, \'%s\', \'%s\', \'%s\', \'%s\')" % (date, self_escape_string(a2), self_escape_string(reason), self_escape_string(detail))
    
            print(sql)
    
            # cursor.execute(sql, row)
            cursor.execute(sql)
            count += 1
            print(f"正在插入{count}条数据")
    
        # 提交更改并关闭数据库连接
        mydb.commit()
        cursor.close()
        mydb.close()
    
    # 将字符串中的特殊字符转义
    # python中没有null只有None
    def self_escape_string(data):
        if data is None:
            return ""
        return escape_string(data)
    
    
    
    if __name__ == '__main__':
        worker_id = 1
        data_center_id = 1
        snowflake = Snowflake(worker_id, data_center_id)
    
        load_excel_data(snowflake)
    

    雪花id生成主键

    import time
    import random
    
    
    class Snowflake:
        def __init__(self, worker_id, data_center_id):
            ### 机器标识ID
            self.worker_id = worker_id
            ### 数据中心ID
            self.data_center_id = data_center_id
            ### 计数序列号
            self.sequence = 0
            ### 时间戳
            self.last_timestamp = -1
    
        def next_id(self):
            timestamp = int(time.time() * 1000)
            if timestamp < self.last_timestamp:
                raise Exception(
                    "Clock moved backwards. Refusing to generate id for %d milliseconds" % abs(timestamp - self.last_timestamp))
            if timestamp == self.last_timestamp:
                self.sequence = (self.sequence + 1) & 4095
                if self.sequence == 0:
                    timestamp = self.wait_for_next_millis(self.last_timestamp)
            else:
                self.sequence = 0
            self.last_timestamp = timestamp
            return ((timestamp - 1288834974657) << 22) | (self.data_center_id << 17) | (self.worker_id << 12) | self.sequence
    
    
    
        def next_id(self):
            timestamp = int(time.time() * 1000)
            if timestamp < self.last_timestamp:
                raise Exception("Clock moved backwards. Refusing to generate id for %d milliseconds" % abs(timestamp - self.last_timestamp))
            if timestamp == self.last_timestamp:
                self.sequence = (self.sequence + 1) & 4095
                if self.sequence == 0:
                    timestamp = self.wait_for_next_millis(self.last_timestamp)
            else:
                self.sequence = 0
            self.last_timestamp = timestamp
            return ((timestamp - 1288834974657) << 22) | (self.data_center_id << 17) | (self.worker_id << 12) | self.sequence
    
        def wait_for_next_millis(self, last_timestamp):
            timestamp = int(time.time() * 1000)
            while timestamp <= last_timestamp:
                timestamp = int(time.time() * 1000)
            return timestamp
    
  • 相关阅读:
    drf重写authenticate方法实现多条件登录(源码分析)
    数说故事×北拓资本:AI技术与数据产业的未来展望
    语义分割在线标注思路
    海绵城市解决方案-最新全套文件
    菜鸟 CPaaS 平台微服务治理实践
    Java 写个注解并使用
    ChatGPT的前世今生:从概念到现实的AI之旅
    Spring学习(2) Spring的IOC底层实现
    借助实例轻松掌握 Makefile --开花结果
    【Java系列】深入解析 Lambda表达式
  • 原文地址:https://blog.csdn.net/weixin_43944305/article/details/138541579