• python Threads and ThreadPools


    在之前的文章解释了线程和锁的相关事项,这里准备三篇文章分别介绍下线程和线程池,进程和进程池,已经携程的概念

    本文中重点介绍下线程和线程池的概念。每个python程序都是一个进程且对应了一个主线程用啦执行程序,有时需要再Python进程中创建额外的线程来并发执行任务。通过Thread类初始化一个线程对象来运行目标函数。使用线程创建的步骤:

    1. 通过Thread类创建创建线程实例并指定被调用的target函数/arg参数
    2. 通过start启动
    3. 通过join等待任务完成

    1.线程中运行无参函数

    thread = Thread(target=task)
    
    • 1

    线程对象被创建后,通过start()函数来启动线程

    thread.start()
    
    • 1

    接着可以通过加入线程来等待任务完成

    thread.join()
    
    • 1

    下面用一个完整的实例来验证上述介绍的步骤

    from time import sleep
    from threading import Thread
    
    
    # a simple task that blocks for a moment and prints a message
    def task():
        # block for a moment
        sleep(1)
        # display a message
        print('This is coming from another thread')
    
    
    # create and configure a new thread to run a function
    thread = Thread(target=task)
    # start the task in a new thread
    thread.start()
    # display a message
    print('Waiting for the new thread to finish...')
    # wait for the task to complete
    thread.join()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    运行该示例将创建线程对象以运行task()函数。

    线程启动,task()函数在另一个线程中执行。任务休眠片刻;同时,在主线程中,将打印一条消息,表示我们正在等待,主线程将加入新线程。
    最后,新线程完成休眠,打印消息并关闭。然后,主线程继续运行,并在没有更多指令要执行时关闭。输出结果如下:

    Waiting for the new thread to finish...
    This is coming from another thread
    
    • 1
    • 2

    2.线程中运行有参函数

    接着上面的示例,在task中加入参数,如下所示:

    from time import sleep
    from threading import Thread
     
    # a custom function that blocks for a moment
    def task(sleep_time, message):
        # block for a moment
        sleep(sleep_time)
        # display a message
        print(message)
     
    # create a thread
    thread = Thread(target=task, args=(1.5, 'New message from another thread'))
    # run the thread
    thread.start()
    # wait for the thread to finish
    print('Waiting for the thread...')
    thread.join()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    输出结果如下所示:

    Waiting for the thread...
    New message from another thread
    
    • 1
    • 2

    3.线程池

    当然也可以重新Thread类的run函数来执行目标函数,参考Extend the Thread Class
    有了线程的初步知识,下面介绍下线程池。有了上面的线程已经可以实现了并发为什么还需要线程池呢?这里给出一个串行请求和并发请的求的示例先有个大概了解

    • 因为系统在启动一个新线程的成本比较高,因为设计和操作系统的交互,这种情况下使用线程
    • 可以控制线程的数量,线程池控制创建线程的时间还能控制线程空闲态时的资源消耗
    • 最直观的是使用线程池不需要手动启动、管理、释放线程等待诸多优点
      引用如下一段文字

    A thread pool is a programming pattern for automatically managing a pool of worker threads.
    The pool is responsible for a fixed number of threads.

    It controls when the threads are created, such as just-in-time when they are needed

    It also controls what threads should do when they are not being used,such as making them wait without consuming computational resources.
    Each thread in the pool is called a worker or a worker thread. Each worker is agnostic to the type of tasks that are executed, along with
    the user of the thread pool to execute a suite of similar (homogeneous) or dissimilar tasks (heterogeneous) in terms of the function called, function arguments, task duration, and more.

    Worker threads are designed to be re-used once the task is completedand provide protection against the unexpected failure of the task,
    such as raising an exception, without impacting the worker thread itself.

    This is unlike a single thread that is configured for the single execution of one specific task.
    The pool may provide some facility to configure the worker threads, such as running an initialization function and naming each worker
    thread using a specific naming convention.

    Thread pools can provide a generic interface for executing ad hoc tasks with a variable number of arguments, but do not require that we
    choose a thread to run the task, start the thread, or wait for the task to complete.

    It can be significantly more efficient to use a thread pool instead of manually starting, managing, and closing threads, especially with a
    large number of tasks.

    线程池的基类是 concurrent.futures 模块中的 Executor,其中Executor有两个子类,分别是ThreadPollExecutor和ProcessPoolExecutor,见名知意,分别对应的是线程池和进程池。concurrent.futures module模块中提供的ThreadPoolExecutor 类主要用于创建和管理线程池。concurrent.futures是在Python3.2之后提供的。
    ThreadPoolExecutor继承并丰富了Executor类,该类被调用时返回Future对象

    1. Executor 作为ThreadPoolExecutor父类,用于定义“资源”池的基本少女革命周期操作
    2. Future作为提交task到线程池的对象

    因此使用线程池/进程池管理并发时,只需要将相应的 task 函数提交到线程池/进程池,剩下的事情有线程池/进程池的完成生命周期的管理。
    Executor提供了三种方法来管理线程池:

    1. submit(fn, *args, **kwargs):将 fn 函数提交给线程池。*args 代表传给 fn 函数的参数,*kwargs 代表以关键字参数的形式为 fn 函数传入参数,一般搭配as_completed()使用。
    2. map(func, *iterables, timeout=None, chunksize=1):该函数类似于全局函数 map(func, *iterables),只是该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理。
    3. shutdown(wait=True):关闭线程池。

    这里发现map()并不支持kwargs字典参数,但是对于自己使用中常用的是传入字典参数,有些参数有默认值有些没有默认这个时候如果使用map则可以将字典参数使用tuple转换为可迭代对象,如下所示:

    def testcommand(self, CommandId='', InvocationName='', InvocationDescription='', RepeatMode='', Timeout='',
                          Frequency='', Parameters='', Username='', WorkingDir='', InstanceIds='', expect_fail=True):
    
    • 1
    • 2

    使用并发时调用testcommand函数如下所示

    kwargs = tuple([dict, ] * multiRequestNum )
    
    # 并发执行
    with concurrent.futures.ThreadPoolExecutor() as executor:
        result = executor.map(lambda f: as_client.testcommand(**f), kwargs)
        logger.info(f'并发创建云助手{result}')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    程序将 task 函数提交(submit)给线程池后,submit 方法会返回一个 Future 对象,Future 类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以 Python 使用 Future 来标识。Future提供了如下函数:

    1. cancel():取消该 Future 代表的线程任务。如果该任务正在执行,不可取消,则该方法返回 False;否则,程序会取消该任务,并返回 True。
    2. cancelled():返回 Future 代表的线程任务是否被成功取消。
    3. running():如果该 Future 代表的线程任务正在执行、不可被取消,该方法返回 True。
    4. done():如果该 Funture 代表的线程任务被成功取消或执行完成,则该方法返回 True。
    5. result(timeout=None):获取该 Future 代表的线程任务最后返回的结果。如果 Future 代表的线程任务还未完成,该方法将会阻塞当前线程,其中 timeout 参数指定最多阻塞多少秒。
    6. exception(timeout=None):获取该 Future 代表的线程任务所引发的异常。如果该任务成功完成,没有异常,则该方法返回 None
    7. add_done_callback(fn):为该 Future 代表的线程任务注册一个“回调函数”,当该任务成功完成时,程序会自动触发该 fn 函数。

    在用完一个线程池后,应该调用该线程池的 shutdown() 方法,该方法将启动线程池的关闭序列。调用 shutdown() 方法后的线程池不再接收新任务,但会将以前所有的已提交任务执行完成。当线程池中的所有任务都执行完成后,该线程池中的所有线程都会死亡。

    一言以蔽之,使用线程池来执行线程任务的主要有4步:

    1. Create: 调用 ThreadPoolExecutor 类的constructor 创建一个线程池。
    2. Submit: 调用submit()或者map()函数提交tasks函数提交线程任务并得到futures对象。
    3. Wait: 等待线程任务结束并得到结果(该步骤可选)。
    4. Shut Down:当不想提交任何任务时,调用 ThreadPoolExecutor 对象的 shutdown() 方法来关闭线程池。

    创建线程池时,系统中每个CPU有一个线程,额外再加4个,因此系统支持的线程数如下:

     Default Total Threads = (Total CPUs) + 4
    
    • 1

    比如机器有4个CPU,每个CPU支持超线程,那么Python将看到有8个CPU,默认将为池分配(8 + 4 = 12)12个线程,以自己的机器为例,物理6核开了超线程共个CPU
    在这里插入图片描述

    接着创建ThreadPoolExecutor()

    executor = ThreadPoolExeutor()
    
    • 1

    当然一个比较好的建议是 测试应用程序以确定产生最佳性能的线程数为最好的选择。一次性创建几百上千的线程不一定是好主意,因为可能会影响可用的RAM,而且线程之间大量的切换也会导致性能较差。可以通过max_workers参数指定在池中创建的线程数

    executor = ThreadPoolExecutor(max_workers=14)
    
    • 1

    创建好了线程池之后,提交目标函数tasks到池中。可以使用两种方式,分别是map和submit

    3.1 map()

    map()函数是内置map()函数的异步版本,用于将函数应用于可迭代(如列表)中的每个元素。在池中调用map()函数,并将函数的名称和可迭代函数传递给它。my_task是希望执行并发的函数,my_items是可迭代对象,每个对象都将由my_task函数执行。任务将在线程中排队,并在可用时由池中的工作线程执行。如上面指定的max_workers=14,机器只有12个线程,并发14个时候,有2个线程是等待被调度的状态。map()立即返回一个可迭代对象,该迭代项可用于访问目标任务函数的结果。

    results = pool.map(my_task, my_items) 
    
    • 1
    for result in executor.map(my_task, my_items):
    	print(result) 
    
    • 1
    • 2

    如果希望在迭代时限内等待每个任务完成的时间,可以通过设置timeout参数,设置的时间单位为s

    for result in executor.map(my_task, my_items, timeout=5):
    	# wait for task to complete or timeout expires
    	print(result)
    
    • 1
    • 2
    • 3

    结合上面的介绍,这里给出使用map提交任务的实例

    from time import sleep
    from random import random
    from concurrent.futures import ThreadPoolExecutor
     
    # custom task that will sleep for a variable amount of time
    def task(name):
        # sleep for less than a second
        sleep(random())
        return f'Task: {name} done.'
     
    # start the thread pool
    with ThreadPoolExecutor(10) as executor:
        # execute tasks concurrently and process results in order
        for result in executor.map(task, range(10)):
            # report the result
            print(result)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    输出结果如下所示:

    Task: 0 done.
    Task: 1 done.
    Task: 2 done.
    Task: 3 done.
    Task: 4 done.
    Task: 5 done.
    Task: 6 done.
    Task: 7 done.
    Task: 8 done.
    Task: 9 done.
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    3.2 submit()

    submit()函数将一个任务提交到线程池去执行,该函数获取要调用函数的所有参数,然后立即返回Future对象,Future对象返回任务的结果。

    future = executor.submit(my_task, arg1, arg2) 
    
    • 1

    my_task是要执行的函数,arg*是传递要给my_task的参数,当然可以使用submit()提交不带任何参数的任务。

    future = executor.submit(my_task) 
    
    • 1

    可以通过Future对象调用result()函数获取任务执行结果,此种方式是阻塞的。

    result = future.result() 
    
    • 1

    result()函数也可以执行超时时间,单位s,超时之后引发超时报错。

    result = future.result(timeout=5) 
    
    • 1

    3.3 Wait等待任务完成

    concurrent.futures 模块通过Future对象提供了两种应用程序。只有调用submit()将任务加入到池中时才会创建Future对象,在前面已经提过改Wait模块是可选的,可以在调用map()或者submit()后直接等待结果,或者等待线程池中所有任务完成。两个模块函数分别是:

    • wait()
    • as_completed
      其中wait()函数用于等待Future对象完成,as_completed()用于在任务完成时获取Future对象。
      wait():等待一个或多个Future对象,直到它们完成。

    as_completed():在完成执行时从集合中返回Future对象。
    可以将这两个函数与由一个或多个线程池创建的Future对象一起使用,它们不特定于应用程序中的任何给定线程池。如果希望在执行不同类型任务的多个线程池中执行等待操作,这将非常有用。
    这两个函数都适用于通过列表压缩提交将多个任务分派到线程池的习惯用法如下所示

    futures = [executor.submit(my_task, my_data) for my_data in my_datalist]
    
    • 1

    my_task是待并发的目标函数,my_data作为参数传递给my_task。

    3.4 as_completed等待任务完成

    wait()函数可以接受一个或多个Future对象,并在发生指定操作时返回,例如所有任务完成、一个任务完成或一个任务引发异常. as_completed通过return_when返回一组符合条件Future对象集合,另一个对象集合返回不满足条件的Future兑现。这个对于有大量请求时在我们得到一个符合条件的结果时停止任务比较有用,可以通过给return_when参数赋值FIRST_COMPLETED常量

    done, not_done = wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
    
    • 1

    也可以使用all_COMPLETED等待所有任务完成。

    done, not_done = wait(futures, return_when=concurrent.futures.ALL_COMPLETED)
    
    • 1

    对于上述的,可以指定任务第一次出现异常时就停止任务,可以通过FIRST_EXCEPTION常量来操作

    done, not_done = wait(futures, return_when=concurrent.futures.FIRST_EXCEPTION)
    
    • 1

    并发执行任务的函数在于可以在任务可用时获取任务的结果,而不需要等到所有任务完成,as_completed()函数将任务在线程池中完成时返回任务的Futrue对象。可以调用该函数,并为其提供一个通过调用submit()创建的Future对象列表,当Future对象以任何顺序完成时,它都会返回Future对象。通常在调用submit时创建的Future对象列表上循环使用as_completed()函数;例如:

    for future in as_completed(futures):
    	# get the result for the next completed task
    	result = future.result() # blocks
    
    • 1
    • 2
    • 3

    4. 关闭线程池

    一旦完成了所有任务不在使用线程池,比较好的习惯是及时关闭线程池,释放线程堆栈空间。

    executor.shutdown() 
    
    • 1

    默认情况下shutdown()函数将等待线程池中的所有任务完成后返回,在调用shutdown()时,可以通过wait 参数设置为False来更改此行为,在这种情况下,函数立即返回,线程池使用的资源将在所有当前任务和排队任务完成之前释放。

    executor.shutdown(wait=False)
    
    • 1

    经过上面的介绍发现用起来也很麻烦,又要启动,又要提交,又要关闭…,能不能像文件操作有个上下文管理器一样,只关注文件操作,不关注关闭这些东西,ThreadPoolExecutor也有上下文管理器。

    5.ThreadPoolExecutor上下文管理器

    上下文管理器,使用with创建一个模块,一旦完成,线程池将自动完成,在with内部,上下文管理器使用默认参数调用shutdown()函数,等待所有排队和正在执行的任务完成,然后返回。

    ...
    # create a thread pool
    with ThreadPoolExecutor(max_workers=10) as pool:
    	# submit tasks and get results
    	# ...
    	# automatically shutdown the thread pool...
    # the pool is shutdown at this point
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    6.并发请求示例

    6.1 普通代码请求

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # This module is ***
    # @Time : 2022/12/3 14:52
    # @Author : zhiyu
    # @Site : 
    
    # SuperFastPython.com
    # download document files and save to local files serially
    import time
    from os import makedirs
    from os.path import basename
    from os.path import join
    from urllib.request import urlopen
    
    
    # download a url and return the raw data, or None on error
    def download_url(url):
        try:
            # open a connection to the server
            with urlopen(url, timeout=3) as connection:
                # read the contents of the html doc
                return connection.read()
        except:
            # bad url, socket timeout, http forbidden, etc.
            return None
    
    
    # save data to a local file
    def save_file(url, data, path):
        # get the name of the file from the url
        filename = basename(url)
        # construct a local path for saving the file
        outpath = join(path, filename)
        # save to file
        with open(outpath, 'wb') as file:
            file.write(data)
        return outpath
    
    
    # download and save a url as a local file
    def download_and_save(url, path):
        # download the url
        data = download_url(url)
        # check for no data
        if data is None:
            print(f'>Error downloading {url}')
            return
        # save the data to a local file
        outpath = save_file(url, data, path)
        # report progress
        print(f'>Saved {url} to {outpath}')
    
    
    # download a list of URLs to local files
    def download_docs(urls, path):
        # create the local directory, if needed
        makedirs(path, exist_ok=True)
        # download each url and save as a local file
        for url in urls:
            download_and_save(url, path)
    
    # python concurrency API docs
    URLS = ['https://docs.python.org/3/library/concurrency.html',
            'https://docs.python.org/3/library/concurrent.html',
            'https://docs.python.org/3/library/concurrent.futures.html',
            'https://docs.python.org/3/library/threading.html',
            'https://docs.python.org/3/library/multiprocessing.html',
            'https://docs.python.org/3/library/multiprocessing.shared_memory.html',
            'https://docs.python.org/3/library/subprocess.html',
            'https://docs.python.org/3/library/queue.html',
            'https://docs.python.org/3/library/sched.html',
            'https://docs.python.org/3/library/contextvars.html']
    # local path for saving the files
    PATH = 'docs'
    
    start = time.time()
    # download all docs
    download_docs(URLS, PATH)
    end = time.time()
    print(end-start)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81

    输出结果如下所示:
    在这里插入图片描述

    6.2 使用submit并发请求

    接着使用submit()方式并发下载

    from os import makedirs
    from os.path import basename
    from os.path import join
    from urllib.request import urlopen
    from concurrent.futures import ThreadPoolExecutor
    import time
    
    
    # download a url and return the raw data, or None on error
    def download_url(url):
        try:
            # open a connection to the server
            with urlopen(url, timeout=3) as connection:
                # read the contents of the html doc
                return connection.read()
        except:
            # bad url, socket timeout, http forbidden, etc.
            return None
    
    
    # save data to a local file
    def save_file(url, data, path):
        # get the name of the file from the url
        filename = basename(url)
        # construct a local path for saving the file
        outpath = join(path, filename)
        # save to file
        with open(outpath, 'wb') as file:
            file.write(data)
        return outpath
    
    
    # download and save a url as a local file
    def download_and_save(url, path):
        # download the url
        data = download_url(url)
        # check for no data
        if data is None:
            print(f'>Error downloading {url}')
            return
        # save the data to a local file
        outpath = save_file(url, data, path)
        # report progress
        print(f'>Saved {url} to {outpath}')
    
    
    # download a list of URLs to local files
    def download_docs(urls, path):
        # create the local directory, if needed
        makedirs(path, exist_ok=True)
        # create the thread pool
        n_threads = len(urls)
        with ThreadPoolExecutor(max_workers=n_threads) as executor:
            # download each url and save as a local file
            _ = [executor.submit(download_and_save, url, path) for url in urls]
    
    
    # python concurrency API docs
    URLS = ['https://docs.python.org/3/library/concurrency.html',
            'https://docs.python.org/3/library/concurrent.html',
            'https://docs.python.org/3/library/concurrent.futures.html',
            'https://docs.python.org/3/library/threading.html',
            'https://docs.python.org/3/library/multiprocessing.html',
            'https://docs.python.org/3/library/multiprocessing.shared_memory.html',
            'https://docs.python.org/3/library/subprocess.html',
            'https://docs.python.org/3/library/queue.html',
            'https://docs.python.org/3/library/sched.html',
            'https://docs.python.org/3/library/contextvars.html']
    # local path for saving the files
    PATH = 'docs'
    start = time.time()
    # download all docs
    download_docs(URLS, PATH)
    end = time.time()
    print(end-start)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75

    输出结果如下:
    在这里插入图片描述

    6.3 使用as_completed并发请求

    from os import makedirs
    import time
    from os.path import basename
    from os.path import join
    from urllib.request import urlopen
    from concurrent.futures import ThreadPoolExecutor
    from concurrent.futures import as_completed
    
    
    # download a url and return the raw data, or None on error
    def download_url(url):
        try:
            # open a connection to the server
            with urlopen(url, timeout=3) as connection:
                # read the contents of the html doc
                return (connection.read(), url)
        except:
            # bad url, socket timeout, http forbidden, etc.
            return (None, url)
    
    
    # save data to a local file
    def save_file(url, data, path):
        # get the name of the file from the url
        filename = basename(url)
        # construct a local path for saving the file
        outpath = join(path, filename)
        # save to file
        with open(outpath, 'wb') as file:
            file.write(data)
        return outpath
    
    
    # download a list of URLs to local files
    def download_docs(urls, path):
        # create the local directory, if needed
        makedirs(path, exist_ok=True)
        # create the thread pool
        n_threads = len(urls)
        with ThreadPoolExecutor(n_threads) as executor:
            # download each url and save as a local file
            futures = [executor.submit(download_url, url) for url in urls]
            # process each result as it is available
            for future in as_completed(futures):
                # get the downloaded url data
                data, url = future.result()
                # check for no data
                if data is None:
                    print(f'>Error downloading {url}')
                    continue
                # save the data to a local file
                outpath = save_file(url, data, path)
                # report progress
                print(f'>Saved {url} to {outpath}')
    
    
    # python concurrency API docs
    URLS = ['https://docs.python.org/3/library/concurrency.html',
            'https://docs.python.org/3/library/concurrent.html',
            'https://docs.python.org/3/library/concurrent.futures.html',
            'https://docs.python.org/3/library/threading.html',
            'https://docs.python.org/3/library/multiprocessing.html',
            'https://docs.python.org/3/library/multiprocessing.shared_memory.html',
            'https://docs.python.org/3/library/subprocess.html',
            'https://docs.python.org/3/library/queue.html',
            'https://docs.python.org/3/library/sched.html',
            'https://docs.python.org/3/library/contextvars.html']
    # local path for saving the files
    PATH = 'docs'
    start = time.time()
    # download all docs
    download_docs(URLS, PATH)
    end = time.time()
    print(end-start)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    输出结果如下:
    在这里插入图片描述

    7.常用并发场景

    上面这些内容并不是ThreadPoolExecutor的全部内容。这里再列举一下日常中使用频率比较高的场景,如下所示:

    • Map and Wait Pattern
    • Submit and Use as Completed Pattern
    • Submit and Use Sequentially Pattern
    • Submit and Use Callback Pattern
    • Submit and Wait for All Pattern
    • Submit and Wait for First Pattern

    7.1 map和wait

    使用ThreadPoolExecutor时最常见的场景可能是对每个集合中的每项执行函数。通常使用for循环遍历

    ...
    # apply a function to each element in a collection
    for item in mylist:
    	result = task(item)
    
    • 1
    • 2
    • 3
    • 4

    更好的方式使用map函数

    ...
    # apply the function to each element in the collection
    results = map(task, mylist)
    
    • 1
    • 2
    • 3

    类比到线程池上可以如下使用:

    for result in executor.map(task, mylist):
    	print(result)
    
    • 1
    • 2
    7.1.1 并发无参请求
    # SuperFastPython.com
    # example of the map and wait pattern for the ThreadPoolExecutor
    from time import sleep
    from random import random
    from concurrent.futures import ThreadPoolExecutor
     
    # custom task that will sleep for a variable amount of time
    def task(name):
        # sleep for less than a second
        sleep(random())
        return name
     
    # start the thread pool
    with ThreadPoolExecutor(10) as executor:
        # execute tasks concurrently and process results in order
        for result in executor.map(task, range(10)):
            # retrieve the result
            print(result)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    输出结果如下所示,注意这个和chapter1进行对比
    这里再给出一个使用requests.get()并发请求的实例,参考map()并发请求

    7.1.2 并发有参请求
    from time import sleep
    from random import random
    from concurrent.futures import ThreadPoolExecutor
     
    # custom task that will sleep for a variable amount of time
    def task(value1, value2):
        # sleep for less than a second
        sleep(random())
        return (value1, value2)
     
    # start the thread pool
    with ThreadPoolExecutor() as executor:
        # submit all tasks
        for result in executor.map(task, ['1', '2', '3'], ['a', 'b', 'c']):
            print(result)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    对map函数的调用将会立即向线程池提交所有任务,即使不处理任何或者遍历其结果,该函数是“惰性”的,每次迭代之前不会等待上一次的请求结果,通过下面的实例

    from time import sleep
    from random import random
    from concurrent.futures import ThreadPoolExecutor
    
    
    # custom task that will sleep for a variable amount of time
    def task(value):
        # sleep for less than a second
        sleep(random())
        print(f'Done: {value}')
        return value
    
    
    # start the thread pool
    with ThreadPoolExecutor() as executor:
        # submit all tasks
        executor.map(task, range(5))
    print('All done!')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    7.2 submit提交as_completed等待

    有了上面的知识积累,这里直接使用as_completed的示例

    from time import sleep
    from random import random
    from concurrent.futures import ThreadPoolExecutor
    from concurrent.futures import as_completed
    
    # custom task that will sleep for a variable amount of time
    def task(name):
        # sleep for less than a second
        sleep(random())
        return name
    
    # start the thread pool
    with ThreadPoolExecutor(10) as executor:
        # submit tasks and collect futures
        futures = [executor.submit(task, i) for i in range(10)]
        # process task results as they are available
        for future in as_completed(futures):
            # retrieve the result
            print(future.result())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    执行该代码,可以看到打印输出结果的顺序是任务完成的顺序,而不是任务提交到线程池的顺序。

    9
    8
    2
    4
    3
    1
    5
    6
    0
    7
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    7.3 按照顺序提交

    submit and use sequentiallly,有时候可能需要按照任务提交的顺序并获取结果,这是因为任务本身有一定逻辑顺序,可以通过为每个task调用submit`()来获取Future对象列表,与as_completed的区别在于,可以直接遍历列表,而不用调用as_completed()

    ...
    # process task results in the order they were submitted
    for future in futures:
    	# retrieve the result
    	print(future.result())
    
    • 1
    • 2
    • 3
    • 4
    • 5

    这里给出一个完整的example,输出结果按照我们提交的顺序,即submit and use sequentially

    from time import sleep
    from random import random
    from concurrent.futures import ThreadPoolExecutor
     
    # custom task that will sleep for a variable amount of time
    def task(name):
        # sleep for less than a second
        sleep(random())
        return name
     
    # start the thread pool
    with ThreadPoolExecutor(10) as executor:
        # submit tasks and collect futures
        futures = [executor.submit(task, i) for i in range(10)]
        # process task results in the order they were submitted
        for future in futures:
            # retrieve the result
            print(future.result())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    输出结果如下所示:

    0
    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    7.4 提交任务并使用回调函数

    项目中一直将结果输出到控制台可能不方便问题跟进和定位,此时需要将回调函数应用到提交任务的结果上,可以让线程为我们自动调用函数,通过调用add_done_callback()函数并传递函数来为每个Future对象设置回调。设置后,线程池将在每个任务完成时调用回调函数,并传递任务的Future对象。

    from time import sleep
    from random import random
    from concurrent.futures import ThreadPoolExecutor
    
    # custom task that will sleep for a variable amount of time
    def task(name):
        # sleep for less than a second
        sleep(random())
        return name
    
    # custom callback function called on tasks when they complete
    def custom_callback(fut):
        # retrieve the result
        print(fut.result())
    
    # start the thread pool
    with ThreadPoolExecutor(10) as executor:
        # submit tasks and collect futures
        futures = [executor.submit(task, i) for i in range(10)]
        # register the callback on all tasks
        for future in futures:
            future.add_done_callback(custom_callback)
        # wait for tasks to complete...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    输出结果如下所示:

    7
    6
    3
    0
    2
    8
    9
    5
    4
    1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    当然也也可以调用多个回调函数,此时只需要在futures中增加遍历对象即可。

    ...
    with ThreadPoolExecutor(10) as executor:
        # submit tasks and collect futures
        futures = [executor.submit(task, i) for i in range(10)]
        # register the callbacks on all tasks
        for future in futures:
            future.add_done_callback(custom_callback1)
            future.add_done_callback(custom_callback2)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    7.5 提交并等待所有任务完成

    通常情况下是提交所有任务,然后等待线程池中的所有任务完成。当不需要直接返回结果时,这种方式比较有用,如每个任务都将结果直接存储在资源中,例如存放在文件中,有两种方式可以等待任务完成:调用wait()或者shutdown()函数。最有可能的情况是,希望显示的等待线程池中的一组或者一个子集任务完成,可以通过将任务列表传递给wait()函数实现,默认情况下,wait()函数将等待所有任务完成。

    ...
    # wait for all tasks to complete
    wait(futures)
    
    • 1
    • 2
    • 3

    当然可以显示的指定return_when参数为ALL_COMPLETED,效果同默认值。下面通过一个实例来说明:

    from time import sleep
    from random import random
    from concurrent.futures import ThreadPoolExecutor
    from concurrent.futures import wait
     
    # custom task that will sleep for a variable amount of time
    def task(name):
        # sleep for less than a second
        sleep(random())
        # display the result
        print(name)
     
    # start the thread pool
    with ThreadPoolExecutor(10) as executor:
        # submit tasks and collect futures
        futures = [executor.submit(task, i) for i in range(10)]
        # wait for all tasks to complete
        wait(futures)
        print('All tasks are done!')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    输出结果如下:

    5
    4
    0
    8
    9
    1
    2
    6
    7
    3
    All tasks are done!
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    另一种,是关闭线程池,等待所有正在执行和排队执行的任务完成后再继续,当没有Future对象列表或者只打算为一组任务使用一次线程池时,这种方式是首选。

    from time import sleep
    from random import random
    from concurrent.futures import ThreadPoolExecutor
    
    
    # custom task that will sleep for a variable amount of time
    def task(name):
        # sleep for less than a second
        sleep(random())
        # display the result
        print(name)
    
    
    # start the thread pool
    with ThreadPoolExecutor(10) as executor:
        # submit tasks and collect futures
        futures = [executor.submit(task, i) for i in range(10)]
        # wait for all tasks to complete
    print('All tasks are done!')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    输出结果如下所示:

    3
    9
    0
    2
    7
    6
    5
    1
    4
    8
    All tasks are done!
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    执行时可以发现,主线程会等待所有任务完成以后再打印输出All tasks are done!,即上下文管理器with自动关闭线程池之后,在所有任务完成之前,主线程不会继续打印输出。

    7.6 提交并等待第一个任务成功

    有时候提交多个任务,比较关注第一个成功返回的任务,比如并发执行访问同一个资源,会出现这种情况,这种方式可以使用wait()函数实现,并将return_when参数设置为FIRST_COMPLETED常量

    ...
    # wait until any task completes
    done, not_done = wait(futures, return_when=FIRST_COMPLETED)
    
    • 1
    • 2
    • 3

    我们还必须通过构造线程池并手动调用shutdown()来手动管理线程池,这样我们就可以继续执行主线程,而无需等待所有其他任务完成.

    from time import sleep
    from random import random
    from concurrent.futures import ThreadPoolExecutor
    from concurrent.futures import wait
    from concurrent.futures import FIRST_COMPLETED
    
    
    # custom task that will sleep for a variable amount of time
    def task(name):
        # sleep for less than a second
        sleep(random())
        return name
    
    
    # start the thread pool
    executor = ThreadPoolExecutor(10)
    # submit tasks and collect futures
    futures = [executor.submit(task, i) for i in range(10)]
    # wait until any task completes
    done, not_done = wait(futures, return_when=FIRST_COMPLETED)
    # get the result from the first task to complete
    print(done.pop().result())
    # shutdown without waiting
    executor.shutdown(wait=False, cancel_futures=True)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    输出结果如下所示:

    0
    
    • 1

    8.配置线程池

    前文中已经提及了设置并发线程数为max_worker,这是配置线程池的一个功能,但是配置时时到底有什么窍门呢,这里我们详细介绍一下。在程序定制线程池从三个方面入手:工作线程的数量,池中线程的名称,池中每个线程的初始化工作

    8.1 设置并发线程数

    前文已经提及了设置线程数默认值为系统CPU数量(逻辑CPU,即开通了超线程)+4,比如本机是6核12个逻辑CPU,则默认工作线程为12+4.当然这个东西也有上限,这个默认上限为32,即使物理核数大于32.主要是因为线程用于IO绑定任务,而不是CPU绑定任务,意味着线程用于等待相对较慢的响应任务,如硬盘驱动器,DVD驱动器,打印机和网络连接诸如这些。因此根据实际需要,应用程序中有成百上千的线程并不罕见,如果确实需要这么多线程,可以使用AsyncIO,后面的其他章节将会介绍。上述内容通过查看源码文件可以得以证实
    在这里插入图片描述

    from concurrent.futures import ThreadPoolExecutor
    # create a thread pool with the default number of worker threads
    pool = ThreadPoolExecutor()
    # report the number of worker threads chosen by default
    print(pool._max_workers)
    
    • 1
    • 2
    • 3
    • 4
    • 5

    输出结果如下所示:

    16
    
    • 1

    和我们上面分析的结果是一致的。当然也可以手动指定max_worker.

    from concurrent.futures import ThreadPoolExecutor
    # create a thread pool with a large number of worker threads
    pool = ThreadPoolExecutor(500)
    # report the number of worker threads
    print(pool._max_workers)
    
    • 1
    • 2
    • 3
    • 4
    • 5

    输出结果如下所示:

    500
    
    • 1

    8.2 设置线程名称

    线程池中每一个执行线程都有名字,主线程的叫MainThread,通过调用main_thread()获取名称属性。

    # access the name of the main thread
    from threading import main_thread
    # access the main thread
    thread = main_thread()
    # report the thread name
    print(thread.name)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    在线程池中创建线程时,每个线程的名称为ThreadPoolExecutor-%d-%d,第一个是%d是线程池的名称,第二个%d是线程的名称,按照线程池/线程的创建顺序命名的。可以通过线程线程对象的enumerate()获得,如下所示:

    import threading
    from concurrent.futures import ThreadPoolExecutor
    
    # a mock task that does nothing
    def task(name):
        pass
    
    # create a thread pool
    executor = ThreadPoolExecutor()
    # execute asks
    executor.map(task, range(10))
    # report all thread names
    for thread in threading.enumerate():
        print(thread.name)
    # shutdown the thread pool
    executor.shutdown()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    输出结果如下所示:

    ThreadPoolExecutor-0_0
    ThreadPoolExecutor-0_1
    ThreadPoolExecutor-0_2
    ThreadPoolExecutor-0_3
    
    • 1
    • 2
    • 3
    • 4

    可以通过设置线程池的名字,即设置ThreadPoolExecutor-%d这个前缀名称,通过设置thread_name_prefix参数来构造线程池

    import threading
    from concurrent.futures import ThreadPoolExecutor
     
    # a mock task that does nothing
    def task(name):
        pass
     
    # create a thread pool with a custom name prefix
    executor = ThreadPoolExecutor(thread_name_prefix='TaskPool')
    # execute asks
    executor.map(task, range(10))
    # report all thread names
    for thread in threading.enumerate():
        print(thread.name)
    # shutdown the thread pool
    executor.shutdown()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    8.3 工作线程初始化

    工作线程在执行之前调用某个设定的函数,可以通过initargs参数将其传递给线程池。

    from time import sleep
    from random import random
    from threading import current_thread
    from concurrent.futures import ThreadPoolExecutor
    
    
    # function for initializing the worker thread
    def initializer_worker():
        # get the unique name for this thread
        name = current_thread().name
        # store the unique worker name in a thread local variable
        print(f'Initializing worker thread {name}')
    
    
    # a mock task that sleeps for a random amount of time less than one second
    def task(identifier):
        sleep(random())
        # get the unique name
        return identifier
    
    
    # create a thread pool
    with ThreadPoolExecutor(max_workers=2, initializer=initializer_worker) as executor:
        # execute asks
        for result in executor.map(task, range(10)):
            print(result)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    输出结果如下所示:

    Initializing worker thread ThreadPoolExecutor-0_0
    Initializing worker thread ThreadPoolExecutor-0_1
    0
    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    线程池初始化了两个线程,共有10个任务需要被调度。

    使用选择

    上文中使用了submit的多种情况,到底如何选择呢?

    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(f, iterable))
    
    • 1
    • 2

    和使用lamda的方式

    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        results = list(map(lambda x: executor.submit(f, x), iterable))
    
    • 1
    • 2

    上述两种方式会产生不同的结果,第一个生成一个f返回的任何类型的列表,第二个生成一个concurrent.futures.Future对象的列表,然后需要用它们的result()方法对这些对象求值,以便获得f返回的值。所以如果使用第二个可以

    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        result_futures = list(map(lambda x: executor.submit(f, x), iterable))
        results = [f.result() for f in futures.as_completed(result_futures)]
    
    • 1
    • 2
    • 3

    还是以上面的示例,比如在restful接口请求的示例中,比如get请求,一般会返回response和status_code两个参数,

     with ThreadPoolExecutor(max_workers=2) as executor:
          futures = []
         
          for index in [1,2,3]:
              asg_kwargs = {'key1': id1, 'key2': [id2]}
              // flag为上文中传入判断参数
              if flag:
                  futures.append(executor.submit(client.openapi_ap1, **asg_kwargs))
              else:
                  futures.append(executor.submit(client.openapi_ap2, **asg_kwargs))
          # 等待任务执行完成,超时报错
          for future in as_completed(futures):
              data = future.result()
              if data[1] != 200:
                  assert data[1] == 400
                  assert **** # assert进行判断
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
  • 相关阅读:
    MySQL——单行函数和分组函数
    Elasticsearch 如何实现时间差查询?
    终端天线—11.NFC线圈仿真
    SpringBoot启动后出现Please sign in页面
    从ZETA无线通信技术特点出发选择合适的物联网协议
    php将word中的omath转成mathml
    【LeetCode】最大矩形(单调栈)
    为什么arcmap结果一直运行不成功,输出的栅格也是空的
    聊聊计算机中的寄存器
    WHAT - reflect-metadata
  • 原文地址:https://blog.csdn.net/rhx_qiuzhi/article/details/128158912