目录
- from queue import Queue
- q =Queue(maxsize=0) # 初始化,maxsize=0表示队列的消息个数不受限制;maxsize>0表示存放限制
- q.get() # 提取消息,如果队列为空会阻塞程序,等待队列消息
- q.get(timeout=1) # 阻塞程序,设置超时时间
- q.put() # 发送消息,将消息放入队列
- from queue import Queue
- import threading
- import time
- def product(q): # 生产者
- kind = ('猪肉','白菜','豆沙')
- for i in range(3):
- print(threading.current_thread().name,"生产者开始生产包子")
- time.sleep(1)
- q.put(kind[i%3]) # 放入包子
- print(threading.current_thread().name,"生产者的包子做完了")
- def consumer(q): # 消费者
- while True:
- print(threading.current_thread().name,"消费者准备吃包子")
- time.sleep(1)
- t=q.get() # 拿出包子
- print("消费者吃了一个{}包子".format(t))
- if __name__=='__main__':
- q=Queue(maxsize=1)
- # 启动两个生产者线程
- threading.Thread(target=product,args=(q, )).start()
- threading.Thread(target=product,args=(q, )).start()
- # 启动一个消费者线程
- threading.Thread(target=consumer,args=(q, )).start()

- import threading # 使用多线程必要的模块
- event=threading.Event() # 创建一个evnet对象
- event.clear() # 重置代码中的event对象,使得所有该event事件都处于待命状态
- event.wait() # 阻塞线程,等待event指令
- event.set() # 发送event指令,使得所有设置该event事件的线程执行
- import threading,time
- # 自定义的线程类
- class MyThread(threading.Thread): # 继承threading.Thread类
- # 初始化
- def __init__(self,event):
- super().__init__() # 调用父类的初始化方法,super()代表父类对象
- self.event=event # 将传入的事件对象event绑定到当前线程实例上
- # 运行
- def run(self):
- print(f"线程{self.name}已经初始化完成,随时准备启动...")
- self.event.wait() # 阻塞线程,等待event触发
- print(f"{self.name}开始执行...")
- if __name__=='__main__':
- event=threading.Event()
- threads=[]
- # 创建10个MyThread线程对象,并传入event。这样每个线程都与这个事件相关联
- [threads.append(MyThread(event)) for i in range(1,11)]
- event.clear() # 使得所有该event事件都处于待命状态
- [t.start() for t in threads] # 启动所有线程,由于事件未触发,线程都被锁定
- time.sleep(5)
- event.set() # 使得所有设置该event事件的线程执行
- [t.join for t in threads] # 等待所有线程结束后再继续执行主线程

- import threading
- cond=threading.Condition() # 新建一个条件对象
- self.cond.acquire() # 获取锁
- self.cond.wait() # 线程阻塞,等待通知
- self.cond.notify() # 唤醒其他wait状态的线程
- self.cond.release() # 释放锁

- import threading
- # 新建一个条件对象
- cond=threading.Condition()
- class thread1(threading.Thread):
- def __init__(self,cond,name):
- threading.Thread.__init__(self,name=name)
- self.cond=cond
- def run(self):
- self.cond.acquire() # 获取锁
- print(self.name+":一支穿云箭") # 线程1说的第1句话
- self.cond.notify() # 唤醒其他wait状态的线程(通知线程2说话)
- self.cond.wait() # 线程阻塞,等待通知
- print(self.name+":山无楞,天地合,乃敢与君决") # 线程1说的第2句话
- self.cond.notify() # 唤醒其他wait状态的线程(通知线程2说话)
- self.cond.wait() # 线程阻塞,等待通知
- print(self.name+":紫薇") # 线程1说的第3句话
- self.cond.notify() # 唤醒其他wait状态的线程(通知线程2说话)
- self.cond.wait() # 线程阻塞,等待通知
- print(self.name+":是你") # 线程1说的第4句话
- self.cond.notify() # 唤醒其他wait状态的线程(通知线程2说话)
- self.cond.wait() # 线程阻塞,等待通知
- print(self.name+":有钱吗,借点?") # 线程1说的第5句话
- self.cond.notify() # 唤醒其他wait状态的线程(通知线程2说话)
- self.cond.release() # 释放锁
- class thread2(threading.Thread):
- def __init__(self,cond,name):
- threading.Thread.__init__(self,name=name)
- self.cond=cond
- def run(self):
- self.cond.acquire() # 获取锁
- self.cond.wait() # 线程阻塞,等待通知
- print(self.name+":千军万马来相见") # 线程2说的第1句话
- self.cond.notify() # 唤醒其他wait状态的线程(通知线程2说话)
- self.cond.wait() # 线程阻塞,等待通知
- print(self.name+":海可枯,石可烂,激情永不散") # 线程2说的第3句话
- self.cond.notify() # 唤醒其他wait状态的线程(通知线程2说话)
- self.cond.wait() # 线程阻塞,等待通知
- print(self.name+":尔康") # 线程2说的第3句话
- self.cond.notify() # 唤醒其他wait状态的线程(通知线程2说话)
- self.cond.wait() # 线程阻塞,等待通知
- print(self.name+":是我") # 线程2说的第4句话
- self.cond.notify() # 唤醒其他wait状态的线程(通知线程2说话)
- self.cond.wait() # 线程阻塞,等待通知
- print(self.name+":滚!") # 线程2说的第5句话
- self.cond.release()
- if __name__=='__main__':
- thread1=thread1(cond,'线程1')
- thread2=thread2(cond,'线程2')
- # 虽然是线程1先说话,但是不能让它先启动。因为线程1先启动的话,发出notify指令,而线程2可能还未启动,导致notify指令无法接收。
- thread2.start() # 线程2先执行
- thread1.start()

- import threading
- local_data=threading.local() # 定义线程内全局变量
- local_data.name='local_data' # 初始名称(主线程)
- class MyThread(threading.Thread):
- def run(self):
- print("赋值前-子线程:",threading.current_thread(),local_data.__dict__) # local_data.__dict__:打印对象所有属性
- # 在子线程中修改local_data.name的值
- local_data.name=self.name
- print("赋值后-子线程:",threading.current_thread(),local_data.__dict__)
- if __name__=='__main__':
- print("开始前-主线程:",local_data.__dict__)
- # 启动两个线程
- t1=MyThread()
- t1.start()
- t1.join()
- t2=MyThread()
- t2.start()
- t2.join()
- print("开始后-主线程:",local_data.__dict__)

- import threading,time
- # 模拟爬取网站内容的线程
- class HtmlSpider(threading.Thread):
- def __init__(self,url,sem):
- super().__init__()
- self.url=url
- self.sem=sem
- def run(self):
- time.sleep(2) # 模拟网络等待
- print("获取网页内容成功!")
- self.sem.release() # 释放信号量
- # 模拟爬取网站链接的线程
- class UrlProducer(threading.Thread):
- def __init__(self,sem):
- super().__init__()
- self.sem=sem
- def run(self):
- for i in range(20):
- self.sem.acquire() # 获取信号量
- html_thread=HtmlSpider(f'https://www.baidu.com/{i}',self.sem) # 模拟20个网址,并爬取内容
- html_thread.start()
- if __name__=='__main__':
- sem=threading.Semaphore(value=3) # 同步信号量
- url_producer=UrlProducer(sem) # 创建获取链接的线程对象
- url_producer.start() # 启动线程



- from concurrent.futures import ThreadPoolExecutor # 线程池模块
- executor = ThreadPoolExecutor(max_workers=3) # 创建线程池对象,max_worker为最大线程数
- task1=executor.submit() # 提交需要线程完成的任务
- from concurrent.futures import ThreadPoolExecutor
- import time
- # 创建一个新的线程池对象,并且指定线程池中最大的线程数为3
- executor = ThreadPoolExecutor(max_workers=3)
- def get_html(times):
- time.sleep(times)
- print(f'获取网页信息{times}完毕')
- return times
- # 通过submit方法提交执行的函数到线程池中,submit函数会立刻返回,不阻塞主线程
- # 只要线程池中有可用线程,就会自动分配线程去完成对应的任务
- task1=executor.submit(get_html,1)
- task2=executor.submit(get_html,2)
- task3=executor.submit(get_html,3)
- task4=executor.submit(get_html,2) # 多余的任务需要等待线程池中有空闲的线程

- from concurrent.futures import ThreadPoolExecutor
- import time
- # 创建一个新的线程池对象,并且指定线程池中最大的线程数为3
- executor = ThreadPoolExecutor(max_workers=3)
- def get_html(times):
- time.sleep(times)
- print(f'获取网页信息{times}完毕')
- return times
- # 通过submit方法提交执行的函数到线程池中,submit函数会立刻返回,不阻塞主线程
- # 只要线程池中有可用线程,就会自动分配线程去完成对应的任务
- task1=executor.submit(get_html,1)
- task2=executor.submit(get_html,2)
- task3=executor.submit(get_html,3)
- task4=executor.submit(get_html,2) # 多余的任务需要等待线程池中有空闲的线程
- print(task1.done()) # 检查任务是否完成,并返回结果
- print(task4.cancel()) # 取消任务的执行,该任务没有放入线程池中才能取消
- print(task3.result()) # 拿到任务的执行结果,该方法是一个阻塞方法

- from concurrent.futures import ThreadPoolExecutor,as_completed
- import time
- # 创建一个新的线程池对象,并且指定线程池中最大的线程数为3
- executor = ThreadPoolExecutor(max_workers=3)
- def get_html(times):
- time.sleep(times)
- print(f'获取网页信息{times}完毕')
- return times
- urls=[4,2,3] # 通过urls列表模拟要抓取的url
- # 通过列表推导式改造多线程任务
- all_task=[executor.submit(get_html,url) for url in urls]
- # 按照任务完成顺序返回结果
- for item in as_completed(all_task): # as_completed是一个生成器,在任务没有完成之前是阻塞的
- data=item.result()
- print(f"主线程中获取任务的返回值是:{data}")

- from concurrent.futures import ThreadPoolExecutor
- import time
- # 创建一个新的线程池对象,并且指定线程池中最大的线程数为3
- executor = ThreadPoolExecutor(max_workers=3)
- def get_html(times):
- time.sleep(times)
- print(f'获取网页信息{times}完毕')
- return times
- urls=[4,2,3] # 通过urls列表模拟要抓取的url
- # map是一个生成器,不需要submit,直接将任务分配给线程池
- for data in executor.map(get_html,urls):
- print(f"主线程中获取任务的返回值是:{data}")

- from concurrent.futures import ThreadPoolExecutor,wait,ALL_COMPLETED
- import time
- # 创建一个新的线程池对象,并且指定线程池中最大的线程数为3
- executor = ThreadPoolExecutor(max_workers=3)
- def get_html(times):
- time.sleep(times)
- print(f'获取网页信息{times}完毕')
- return times
- urls=[4,2,3] # 通过urls列表模拟要抓取的url
- all_task=[executor.submit(get_html,url) for url in urls] # 任务列表
- wait(all_task,return_when=ALL_COMPLETED) # 让主线程阻塞,ALL_COMPLETED直到线程池中的所有线程任务执行完毕
- print('代码执行完毕')

- # 使用concurrent futures模块提供的ProcessPoolExecutor来实现进程池
- from concurrent.futures import ProcessPoolExecutor # 使用方法与线程池的完全一致
- # 使用Pool类来实现进程池
- from multiprocessing
- multiprocessing.Pool()
- import multiprocessing
- import time
- def get_html(n):
- time.sleep(n)
- print(f"子进程{n}获取内容成功")
- return n
- if __name__=='__main__':
- pool=multiprocessing.Pool(multiprocessing.cpu_count()) # 设置进程池,进程数量是电脑cpu的核心数
- result=pool.apply_async(get_html,args=(3,)) # 异步方式调用
- pool.close() # 这个方法必须在join前调用
- pool.join() # 子进程未执行完前,会阻塞主进程代码
- print(result.get()) # 拿到子进程执行的结果
- print("end")

- import multiprocessing
- import time
- def get_html(n):
- time.sleep(n)
- print(f"子进程{n}获取内容成功")
- return n
- if __name__=='__main__':
- pool=multiprocessing.Pool(multiprocessing.cpu_count()) # 设置进程池,进程数量是电脑cpu的核心数
- for result in pool.imap(get_html,[4,2,3]):
- print(f"{result}休眠执行成功!")
