• 【python】ThreadPoolExecutor线程池


    python的Demo

    from concurrent.futures import ThreadPoolExecutor
    import time
    
    class RejectableThreadPool:
        def __init__(self, max_workers, max_pending_tasks):
            self.max_pending_tasks = max_pending_tasks
            self.executor = ThreadPoolExecutor(max_workers=max_workers)
            self.pending_tasks = 0
    
        def submit_task(self, func, *args, **kwargs):
            if self.pending_tasks >= self.max_pending_tasks:
                return "Task limit reached. Try again later."
    
            future = self.executor.submit(self._task_wrapper, func, *args, **kwargs)
            self.pending_tasks += 1
            return future
    
        def _task_wrapper(self, func, *args, **kwargs):
            try:
                result = func(*args, **kwargs)
            finally:
                self.pending_tasks -= 1
    
        def shutdown(self):
            self.executor.shutdown()
    
        def get_pending_tasks(self):
            return self.pending_tasks
    
    
    def task_func(task_id):
        print(f"Task {task_id} started")
        time.sleep(1)
        print(f"Task {task_id} completed")
    
    thread_pool = RejectableThreadPool(max_workers=5, max_pending_tasks=2)
    
    # 提交任务
    for i in range(5):
        result = thread_pool.submit_task(task_func, i)
        if isinstance(result, str):
            print(result)  # 输出任务被拒绝的提示信息
    
    # 等待所有任务完成
    thread_pool.shutdown()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    解释

    让我们逐行解释代码的功能和作用:

    首先,我们引入了ThreadPoolExecutor类和time模块,ThreadPoolExecutor是Python标准库concurrent.futures模块提供的一个线程池实现。

    RejectableThreadPool类的定义开始。在构造函数__init__中,我们传入了两个参数max_workers和max_pending_tasks,分别代表线程池的最大工作线程数和最大挂起任务数。我们还初始化了executor成员变量为一个ThreadPoolExecutor对象,并设置了max_workers为传入的max_workers值,pending_tasks初始化为0。

    submit_task方法用于提交任务。首先,它会检查当前挂起任务数是否已达到最大限制,如果是,则直接返回一个字符串"Task limit reached. Try again later."表示任务被拒绝。否则,它会使用executor.submit方法来提交任务,并调用_task_wrapper方法包装任务,最后增加挂起任务数并返回任务的Future对象。

    _task_wrapper方法用于执行任务函数。它接收任务函数func和其它参数,首先调用func函数执行任务,并在finally块中减少挂起任务数。

    shutdown方法用于等待所有任务执行完毕并关闭线程池。

    get_pending_tasks方法返回当前挂起任务数。

    下面是一个示例使用该线程池的代码。我们创建了一个RejectableThreadPool对象thread_pool,设置最大工作线程数为5,最大挂起任务数为2。

    之后,我们使用submit_task方法提交了5个任务给线程池。在每次提交任务后,我们检查返回结果的类型,如果是字符串类型,则说明任务被拒绝,我们将其打印出来。

    最后,我们调用shutdown方法等待所有任务执行完毕并关闭线程池。

    通过以上的代码示例,我们可以实现一个具有任务拒绝机制的自定义线程池,用于任务调度和执行。

  • 相关阅读:
    css 10-13
    Kafka高性能
    Git知识汇总
    DRF纯净版项目搭建和配置
    Java 线程池调度周期性任务“异常“探究
    如何手写一个js工具库?同时发布到npm上
    websocket基础
    支持JDK19虚拟线程的web框架,之一:体验
    JavaScript -- 07. 面向对象编程
    微信小程序控制元素显示隐藏
  • 原文地址:https://blog.csdn.net/hh1357102/article/details/133828139