• python3多进程与进程池


    使用进程并发主要依赖于Python的multiprocessing 和 mpi4py 的两个模块。

    1.多进程

    multiprocessing主要包括如下方法和属性:

    方法介绍:

    •  p.start():启动进程,并调用该子进程中的p.run() 
    • p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
    • p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,(使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁 )
    • p.is_alive():如果p仍然运行,返回True 
    • p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。 
    • timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程

    属性介绍

    •  p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置守护进程:跟随着父进程的代码执行结束,守护进程就结束
    • p.name:进程的名称
    • p.pid:进程的pid
    • p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
    • p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功

    1.创建一个进程

    Python中的multiprocessing库创建进程的步骤如下:

    1. 创建进程对象
    2. 调用 start() 方法,开启进程的活动
    3. 调用 join() 方法,在进程结束之前主进程一直等待

    示例

    1. import multiprocessing
    2. def foo(i):
    3. print ('called function in process: %s' %i)
    4. return
    5. if __name__ == '__main__':
    6. Process_jobs = []
    7. for i in range(5):
    8. p = multiprocessing.Process(target=foo, args=(i,))
    9. Process_jobs.append(p)
    10. p.start()
    11. p.join()

    结果如下:

    1. called function in process: 0
    2. called function in process: 1
    3. called function in process: 2
    4. called function in process: 3
    5. called function in process: 4

    进程对象创建时需要分配一个函数,作为进程的执行任务。如示例中的foo(),可以使用元组的形式给函数传递一些参数

    上面的代码中,主进程是指运行整个脚本的进程,也就是执行if __name__ == '__main__':之后的代码的进程。主进程负责创建和管理子进程

    2.给进程起个名字

    前面创建了一个进程,分配目标函数和函数变量。给进程分配一个名字,有助于debug。

    命名进程需要为进程对象提供 name 参数

    进程的默认名字是Process-N这种方式

    代码如下

    1. # 命名一个进程
    2. import multiprocessing
    3. import time
    4. def foo():
    5. name = multiprocessing.current_process().name
    6. print("Starting %s \n" % name)
    7. #time.sleep(3)
    8. print("Exiting %s \n" % name)
    9. if __name__ == '__main__':
    10. process_with_name = multiprocessing.Process(name='foo_process', target=foo)
    11. process_with_name.daemon = True # 注意原代码有这一行,但是译者发现删掉这一行才能得到正确输出
    12. process_with_default_name = multiprocessing.Process(target=foo)
    13. process_with_name.start()
    14. process_with_default_name.start()
    15. 》》》
    16. 开始运行...
    17. Starting foo_process
    18. Exiting foo_process
    19. Starting Process-2
    20. Exiting Process-2
    21. 运行结束。

    3.后台运行进程

    在处理较大任务时,可以将 进程作为后台进程,multiprocessing模块提供了后台进程选项

    可以使用daemon选项设置进程后台运行。

    代码如下

    1. # 命名一个进程
    2. import multiprocessing
    3. import time
    4. def foo():
    5. name = multiprocessing.current_process().name
    6. print("Starting %s \n" % name)
    7. time.sleep(3)
    8. print("Exiting %s \n" % name)
    9. if __name__ == '__main__':
    10. process_with_name = multiprocessing.Process(name='foo_process', target=foo)
    11. process_with_name.daemon = True # foo_process进程将会后台运行
    12. process_with_default_name = multiprocessing.Process(target=foo)
    13. process_with_name.start()
    14. process_with_default_name.start()

    代码的输出如下:

    1. 开始运行...
    2. Starting foo_process
    3. Starting Process-2
    4. Exiting Process-2
    5. 运行结束。

     这段代码中if __name__ == '__main__'主进程并没有等待子进程执行完成。使用join才会等待。若要main主进程等待子进程执行完成再退出,则添加如下两行代码

    1. process_with_name.join()
    2. process_with_default_name.join()

    4.杀死进程

    可以使用 terminate() 方法立即杀死一个进程,可以使用 is_alive() 方法来判断一个进程是否还存活

    代码如下

    1. # 杀死一个进程
    2. import multiprocessing
    3. import time
    4. def foo():
    5. print('Starting function')
    6. time.sleep(0.1)
    7. print('Finished function')
    8. if __name__ == '__main__':
    9. p = multiprocessing.Process(target=foo)
    10. print('Process before execution:', p, p.is_alive())
    11. p.start()
    12. print('Process running:', p, p.is_alive())
    13. p.terminate()
    14. print('Process terminated:', p, p.is_alive())
    15. p.join()
    16. print('Process joined:', p, p.is_alive())
    17. print('Process exit code:', p.exitcode)

    输出为

    1. Process before execution: <Process name='Process-1' parent=11242 initial> False
    2. Process running: <Process name='Process-1' pid=11244 parent=11242 started> True
    3. Process terminated: <Process name='Process-1' pid=11244 parent=11242 started> True
    4. Process joined: <Process name='Process-1' pid=11244 parent=11242 stopped exitcode=-SIGTERM> False
    5. Process exit code: -15

    上面的代码用 is_alive() 方法监控进程生命周期。然后通过调用 terminate() 方法结束进程。

    通过读进程的 ExitCode 状态码(status code)验证进程已经结束, ExitCode 可能的值如下:

    • == 0: 没有错误正常退出
    • > 0: 进程有错误,并以此状态码退出
    • < 0: 进程被 -1 * 的信号杀死并以此作为 ExitCode 退出

    输出的 ExitCode 是 -15 。负数表示子进程被数字为15的信号杀死。

    5.子类中使用进程

    实现一个自定义的进程子类,需要以下三步:

    • 定义 Process 的子类
    • 覆盖 __init__(self [,args]) 方法来添加额外的参数
    • 覆盖 run(self, [.args]) 方法来实现 Process 启动的时候执行的任务

    创建 Porcess 子类之后,你可以创建它的实例并通过 start() 方法启动它,启动之后会运行 run() 方法。

    1. # -*- coding: utf-8 -*-
    2. # 自定义子类进程
    3. import multiprocessing
    4. class MyProcess(multiprocessing.Process):
    5. def run(self):
    6. print ('called run method in process: %s' % self.name)
    7. return
    8. if __name__ == '__main__':
    9. for i in range(5):
    10. p = MyProcess()
    11. p.start()
    12. p.join()

    输出为

    1. called run method in process: MyProcess-1
    2. called run method in process: MyProcess-2
    3. called run method in process: MyProcess-3
    4. called run method in process: MyProcess-4
    5. called run method in process: MyProcess-5

    由于没有指定进程名字,且是自定义了类,故进程的名字默认是MyProcess-N的方式

    6.在进程间交换对象

    Multiprocessing有两种方式可以交换对象:队列和管道

    1.使用队列交换对象

    可以通过队列数据结构来共享对象。Queue 返回一个进程共享的队列,是线程安全的,也是进程安全的。任何可序列化的对象(Python通过 pickable 模块序列化对象)都可以通过它进行交换。

    使用队列来实现生产者-消费者问题。 Producer 类生产item放到队列中,然后 Consumer 类从队列中移除它们。代码如下:

    1. import multiprocessing
    2. import random
    3. import time
    4. class Producer(multiprocessing.Process):
    5. def __init__(self, queue):
    6. multiprocessing.Process.__init__(self)
    7. self.queue = queue
    8. def run(self):
    9. for i in range(10):
    10. item = random.randint(0, 256)
    11. self.queue.put(item)
    12. print("Process Producer : item %d appended to queue %s" % (item, self.name))
    13. time.sleep(1)
    14. print("The size of queue is %s" % self.queue.qsize())
    15. class Consumer(multiprocessing.Process):
    16. def __init__(self, queue):
    17. multiprocessing.Process.__init__(self)
    18. self.queue = queue
    19. def run(self):
    20. while True:
    21. if self.queue.empty():
    22. print("the queue is empty")
    23. break
    24. else:
    25. time.sleep(2)
    26. item = self.queue.get()
    27. print('Process Consumer : item %d popped from by %s \n' % (item, self.name))
    28. time.sleep(1)
    29. if __name__ == '__main__':
    30. queue = multiprocessing.Queue()
    31. process_producer = Producer(queue)
    32. process_consumer = Consumer(queue)
    33. process_producer.start()
    34. process_consumer.start()
    35. process_producer.join()
    36. process_consumer.join()

    其中一次运行可能的输出为

    1. the queue is empty
    2. Process Producer : item 59 appended to queue Producer-1
    3. The size of queue is 1
    4. Process Producer : item 41 appended to queue Producer-1
    5. The size of queue is 2
    6. Process Producer : item 184 appended to queue Producer-1
    7. The size of queue is 3
    8. Process Producer : item 158 appended to queue Producer-1
    9. The size of queue is 4
    10. Process Producer : item 114 appended to queue Producer-1
    11. The size of queue is 5
    12. Process Producer : item 118 appended to queue Producer-1
    13. The size of queue is 6
    14. Process Producer : item 157 appended to queue Producer-1
    15. The size of queue is 7
    16. Process Producer : item 201 appended to queue Producer-1
    17. The size of queue is 8
    18. Process Producer : item 99 appended to queue Producer-1
    19. The size of queue is 9
    20. Process Producer : item 81 appended to queue Producer-1
    21. The size of queue is 10

    如果在macos系统上会出现错误,  File "/usr/local/Cellar/python@3.9/3.9.5/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/queues.py", line 126, in qsize
        return self._maxsize - self._sem._semlock._get_value()
    NotImplementedError
    Process Consumer : item 0 popped from by Consumer-2 

    解决参考Queue.qsize() 可能会在未实现 sem_getvalue() 的 Unix 平台上引发 NotImplementedError 异常的解决办法

    队列还有一个 JoinableQueue 子类,它有以下两个额外的方法:

    • task_done(): 此方法意味着之前入队的一个任务已经完成,比如, get() 方法从队列取回item之后调用。所以此方法只能被队列的消费者调用。
    • join(): 此方法将进程阻塞,直到队列中的item全部被取出并执行。

    ( Microndgt 注:因为使用队列进行通信是一个单向的,不确定的过程,所以你不知道什么时候队列的元素被取出来了,所以使用task_done来表示队列里的一个任务已经完成。

    这个方法一般和join一起使用,当队列的所有任务都处理之后,也就是说put到队列的每个任务都调用了task_done方法后,join才会完成阻塞。)

    2.使用管道交换对象

    一个管道可以做以下事情:

    • 返回一对被管道连接的连接对象
    • 然后对象就有了 send/receive 方法可以在进程之间通信

    下面是管道用法的一个简单示例。这里有一个进程管道从0到9发出数字,另一个进程接收数字并进行平方计算。

    1. import multiprocessing
    2. def create_items(pipe):
    3. output_pipe, _ = pipe
    4. for item in range(10):
    5. output_pipe.send(item)
    6. output_pipe.close()
    7. def multiply_items(pipe_1, pipe_2):
    8. close, input_pipe = pipe_1
    9. close.close()
    10. output_pipe, _ = pipe_2
    11. try:
    12. while True:
    13. item = input_pipe.recv()
    14. output_pipe.send(item * item)
    15. except EOFError:
    16. output_pipe.close()
    17. if __name__== '__main__':
    18. # 第一个进程管道发出数字
    19. pipe_1 = multiprocessing.Pipe(True)
    20. process_pipe_1 = multiprocessing.Process(target=create_items, args=(pipe_1,))
    21. process_pipe_1.start()
    22. # 第二个进程管道接收数字并计算
    23. pipe_2 = multiprocessing.Pipe(True)
    24. process_pipe_2 = multiprocessing.Process(target=multiply_items, args=(pipe_1, pipe_2,))
    25. process_pipe_2.start()
    26. pipe_1[0].close()
    27. pipe_2[0].close()
    28. try:
    29. while True:
    30. print(pipe_2[1].recv())
    31. except EOFError:
    32. print("End")

    结果为

    1. 0
    2. 1
    3. 4
    4. 9
    5. 16
    6. 25
    7. 36
    8. 49
    9. 64
    10. 81
    11. End

    Pipe() 函数返回一对通过双向管道连接起来的对象。在本例中, out_pipe 包含数字0-9,通过目标函数 create_items() 产生:

    在第二个进程中,我们有两个管道,输入管道和包含结果的输出管道:

    7.进程同步

    多个进程可以协同工作来完成一项任务。通常需要共享数据。所以在多进程之间保持数据的一致性就很重要了。需要共享数据协同的进程必须以适当的策略来读写数据。相关的同步原语和线程的库很类似。

    进程的同步原语如下:

    • Lock: 这个对象可以有两种状态:锁住(locked)和未锁住(unlocked)。一个Lock对象有两个方法, acquire() 和 release() ,来控制共享数据的读写权限。
    • Event: 实现了进程间的简单通讯,一个进程发事件的信号,另一个进程等待事件的信号。 Event 对象有两个方法, set() 和 clear() ,来管理自己内部的变量。
    • Condition: 此对象用来同步部分工作流程,在并行的进程中,有两个基本的方法: wait() 用来等待进程, notify_all() 用来通知所有等待此条件的进程。
    • Semaphore: 用来共享资源,例如,支持固定数量的共享连接。
    • Rlock: 递归锁对象。其用途和方法同 Threading 模块一样。
    • Barrier: 将程序分成几个阶段,适用于有些进程必须在某些特定进程之后执行。处于障碍(Barrier)之后的代码不能同处于障碍之前的代码并行。

    下面的代码展示了如何使用 barrier() 函数来同步两个进程。我们有4个进程,进程1和进程2由barrier语句管理,进程3和进程4没有同步策略。

    1. import multiprocessing
    2. from multiprocessing import Barrier, Lock, Process
    3. from time import time
    4. from datetime import datetime
    5. def test_with_barrier(synchronizer, serializer):
    6. name = multiprocessing.current_process().name
    7. synchronizer.wait()
    8. now = time()
    9. with serializer:
    10. print("process %s ----> %s" % (name, datetime.fromtimestamp(now)))
    11. def test_without_barrier():
    12. name = multiprocessing.current_process().name
    13. now = time()
    14. print("process %s ----> %s" % (name, datetime.fromtimestamp(now)))
    15. if __name__ == '__main__':
    16. synchronizer = Barrier(2)
    17. serializer = Lock()
    18. Process(name='p1 - test_with_barrier', target=test_with_barrier, args=(synchronizer,serializer)).start()
    19. Process(name='p2 - test_with_barrier', target=test_with_barrier, args=(synchronizer,serializer)).start()
    20. Process(name='p3 - test_without_barrier', target=test_without_barrier).start()
    21. Process(name='p4 - test_without_barrier', target=test_without_barrier).start()

    在主程序中,我们创建了四个进程,然后我们需要一个锁和一个barrier来进程同步。barrier声明的第二个参数代表要管理的进程总数

    test_with_barrier 函数调用了barrier的 wait() 方法

    当两个进程都调用 wait() 方法的时候,它们会一起继续执行

    barrier同步两个进程如下所示:

    结果为

    1. process p2 - test_with_barrier ----> 2023-09-18 16:22:20.998469
    2. process p1 - test_with_barrier ----> 2023-09-18 16:22:20.998507
    3. process p3 - test_without_barrier ----> 2023-09-18 16:22:20.999103
    4. process p4 - test_without_barrier ----> 2023-09-18 16:22:20.999650

    只能看到 with_barrier 的进程1和2比 without_barrier 的进程3和4时间差的小很多。偶尔进程1、2和进程3、4之间的时间是相同的

    需要注意的是在macos时运行上述代码报错。

    8.进程之间管理状态

    Python的多进程模块提供了管理共享信息的管理者(Manager)。一个Manager对象控制着持有Python对象的服务进程,并允许其它进程操作共享对象。

    Manager有以下特性:

    • 它控制着管理共享对象的服务进程
    • 它确保当某一进程修改了共享对象之后,所有的进程拿到额共享对象都得到了更新

    代码如下

    1. import multiprocessing
    2. def worker(dictionary, key, item):
    3. dictionary[key] = item
    4. print("key = %d value = %d" % (key, item))
    5. if __name__ == '__main__':
    6. mgr = multiprocessing.Manager()
    7. dictionary = mgr.dict()
    8. jobs = [multiprocessing.Process(target=worker, args=(dictionary, i, i*2)) for i in range(10)]
    9. for j in jobs:
    10. j.start()
    11. for j in jobs:
    12. j.join()
    13. print('Results:', dictionary)
    1. 首先,声明了一个manager

    2. 其次,创建了 dictionary 类型的一个数据结构,在 n 个 taskWorkers 之间共享,每个worker更新字典的某一个index。

    3. 所有的worker完成之后,新的列表打印到 stdout :

    结果为

    1. key = 0 value = 0
    2. key = 1 value = 2
    3. key = 2 value = 4
    4. key = 3 value = 6
    5. key = 4 value = 8
    6. key = 5 value = 10
    7. key = 6 value = 12
    8. key = 8 value = 16
    9. key = 7 value = 14
    10. key = 9 value = 18
    11. Results: {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 8: 16, 7: 14, 9: 18}

    9.进程池

    多进程库提供了 Pool 类来实现简单的多进程任务。 Pool 类有以下方法:

    • apply(): 直到得到结果之前一直阻塞。
    • apply_async(): 这是 apply() 方法的一个变体,返回的是一个result对象。这是一个异步的操作,在所有的子类执行之前不会锁住主进程。
    • map(): 这是内置的 map() 函数的并行版本。在得到结果之前一直阻塞,此方法将可迭代的数据的每一个元素作为进程池的一个任务来执行。
    • map_async(): 这是 map() 方法的一个变体,返回一个result对象。如果指定了回调函数,回调函数应该是callable的,并且只接受一个参数。当result准备好时会自动调用回调函数(除非调用失败)。回调函数应该立即完成,否则,持有result的进程将被阻塞。

    下面的例子展示了如果通过进程池来执行一个并行应用。我们创建了有4个进程的进程池,然后使用 map() 方法进行一个简单的计算。

    1. import multiprocessing
    2. def function_square(data):
    3. result = data*data
    4. return result
    5. if __name__ == '__main__':
    6. inputs = list(range(100))
    7. pool = multiprocessing.Pool(processes=4)
    8. pool_outputs = pool.map(function_square, inputs)
    9. pool.close()
    10. pool.join()
    11. print ('Pool :', pool_outputs)
    1. multiprocessing.Pool 方法在输入元素上应用 function_square 方法来执行简单的计算。并行的进程数量是4
    2. pool.map 方法将一些独立的任务提交给进程池
    3. 计算的结果存储在 pool_outputs 中。最后的结果打印出来

    结果为

    Pool    : [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521, 1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209, 2304, 2401, 2500, 2601, 2704, 2809, 2916, 3025, 3136, 3249, 3364, 3481, 3600, 3721, 3844, 3969, 4096, 4225, 4356, 4489, 4624, 4761, 4900, 5041, 5184, 5329, 5476, 5625, 5776, 5929, 6084, 6241, 6400, 6561, 6724, 6889, 7056, 7225, 7396, 7569, 7744, 7921, 8100, 8281, 8464, 8649, 8836, 9025, 9216, 9409, 9604, 9801]

    需要注意的是, pool.map() 方法的结果和Python内置的 map() 结果是相同的,不同的是 pool.map() 是通过多个并行进程计算的。

    在Python中,多个进程之间默认是无法直接共享变量的。每个进程都有自己独立的内存空间,变量在一个进程中的修改不会影响其他进程中的变量。

    如果需要在多个进程之间共享变量,可以使用multiprocessing模块中的ValueArray来创建共享内存的变量。

    进程池

    1. import multiprocessing
    2. def process_func(text, qq_status, index_data):
    3. # 处理文本段的逻辑
    4. # ...
    5. # 修改共享变量
    6. if qq_status.value == 0:
    7. try:
    8. # 处理文本段的逻辑
    9. # 如果处理失败,将qq_status赋值为-1
    10. # 如果处理成功,将文本段内容放入index_data
    11. index_data.append(text)
    12. if (text == "中国最美丽的地方一定是新"):
    13. qq_status.value = -1
    14. print(text)
    15. except:
    16. qq_status.value = -1
    17. def main():
    18. texts = ["中","中国","中国最","中国最美","中国最美丽","中国最美丽的","中国最美丽的地","中国最美丽的地方","中国最美丽的地方一","中国最美丽的地方一定",
    19. "中国最美丽的地方一定是","中国最美丽的地方一定是新","中国最美丽的地方一定是新疆","中国最美丽的地方是新疆省","中国最美丽的地方是新疆省阿"] # 100个文本段
    20. manager = multiprocessing.Manager()
    21. qq_status = manager.Value('i', 0)
    22. index_data = manager.list()
    23. pool = multiprocessing.Pool()
    24. for text in texts:
    25. pool.apply_async(process_func, args=(text, qq_status, index_data))
    26. pool.close()
    27. pool.join()
    28. print(qq_status.value)
    29. print(index_data)
    30. if __name__ == '__main__':
    31. main()

  • 相关阅读:
    IB经济与商业可以一起选吗?
    fplan-Powerplan实例
    Flutter 实战:构建跨平台应用
    不使用oh-my-zsh配置轻量级zsh环境
    clickhouse——clickhouse单节点部署及基础命令介绍
    Mac怎么删除文件和软件?苹果电脑删除第三方软件方法
    多聚体/壳聚糖修饰白蛋白纳米球/mPEG-HSA聚乙二醇人血清白蛋白纳米球的制备与研究
    第二十四章《学生信息管理系统》第1节:学生信息管理系统简介
    CF603E Pastoral Oddities
    短URL服务的设计以及实现
  • 原文地址:https://blog.csdn.net/qq_38196982/article/details/132846537