• python 线程池ThreadPoolExecutor


    python 线程池ThreadPoolExecutor


    每个线程各分配一个任务,剩下的任务排队等待,当某个线程完成了任务的时候,排队任务就可以安排给这个线程继续执行,免于停的创建和销毁线程,这就是所谓的线程池 ThreadPoolExecutor 原理。
    参考链接(https://www.codersrc.com/archives/6732.html)

    1.submit

    from concurrent.futures import ThreadPoolExecutor
    import time
    import datetime
    
    # 参数times用来模拟下载的时间
    def down_video(times):
        time.sleep(times)
        print("down video {}s finished".format(times))
        return times
    
    executor = ThreadPoolExecutor(max_workers=2)
    # 通过submit函数提交执行的函数到线程池中,submit函数立即返回,不阻塞
    task1 = executor.submit(down_video, (3))
    task2 = executor.submit(down_video, (2))
    # done方法用于判定某个任务是否完成
    print("任务1是否已经完成:",task1.done())
    # cancel方法用于取消某个任务,该任务没有放入线程池中才能取消成功
    print("取消任务2:",task2.cancel())
    time.sleep(4)
    print("任务1是否已经完成:",task1.done())
    # result方法可以获取task的执行结果
    print(task1.result())
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    1、ThreadPoolExecutor 构造实例的时候,传入 max_workers 参数来设置线程池中最多能同时运行的线程数目。
    2、通过 submit 函数返回的任务句柄,能够使用done 方法判断该任务是否结束。
    3、使用cancel 方法可以取消提交的任务,如果任务已经在线程池中运行了,就取消不了。
    4、submit 方法不是阻塞的,立即返回。result 方法可以获取任务的返回值,这个方法是阻塞的。

    result:

    任务1是否已经完成: False
    取消任务2: False
    down video 2s finished
    down video 3s finished
    任务1是否已经完成: True
    3
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2. as_completed

    done 函数提供了判断任务是否结束的方法,但并不太实用,因为并不知道线程何时结束,需要一直判断。
    as_completed 方法是一个生成器,在没有任务完成的时候,会阻塞,在有某个任务完成的时候,就能继续执行 for 循环后面的语句,然后继续阻塞住,循环到所有的任务结束。

    from concurrent.futures import ThreadPoolExecutor, as_completed
    import time
    import datetime
    
    
    # 参数times用来模拟网络请求的时间
    def download_video(index):
        time.sleep(2)
        print("download video {} finished at {}\n".format(index, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
        return index
    
    
    executor = ThreadPoolExecutor(max_workers=2)
    urls = [1, 2, 3, 4, 5]
    all_task = [executor.submit(download_video, (url)) for url in urls]
    
    for task in as_completed(all_task):
        data = task.result()
        print("任务{} down load success\n".format(data))
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    result:

    任务2 开始执行 2022-08-02 03:04:15
    任务1 开始执行 2022-08-02 03:04:15
    
    任务1 down load success
    任务2 down load success
    
    任务4 开始执行 2022-08-02 03:04:17
    任务3 开始执行 2022-08-02 03:04:17
    
    任务4 down load success
    任务3 down load success
    
    任务5 开始执行 2022-08-02 03:04:19
    
    任务5 down load success
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    3.map

    as_completed 方法不同的是:map 方法能保证任务的顺序性,举个例子:如果同时下载 5 个视频,就算第二个视频比第一个视频先下载完成,也会阻塞等待第一个视频下载完成并通知主线程之后,第二个下载完成的视频才回通知主线程,保证按照顺序完成任务.

    from concurrent.futures import ThreadPoolExecutor, as_completed
    import time
    import datetime
    
    
    # 参数times用来模拟网络请求的时间
    def download_video(index):
        print("download video {} start at {}".format(index, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
        time.sleep(index)   # 模拟执行任务时间
        print("download video {} finished at {}".format(index, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
        return index
    
    
    executor = ThreadPoolExecutor(max_workers=5)
    urls = [3, 2, 1, 4, 5]
    
    for data in executor.map(download_video, urls):
        print("任务{} down load success".format(data))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    并发执行,按照任务输入的顺序,输出结果:
    result:

    download video 3 start at 2022-08-17 15:43:50
    download video 2 start at 2022-08-17 15:43:50
    download video 1 start at 2022-08-17 15:43:50
    download video 4 start at 2022-08-17 15:43:50
    download video 5 start at 2022-08-17 15:43:50
    download video 1 finished at 2022-08-17 15:43:51
    download video 2 finished at 2022-08-17 15:43:52
    download video 3 finished at 2022-08-17 15:43:53
    任务3 down load success
    任务2 down load success
    任务1 down load success
    download video 4 finished at 2022-08-17 15:43:54
    任务4 down load success
    download video 5 finished at 2022-08-17 15:43:55
    任务5 down load success
    
    Process finished with exit code 0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    4. wait

    wait 方法有点类似线程的 join 方法,能阻塞主线程,直到线程池中的所有的线程都操作完成!

    from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
    import time
    
    
    # 参数times用来模拟网络请求的时间
    def download_video(index):
        time.sleep(2)
        print("download video {} finished at {}".format(index, time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime())))
        return index
    
    
    executor = ThreadPoolExecutor(max_workers=2)
    urls = [1, 2, 3, 4, 5]
    all_task = [executor.submit(download_video, (url)) for url in urls]
    
    wait(all_task, return_when=ALL_COMPLETED)
    
    print("main ")
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    result:

    download video 2 finished at 2022-08-02 06:54:14
    download video 1 finished at 2022-08-02 06:54:14
    download video 4 finished at 2022-08-02 06:54:16download video 3 finished at 2022-08-02 06:54:16
    
    download video 5 finished at 2022-08-02 06:54:18
    main 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    wait 方法接收 3 个参数,等待的任务序列、超时时间以及等待条件。等待条件 return_when 默认为 ALL_COMPLETED ,表明要等待所有的任务都结束。可以看到运行结果中,确实是所有任务都完成了,主线程才打印出 main 。等待条件还可以设置为FIRST_COMPLETED ,表示第一个任务完成就停止等待。

  • 相关阅读:
    DRF-认证权限频率
    .NET Core多线程 (3) 异步 - 下
    java实现wav的重采样
    TXS0104EPWR芯片介绍(双向逻辑电平转换器,4通道,50 mA,4.6 ns,1.65 ~3.6V电平输入 至 2.3~5.5 V电平输出)
    【javaweb】javabean-四则运算
    【融合ChatGPT等AI模型】Python-GEE遥感云大数据分析、管理与可视化及多领域应用
    分析A股交易接口获取股票信息的整个过程
    k8s认证
    【JavaScript】Promise和async/await的区别
    go 中解析JSON的三种姿势
  • 原文地址:https://blog.csdn.net/weixin_39451323/article/details/126118696