• python 线程池使用


    ❤️ 前段时间发现了一个 人工智能学习网站,通俗易懂,风趣幽默,分享一下给大家。👈🏻 学习链接

    线程池使用

    1. 线程池介绍

    当线程池初始化时,会自动创建指定数量的线程,有任务到达时直接从线程池中取一个空闲线程来用即可,当任务执行结束时线程不会消亡而是直接进入空闲状态,继续等待下一个任务。而随着任务的增加线程池中的可用线程必将逐渐减少,当减少至零时,任务就需要等待了。

    在 python 中使用线程池有两种方式:
    一、是基于第三方库 threadpool
    二、是基于 python3 新引入的库 concurrent.futures.ThreadPoolExecutor

    2. 使用方法

    2.1 threadpool

    安装:

    pip install threadpool

    使用:

    pool.putRequest() 将任务丢到线程池执行
    pool.wait() 等待所有线程结束
    定义回调函数,拿到任务的返回结果

    示例:

    # -*- coding:utf-8 -*-
    import time
    import threadpool
    import threading
    
    def process(data):
        """消费者"""
        print("%s process to %s" % (threading.current_thread().getName(), data))
        time.sleep(1)
        return data
    
    def callback(request, result):
        """回调函数,用于取回结果"""
        print(f"callback result = {result}")
    
    def main():
        start_time = time.time()
        # 创建线程池
        pool = threadpool.ThreadPool(3)
        # 创建任务
        requests_ = threadpool.makeRequests(process, range(10), callback)  # 创建任务
        for req in requests_:
            pool.putRequest(req)
        pool.wait()
        print(f"{threading.current_thread().getName()} 用时:{time.time() - start_time}")
    
    
    if __name__ == '__main__':
        main()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    2.2 ThreadPoolExecutor

    from concurrent.futures import ThreadPoolExecutor

    import os
    import json
    from concurrent.futures import ThreadPoolExecutor
    def parse_get_license(url):
        pass
    
    def process():
        map_file = "demo.txt"
        if not os.path.exists(map_file):
            return 
        with ThreadPoolExecutor(max_workers=20) as t:
            for line in open(map_file, "rb"):
                line_str = line.decode("utf8").strip()
                if not line_str:
                    continue
                line_obj = json.loads(line_str)
                url = line_obj["url"]
                print(url)
                task = t.submit(parse_get_license, url)
                task.done()
                
    def main():
        process()
        
    if __name__ == '__main__':
        main()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    1. ThreadPoolExecutor构造实例的时候,传入max_workers参数来设置线程池中最多能同时运行的线程数目;
    2. 使用submit 函数来提交线程需要执行的任务(函数名和参数)到线程池中,并返回该任务的句柄(类似于文件、画图),注意submit()不是阻塞的,而是立即返回。
    3. submit(fun_name, *args, **kwargs)
    4. done()方法判断该任务是否结束

    submit

    def submit(self, fn, *args, **kwargs):
        pass
    
    • 1
    • 2

    as_completed

    from concurrent.futures import ThreadPoolExecutor, as_completed
    as_completed 一次取出所有任务的结果

    def as_completed(fs, timeout=None):
        """An iterator over the given futures that yields each as it completes.
        Args:
            fs: The sequence of Futures (possibly created by different Executors) to
                iterate over.
            timeout: The maximum number of seconds to wait. If None, then there
                is no limit on the wait time.
        Returns:
            An iterator that yields the given Futures as they complete (finished or
            cancelled). If any given Futures are duplicated, they will be returned
            once.
        Raises:
            TimeoutError: If the entire result iterator could not be generated
                before the given timeout.
        """
        pass
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    示例:

    from concurrent.futures import ThreadPoolExecutor, as_completed
    executor = ThreadPoolExecutor(max_workers=2)
    # 请求网址
    urls = []
    all_task = [executor.submit(get_html, (url)) for url in urls]
    for future in as_completed(all_task):
        data = future.result()
        print(data)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    executor.map 类似于 as_completed

    from concurrent.futures import ThreadPoolExecutor, as_completed
    executor = ThreadPoolExecutor(max_workers=2)
    # 请求网址
    urls = []
    for data in executor.map(get_html, urls):
        print(data)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    wait()

    wait方法可以让主线程阻塞,直到满足设定的要求
    def wait(fs, timeout=None, return_when=ALL_COMPLETED):
        """
        Wait for the futures in the given sequence to complete.
        Args:
            fs: The sequence of Futures (possibly created by different Executors) to
                wait upon.
            timeout: The maximum number of seconds to wait. If None, then there
                is no limit on the wait time.
            return_when: Indicates when this function should return. The options
                are:
                FIRST_COMPLETED - Return when any future finishes or is
                                  cancelled.
                FIRST_EXCEPTION - Return when any future finishes by raising an
                                  exception. If no future raises an exception
                                  then it is equivalent to ALL_COMPLETED.
                ALL_COMPLETED -   Return when all futures finish or are cancelled.
        Returns:
            A named 2-tuple of sets. The first set, named 'done', contains the
            futures that completed (is finished or cancelled) before the wait
            completed. The second set, named 'not_done', contains uncompleted
            futures.
        """
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
  • 相关阅读:
    【个人博客搭建】(24)统一api接口返回格式
    java计算机毕业设计医院临床管理系统录屏源程序+mysql+系统+lw文档+远程调试
    Groovy(第五节) Groovy 之集合
    二造考生必看|巩固优选题库助力考生最后冲刺
    kubernetes数据持久化StorageClass动态供给(二)
    网页制作基础大二dw作业HTML+CSS+JavaScript云南我的家乡旅游景点
    政企版 WPS Pro 专业版注册安装教程
    山东2024年高企申报条件
    若依分割拼接图片地址
    一生一芯18——Chisel模板与Chisel工程构建
  • 原文地址:https://blog.csdn.net/weixin_45459224/article/details/126182031