从源代码修改
- from threading import Thread
- from queue import Queue
-
- class WithThread:
-
- def __init__(self, thread_num):
- self.thread_num = thread_num
- self.task_queue = Queue()
- self.threads = []
- self.is_running = False
- self.thread_status = [True] * thread_num # 新增一个列表记录每个线程的状态
-
- def put(self, func, args=()):
- if not self.is_running:
- self.start()
- self.task_queue.put((func, args))
-
- def start(self):
- self.is_running = True
- for i in range(self.thread_num):
- t = Thread(target=self._run, args=(i,)) # 将线程ID作为参数传入方法中
- t.start()
- self.threads.append(t)
-
- def stop(self):
- self.is_running = False
- for t in self.threads:
- t.join()
-
- def _run(self, i):
- while self.is_running:
- try:
- func, args = self.task_queue.get(timeout=1)
- except Exception as e:
- # 捕获Empty异常
- continue
- try:
- func(*args)
- except Exception as e:
- print("任务执行失败: ", e)
- finally:
- self.task_queue.task_done()
- self.thread_status[i] = True # 在任务完成后将状态标记为True
-
- def get_thread_status(self):
- return self.thread_status # 返回线程状态列表
新增了一个thread_status列表来记录每个线程的状态,并在每个线程的_run方法中更新状态。另外,我们还新增了一个get_thread_status方法,用于获取所有线程的状态。
- from threading import Thread
- from queue import Queue
-
- class WithThread:
-
- def __init__(self, thread_num, max_queue_size=100):
- self.thread_num = thread_num
- self.task_queue = Queue()
- self.max_queue_size = max_queue_size
- self.threads = []
- self.is_running = False
-
- def put(self, func, args=()):
- if not self.is_running:
- self.start()
- self.task_queue.put((func, args))
-
- def start(self):
- self.is_running = True
- for i in range(self.thread_num):
- t = Thread(target=self._run)
- t.start()
- self.threads.append(t)
-
- def stop(self):
- self.is_running = False
- for t in self.threads:
- t.join()
-
- def _run(self):
- while self.is_running:
- try:
- func, args = self.task_queue.get(timeout=1)
- except Exception as e:
- # 捕获Empty异常
- continue
- try:
- func(*args)
- except Exception as e:
- print("任务执行失败: ", e)
- finally:
- self.task_queue.task_done()
- # 判断队列长度是否超出阈值
- if self.task_queue.qsize() > self.max_queue_size:
- print("当前线程受阻,队列长度:", self.task_queue.qsize())
新增了一个max_queue_size参数,并在线程的_run方法中判断队列长度是否超过该阈值。如果队列长度超过阈值,就认为该线程受阻。