全局变量在多个进程中不共享资源,进程之间的数据是独立的,默认情况下是互不影响的。
示例代码:
- from multiprocessing import Process
-
- num = 1
-
-
- def task1():
- global num
- num += 5
- print("子进程1运行,num:", num)
-
-
- def task2():
- global num
- num += 10
- print("子进程2运行,num:", num)
-
-
- if __name__ == '__main__':
- print("父进程开始运行...")
- p1 = Process(target=task1)
- p2 = Process(target=task2)
- p1.start()
- p2.start()
- p1.join()
- p2.join()
运行结果:
Queue 是多进程安全的队列,可以使用 Queue 实现多进程之间的数据传递。
put 方法用以插入数据到队列中, put 方法还有两个可选参数: blocked 和 timeout。如果 blocked 为 True(默认值),并且 timeout 为正值,该方法会阻塞 timeout 指定的时间,直到该队列有剩余的空间。如果超时,会抛出 Queue.full 异常。如果 blocked 为 False,但该 Queue 已满,会立即抛出 Queue.full 异常。
get 方法可以从队列读取并且删除一个元素。同样, get 方法有两个可选参数: blocked和 timeout。如果 blocked 为 True(默认值),并且 timeout 为正值,那么在等待时间内没有取到任何元素,会抛出 Queue.Empty 异常。如果 blocked 为 False,有两种情况存在,如果Queue 有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty 异常
示例代码:
- from multiprocessing import Queue
-
-
- q = Queue(3)
- q.put('msg1')
- q.put('msg2')
- print('消息队列是否已满:', q.full())
- q.put('msg3')
- print('消息队列是否已满:', q.full())
- # q.put('msg4') # 以为消息队列已经满了,需要直接写入需要等待,如果超时会抛出异常
-
- # 写入数据时先判断,判断队列是否已满
- if not q.full():
- q.put('msg4')
-
- # 同理,取消息时可以先判断队列是否有数据
- if not q.empty():
- for _ in range(q.qsize()):
- print(q.get())
运行结果:
示例代码:
- from multiprocessing import Process
- from multiprocessing import Queue
- import random
- import os
-
-
- # 向queue中输入数据的函数
- def inputQ(queue):
- info = random.randint(1, 100)
- queue.put(info)
- print('进程{}往队列中存了一个数据:{}'.format(os.getpid(), info))
-
-
- # 向queue中输出数据的函数
- def outputQ(queue):
- info = queue.get()
- print('进程{}从队列中取出一个数据:{}'.format(os.getpid(), info))
-
-
- if __name__ == '__main__':
-
- queue = Queue(5)
- lst_1 = []
- lst_2 = []
- for i in range(3):
- process = Process(target=inputQ, args=(queue,))
- process.start()
- lst_1.append(process)
-
- # 输出进程
- for i in range(2):
- process = Process(target=outputQ, args=(queue,))
- process.start()
- lst_2.append(process)
-
- for p in lst_1:
- p.join()
-
- for p in lst_2:
- p.join()
运行结果:
示例代码:
- import time
- from multiprocessing import Queue, Process
-
-
- def write_data(q):
- # 将列表元素写入到队列中
- for i in ['aa', 'bb', 'cc', 'dd']:
- print('开始写入值%s' %i)
- q.put(i)
- time.sleep(1)
-
-
- def read_data(q):
- print("开始读取数据...")
- while True:
- if not q.empty():
- print("读取到数据:", q.get())
- time.sleep(1)
- else:
- break
-
-
- if __name__ == '__main__':
- # 创建队列
- q = Queue()
- # 创建进程
- qw = Process(target=write_data, args=(q, ))
- qr = Process(target=read_data, args=(q, ))
-
- # 启动进程
- qw.start()
- qr.start()
- qw.join()
- qr.join()
运行结果:
如果使用 Pool 创建进程,就需要使用 multiprocessing.Manager()中的 Queue()来完成进程间的通信,而不是 multiprocessing.Queue(),否则会抛出异常。
示例代码:
- import time
- from multiprocessing import Manager, Pool, Queue
-
-
- def write_data(q):
- # 将列表元素写入到队列中
- for i in ['aa', 'bb', 'cc', 'dd']:
- print('开始写入值%s' %i)
- q.put(i)
- time.sleep(1)
-
-
- def read_data(q):
- print("开始读取数据...")
- while True:
- if not q.empty():
- print("读取到数据:", q.get())
- time.sleep(1)
- else:
- break
-
-
- if __name__ == '__main__':
- # 创建队列
- q = Manager().Queue()
- # q = Queue() # 直接这样使用Queue()会报错
- # 创建进程池
- p = Pool(3)
- # 使用apply阻塞模式创建进程
- p.apply(write_data, (q, ))
- p.apply(read_data, (q, ))
- p.close()
- p.join()
运行结果:
示例代码:
- import os
- from multiprocessing import Manager, Process
-
-
- # 定义了一个foo函数,接收一个字典和一个列表
- def foo(dic, lst):
- # 字典和列表都放进程ID
- dic[os.getpid()] = os.getpid()
- lst.append(os.getpid())
-
-
- if __name__ == '__main__':
-
- # 生成Manager对象
- manager = Manager()
- dic = manager.dict()
- print(dic)
- lst = manager.list(range(3))
- print(lst)
-
- # 10个进程分别join
- p_list = []
- for i in range(10):
- p = Process(target=foo, args=(dic, lst))
- p.start()
- p_list.append(p)
-
- for res in p_list:
- res.join()
-
- # 打印字典和列表
- print(dic)
- print(lst)
运行结果:
示例代码:
- from multiprocessing import Pool, current_process, Manager
- import time
-
-
- def produce_data(queue):
- for i in range(10):
- queue.put(i)
-
-
- def consume_data(queue):
- while queue.qsize() > 0:
- data = queue.get() # 注意:当get()拿不到数据时,会一直处于等待状态
- print(f"当前进程为:{current_process().name}, 队列获取数据为:{data},队列剩余数据为:{queue.qsize()}个!")
- time.sleep(0.01)
-
-
- if __name__ == '__main__':
- print(f"主进程{current_process().name}开始执行!")
- p = Pool(processes=6, maxtasksperchild=6)
- queue = Manager().Queue(maxsize=20)
-
- p.apply_async(produce_data, args=(queue, ))
- time.sleep(1)
- for i in range(5):
- p.apply_async(consume_data, args=(queue, ))
-
- p.close() # 关闭进程池,防止将任何其他任务提交到池中。需要在join之前调用,否则会报ValueError: Pool is still running错误
- p.join() # 等待进程池中的所有进程执行完毕
- print(f"主进程{current_process().name}结束!")
运行结果:
注意:
示例代码:
- from multiprocessing import Process, current_process
- import time
- import random
-
- lst = []
-
-
- def task(i):
- print(current_process().name, i, 'start...') # current_process().name输出进程的名字
- time.sleep(random.randint(1, 4))
- lst.append(i)
- print(lst)
- print(current_process().name, i, 'end.....')
-
-
- if __name__ == "__main__":
- p_lst = []
- for i in range(4):
- p = Process(target=task, args=(i, ))
- p_lst.append(p)
- p.start()
- for p in p_lst:
- p.join() # 阻塞当前进程,直到子进程全部退出
- print("main end.......")
运行结果:
进程池里有固定数量的进程,每次执行任务时都从进程池中取出一个空闲进程来执行,如果任务数量超过进程池中进程数量,那么就等待已经在执行的任务结束之后,有进程空闲之后再执行,也就是说,同一时间,只有固定数量的进程在执行,这样对操作系统得压力也不会太大,效率也得到保证。
示例代码:
- from multiprocessing import Pool, current_process
- import time
- import random
-
- lst = []
-
-
- def task(i):
- print(current_process().name, i, 'start...')
- time.sleep(random.randint(1, 5))
- lst.append(i)
- print(lst)
- print(current_process().name, i, 'end.....')
-
-
- if __name__ == "__main__":
- p = Pool(processes=3, maxtasksperchild=3)
- for i in range(10):
- p.apply_async(func=task, args=(i,)) # 进程池接收任务
- p.close() # 关闭进程池 ==》 不接受任务
- p.join() # 等待子进程执行完毕,父进程再执行
- print("end.............")
运行结果:
示例代码: 【同步执行】
- import os
- import time
- import random
- from multiprocessing import Pool
-
-
- def func1(n):
- print('任务{}开始执行,进程为:{}'.format(n, os.getpid()))
-
- time.sleep(random.randint(1, 4))
-
- print('任务{}结束执行,进程为:{}'.format(n, os.getpid()))
-
-
- if __name__ == '__main__':
- # c创建一个进程池,里面有三个进程
- p = Pool(3)
-
- for i in range(10):
- res = p.apply(func1, args=(i,))
运行结果:
示例代码: 【异步执行】
- import os
- import time
- import random
- from multiprocessing import Pool
-
-
- def func1(n):
- print('任务{}开始执行,进程为:{}'.format(n, os.getpid()))
-
- time.sleep(random.randint(1, 4))
-
- print('任务{}结束执行,进程为:{}'.format(n, os.getpid()))
-
-
- if __name__ == '__main__':
- # c创建一个进程池,里面有三个进程
- p = Pool(3)
-
- for i in range(5):
- res = p.apply_async(func1, args=(i,))
-
- p.close() # 一定要关闭
-
- p.join() # 一定要使用join,不然进程池里的进程没来得及执行,主进程结束了,子进程也都跟着结束。
运行结果:
共享变量不适用于多进程,进程间的变量是互相隔离的,子进程的全局变量是完全复制一份父进程的数据,对子进程的全局变量修改完全影响不到其他进程的全局变量。
示例代码:
- import time
- from multiprocessing import Process
-
-
- def producer(a):
- a += 1
- time.sleep(2)
-
-
- def consumer(a):
- time.sleep(3)
- data = a
- print(data)
-
-
- if __name__ == "__main__":
- a = 1
- my_producer = Process(target=producer, args=(a, ))
- my_consumer = Process(target=consumer, args=(a, ))
- my_producer.start()
- my_consumer.start()
- my_producer.join()
- my_consumer.join()
-
- # 输出结果为1
运行结果:
示例代码: 【进程之间的变量是无法共享的,即使是全局变量也是不能共享的】
- from multiprocessing import Process
- import os
-
-
- def func():
- global n
- n = 10
- print('子进程pid:{},n:{}'.format(os.getppid(), n))
-
-
- if __name__ == '__main__':
- n = 100
- print('主进程pid:{},n:{}'.format(os.getppid(), n))
- p = Process(target=func)
- p.start()
- p.join()
- print('主进程中输出n:{}'.format(n))
运行结果:
Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道。dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。
示例代码:
- import time
- from multiprocessing import Process, Queue, Pool, Manager, Pipe
-
-
- def producer(pipe):
- pipe.send("a")
- time.sleep(3)
- print(pipe.recv())
-
-
- def consumer(pipe):
- time.sleep(2)
- data = pipe.recv()
- pipe.send("b")
- print(data)
-
-
- if __name__ == "__main__":
- # Pipe实现两进程间通信
- s_pipe, r_pipe = Pipe()
- pool = Pool()
- pool.apply_async(producer, args=(s_pipe, ))
- pool.apply_async(consumer, args=(r_pipe, ))
- pool.close()
- pool.join()
运行结果:
示例代码:
- from multiprocessing import Process, Pipe
-
-
- def f(conn):
- conn.send('主进程,你好呀!') # 发送数据给主进程
- print('子进程收到主进程发来的数据:{}'.format(conn.recv()))
- conn.close() # 关闭
-
-
- if __name__ == '__main__':
- # Pipe是一个函数,返回的是一个元组
- parent_conn, child_conn = Pipe()
- # 创建一个子进程
- p = Process(target=f, args=(child_conn,))
- p.start()
- print("主进程收到子进程发来的数据:{}".format(parent_conn.recv()))
- parent_conn.send('子进程,你好啊!') # 发送数据给子进程
-
- p.join()
运行结果:
当多个进程对同一资源进行IO操作时,需要对资源“上锁”,否则会出现意外结果。上锁之后,同一件就只能有一个进程运行上锁的代码块。例如有一个txt文件,里面内容是一个数字10,我们要用多进程去读取这个文件的值,然后每读一次,让txt中的这个数字减1,不加锁时代码如下:
- import time
- import os
- from multiprocessing import Process
- from multiprocessing import Lock
-
-
- def func():
- if os.path.exists('num.txt'):
- with open('num.txt', 'r') as rf:
- num = int(rf.read())
- num -= 1
- time.sleep(1)
- with open('num.txt', 'w') as wf:
- wf.write(str(num))
- else:
- with open('num.txt', 'w') as wf:
- wf.write('10')
-
-
- if __name__ == '__main__':
-
- print("主进程开始运行……")
- p_list = []
- for i in range(10):
- p = Process(target=func)
- p_list.append(p)
- p.start()
-
- for p in p_list:
- p.join()
-
- with open('num.txt', 'r') as f:
- num = int(f.read())
-
- print('最后结果为:{}'.format(num))
- print("主进程结束运行……" )
运行结果:
虽然用了10个进程读取并修改txt文件,但最后的值却不是1。这正是多进程共同访问资源造成混乱造成的。要达到预期结果,就要给资源上锁:
- import time
- import os
- from multiprocessing import Process
- from multiprocessing import Lock
-
-
- def func(lock):
- if os.path.exists('num.txt'):
- lock.acquire()
- with open('num.txt', 'r') as f:
- num = int(f.read())
- num -= 1
- time.sleep(1)
-
- with open('num.txt', 'w') as f:
- f.write(str(num))
- lock.release()
- else:
- with open('num.txt', 'w') as f:
- f.write('10')
-
-
- if __name__ == '__main__':
-
- print("主进程开始运行……")
- lock = Lock()
- p_list = []
- for i in range(10):
- p = Process(target=func, args=(lock,))
- p_list.append(p)
- p.start()
-
- for p in p_list:
- p.join()
-
- with open('num.txt', 'r') as f:
- num = int(f.read())
-
- print('最后结果为:{}'.format(num))
- print("主进程结束运行……")
运行结果:
果然,用了进程锁之后获得了预料中的结果。但是,如果你运行了上面两块代码你就会发现,加了锁之后,程序明显变慢了很多,因为程序成了串行的了,当然好处是数据安全有保证。
锁同时只允许一个线程更改数据,而信号量是同时允许一定数量的进程更改数据 。假如有一下应用场景:有10个人吃饭,但只有一张餐桌,只允许做3个人,没上桌的人不允许吃饭,已上桌吃完饭离座之后,下面的人才能抢占桌子继续吃饭,如果不用信号量,肯定是10人一窝蜂一起吃饭:
- from multiprocessing import Process
- import time
- import random
-
-
- def fun(i):
- print('{}号顾客上座,开始吃饭'.format(i))
- time.sleep(random.randint(3, 8))
- print('{}号顾客吃完饭了,离座'.format(i))
-
-
- if __name__ == '__main__':
-
- for i in range(10):
- p = Process(target=fun, args=(i,))
- p.start()
运行结果:
用了信号量,实现了轮流吃饭,每次只有3个人吃饭:
示例代码:
- from multiprocessing import Process
- import time
- import random
- from multiprocessing import Semaphore
-
-
- def fun(i , sem):
- sem.acquire()
- print('{}号顾客上座,开始吃饭'.format(i))
- time.sleep(random.randint(3, 8))
- print('{}号顾客吃完饭了,离座'.format(i))
- sem.release()
-
-
- if __name__ == '__main__':
-
- sem = Semaphore(3)
- for i in range(10):
- p = Process(target=fun, args=(i,sem))
- p.start()
运行结果:
事实上,Semaphore的作用也类似于锁,只不过在锁机制上添加了一个计数器,允许多个人拥有“钥匙”。
python进程的事件用于主进程控制其他子进程的执行,Event类有如下几个主要方法:
1)wait() 插入在进程中插入一个标记(flag)默认为 False,当 flag为False时,程序会停止运行进入阻塞状态;
2)set() 使flag为True,程序会进入非阻塞状态
3)clear() 使flag为False,程序会停止运行,进入阻塞状态
4)is_set() 判断flag 是否为True,是的话返回True,不是则返回False
有如下需求:获取当前时间的秒数的个位数,如果小于5,则设置子进程阻塞,如果大于5则设置子进程非阻塞。代码如下:
- from multiprocessing import Event, Process
- import time
- from datetime import datetime
-
-
- def func(e):
- print('子进程:开始运行……')
- while True:
- print('子进程:现在事件秒数是{}'.format(datetime.now().second))
- e.wait() # 阻塞等待信号 这里插入了一个flag 默认为 False
- time.sleep(1)
-
-
- if __name__ == '__main__':
-
- e = Event()
- p = Process(target=func, args=(e,))
- p.start()
- for i in range(10):
- s = int(str(datetime.now().second)[-1]) # 获取当前秒数的个位数
- if s < 5:
- print('子进程进入阻塞状态')
- e.clear() # 使插入的flag为False 进程进入阻塞状态
- else:
- print('子进程取消阻塞状态')
- e.set() # 使插入的flag为True,进程进入非阻塞状态
- time.sleep(1)
- e.set()
- time.sleep(3)
- p.terminate()
- print("主进程运行结束……")
运行结果:
示例代码:
- import time
- import os
- import random
- from multiprocessing.pool import ThreadPool
-
-
- def task():
- print(f'开始执行任务:{os.getpid()}')
- time.sleep(random.randint(0, 5))
- print(f"执行任务结束:{os.getpid()}")
-
-
- if __name__ == '__main__':
- pool = ThreadPool(2)
- for i in range(5):
- pool.apply_async(task)
- pool.close()
- pool.join()
运行结果:
参考博文:
Python并发编程系列之多进程(multiprocessing) - 走看看