• python多进程multiprocessing


    多进程

    • 概述

    多进程是指在同一时间内,同时执行多个程序或多个部分的程序。每个进程都拥有自己的地址空间、内存、文件描述符和其他系统资源。多进程的好处在于可以使程序并行执行,从而提高程序的运行效率。

    Python中的multiprocessing模块提供了一种创建和管理进程的方式,使得可以利用多个CPU来加速程序运行。

    • 例子

    multiprocessing模块提供了一个Process类,可以用来创建和管理进程。下面是一个简单的示例:

    import multiprocessing
    
    def worker():
    """该函数将在子进程中执行"""
    print('Worker')
    
    if __name__ == '__main__':
    # 创建子进程
    p = multiprocessing.Process(target=worker)
    # 启动子进程
    p.start()
    # 等待子进程结束
    p.join()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    ​ 在上面的代码中,worker函数将在子进程中执行。首先,创建了一个Process对象,指定target参数为worker函数。然后,通过调用start方法启动子进程,最后调用join方法等待子进程结束。

    • 带参调用

    这地方用到的是args和kwargs,其中args是使用元素组的方式给指定任务传参,kwargs是使用字典方式给指定任务传参。

    进程对象 = multiprocessing.Process(target=*,args=(*,))
    
    • 1

    此处注意,若只有一个元素,那个逗号也是不可以省略的。

    进程对象 = multiprocessing.Process(target=*,kwargs={"变量名": 变量值})
    
    • 1

    例子

    import time
    import multiprocessing
    
    
    def eat(num,name):
     for i in range(num):
         print(name+"吃一口……")
         time.sleep(1)
    def drink(num,name):
     for i in range(num):
         print(name+"喝一口……")
         time.sleep(1)
    if __name__ == '__main__':
     # target:指定执行的函数名
     # args:使用元组方式给指定任务传参
     # kwargs:使用字典方式给指定任务传参
     eat_process = multiprocessing.Process(target=eat,args=(3,"giao"))
     drink_process = multiprocessing.Process(target=drink,kwargs={"num": 4,"name":"giao"})
     eat_process.start()
     drink_process.start()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 获取进程的编号

    • 获取当前进程的编号

    os.getpid()
    
    • 1
    • 获取当前进程的父进程的编号
    os.getppid()
    
    • 1
    • 设置子进程守护

    当参数daemon是False时,主进程结束了子进程还是会继续执行;否则,子进程和主进程一起结束,默认False。

    当主进程结束时,要让子进程也不再继续执行,直接结束,我们可以通过进程对象的参数daemon来进行控制。

    只需要在创建进程之后,加入这样一句代码

    进程名称.daemon = True
    
    • 1

    这样子进程就会守护主进程,主进程结束,子进程也会自动销毁。

    例子

    import multiprocessing
    import time
    def eat():
       for i in range(10):
           print("我吃我吃……")
           time.sleep(0.5)
    if __name__ == '__main__':
       eat_process = multiprocessing.Process(target=eat)
       # 设置进程守护
       eat_process.daemon = True
       eat_process.start()
    
       time.sleep(1)
       print("我吃饱了……")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 创建进程的方式

    • 方式1:使用Process直接创建

        # 创建子进程
        p = multiprocessing.Process(target=worker)
    
    • 1
    • 2
    • 方式2:进程池创建

    如果需要创建大量的进程,那么使用Process类可能会导致系统资源的浪费。此时,可以使用Pool类来创建进程池。下面是一个简单的示例:

    import multiprocessing
    import os
    import time
    def worker(num):
        """该函数将在子进程中执行"""
        time.sleep(4)
        print('Worker %d , 进程id为 %d' %(num,os.getpid()))
    
    if __name__ == '__main__':
        # 创建进程池
        pool = multiprocessing.Pool(4) # 设置进程池中进程最大数量是4
        # 启动进程池中的进程
        pool.map(worker, range(10)) # 需要开启10个进程
        # 关闭进程池
        pool.close()
        # 等待进程池中的进程结束
        pool.join()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    在上面的代码中,Pool类的构造函数中指定了进程池的大小为4,然后通过调用map方法来启动进程池中的进程。map方法会将worker函数和range(10)序列中的每个元素一一对应,然后将它们作为参数传递给进程池中的进程。此外,需要的10个进程会从进程池中拿3次。

    Worker 0 , 进程id为 14948
    Worker 1 , 进程id为 28360
    Worker 2 , 进程id为 30592
    Worker 3 , 进程id为 22260
    ---上4组并行
    Worker 4 , 进程id为 14948
    Worker 5 , 进程id为 28360
    Worker 6 , 进程id为 30592
    Worker 7 , 进程id为 22260
    ---上4组并行
    Worker 8 , 进程id为 14948
    Worker 9 , 进程id为 28360
    ---上2组并行
    
    Process finished with exit code 0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 进程池的进程最大数量最多不超过计算机的cpu核心数量,以下函数cpu_count()可以获取到当前计算机的cpu核心数量

      import multiprocessing
      
      cpus = multiprocessing.cpu_count()
      pool = multiprocessing.Pool(cpus)
      
      • 1
      • 2
      • 3
      • 4
    • 进程执行方法需要多个参数:

      https://blog.csdn.net/u013421629/article/details/100284962

      每个进程需要执行的方法有多个参数需要如何处理?比如上面的worker方法中有两个参数,那么pool.map调用的时候会报错,例子如下:

      data_list=[(1,1),(2,2),(3,3)]
      res = pool.map(worker,data_list)
      
      def worker(num,num2):
          time.sleep(4)
          print('Worker %d , 进程id为 %d' %(num,os.getpid()))
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6

      会报出:参数个数不匹配的错误。

      解决方法1:多个参数合并成一个参数(list),需要重新定义一个方法worker,该方法只有一个形式参数,其内部再调用多参数的方法,如下:

      def worker2(s:list):
          worker(s[0],s[1])
          
      data_list=[(1,1),(2,2),(3,3)]
      res = pool.map(worker2,data_list)
      
      • 1
      • 2
      • 3
      • 4
      • 5

      此外,你可能会有使用lambda表达式的思路,生成一个匿名函数worker2,然后调用worker,就像这样

      worker2 = lambda x:worker(x[0],x[1])
      res = pool.map(worker2,data_list)
      
      • 1
      • 2

      然后就会发现,不行,报错,如下

        File "/usr/lib/python3.10/multiprocessing/connection.py", line 206, in send
          self._send_bytes(_ForkingPickler.dumps(obj))
        File "/usr/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
          cls(buf, protocol).dump(obj)
      AttributeError: Can't pickle local object 'worker..'
      
      • 1
      • 2
      • 3
      • 4
      • 5

      其实,还有另外一种方法,让我们直接就可以把多参的函数穿传入进来。如下

      解决方法2:使用pathos.multiprocessing,可以传入多个参数的方法f,代码如下,不多此时,map的第二个参数就是f的第一个参数的list,map的第三个参数就是f的第二个参数的list。

      >>> from pathos.multiprocessing import ProcessingPool as Pool
      >>>
      >>> def add_and_subtract(x,y):
      ...   return x+y, x-y
      
      >>> res = Pool().map(add_and_subtract, range(0,20,2), range(-5,5,1))
      >>> res
      [(-5, 5), (-2, 6), (1, 7), (4, 8), (7, 9), (10, 10), (13, 11), (16, 12), (19, 13), (22, 14)]
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8

      比如,Pool().map(add,[0,1,2],[3,4,5]),代表add(0,3),add(1,4),add(2,5)这三个函数并行

    参考:
    http://stack.itcast.cn/news/20230404/10534657760.shtml

    https://www.rstk.cn/news/45013.html?action=onClick

  • 相关阅读:
    json序列化时案例总结
    Techviz:XR协作工作流程,重塑远程电话会议新形式
    Unity BatchRendererGroup 在低端设备上也实现高帧率
    开发了一个Java库的Google Bard API,可以自动化与AI对话了
    探索树堆Treap和红黑树的优势和劣势
    计算机组成原理之定点除法
    Prometheus 监控指南:如何可靠地记录数字时间序列数据
    C Primer Plus(6) 中文版 第1章 初识C语言 1.8 编程机制 1.11 本章小结
    字节旗下火山引擎违规分发SkyWalking,更改所有包名、删除Apache基金会抬头
    2022-8-31 第六小组 瞒春 学习笔记
  • 原文地址:https://blog.csdn.net/Supreme7/article/details/132836839