• python多进程中常用方法用法详解


    1、进程间的通信

            全局变量在多个进程中不共享资源,进程之间的数据是独立的,默认情况下是互不影响的。

    示例代码

    1. from multiprocessing import Process
    2. num = 1
    3. def task1():
    4. global num
    5. num += 5
    6. print("子进程1运行,num:", num)
    7. def task2():
    8. global num
    9. num += 10
    10. print("子进程2运行,num:", num)
    11. if __name__ == '__main__':
    12. print("父进程开始运行...")
    13. p1 = Process(target=task1)
    14. p2 = Process(target=task2)
    15. p1.start()
    16. p2.start()
    17. p1.join()
    18. p2.join()

    运行结果:

    2、用 Queue() 实现多进程之间的数据传递

            Queue 是多进程安全的队列,可以使用 Queue 实现多进程之间的数据传递。

            put 方法用以插入数据到队列中, put 方法还有两个可选参数: blocked 和 timeout。如果 blocked 为 True(默认值),并且 timeout 为正值,该方法会阻塞 timeout 指定的时间,直到该队列有剩余的空间。如果超时,会抛出 Queue.full 异常。如果 blocked 为 False,但该 Queue 已满,会立即抛出 Queue.full 异常。

            get 方法可以从队列读取并且删除一个元素。同样, get 方法有两个可选参数: blocked和 timeout。如果 blocked 为 True(默认值),并且 timeout 为正值,那么在等待时间内没有取到任何元素,会抛出 Queue.Empty 异常。如果 blocked 为 False,有两种情况存在,如果Queue 有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty 异常

    示例代码:

    1. from multiprocessing import Queue
    2. q = Queue(3)
    3. q.put('msg1')
    4. q.put('msg2')
    5. print('消息队列是否已满:', q.full())
    6. q.put('msg3')
    7. print('消息队列是否已满:', q.full())
    8. # q.put('msg4') # 以为消息队列已经满了,需要直接写入需要等待,如果超时会抛出异常
    9. # 写入数据时先判断,判断队列是否已满
    10. if not q.full():
    11. q.put('msg4')
    12. # 同理,取消息时可以先判断队列是否有数据
    13. if not q.empty():
    14. for _ in range(q.qsize()):
    15. print(q.get())

    运行结果:

    示例代码:

    1. from multiprocessing import Process
    2. from multiprocessing import Queue
    3. import random
    4. import os
    5. # 向queue中输入数据的函数
    6. def inputQ(queue):
    7. info = random.randint(1, 100)
    8. queue.put(info)
    9. print('进程{}往队列中存了一个数据:{}'.format(os.getpid(), info))
    10. # 向queue中输出数据的函数
    11. def outputQ(queue):
    12. info = queue.get()
    13. print('进程{}从队列中取出一个数据:{}'.format(os.getpid(), info))
    14. if __name__ == '__main__':
    15. queue = Queue(5)
    16. lst_1 = []
    17. lst_2 = []
    18. for i in range(3):
    19. process = Process(target=inputQ, args=(queue,))
    20. process.start()
    21. lst_1.append(process)
    22. # 输出进程
    23. for i in range(2):
    24. process = Process(target=outputQ, args=(queue,))
    25. process.start()
    26. lst_2.append(process)
    27. for p in lst_1:
    28. p.join()
    29. for p in lst_2:
    30. p.join()

    运行结果:

    3、Queue 队列实现进程间通信

    示例代码:

    1. import time
    2. from multiprocessing import Queue, Process
    3. def write_data(q):
    4. # 将列表元素写入到队列中
    5. for i in ['aa', 'bb', 'cc', 'dd']:
    6. print('开始写入值%s' %i)
    7. q.put(i)
    8. time.sleep(1)
    9. def read_data(q):
    10. print("开始读取数据...")
    11. while True:
    12. if not q.empty():
    13. print("读取到数据:", q.get())
    14. time.sleep(1)
    15. else:
    16. break
    17. if __name__ == '__main__':
    18. # 创建队列
    19. q = Queue()
    20. # 创建进程
    21. qw = Process(target=write_data, args=(q, ))
    22. qr = Process(target=read_data, args=(q, ))
    23. # 启动进程
    24. qw.start()
    25. qr.start()
    26. qw.join()
    27. qr.join()

    运行结果:

    4、Manage()的使用

            如果使用 Pool 创建进程,就需要使用 multiprocessing.Manager()中的 Queue()来完成进程间的通信,而不是 multiprocessing.Queue(),否则会抛出异常。

    示例代码:

    1. import time
    2. from multiprocessing import Manager, Pool, Queue
    3. def write_data(q):
    4. # 将列表元素写入到队列中
    5. for i in ['aa', 'bb', 'cc', 'dd']:
    6. print('开始写入值%s' %i)
    7. q.put(i)
    8. time.sleep(1)
    9. def read_data(q):
    10. print("开始读取数据...")
    11. while True:
    12. if not q.empty():
    13. print("读取到数据:", q.get())
    14. time.sleep(1)
    15. else:
    16. break
    17. if __name__ == '__main__':
    18. # 创建队列
    19. q = Manager().Queue()
    20. # q = Queue() # 直接这样使用Queue()会报错
    21. # 创建进程池
    22. p = Pool(3)
    23. # 使用apply阻塞模式创建进程
    24. p.apply(write_data, (q, ))
    25. p.apply(read_data, (q, ))
    26. p.close()
    27. p.join()

    运行结果:

    示例代码:

    1. import os
    2. from multiprocessing import Manager, Process
    3. # 定义了一个foo函数,接收一个字典和一个列表
    4. def foo(dic, lst):
    5. # 字典和列表都放进程ID
    6. dic[os.getpid()] = os.getpid()
    7. lst.append(os.getpid())
    8. if __name__ == '__main__':
    9. # 生成Manager对象
    10. manager = Manager()
    11. dic = manager.dict()
    12. print(dic)
    13. lst = manager.list(range(3))
    14. print(lst)
    15. # 10个进程分别join
    16. p_list = []
    17. for i in range(10):
    18. p = Process(target=foo, args=(dic, lst))
    19. p.start()
    20. p_list.append(p)
    21. for res in p_list:
    22. res.join()
    23. # 打印字典和列表
    24. print(dic)
    25. print(lst)

    运行结果:

    示例代码:

    1. from multiprocessing import Pool, current_process, Manager
    2. import time
    3. def produce_data(queue):
    4. for i in range(10):
    5. queue.put(i)
    6. def consume_data(queue):
    7. while queue.qsize() > 0:
    8. data = queue.get() # 注意:当get()拿不到数据时,会一直处于等待状态
    9. print(f"当前进程为:{current_process().name}, 队列获取数据为:{data},队列剩余数据为:{queue.qsize()}个!")
    10. time.sleep(0.01)
    11. if __name__ == '__main__':
    12. print(f"主进程{current_process().name}开始执行!")
    13. p = Pool(processes=6, maxtasksperchild=6)
    14. queue = Manager().Queue(maxsize=20)
    15. p.apply_async(produce_data, args=(queue, ))
    16. time.sleep(1)
    17. for i in range(5):
    18. p.apply_async(consume_data, args=(queue, ))
    19. p.close() # 关闭进程池,防止将任何其他任务提交到池中。需要在join之前调用,否则会报ValueError: Pool is still running错误
    20. p.join() # 等待进程池中的所有进程执行完毕
    21. print(f"主进程{current_process().name}结束!")

    运行结果:

    注意:

    • p.close()  # 关闭进程池,防止将任何其他任务提交到池中。需要在join之前调用,否则会报ValueError: Pool is still running错误
    • p.join()  # 等待进程池中的所有进程执行完毕
    • p.close()是关掉进程池子,是不再向里面添加进程了,对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。

    5、current_process()的使用

    示例代码:

    1. from multiprocessing import Process, current_process
    2. import time
    3. import random
    4. lst = []
    5. def task(i):
    6. print(current_process().name, i, 'start...') # current_process().name输出进程的名字
    7. time.sleep(random.randint(1, 4))
    8. lst.append(i)
    9. print(lst)
    10. print(current_process().name, i, 'end.....')
    11. if __name__ == "__main__":
    12. p_lst = []
    13. for i in range(4):
    14. p = Process(target=task, args=(i, ))
    15. p_lst.append(p)
    16. p.start()
    17. for p in p_lst:
    18. p.join() # 阻塞当前进程,直到子进程全部退出
    19. print("main end.......")

    运行结果:

    6、进程池

            进程池里有固定数量的进程,每次执行任务时都从进程池中取出一个空闲进程来执行,如果任务数量超过进程池中进程数量,那么就等待已经在执行的任务结束之后,有进程空闲之后再执行,也就是说,同一时间,只有固定数量的进程在执行,这样对操作系统得压力也不会太大,效率也得到保证。

    示例代码:

    1. from multiprocessing import Pool, current_process
    2. import time
    3. import random
    4. lst = []
    5. def task(i):
    6. print(current_process().name, i, 'start...')
    7. time.sleep(random.randint(1, 5))
    8. lst.append(i)
    9. print(lst)
    10. print(current_process().name, i, 'end.....')
    11. if __name__ == "__main__":
    12. p = Pool(processes=3, maxtasksperchild=3)
    13. for i in range(10):
    14. p.apply_async(func=task, args=(i,)) # 进程池接收任务
    15. p.close() # 关闭进程池 ==》 不接受任务
    16. p.join() # 等待子进程执行完毕,父进程再执行
    17. print("end.............")

    运行结果:

    示例代码:   【同步执行】

    1. import os
    2. import time
    3. import random
    4. from multiprocessing import Pool
    5. def func1(n):
    6. print('任务{}开始执行,进程为:{}'.format(n, os.getpid()))
    7. time.sleep(random.randint(1, 4))
    8. print('任务{}结束执行,进程为:{}'.format(n, os.getpid()))
    9. if __name__ == '__main__':
    10. # c创建一个进程池,里面有三个进程
    11. p = Pool(3)
    12. for i in range(10):
    13. res = p.apply(func1, args=(i,))

    运行结果:

    示例代码:   【异步执行】

    1. import os
    2. import time
    3. import random
    4. from multiprocessing import Pool
    5. def func1(n):
    6. print('任务{}开始执行,进程为:{}'.format(n, os.getpid()))
    7. time.sleep(random.randint(1, 4))
    8. print('任务{}结束执行,进程为:{}'.format(n, os.getpid()))
    9. if __name__ == '__main__':
    10. # c创建一个进程池,里面有三个进程
    11. p = Pool(3)
    12. for i in range(5):
    13. res = p.apply_async(func1, args=(i,))
    14. p.close() # 一定要关闭
    15. p.join() # 一定要使用join,不然进程池里的进程没来得及执行,主进程结束了,子进程也都跟着结束。

    运行结果:

    7、进程共享变量

            共享变量不适用于多进程,进程间的变量是互相隔离的,子进程的全局变量是完全复制一份父进程的数据,对子进程的全局变量修改完全影响不到其他进程的全局变量。

    示例代码:

    1. import time
    2. from multiprocessing import Process
    3. def producer(a):
    4. a += 1
    5. time.sleep(2)
    6. def consumer(a):
    7. time.sleep(3)
    8. data = a
    9. print(data)
    10. if __name__ == "__main__":
    11. a = 1
    12. my_producer = Process(target=producer, args=(a, ))
    13. my_consumer = Process(target=consumer, args=(a, ))
    14. my_producer.start()
    15. my_consumer.start()
    16. my_producer.join()
    17. my_consumer.join()
    18. # 输出结果为1

    运行结果:

    示例代码:   【进程之间的变量是无法共享的,即使是全局变量也是不能共享的】

    1. from multiprocessing import Process
    2. import os
    3. def func():
    4. global n
    5. n = 10
    6. print('子进程pid:{},n:{}'.format(os.getppid(), n))
    7. if __name__ == '__main__':
    8. n = 100
    9. print('主进程pid:{},n:{}'.format(os.getppid(), n))
    10. p = Process(target=func)
    11. p.start()
    12. p.join()
    13. print('主进程中输出n:{}'.format(n))

    运行结果:

    8、管道Pipe(两进程间的通信优先考虑)

            Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道。dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。

    • conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
    • conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象
    • conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法
    • conn1.fileno():返回连接使用的整数文件描述符
    • conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。
    • conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。
    • conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收    
    • conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。

    示例代码:

    1. import time
    2. from multiprocessing import Process, Queue, Pool, Manager, Pipe
    3. def producer(pipe):
    4. pipe.send("a")
    5. time.sleep(3)
    6. print(pipe.recv())
    7. def consumer(pipe):
    8. time.sleep(2)
    9. data = pipe.recv()
    10. pipe.send("b")
    11. print(data)
    12. if __name__ == "__main__":
    13. # Pipe实现两进程间通信
    14. s_pipe, r_pipe = Pipe()
    15. pool = Pool()
    16. pool.apply_async(producer, args=(s_pipe, ))
    17. pool.apply_async(consumer, args=(r_pipe, ))
    18. pool.close()
    19. pool.join()

    运行结果:

    示例代码:

    1. from multiprocessing import Process, Pipe
    2. def f(conn):
    3. conn.send('主进程,你好呀!') # 发送数据给主进程
    4. print('子进程收到主进程发来的数据:{}'.format(conn.recv()))
    5. conn.close() # 关闭
    6. if __name__ == '__main__':
    7. # Pipe是一个函数,返回的是一个元组
    8. parent_conn, child_conn = Pipe()
    9. # 创建一个子进程
    10. p = Process(target=f, args=(child_conn,))
    11. p.start()
    12. print("主进程收到子进程发来的数据:{}".format(parent_conn.recv()))
    13. parent_conn.send('子进程,你好啊!') # 发送数据给子进程
    14. p.join()

    运行结果:

    9、进程之间的同步控制

    9.1 进程锁:Lock()

            当多个进程对同一资源进行IO操作时,需要对资源“上锁”,否则会出现意外结果。上锁之后,同一件就只能有一个进程运行上锁的代码块。例如有一个txt文件,里面内容是一个数字10,我们要用多进程去读取这个文件的值,然后每读一次,让txt中的这个数字减1,不加锁时代码如下:

    1. import time
    2. import os
    3. from multiprocessing import Process
    4. from multiprocessing import Lock
    5. def func():
    6. if os.path.exists('num.txt'):
    7. with open('num.txt', 'r') as rf:
    8. num = int(rf.read())
    9. num -= 1
    10. time.sleep(1)
    11. with open('num.txt', 'w') as wf:
    12. wf.write(str(num))
    13. else:
    14. with open('num.txt', 'w') as wf:
    15. wf.write('10')
    16. if __name__ == '__main__':
    17. print("主进程开始运行……")
    18. p_list = []
    19. for i in range(10):
    20. p = Process(target=func)
    21. p_list.append(p)
    22. p.start()
    23. for p in p_list:
    24. p.join()
    25. with open('num.txt', 'r') as f:
    26. num = int(f.read())
    27. print('最后结果为:{}'.format(num))
    28. print("主进程结束运行……" )

    运行结果:

            虽然用了10个进程读取并修改txt文件,但最后的值却不是1。这正是多进程共同访问资源造成混乱造成的。要达到预期结果,就要给资源上锁:

    1. import time
    2. import os
    3. from multiprocessing import Process
    4. from multiprocessing import Lock
    5. def func(lock):
    6. if os.path.exists('num.txt'):
    7. lock.acquire()
    8. with open('num.txt', 'r') as f:
    9. num = int(f.read())
    10. num -= 1
    11. time.sleep(1)
    12. with open('num.txt', 'w') as f:
    13. f.write(str(num))
    14. lock.release()
    15. else:
    16. with open('num.txt', 'w') as f:
    17. f.write('10')
    18. if __name__ == '__main__':
    19. print("主进程开始运行……")
    20. lock = Lock()
    21. p_list = []
    22. for i in range(10):
    23. p = Process(target=func, args=(lock,))
    24. p_list.append(p)
    25. p.start()
    26. for p in p_list:
    27. p.join()
    28. with open('num.txt', 'r') as f:
    29. num = int(f.read())
    30. print('最后结果为:{}'.format(num))
    31. print("主进程结束运行……")

    运行结果:

            果然,用了进程锁之后获得了预料中的结果。但是,如果你运行了上面两块代码你就会发现,加了锁之后,程序明显变慢了很多,因为程序成了串行的了,当然好处是数据安全有保证。

    9.2 信号量:Semaphore

            锁同时只允许一个线程更改数据,而信号量是同时允许一定数量的进程更改数据 。假如有一下应用场景:有10个人吃饭,但只有一张餐桌,只允许做3个人,没上桌的人不允许吃饭,已上桌吃完饭离座之后,下面的人才能抢占桌子继续吃饭,如果不用信号量,肯定是10人一窝蜂一起吃饭:

    1. from multiprocessing import Process
    2. import time
    3. import random
    4. def fun(i):
    5. print('{}号顾客上座,开始吃饭'.format(i))
    6. time.sleep(random.randint(3, 8))
    7. print('{}号顾客吃完饭了,离座'.format(i))
    8. if __name__ == '__main__':
    9. for i in range(10):
    10. p = Process(target=fun, args=(i,))
    11. p.start()

    运行结果:

    用了信号量,实现了轮流吃饭,每次只有3个人吃饭:

    示例代码:

    1. from multiprocessing import Process
    2. import time
    3. import random
    4. from multiprocessing import Semaphore
    5. def fun(i , sem):
    6. sem.acquire()
    7. print('{}号顾客上座,开始吃饭'.format(i))
    8. time.sleep(random.randint(3, 8))
    9. print('{}号顾客吃完饭了,离座'.format(i))
    10. sem.release()
    11. if __name__ == '__main__':
    12. sem = Semaphore(3)
    13. for i in range(10):
    14. p = Process(target=fun, args=(i,sem))
    15. p.start()

    运行结果:

            事实上,Semaphore的作用也类似于锁,只不过在锁机制上添加了一个计数器,允许多个人拥有“钥匙”。

    9.3 事件:Event

    python进程的事件用于主进程控制其他子进程的执行,Event类有如下几个主要方法:

      1)wait()    插入在进程中插入一个标记(flag)默认为 False,当 flag为False时,程序会停止运行进入阻塞状态;

      2)set()     使flag为True,程序会进入非阻塞状态

      3)clear()      使flag为False,程序会停止运行,进入阻塞状态 

      4)is_set()   判断flag  是否为True,是的话返回True,不是则返回False

      有如下需求:获取当前时间的秒数的个位数,如果小于5,则设置子进程阻塞,如果大于5则设置子进程非阻塞。代码如下:

    1. from multiprocessing import Event, Process
    2. import time
    3. from datetime import datetime
    4. def func(e):
    5. print('子进程:开始运行……')
    6. while True:
    7. print('子进程:现在事件秒数是{}'.format(datetime.now().second))
    8. e.wait() # 阻塞等待信号 这里插入了一个flag 默认为 False
    9. time.sleep(1)
    10. if __name__ == '__main__':
    11. e = Event()
    12. p = Process(target=func, args=(e,))
    13. p.start()
    14. for i in range(10):
    15. s = int(str(datetime.now().second)[-1]) # 获取当前秒数的个位数
    16. if s < 5:
    17. print('子进程进入阻塞状态')
    18. e.clear() # 使插入的flag为False 进程进入阻塞状态
    19. else:
    20. print('子进程取消阻塞状态')
    21. e.set() # 使插入的flag为True,进程进入非阻塞状态
    22. time.sleep(1)
    23. e.set()
    24. time.sleep(3)
    25. p.terminate()
    26. print("主进程运行结束……")

    运行结果:

    10、内置线程池

    示例代码:

    1. import time
    2. import os
    3. import random
    4. from multiprocessing.pool import ThreadPool
    5. def task():
    6. print(f'开始执行任务:{os.getpid()}')
    7. time.sleep(random.randint(0, 5))
    8. print(f"执行任务结束:{os.getpid()}")
    9. if __name__ == '__main__':
    10. pool = ThreadPool(2)
    11. for i in range(5):
    12. pool.apply_async(task)
    13. pool.close()
    14. pool.join()

    运行结果:

    参考博文:

    Python并发编程系列之多进程(multiprocessing) - 走看看

    线程池的极简用法——内置线程池multiprocessing - 皓敐 - 博客园

    python多进程总结 - fengf233 - 博客园

  • 相关阅读:
    相机HAL
    小样本学习导论
    xxl-job做集群的时候,用F5做负载均衡效率高还是直接写死几个服务器地址的效率高?
    js 去除字符串空格
    研发团队数字化转型实践
    【数据结构】Java实现数据结构的前置知识,时间复杂度空间复杂度,泛型类的讲解
    Java注解详解和自定义注解实战,用代码讲解
    TGK-Planner无人机运动规划算法解读
    “论软件系统架构评估”精选范文,软考高级论文,系统架构设计师论文
    财务对账-财务收发存-业务收发存
  • 原文地址:https://blog.csdn.net/weixin_44799217/article/details/127624211