• 【python百宝箱】抛开GIL束缚:线程、进程、异步实现高效编程


    Python并发编程大揭秘:线程、进程、异步

    前言

    在当今计算机科学领域,处理大规模任务并提高程序性能的需求越来越迫切。Python作为一种流行而灵活的编程语言,提供了多种处理并发的工具和库。本文将深入探讨Python中的并发编程,主要聚焦于concurrent.futuresthreadingmultiprocessing以及asyncio等关键库,通过实例和详细介绍,帮助读者更好地理解并发编程的核心概念和应用场景。

    1. concurrent.futures

    1.1 简介

    Python的concurrent.futures模块为异步执行函数提供了一个高层次的接口,使并发编程更加便捷。这个模块主要关注于任务的并行执行,通过提供线程池和进程池的支持,使得开发者能够更轻松地实现并发操作。

    1.2 主要特性
    • 线程池和进程池的支持: concurrent.futures通过ThreadPoolExecutorProcessPoolExecutor两个类实现线程池和进程池。这使得开发者能够根据任务的性质选择适当的并发方式,充分利用多核资源。

    • 异步执行任务: 使用submit函数可以异步提交任务,该函数返回一个Future对象,代表将来会返回结果的承诺。

    • 获取结果和处理异常: Future对象提供了一种轻松获取任务执行结果的方式,同时也能捕获任务中可能出现的异常。通过result方法,我们能够同步等待任务的完成并获取最终结果。

    1.3 应用场景

    concurrent.futures适用于需要并发执行独立任务的场景,从而提高程序的性能。以下是一个简单的示例,展示了如何使用线程池计算数字的平方:

    from concurrent.futures import ThreadPoolExecutor
    
    def square(n):
        return n * n
    
    with ThreadPoolExecutor() as executor:
        # 异步提交任务
        future = executor.submit(square, 5)
        # 获取结果
        result = future.result()
        print(result)  # 输出:25
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    这个例子展示了如何使用concurrent.futures创建一个线程池,并通过submit方法提交任务,然后通过result方法获取任务执行的结果。这种模式特别适用于需要并行执行多个相互独立任务的情景。

    1.4 批量提交任务

    除了单个任务的提交,concurrent.futures还支持批量提交任务,这在需要同时处理多个任务的情况下非常实用。使用map方法,可以方便地将一个函数应用于可迭代的多个参数,实现并行处理。

    from concurrent.futures import ThreadPoolExecutor
    
    def square(n):
        return n * n
    
    numbers = [1, 2, 3, 4, 5]
    
    with ThreadPoolExecutor() as executor:
        # 批量异步提交任务
        results = executor.map(square, numbers)
    
        # 获取结果
        for result in results:
            print(result)
    # 输出:
    # 1
    # 4
    # 9
    # 16
    # 25
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    这个例子中,我们使用map方法一次性提交了多个任务,每个任务计算一个数字的平方。通过map返回的生成器,我们可以轻松地迭代并获取每个任务的结果。

    1.5 Timeout 控制

    在实际应用中,有时我们希望对任务的执行时间进行控制,以防止某个任务因为执行时间过长而影响整体程序的性能。concurrent.futures允许我们设置timeout参数,限定任务的最大执行时间。

    from concurrent.futures import ThreadPoolExecutor, TimeoutError
    import time
    
    def long_running_task():
        time.sleep(5)
        return "Task completed"
    
    with ThreadPoolExecutor() as executor:
        # 异步提交任务并设置 timeout 为 3 秒
        future = executor.submit(long_running_task)
        try:
            result = future.result(timeout=3)
            print(result)
        except TimeoutError:
            print("Task timed out")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    在这个例子中,long_running_task函数模拟一个耗时的任务,但我们通过设置timeout参数为3秒,使得即便任务未完成,也会在指定时间内抛出TimeoutError异常。

    这些用法展示了concurrent.futures模块的灵活性和实用性,为开发者提供了强大的工具来处理并发任务。

    1.6 处理异常

    concurrent.futures模块提供了方便的异常处理机制。通过add_done_callback方法,我们能够注册一个回调函数,用于处理任务执行完成时的结果或异常。

    from concurrent.futures import ThreadPoolExecutor
    
    def task_with_exception():
        raise ValueError("Simulated error in the task")
    
    def handle_result(future):
        try:
            result = future.result()
            print("Task result:", result)
        except Exception as e:
            print("Task raised an exception:", e)
    
    with ThreadPoolExecutor() as executor:
        # 异步提交任务
        future = executor.submit(task_with_exception)
        # 注册异常处理回调函数
        future.add_done_callback(handle_result)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    在这个例子中,task_with_exception函数模拟了一个会抛出异常的任务。我们使用add_done_callback方法注册了一个回调函数handle_result,该函数会在任务执行完成时被调用,以处理任务的结果或异常。

    1.7 使用 as_completed

    concurrent.futures还提供了as_completed函数,它接受一组Future对象,并在它们完成时生成这些Future对象的迭代器,使得我们能够按照完成的顺序获取结果。

    from concurrent.futures import ThreadPoolExecutor, as_completed
    
    def square(n):
        return n * n
    
    numbers = [3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5]
    
    with ThreadPoolExecutor() as executor:
        # 异步提交任务
        futures = [executor.submit(square, num) for num in numbers]
    
        # 按照完成顺序获取结果
        for future in as_completed(futures):
            result = future.result()
            print(result)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    在这个例子中,我们使用as_completed函数,它返回一个迭代器,按照任务完成的顺序产生Future对象。这样,我们可以立即处理已完成任务的结果,而不必等待所有任务都完成。

    1.8 取消任务

    concurrent.futures模块允许我们取消尚未开始的任务或正在运行的任务。通过cancel方法,我们可以在提交任务后取消它,但需要注意,已经开始执行的任务无法取消。

    from concurrent.futures import ThreadPoolExecutor
    import time
    
    def long_running_task():
        time.sleep(5)
        return "Task completed"
    
    with ThreadPoolExecutor() as executor:
        # 异步提交任务
        future = executor.submit(long_running_task)
    
        # 取消任务
        cancelled = future.cancel()
    
        if cancelled:
            print("Task successfully cancelled")
        else:
            print("Task could not be cancelled as it is already running")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    在这个例子中,我们通过cancel方法尝试取消一个正在休眠的任务。由于任务已经开始执行,所以取消操作无法生效。在实际应用中,需要根据任务的性质谨慎使用任务取消功能。

    这些进一步拓展了对 concurrent.futures 模块的理解,为读者提供了更多处理异步任务的技巧和手段。

    2. threading

    2.1 概述

    threading模块是Python标准库提供的模块,专用于创建和管理线程。线程是程序中执行的最小单位,threading模块允许程序在同一进程中的多个线程中运行,实现并发执行。在处理I/O密集型任务时,使用线程可以有效提高程序的响应性,使得在等待外部资源时,其他线程仍能执行。

    2.2 主要功能
    • 创建和启动线程: 使用Thread类可以轻松创建线程对象,然后调用start方法启动线程。下面是一个简单的例子:

      import threading
      
      def my_function():
          print("Thread is running!")
      
      # 创建线程对象
      my_thread = threading.Thread(target=my_function)
      
      # 启动线程
      my_thread.start()
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
    • 线程同步和通信: 在多线程环境下,为了避免竞争条件和确保数据一致性,线程同步非常重要。threading模块提供了多种同步机制,包括锁(Lock)、条件变量(Condition)、信号量(Semaphore)等。这些工具可以用于线程间的同步和通信,确保线程安全。

    2.3 适用场景

    threading适用于处理I/O密集型任务,其中线程可以在等待外部资源时继续执行其他任务,提高整体程序性能。然而,由于Python的全局解释器锁(GIL)限制,threading对于CPU密集型任务并不是一个理想的选择。以下是一个简单的线程同步的例子:

    import threading
    
    shared_variable = 0
    lock = threading.Lock()
    
    def increment():
        global shared_variable
        for _ in range(100000):
            with lock:
                shared_variable += 1
    
    # 创建两个线程并启动
    thread1 = threading.Thread(target=increment)
    thread2 = threading.Thread(target=increment)
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()
    
    print(shared_variable)  # 输出:200000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    在这个例子中,我们通过使用锁(Lock)确保了shared_variable的线程安全访问。两个线程并发执行 increment 函数,通过锁的机制保证了对共享变量的互斥访问。这是一个典型的线程同步例子,适用于需要保证数据一致性的场景。

    2.4 线程间通信与队列

    线程间通信是多线程编程中常见的需求之一,而threading模块通过提供Queue类,简化了线程之间的通信。Queue是一个线程安全的队列实现,可以用于在不同线程之间传递数据。

    import threading
    import queue
    import time
    
    def producer(q):
        for i in range(5):
            time.sleep(1)
            item = f"Item {i}"
            q.put(item)
            print(f"Produced: {item}")
    
    def consumer(q):
        while True:
            item = q.get()
            if item is None:
                break
            print(f"Consumed: {item}")
    
    # 创建线程安全的队列
    my_queue = queue.Queue()
    
    # 创建生产者线程和消费者线程
    producer_thread = threading.Thread(target=producer, args=(my_queue,))
    consumer_thread = threading.Thread(target=consumer, args=(my_queue,))
    
    # 启动线程
    producer_thread.start()
    consumer_thread.start()
    
    # 等待生产者线程完成
    producer_thread.join()
    
    # 等待队列被消费完(放入 None 表示结束消费)
    my_queue.put(None)
    consumer_thread.join()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    在这个例子中,producer函数模拟了生产者,每秒向队列中放入一个项目。consumer函数模拟了消费者,不断从队列中取出项目并处理。通过Queue的机制,我们实现了线程之间的通信和数据共享。

    2.5 定时线程

    threading模块还支持定时线程,即可以在指定的时间间隔内重复执行某个任务。这通过使用Timer类实现。

    import threading
    import time
    
    def print_message():
        print("Timer expired!")
    
    # 创建定时线程,每 2 秒执行一次 print_message 函数
    timer_thread = threading.Timer(2, print_message)
    
    # 启动定时线程
    timer_thread.start()
    
    # 主线程等待定时线程完成
    timer_thread.join()
    
    print("Main thread continues...")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    在这个例子中,Timer类被用于创建一个在2秒后执行的定时线程。通过这种方式,我们可以在多线程环境下实现周期性的任务调度。

    这些用法展示了threading模块在多线程编程中的灵活性和强大功能,为读者提供了更多处理线程相关任务的工具和技巧。

    2.6 定时线程与 Timer

    threading模块中的 Timer 类提供了一种在指定时间间隔后执行某个任务的方式。这对于需要周期性执行任务的场景非常有用。

    import threading
    
    def print_message():
        print("Timer expired!")
    
    # 创建定时线程,每 2 秒执行一次 print_message 函数
    timer_thread = threading.Timer(2, print_message)
    
    # 启动定时线程
    timer_thread.start()
    
    # 主线程等待定时线程完成
    timer_thread.join()
    
    print("Main thread continues...")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    在这个例子中,Timer 类被用于创建一个在2秒后执行的定时线程。通过这种方式,我们可以在多线程环境下实现周期性的任务调度。

    2.7 线程安全的数据结构

    threading模块还提供了一些线程安全的数据结构,如 LockSemaphoreCondition,用于协调多个线程对共享资源的访问。这些数据结构可以确保在多线程环境中对共享数据的安全访问。

    import threading
    
    # 使用 Lock 实现线程安全的计数器
    class Counter:
        def __init__(self):
            self.value = 0
            self.lock = threading.Lock()
    
        def increment(self):
            with self.lock:
                self.value += 1
    
    # 创建 Counter 实例
    counter = Counter()
    
    # 创建多个线程并启动
    threads = []
    for _ in range(5):
        thread = threading.Thread(target=counter.increment)
        threads.append(thread)
        thread.start()
    
    # 主线程等待所有线程完成
    for thread in threads:
        thread.join()
    
    print("Counter value:", counter.value)  # 输出:Counter value: 5
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    在这个例子中,我们使用 Lock 确保了 Counter 类中的 increment 方法的线程安全。多个线程并发执行 increment 方法,但由于使用了锁,共享数据 value 的访问是互斥的,从而避免了竞争条件。

    2.8 threading 中的事件

    threading 模块中的 Event 类提供了一种线程间通信的方式,用于在多个线程之间设置信号。一个线程可以等待事件的发生,而另一个线程可以触发事件的发生。

    import threading
    import time
    
    def worker(event):
        print("Worker is waiting for the event.")
        event.wait()
        print("Worker got the event and is now working.")
    
    # 创建事件对象
    my_event = threading.Event()
    
    # 创建并启动线程
    my_thread = threading.Thread(target=worker, args=(my_event,))
    my_thread.start()
    
    # 主线程等待一段时间后设置事件
    time.sleep(2)
    print("Main thread is setting the event.")
    my_event.set()
    
    # 主线程等待子线程完成
    my_thread.join()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    在这个例子中,主线程创建了一个 Event 对象 my_event,并传递给子线程 worker。子线程在 wait 方法上阻塞等待事件的发生,而主线程通过 set 方法设置了事件。一旦事件被设置,子线程将收到通知并执行相应的任务。

    这些进一步拓展了对 threading 模块的理解,为读者提供了更多处理线程相关任务的工具和技巧。

    3. multiprocessing

    3.1 概述

    multiprocessing模块处理多进程,与threading相似,但使用多个进程。这使得它适用于CPU密集型任务。

    3.2 关键特性
    • 充分利用多核CPU: 通过创建多个进程,每个进程在独立的CPU核心上运行。
    • 进程间通信机制: 使用Queue等机制实现进程间通信。
    3.3 应用场景

    multiprocessing适用于并行处理独立的计算密集型任务。以下是一个简单的多进程示例:

    from multiprocessing import Process, Value, Lock
    
    shared_variable = Value('i', 0)
    lock = Lock()
    
    def increment():
        global shared_variable
        for _ in range(100000):
            with lock:
                shared_variable.value += 1
    
    # 创建两个进程并启动
    process1 = Process(target=increment)
    process2 = Process(target=increment)
    process1.start()
    process2.start()
    process1.join()
    process2.join()
    
    print(shared_variable.value)  # 输出:200000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    3.4 进程间通信

    在多进程编程中,multiprocessing模块提供了多种进程间通信的机制,其中最常用的是 QueueQueue是一个进程安全的队列,可以在多个进程之间安全地传递数据。

    from multiprocessing import Process, Queue
    
    def producer(q):
        for i in range(5):
            item = f"Item {i}"
            q.put(item)
            print(f"Produced: {item}")
    
    def consumer(q):
        while True:
            item = q.get()
            if item is None:
                break
            print(f"Consumed: {item}")
    
    if __name__ == "__main__":
        # 创建进程安全的队列
        my_queue = Queue()
    
        # 创建生产者进程和消费者进程
        producer_process = Process(target=producer, args=(my_queue,))
        consumer_process = Process(target=consumer, args=(my_queue,))
    
        # 启动进程
        producer_process.start()
        consumer_process.start()
    
        # 主进程等待生产者进程完成
        producer_process.join()
    
        # 向队列放入 None 表示结束消费
        my_queue.put(None)
    
        # 主进程等待消费者进程完成
        consumer_process.join()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    在这个例子中,producer 进程负责往队列中放入数据,而 consumer 进程负责从队列中取出数据进行消费。通过 Queue 的机制,我们实现了多进程之间的通信。

    3.5 进程池

    multiprocessing模块提供了 Pool 类,用于创建进程池,进一步提高了多进程编程的便利性。通过进程池,可以重用已创建的进程,降低了进程创建和销毁的开销。

    from multiprocessing import Pool
    
    def square(n):
        return n * n
    
    if __name__ == "__main__":
        with Pool(processes=4) as pool:
            # 使用 map 方法将 square 函数应用于一组参数
            results = pool.map(square, [1, 2, 3, 4, 5])
    
        print(results)  # 输出:[1, 4, 9, 16, 25]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    在这个例子中,我们使用 Pool 类创建了一个包含4个进程的进程池,并通过 map 方法将 square 函数应用于一组参数。这样,square 函数将并行执行在不同进程中,提高了整体计算速度。

    3.6 ValueArray 实现共享内存

    在多进程编程中,有时需要在不同进程之间共享数据。multiprocessing模块提供了 ValueArray 两种方式实现共享内存。

    from multiprocessing import Process, Value, Array
    
    def modify_shared_data(shared_value, shared_array):
        with shared_value.get_lock():
            shared_value.value += 1
    
        for i in range(len(shared_array)):
            shared_array[i] *= 2
    
    if __name__ == "__main__":
        shared_counter = Value("i", 0)  # 共享整数
        shared_numbers = Array("i", [1, 2, 3, 4, 5])  # 共享数组
    
        # 创建两个进程
        process1 = Process(target=modify_shared_data, args=(shared_counter, shared_numbers))
        process2 = Process(target=modify_shared_data, args=(shared_counter, shared_numbers))
    
        # 启动进程
        process1.start()
        process2.start()
    
        # 等待进程结束
        process1.join()
        process2.join()
    
        print("Shared counter:", shared_counter.value)  # 输出:2
        print("Shared array:", shared_numbers[:])  # 输出:[2, 4, 6, 8, 10]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    在这个例子中,通过 Value 创建了一个共享的整数 shared_counter,通过 Array 创建了一个共享的整数数组 shared_numbers。两个进程并行执行 modify_shared_data 函数,通过 get_lock 方法确保了对共享数据的互斥访问。

    这些进一步拓展了对 multiprocessing 模块的理解,为读者提供了更多在多进程编程中的实际应用场景和技巧。

    3.7 异步执行任务

    multiprocessing模块的 map_async 方法允许异步地并行执行函数。这种方式适用于对结果的获取没有先后顺序要求的情况。

    from multiprocessing import Pool
    
    def square(n):
        return n * n
    
    if __name__ == "__main__":
        with Pool(processes=4) as pool:
            # 使用 map_async 异步执行 square 函数
            result = pool.map_async(square, [1, 2, 3, 4, 5])
    
            # 等待所有进程完成
            pool.close()
            pool.join()
    
            print(result.get())  # 输出:[1, 4, 9, 16, 25]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    在这个例子中,通过 map_async 异步执行了 square 函数,然后通过 get 方法获取结果。这种方式适用于对结果的获取没有先后顺序要求的情况。

    3.8 进程间事件通知

    multiprocessing模块的 Event 类与 threading 中的 Event 类相似,用于实现进程间的事件通知。

    from multiprocessing import Process, Event
    import time
    
    def worker(event):
        print("Worker is waiting for the event.")
        event.wait()
        print("Worker got the event and is now working.")
    
    if __name__ == "__main__":
        # 创建事件对象
        my_event = Event()
    
        # 创建并启动进程
        my_process = Process(target=worker, args=(my_event,))
        my_process.start()
    
        # 主进程等待一段时间后设置事件
        time.sleep(2)
        print("Main process is setting the event.")
        my_event.set()
    
        # 主进程等待子进程完成
        my_process.join()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    在这个例子中,主进程创建了一个 Event 对象 my_event,并传递给子进程 worker。子进程在 wait 方法上阻塞等待事件的发生,而主进程通过 set 方法设置了事件。一旦事件被设置,子进程将收到通知并执行相应的任务。

    3.9 进程间共享资源

    在多进程编程中,有时候需要在不同进程之间共享资源。multiprocessing模块的 Manager 类提供了一种简便的方式来管理进程间共享的数据。

    from multiprocessing import Process, Manager
    
    def modify_shared_list(shared_list):
        shared_list.append(4)
        shared_list.extend([5, 6, 7])
    
    if __name__ == "__main__":
        with Manager() as manager:
            # 创建进程共享的列表
            shared_list = manager.list([1, 2, 3])
    
            # 创建并启动进程
            my_process = Process(target=modify_shared_list, args=(shared_list,))
            my_process.start()
    
            # 主进程等待子进程完成
            my_process.join()
    
            print("Shared list:", shared_list)  # 输出:Shared list: [1, 2, 3, 4, 5, 6, 7]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    在这个例子中,通过 Manager 类创建了一个进程共享的列表 shared_list,子进程执行了 modify_shared_list 函数,往列表中添加了新元素。由于使用了 Manager,确保了进程间共享资源的安全访问。

    这些进一步拓展了对 multiprocessing 模块的理解,为读者提供了更多在多进程编程中的实际应用场景和技巧。

    4. asyncio

    4.1 简介

    asyncio是Python的异步I/O和事件循环库,用于编写高效的异步代码。它基于协程和事件循环的概念。

    4.2 主要组件
    • 事件循环(Event Loop): 负责调度和执行异步任务。
    • 协程: 通过async def定义的异步函数。
    4.3 优势与应用

    asyncio适用于高效处理大量I/O密集型任务,例如网络应用。以下是一个简单的异步I/O的例子:

    import asyncio
    
    async def main():
        print("Start")
    
        async def foo():
            print("Foo")
            await asyncio.sleep(2)
            print("End Foo")
    
        await asyncio.gather(foo(), foo(), foo())
    
    asyncio.run(main())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    4.4 异步I/O与协程

    asyncio 的核心概念之一是协程(coroutine)。通过 async def 关键字定义的函数可以被视为协程,它可以在遇到异步操作时挂起自身的执行,让事件循环去执行其他任务。

    import asyncio
    
    async def example_coroutine():
        print("Start Coroutine")
        await asyncio.sleep(2)
        print("End Coroutine")
    
    async def main():
        await example_coroutine()
    
    asyncio.run(main())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    在这个例子中,example_coroutine 函数中的 await asyncio.sleep(2) 是一个异步操作,它会挂起协程的执行,然后让事件循环去执行其他任务。2秒后,协程恢复执行,打印 “End Coroutine”。

    4.5 异步任务与gather

    asyncio.gather 函数用于并行运行多个异步任务,并等待它们全部完成。

    import asyncio
    
    async def foo():
        print("Foo")
        await asyncio.sleep(2)
        print("End Foo")
    
    async def bar():
        print("Bar")
        await asyncio.sleep(1)
        print("End Bar")
    
    async def main():
        await asyncio.gather(foo(), bar(), foo())
    
    asyncio.run(main())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    在这个例子中,foo()bar() 是两个异步任务,通过 asyncio.gather(foo(), bar(), foo()) 并行执行。由于 asyncio.sleep 是异步操作,整个过程将在几秒内完成。

    4.6 异步I/O与async with

    asyncio 还提供了 async with 语法,用于在协程中使用异步上下文管理器。这在需要在异步代码中进行资源管理时非常有用。

    import asyncio
    
    class AsyncResource:
        async def __aenter__(self):
            print("Enter AsyncResource")
            await asyncio.sleep(1)
            return self
    
        async def __aexit__(self, exc_type, exc, tb):
            print("Exit AsyncResource")
    
    async def main():
        async with AsyncResource():
            print("Inside async with block")
    
    asyncio.run(main())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    在这个例子中,AsyncResource 是一个异步上下文管理器,通过 async with AsyncResource(): 进行使用。在 __aenter__ 中进行资源的初始化,而在 __aexit__ 中进行资源的清理。await asyncio.sleep(1) 模拟了异步操作。

    4.7 异步网络请求

    asyncio 在处理异步网络请求时非常强大。以下是一个使用 aiohttp 库进行异步网络请求的简单例子。

    首先,确保你已经安装了 aiohttp

    pip install aiohttp
    
    • 1

    然后,可以使用以下代码进行异步网络请求:

    import asyncio
    import aiohttp
    
    async def fetch(url):
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                return await response.text()
    
    async def main():
        url = "https://www.example.com"
        response = await fetch(url)
        print(response)
    
    asyncio.run(main())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    在这个例子中,fetch 函数使用 aiohttp 发起异步的GET请求,而 main 函数则等待这个请求的完成。这允许在异步代码中高效地处理多个并发的网络请求。

    这些例子展示了 asyncio 在异步I/O编程中的应用,为读者提供了基础和实际应用的示例。

    5. 其他相关库

    5.1 queue
    • queue模块提供了线程安全的队列数据结构,用于线程间通信。以下是一个简单的队列使用例子:
    from queue import Queue
    import threading
    
    def worker(queue):
        while True:
            item = queue.get()
            if item is None:
                break
            # 处理任务
            print(item)
    
    # 创建队列和线程
    my_queue = Queue()
    my_thread = threading.Thread(target=worker, args=(my_queue,))
    my_thread.start()
    
    # 向队列中添加任务
    for item in range(5):
        my_queue.put(item)
    
    # 关闭队列
    my_queue.put(None)
    my_thread.join()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    5.2 async/await
    • async/await是Python3.5之后引入的关键字,用于定义异步函数。以下是一个简单的异步函数的例子:
    async def async_example():
        print("Start")
        await asyncio.sleep(2)
        print("End")
    
    asyncio.run(async_example())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    5.3 threadingmultiprocessing 的替代品
    • ray: 用于构建分布式应用程序的通用框架。
    • dask: 提供并行计算和分布式计算的灵活框架。
    5.4 asyncpg
    • asyncpg是一个异步的PostgreSQL数据库驱动,适用于asyncio环境。以下是一个简单的异步数据库查询的例子:
    import asyncpg
    import asyncio
    
    async def query_example():
        # 连接到数据库
        connection = await asyncpg.connect(user='user', password='password',
                                           database='mydatabase', host='localhost')
    
        # 执行查询
        result = await connection.fetch('SELECT * FROM mytable')
    
        # 处理查询结果
        for row in result:
            print(row)
    
        # 关闭数据库连接
        await connection.close()
    
    # 运行异步查询
    asyncio.run(query_example())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    通过了解这些库和模块,你可以更深入地了解Python中并发和多线程编程的不同方面,以及如何选择适当的工具来处理特定类型的并发任务。

    6. 并发和多线程的最佳实践

    在进行并发和多线程编程时,一些最佳实践可以帮助确保代码的可维护性、稳定性和性能。以下是一些建议:

    6.1 尽量使用线程池和进程池

    使用 concurrent.futures 模块中的线程池和进程池,而不是手动创建线程和进程。线程池和进程池提供了高级别的接口,简化了并发编程的复杂性,同时有效地管理资源。

    from concurrent.futures import ThreadPoolExecutor
    
    def square(n):
        return n * n
    
    with ThreadPoolExecutor() as executor:
        # 异步提交任务
        future = executor.submit(square, 5)
        # 获取结果
        result = future.result()
        print(result)  # 输出:25
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    6.2 注意线程同步

    在多线程编程中,确保对共享数据的访问是线程安全的至关重要。使用锁 (Lock) 或其他同步机制确保多个线程之间的数据一致性。

    import threading
    
    shared_variable = 0
    lock = threading.Lock()
    
    def increment():
        global shared_variable
        for _ in range(100000):
            with lock:
                shared_variable += 1
    
    # 创建两个线程并启动
    thread1 = threading.Thread(target=increment)
    thread2 = threading.Thread(target=increment)
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()
    
    print(shared_variable)  # 输出:200000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    6.3 使用 async/await 进行异步编程

    在处理异步编程时,使用 async/await 关键字定义异步函数,并使用 asyncio 模块进行任务调度。这可以有效地处理大量 I/O 密集型任务,提高程序性能。

    import asyncio
    
    async def async_example():
        print("Start")
        await asyncio.sleep(2)
        print("End")
    
    asyncio.run(async_example())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    6.4 谨慎处理共享资源

    在多进程编程中,共享资源的处理需要特别小心。使用 multiprocessing 模块提供的 ValueArray 等数据结构,以及 Manager 类来实现共享资源的安全访问。

    from multiprocessing import Process, Value, Lock
    
    shared_variable = Value('i', 0)
    lock = Lock()
    
    def increment():
        global shared_variable
        for _ in range(100000):
            with lock:
                shared_variable.value += 1
    
    # 创建两个进程并启动
    process1 = Process(target=increment)
    process2 = Process(target=increment)
    process1.start()
    process2.start()
    process1.join()
    process2.join()
    
    print(shared_variable.value)  # 输出:200000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    6.5 选择适当的工具和库

    根据项目的需求和复杂性,选择适当的工具和库。例如,对于分布式应用程序,考虑使用 raydask;对于异步数据库操作,考虑使用像 asyncpg 这样的异步数据库驱动。

    这些建议可以帮助你在并发和多线程编程中更好地组织和管理代码,确保代码的可维护性和性能。

    总结

    通过本文的阅读,读者将获得全面的Python并发编程知识,从concurrent.futures的任务异步执行,到threadingmultiprocessing的多线程与多进程处理,再到asyncio的异步编程范式,深入理解并发编程的核心思想和实际应用。通过实例演示和详细介绍,读者将能够更自信地选择适当的工具来处理各种并发任务,提升程序性能,迎接复杂应用场景的挑战。

  • 相关阅读:
    Echarts折线图数据过小重叠,被x轴刻度顶到最上面解决办法
    第19章-IPv6基础
    js使用闭包封装防抖函数
    Android自定义控件(五) 自定义View实现Android Loading效果
    ubuntu永久修改mac地址
    2022-07-01 Dubbo&&Zookeeper
    【老生谈算法】matlab实现图像增强算法源码——图像增强算法
    Leetcode 816. 模糊坐标
    MySQL 索引
    SQL之mysql到hive批量生成建表语句
  • 原文地址:https://blog.csdn.net/qq_42531954/article/details/134524322