多进程实现方式一
- from multiprocessing import Process
-
- def func1(name):
- print("测试 %s 多进程" %name)
-
- if __name__ == "__main__":
- process_list = []
-
- for i in range(5):
- p = Process(target = func1, args = ('Python', ))
- p.start()
- process_list.append(p)
-
- for i in process_list:
- p.join()
-
- print("结束测试")
多进程实现方式二
- from multiprocessing import Process
-
- class MyProcess(Process): # 继承Process类
- def __init__(self, name):
- super(MyProcess, self).__init__()
- self.name = name
-
- def run(self):
- print("测试%s多进程" % self.name)
-
- if __name__ == '__main__':
- process_list = []
-
- for i in range(5):
- p = MyProcess('Python {}'.format(i))
- p.start()
- process_list.append(p)
-
- for i in process_list:
- p.join()
-
- print('测试结束')
管道(命名管道、匿名管道)
消息队列
共享内存
信号量
信号
Socket
调用Pipe()返回管道两端的Connection;Pipe的读写效率高于Queue,进程间的Pipe基于fork机制建立。
Python官方文档的描述:
Returns a pair (conn1, conn2) of Connection objects representing the ends of a pipe.
新建Pipe(duplex)的时候
Pipe的方法包含 send()、recv()、close();当pipe的输入端被关闭,且无法接受接收到输入端的值,则会抛出EOFError。
- from multiprocessing import Process, Pipe
- import time
-
- def func1(conn):
- print("子进程发送消息:")
- conn.send("你好主进程")
- print("子进程接收消息:")
- print(conn.recv())
- conn.close()
-
- if __name__ == '__main__':
- conn1, conn2 = Pipe()
-
- p = Process(target = func1, args = (conn2, ))
- p.start()
-
- time.sleep(1)
- print('主进程接受消息:')
- print(conn1.recv())
- print('主进程发送消息:')
- conn1.send("你好子进程")
- p.join()
- print('结束测试')
Queue是基于Pipe实现的
Queue的使用主要是一边put(),一边get();但是Queue可以是多个Process进行put操作,也可以是多个Process进行get操作。
- from multiprocessing import Process, Queue
- import time
-
- def func1(queue, index):
- print("子进程 {} 开始put数据".format(index))
- queue.put("子进程 {}, 通过Queue开始通信".format(index))
-
- if __name__ == "__main__":
- queue = Queue()
-
- process_list = []
- for i in range(3):
- p = Process(target = func1, args = (queue, i, ))
- p.start()
- process_list.append(p)
-
- print("开始join")
- for i in process_list:
- p.join()
-
- print('主进程获取Queue数据')
-
- for i in range(3):
- print(queue.get())
-
- print("结束测试")
Queue,Pipe仅实现数据交互, 并未实现数据共享。进程间交互使用到managers
- from concurrent.futures import ThreadPoolExecutor
- import time
-
- # 设置最大的线程个数
- executor = ThreadPoolExecutor(max_workers = 8)
- all_num = 100
-
- def worker(index):
- time.sleep( (int)(index) / 5)
- print(index)
-
- def run():
- time1 = time.time()
- for id_ in range(all_num):
- executor.submit(worker,id_)
-
- executor.shutdown(wait = True)
- time2 = time.time()
- print(time2 - time1)
-
- if __name__ == "__main__":
- run()