• python多线程学


    import csv
    import threading
    import queue
    import time
    from concurrent.futures import ThreadPoolExecutor, as_completed,wait
    from dbutils.persistent_db import PersistentDB
    import sqlite3
     
     
    class Pool(object):       # 数据库连接池
        __pool = None     # 记录第一个被创建的对象引用
        config = {
            'database': 'lac_ci.db'  # 数据库文件路径
        }
     
        def __new__(cls, *args, **kwargs):
            """创建连接池对象  单例设计模式(每个线程中只创建一个连接池对象)  PersistentDB为每个线程提供专用的连接池"""
            if cls.__pool is None:    # 如果__pool为空,说明创建的是第一个连接池对象
                cls.__pool = PersistentDB(sqlite3, maxusage=None, closeable=False, **cls.config)
            return cls.__pool
     
     
    class Connect:
        def __enter__(self):
            """自动从连接池中取出一个连接"""
            db_pool = Pool()
            self.conn = db_pool.connection()
            self.cur = self.conn.cursor()
            return self
     
        def __exit__(self, exc_type, exc_val, exc_tb):
            """自动释放当前连接资源 归还给连接池"""
            self.cur.close()
            self.conn.close()
     
    
     
     
    """
    SQLite 由于线程安全机制 不支持 PooledDB 线程共享连接模式   故使用PersistentDB 线程专用连接模式 为每个线程单独开连接池
    SQLite 只支持读并发 即:写独占,读共享,所以要在修改操作时使用互斥锁。 为了体现精简性,这里就不演示了
    PooledDB()中的参数解释自行查找
    """
    # 定义一个队列,用于在读取线程和写入线程之间传递数据
    data_queue = queue.Queue()
    mutex = threading.Lock()
    reader = None
    
    # 创建数据库连接池
    pool = None
    
    # 定义一个函数,用于处理每一行数据
    def process_row():
        
        while True:
        
            mutex.acquire()
            row = next(reader, "end")
            mutex.release()
            
         
            if(row == "end"):
                break;
    
            # 获取第四列的数值并加一
            lac = int(row[0])
            ci = int(row[1])
    
            # 查询数据库是否已经存在
            with Connect() as conn:
                cursor = conn.cur.execute("SELECT lon,lat,radius FROM LacCiT WHERE lac = ? AND ci = ?",(lac, ci))
                result = cursor.fetchone()
            
            if(result != None): 
                lat = result[0]
                lon = result[1]
                radius = result[2]
                # 将结果写入
                row.extend([str(lon),str(lat),str(radius)])
            else:
                pass
            
            
            # 将处理后的行放入队列
            data_queue.put(row)
    
    # 定义一个函数,用于写入数据到CSV文件
    def write_csv(filename):
        with open(filename, 'w', newline='') as file:
            writer = csv.writer(file)
            while True:
                try:
                    # 从队列中获取处理后的行并写入文件
                    row = data_queue.get(timeout=1)  # 设置超时以避免无限等待
                    writer.writerow(row)
                    data_queue.task_done()
                except queue.Empty:
                    # 队列为空,说明数据已经全部处理完毕
                    break
    
    
    if __name__ == "__main__":
        # 指定你的CSV文件路径
        csv_file = 'lac_tt.csv'
        start_time = time.time()  # 记录程序开始时间
        
        obj_list = []
        file = open(csv_file, 'r', newline='')
        reader = csv.reader(file)
        
        with ThreadPoolExecutor(max_workers=10) as executor:
            executor.submit(process_row)
            executor.submit(process_row)
            executor.submit(process_row)
                
        write_csv('new_file.csv')
    
        print("CSV文件处理完成。")
        end_time = time.time()  # 记录程序结束时间
        
        elapsed_time = end_time - start_time  # 计算运行时间
        print(f"程序运行时间:{elapsed_time:.2f} 秒")
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
  • 相关阅读:
    微服务配置配置中心和链路追踪
    OpenCV For Unity Mat容器的创建与矩阵操作基础
    JavaScript小技能:事件
    vue面试题7
    【2024最新华为OD-C/D卷试题汇总】[支持在线评测] 最小配对和(100分) - 三语言AC题解(Python/Java/Cpp)
    API 服务器健康状态自检
    微机原理与技术接口 实验五 基本IO操作温度控制实验
    亚马逊云科技Build On学习心得
    I.MX6U-ALPHA开发板(高精度定时器)
    SQL数据分析之窗口排序函数rank、dense_rank、raw_number与lag、lead窗口偏移函数【用法整理】
  • 原文地址:https://blog.csdn.net/GreatSimulation/article/details/132835751