• Python进阶教学——多线程高级应用


    目录

    一、线程间的通讯机制

    二、线程中的消息隔离机制

    三、线程同步信号量

    四、线程池和进程池


    一、线程间的通讯机制

    1、Queue消息队列
    • 消息队列是在消息的传输过程中保存消息的容器,主要用于不同线程间任意类型数据的共享。
    • 消息队列最经典的用法就是消费者和生成者之间通过消息管道来传递消息,消费者和生成者是不同的线程。生产者往管道中写消息,消费者从管道中读消息,且一次只允许一个线程访问管道。
    1.1、常用接口
      1. from queue import Queue
      2. q =Queue(maxsize=0) # 初始化,maxsize=0表示队列的消息个数不受限制;maxsize>0表示存放限制
      3. q.get() # 提取消息,如果队列为空会阻塞程序,等待队列消息
      4. q.get(timeout=1) # 阻塞程序,设置超时时间
      5. q.put() # 发送消息,将消息放入队列
    1.2、演示
    • 使用生产者和消费者的案例进行演示。 
        1. from queue import Queue
        2. import threading
        3. import time
        4. def product(q): # 生产者
        5. kind = ('猪肉','白菜','豆沙')
        6. for i in range(3):
        7. print(threading.current_thread().name,"生产者开始生产包子")
        8. time.sleep(1)
        9. q.put(kind[i%3]) # 放入包子
        10. print(threading.current_thread().name,"生产者的包子做完了")
        11. def consumer(q): # 消费者
        12. while True:
        13. print(threading.current_thread().name,"消费者准备吃包子")
        14. time.sleep(1)
        15. t=q.get() # 拿出包子
        16. print("消费者吃了一个{}包子".format(t))
        17. if __name__=='__main__':
        18. q=Queue(maxsize=1)
        19. # 启动两个生产者线程
        20. threading.Thread(target=product,args=(q, )).start()
        21. threading.Thread(target=product,args=(q, )).start()
        22. # 启动一个消费者线程
        23. threading.Thread(target=consumer,args=(q, )).start()
    • 运行结果:
    2、Event事件对象
    • 事件对象主要用于通过事件通知机制实现线程的大规模并发。
    • 事件对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,事件对象中的信号标志被设置为假。如果有线程等待一个事件对象,而这个事件对象的标志为假,那么这个线程将会被一直阻塞直到该标志为真。如果一个事件对象的信号标志被设置为真,它将唤醒所有等待该事件对象的线程。如果一个线程等待一个已经被设置为真的事件对象,那么它将忽略这个事件,继续执行。
    • 应用场景:多个线程逐步开始运行时,由于某个条件未满足,则它们都会被阻塞。直到条件满足后,才全部继续执行。
    2.1、常用接口
      1. import threading # 使用多线程必要的模块
      2. event=threading.Event() # 创建一个evnet对象
      3. event.clear() # 重置代码中的event对象,使得所有该event事件都处于待命状态
      4. event.wait() # 阻塞线程,等待event指令
      5. event.set() # 发送event指令,使得所有设置该event事件的线程执行
    2.2、演示 
    •  创建10个线程对象,使用event事件将其全部关联起来。先把它们全部阻塞,再同时运行。
        1. import threading,time
        2. # 自定义的线程类
        3. class MyThread(threading.Thread): # 继承threading.Thread类
        4. # 初始化
        5. def __init__(self,event):
        6. super().__init__() # 调用父类的初始化方法,super()代表父类对象
        7. self.event=event # 将传入的事件对象event绑定到当前线程实例上
        8. # 运行
        9. def run(self):
        10. print(f"线程{self.name}已经初始化完成,随时准备启动...")
        11. self.event.wait() # 阻塞线程,等待event触发
        12. print(f"{self.name}开始执行...")
        13. if __name__=='__main__':
        14. event=threading.Event()
        15. threads=[]
        16. # 创建10个MyThread线程对象,并传入event。这样每个线程都与这个事件相关联
        17. [threads.append(MyThread(event)) for i in range(1,11)]
        18. event.clear() # 使得所有该event事件都处于待命状态
        19. [t.start() for t in threads] # 启动所有线程,由于事件未触发,线程都被锁定
        20. time.sleep(5)
        21. event.set() # 使得所有设置该event事件的线程执行
        22. [t.join for t in threads] # 等待所有线程结束后再继续执行主线程
    •  运行结果:
    • 多个线程可以绑定同一个事件,在事件触发时统一并发执行。
    3、Condition条件对象
    • 条件对象主要用于多个线程间轮流交替执行任务。
    3.1、常用接口
      1. import threading
      2. cond=threading.Condition() # 新建一个条件对象
      3. self.cond.acquire() # 获取锁
      4. self.cond.wait() # 线程阻塞,等待通知
      5. self.cond.notify() # 唤醒其他wait状态的线程
      6. self.cond.release() # 释放锁
    3.2、演示
    • 创建两个线程,轮流执行完成对话,如下图。
        1. import threading
        2. # 新建一个条件对象
        3. cond=threading.Condition()
        4. class thread1(threading.Thread):
        5. def __init__(self,cond,name):
        6. threading.Thread.__init__(self,name=name)
        7. self.cond=cond
        8. def run(self):
        9. self.cond.acquire() # 获取锁
        10. print(self.name+":一支穿云箭") # 线程1说的第1句话
        11. self.cond.notify() # 唤醒其他wait状态的线程(通知线程2说话)
        12. self.cond.wait() # 线程阻塞,等待通知
        13. print(self.name+":山无楞,天地合,乃敢与君决") # 线程1说的第2句话
        14. self.cond.notify() # 唤醒其他wait状态的线程(通知线程2说话)
        15. self.cond.wait() # 线程阻塞,等待通知
        16. print(self.name+":紫薇") # 线程1说的第3句话
        17. self.cond.notify() # 唤醒其他wait状态的线程(通知线程2说话)
        18. self.cond.wait() # 线程阻塞,等待通知
        19. print(self.name+":是你") # 线程1说的第4句话
        20. self.cond.notify() # 唤醒其他wait状态的线程(通知线程2说话)
        21. self.cond.wait() # 线程阻塞,等待通知
        22. print(self.name+":有钱吗,借点?") # 线程1说的第5句话
        23. self.cond.notify() # 唤醒其他wait状态的线程(通知线程2说话)
        24. self.cond.release() # 释放锁
        25. class thread2(threading.Thread):
        26. def __init__(self,cond,name):
        27. threading.Thread.__init__(self,name=name)
        28. self.cond=cond
        29. def run(self):
        30. self.cond.acquire() # 获取锁
        31. self.cond.wait() # 线程阻塞,等待通知
        32. print(self.name+":千军万马来相见") # 线程2说的第1句话
        33. self.cond.notify() # 唤醒其他wait状态的线程(通知线程2说话)
        34. self.cond.wait() # 线程阻塞,等待通知
        35. print(self.name+":海可枯,石可烂,激情永不散") # 线程2说的第3句话
        36. self.cond.notify() # 唤醒其他wait状态的线程(通知线程2说话)
        37. self.cond.wait() # 线程阻塞,等待通知
        38. print(self.name+":尔康") # 线程2说的第3句话
        39. self.cond.notify() # 唤醒其他wait状态的线程(通知线程2说话)
        40. self.cond.wait() # 线程阻塞,等待通知
        41. print(self.name+":是我") # 线程2说的第4句话
        42. self.cond.notify() # 唤醒其他wait状态的线程(通知线程2说话)
        43. self.cond.wait() # 线程阻塞,等待通知
        44. print(self.name+":滚!") # 线程2说的第5句话
        45. self.cond.release()
        46. if __name__=='__main__':
        47. thread1=thread1(cond,'线程1')
        48. thread2=thread2(cond,'线程2')
        49. # 虽然是线程1先说话,但是不能让它先启动。因为线程1先启动的话,发出notify指令,而线程2可能还未启动,导致notify指令无法接收。
        50. thread2.start() # 线程2先执行
        51. thread1.start()
    • 运行结果:

    二、线程中的消息隔离机制

    1、消息隔离
    • 假设有两个线程,线程A种的变量和线程B中的变量值不能共享,这就是消息隔离
    • 那变量名取不一样不就好啦?的确可以。但如果所有的线程都是由一个class实例化出来的对象呢?这样要给每个线程添加不同的变量就显得很麻烦了。
    • 基于上述场景,python提供了threading.local()这个类,可以很方便的控制变量的隔离,即使是同一个变量,在不同的线程中,其值也是不能共享的。
    2、演示
    • 设置一个threading.local共享线程内全局变量,然后新建2个线程,分别设置这个threading.local的值,然后再分别打印这两个threading.local的值,确认是否每个线程打印出来的threading.local的值都是不同的。
        1. import threading
        2. local_data=threading.local() # 定义线程内全局变量
        3. local_data.name='local_data' # 初始名称(主线程)
        4. class MyThread(threading.Thread):
        5. def run(self):
        6. print("赋值前-子线程:",threading.current_thread(),local_data.__dict__) # local_data.__dict__:打印对象所有属性
        7. # 在子线程中修改local_data.name的值
        8. local_data.name=self.name
        9. print("赋值后-子线程:",threading.current_thread(),local_data.__dict__)
        10. if __name__=='__main__':
        11. print("开始前-主线程:",local_data.__dict__)
        12. # 启动两个线程
        13. t1=MyThread()
        14. t1.start()
        15. t1.join()
        16. t2=MyThread()
        17. t2.start()
        18. t2.join()
        19. print("开始后-主线程:",local_data.__dict__)
    • 运行结果:
      • 主线程中的local_data被赋值,而子线程开始前的local_data并未赋值,故为空。

    三、线程同步信号量

    1、简介
    • semaphore信号量是用于控制线程工作数量的一种锁。
    • 我们知道,在访问文件时,一次只能有一个线程写,而可以有多个线程同时读。如果我们要控制同时读取文件的线程个数,就需要使用到同步信号量。
    • 当信号量不为0时,其他线程可以获取该信号量执行任务。每增加一个线程执行任务,信号量就会减一;每减少一个线程执行任务,信号量加一。当信号量为0时,其他线程全部阻塞,直到有线程完成任务释放信号量。
    2、演示
    • 建立10个模拟爬取网站内容的线程,利用同步信号量控制每次只能由3个线程执行任务。
        1. import threading,time
        2. # 模拟爬取网站内容的线程
        3. class HtmlSpider(threading.Thread):
        4. def __init__(self,url,sem):
        5. super().__init__()
        6. self.url=url
        7. self.sem=sem
        8. def run(self):
        9. time.sleep(2) # 模拟网络等待
        10. print("获取网页内容成功!")
        11. self.sem.release() # 释放信号量
        12. # 模拟爬取网站链接的线程
        13. class UrlProducer(threading.Thread):
        14. def __init__(self,sem):
        15. super().__init__()
        16. self.sem=sem
        17. def run(self):
        18. for i in range(20):
        19. self.sem.acquire() # 获取信号量
        20. html_thread=HtmlSpider(f'https://www.baidu.com/{i}',self.sem) # 模拟20个网址,并爬取内容
        21. html_thread.start()
        22. if __name__=='__main__':
        23. sem=threading.Semaphore(value=3) # 同步信号量
        24. url_producer=UrlProducer(sem) # 创建获取链接的线程对象
        25. url_producer.start() # 启动线程
    • 运行结果:
      • 每三个一组完成任务。

    四、线程池和进程池

    1、线程池
    • 线程池是一种多线程处理的形式。线程池维护着多个线程,等待着管理者分配可并发执行的任务,这些任务会被分配给空闲的线程。这避免了在处理短时间任务时创建与销毁线程的代价,让创建好的线程得到重复利用。线程池不仅能够保证内核的充分利用,还能防止过分调度。
    • 不使用线程池:
      • 每个线程在创建并且执行完任务后就会被销毁。
    • 使用线程池:
      • 线程池中的线程会一直保留,直到程序结束。
    1.1、线程池模块
      1. from concurrent.futures import ThreadPoolExecutor # 线程池模块
      2. executor = ThreadPoolExecutor(max_workers=3) # 创建线程池对象,max_worker为最大线程数
      3. task1=executor.submit() # 提交需要线程完成的任务
    • 线程池模块的特性:
      • 主线程可以获取某一个线程或任务的状态,以及返回值。
      • 当一个线程完成的时候,主线程能够立即知道。
      • 让多线程和多进程的编码接口一致。
    1.2、演示
    • 创建一个线程池,其中包含3个线程,并为该线程池分配4个任务。 
        1. from concurrent.futures import ThreadPoolExecutor
        2. import time
        3. # 创建一个新的线程池对象,并且指定线程池中最大的线程数为3
        4. executor = ThreadPoolExecutor(max_workers=3)
        5. def get_html(times):
        6. time.sleep(times)
        7. print(f'获取网页信息{times}完毕')
        8. return times
        9. # 通过submit方法提交执行的函数到线程池中,submit函数会立刻返回,不阻塞主线程
        10. # 只要线程池中有可用线程,就会自动分配线程去完成对应的任务
        11. task1=executor.submit(get_html,1)
        12. task2=executor.submit(get_html,2)
        13. task3=executor.submit(get_html,3)
        14. task4=executor.submit(get_html,2) # 多余的任务需要等待线程池中有空闲的线程
    •  运行结果:
    1.3、基本方法
    1.3.1、done、cancel、result
    • done():检查任务是否完成,并返回结果。但并不知道线程什么时候完成的。
    • cancel():取消任务的执行,该任务没有放入线程池中才能取消。
    • result():拿到任务的执行结果,该方法是一个阻塞方法。
    • 演示
        1. from concurrent.futures import ThreadPoolExecutor
        2. import time
        3. # 创建一个新的线程池对象,并且指定线程池中最大的线程数为3
        4. executor = ThreadPoolExecutor(max_workers=3)
        5. def get_html(times):
        6. time.sleep(times)
        7. print(f'获取网页信息{times}完毕')
        8. return times
        9. # 通过submit方法提交执行的函数到线程池中,submit函数会立刻返回,不阻塞主线程
        10. # 只要线程池中有可用线程,就会自动分配线程去完成对应的任务
        11. task1=executor.submit(get_html,1)
        12. task2=executor.submit(get_html,2)
        13. task3=executor.submit(get_html,3)
        14. task4=executor.submit(get_html,2) # 多余的任务需要等待线程池中有空闲的线程
        15. print(task1.done()) # 检查任务是否完成,并返回结果
        16. print(task4.cancel()) # 取消任务的执行,该任务没有放入线程池中才能取消
        17. print(task3.result()) # 拿到任务的执行结果,该方法是一个阻塞方法
      • 运行结果:
    1.3.2、as_completed
    • as_completed用来检查任务是否完成,并根据任务完成的时间返回结果。
    • 演示
        1. from concurrent.futures import ThreadPoolExecutor,as_completed
        2. import time
        3. # 创建一个新的线程池对象,并且指定线程池中最大的线程数为3
        4. executor = ThreadPoolExecutor(max_workers=3)
        5. def get_html(times):
        6. time.sleep(times)
        7. print(f'获取网页信息{times}完毕')
        8. return times
        9. urls=[4,2,3] # 通过urls列表模拟要抓取的url
        10. # 通过列表推导式改造多线程任务
        11. all_task=[executor.submit(get_html,url) for url in urls]
        12. # 按照任务完成顺序返回结果
        13. for item in as_completed(all_task): # as_completed是一个生成器,在任务没有完成之前是阻塞的
        14. data=item.result()
        15. print(f"主线程中获取任务的返回值是:{data}")
      • 运行结果:
    1.3.3、map
    • map和as_completed都可以拿到线程执行的结果,但是map也只是按照输入顺序拿到结果,而as_completed是根据任务结束顺序拿到结果。
    • 演示
        1. from concurrent.futures import ThreadPoolExecutor
        2. import time
        3. # 创建一个新的线程池对象,并且指定线程池中最大的线程数为3
        4. executor = ThreadPoolExecutor(max_workers=3)
        5. def get_html(times):
        6. time.sleep(times)
        7. print(f'获取网页信息{times}完毕')
        8. return times
        9. urls=[4,2,3] # 通过urls列表模拟要抓取的url
        10. # map是一个生成器,不需要submit,直接将任务分配给线程池
        11. for data in executor.map(get_html,urls):
        12. print(f"主线程中获取任务的返回值是:{data}")
      • 运行结果:
    1.3.4、wait
    • wait可以阻塞主线程,直到满足指定的条件。 return_when指定了需要满足的条件。
    • 演示
        1. from concurrent.futures import ThreadPoolExecutor,wait,ALL_COMPLETED
        2. import time
        3. # 创建一个新的线程池对象,并且指定线程池中最大的线程数为3
        4. executor = ThreadPoolExecutor(max_workers=3)
        5. def get_html(times):
        6. time.sleep(times)
        7. print(f'获取网页信息{times}完毕')
        8. return times
        9. urls=[4,2,3] # 通过urls列表模拟要抓取的url
        10. all_task=[executor.submit(get_html,url) for url in urls] # 任务列表
        11. wait(all_task,return_when=ALL_COMPLETED) # 让主线程阻塞,ALL_COMPLETED直到线程池中的所有线程任务执行完毕
        12. print('代码执行完毕')
      • 运行结果:        
    2、进程池
    • 与线程池类似,进程池是一种多进程处理的形式。进程池维护着多个进程,等待着管理者分配可并发执行的任务,这些任务会被分配给空闲的进程。
    2.1、进程池模块
    • 可使用的进程池模块有两种,如下所示。 
      1. # 使用concurrent futures模块提供的ProcessPoolExecutor来实现进程池
      2. from concurrent.futures import ProcessPoolExecutor # 使用方法与线程池的完全一致
      3. # 使用Pool类来实现进程池
      4. from multiprocessing
      5. multiprocessing.Pool()
    •  下面讲解使用Pool类来实现进程池。
    2.2、演示
    • 单个进程执行
        1. import multiprocessing
        2. import time
        3. def get_html(n):
        4. time.sleep(n)
        5. print(f"子进程{n}获取内容成功")
        6. return n
        7. if __name__=='__main__':
        8. pool=multiprocessing.Pool(multiprocessing.cpu_count()) # 设置进程池,进程数量是电脑cpu的核心数
        9. result=pool.apply_async(get_html,args=(3,)) # 异步方式调用
        10. pool.close() # 这个方法必须在join前调用
        11. pool.join() # 子进程未执行完前,会阻塞主进程代码
        12. print(result.get()) # 拿到子进程执行的结果
        13. print("end")
      • 运行结果:
    • 【注】进程数量最好与电脑CPU的核心数相同。
      • join前必须调用close方法。
    • 多个进程执行
        1. import multiprocessing
        2. import time
        3. def get_html(n):
        4. time.sleep(n)
        5. print(f"子进程{n}获取内容成功")
        6. return n
        7. if __name__=='__main__':
        8. pool=multiprocessing.Pool(multiprocessing.cpu_count()) # 设置进程池,进程数量是电脑cpu的核心数
        9. for result in pool.imap(get_html,[4,2,3]):
        10. print(f"{result}休眠执行成功!")
      • 运行结果:
        • 返回的结果是按顺序输出的。

  • 相关阅读:
    使用LIME解释CNN
    04 【函数的扩展】
    移远BG95的AT指令流程汇总
    nginx的重定向
    玩转MyBatis-Plus分页插件一:分页基本使用+方法解释+解析Page对象
    Java 数据结构与算法 插入类排序:直接插入排序、希尔排序
    使用requests库解决Session对象设置超时的问题
    业务指标采集影响系统性能问题排查
    UE5 ChaosVehicles载具 增加方向盘动画 (连载三)
    猿创征文|信息抽取(2)——pytorch实现Bert-BiLSTM-CRF、Bert-CRF模型进行实体抽取
  • 原文地址:https://blog.csdn.net/weixin_45100742/article/details/132824377