• Python多进程 multiprocessing


    在大数据时代,Python 已经成为最受追捧的语言。 在本文中,让我们专注于 Python 的一个特定方面,它使其成为最强大的编程语言之一——Multi-Processing
    在阅读本文之前,我建议您阅读我之前关于 Python 中的线程的文章,因为它可以为当前文章提供更好的上下文。

    多进程是什么?

    假设你是一名小学生,你的作业是让 1200 对数字相乘,这让你感到麻木。 假设您能够在 3 秒内将一对数字相乘。 那么总共需要 12003 = 3600 秒,也就是 1 小时来解决整个作业。 但是你必须在 20 分钟内赶上你最喜欢的电视节目。
    你会怎么做? 一个聪明的学生,虽然不诚实,但会召集另外三个能力相近的朋友并分配作业。
    因此,您需要完成 250 个乘法任务,您将在 250
    3 = 750 秒内完成,即 15 分钟。
    因此,您和您的其他 3 个朋友将在 15 分钟内完成任务,给您 5 分钟的时间来吃点零食并坐下来观看电视节目。
    当你们 4 人一起工作时,这项任务只需要 15 分钟,否则需要 1 小时。

    这是多进程的基本思想
    如果你有一个算法可以划分为不同的 workers(processors,处理器),那么你可以加速程序。 现在的机器有 4,8 和 16 核,然后可以并行部署。

    数据科学中的多进程

    多处理在数据科学中有两个关键应用。

    1.输入输出过程-

    任何数据密集型(data-intensive)管道(pipeline)都有输入、输出进程,其中数百万字节的数据在整个系统中流动。 通常,数据读取(输入)过程不会花费太多时间,但将数据写入数据仓库的过程会花费大量时间。 写入过程可以并行进行,节省大量时间。
    在这里插入图片描述

    在这里插入图片描述

    2.训练模型

    尽管并非所有模型都可以并行训练,但很少有模型具有允许它们使用并行处理进行训练的固有特征(inherent characteristic)。 例如,随机森林算法部署多个决策树来做出累积决策。 这些树可以并行构建。 实际上,sklearn API 带有一个名为 n_jobs 的参数,它提供了使用多个工作人员的选项。

    Python 的multiprocessing 类

    现在让我们掌握Python中的 multiprocessing 库。

    import time
    
    def sleepy_man():
        print('Starting to sleep')
        time.sleep(1)
        print('Done sleeping')
    
    tic = time.time()
    sleepy_man()
    sleepy_man()
    toc = time.time()
    
    print('Done in {:.4f} seconds'.format(toc-tic))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    上面的代码很简单。 函数 sleepy_man 休眠一秒钟,我们调用该函数两次。 我们记录两个函数调用所花费的时间并打印结果。 输出如下图所示。

    Starting to sleep
    Done sleeping
    Starting to sleep
    Done sleeping
    Done in 2.0098 seconds
    
    • 1
    • 2
    • 3
    • 4
    • 5

    这是预期的,因为我们两次调用该函数并记录时间。 流程如下图所示。
    在这里插入图片描述
    现在让我们将 Multi-Processing 合并到代码中。

    import multiprocessing
    import time
    def sleepy_man():
        print('Starting to sleep')
        time.sleep(1)
        print('Done sleeping')
    
    tic = time.time()
    p1 =  multiprocessing.Process(target= sleepy_man)
    p2 =  multiprocessing.Process(target= sleepy_man)
    p1.start()
    p2.start()
    toc = time.time()
    
    print('Done in {:.4f} seconds'.format(toc-tic))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    这里 multiprocessing.Process(target= sleepy_man) 定义了一个多进程实例。 我们将需要执行的函数 sleepy_man 作为参数传递。 我们通过 p1.start() 触发这两个实例。

    输出如下 -

    Done in 0.0210 seconds
    Starting to sleep
    Starting to sleep
    Done sleeping
    Done sleeping
    
    • 1
    • 2
    • 3
    • 4
    • 5

    现在注意一件事。 时间日志打印语句首先执行。 这是因为与 sleepy_man 函数触发的多进程实例一起,该函数的主要代码被并行单独执行。 下面给出流程图,以便更加清晰地理解。
    在这里插入图片描述

    为了在多进程函数执行之后执行程序的其余部分,我们需要执行函数join()。

    关于 join()函数,在这做进一步的解释(借鉴自stack overflow):

    join()方法,当其与 threading 或者 multiprocessing 库时,它和 str.join()方法无关的——它实际上并没有把什么拼接在一起。然而,它仅仅意味着 “等待这个[thread/process]去完成”
    使用 join 这个名字是因为 multiprocessing 模块想和 threading 模块看起来比较像,而且 后者针对 Theread 使用 join 。
    使用术语 join 去表达 “等待一个线程去完成” 在许多编程语言中通用,所以 Python 也采用了。

    Remember also that non-daemonic processes will be automatically be joined.

    import multiprocessing
    import time
    
    def sleepy_man():
        print('Starting to sleep')
        time.sleep(1)
        print('Done sleeping')
    
    tic = time.time()
    p1 =  multiprocessing.Process(target= sleepy_man)
    p2 =  multiprocessing.Process(target= sleepy_man)
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    toc = time.time()
    
    print('Done in {:.4f} seconds'.format(toc-tic))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    现在,代码块的其余部分只会在多处理任务完成后执行。输出如下所示。

    Starting to sleep
    Starting to sleep
    Done sleepingDone sleeping
    
    Done in 1.1837 seconds
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    流程图如下所示。
    在这里插入图片描述
    由于两个睡眠函数是并行执行的,因此该函数加起来大约需要 1 秒

    我们可以定义任意数量的多处理实例。 看看下面的代码。 它使用 for 循环定义了 10 个不同的多处理实例。

    import multiprocessing
    import time
    
    def sleepy_man():
        print('Starting to sleep')
        time.sleep(1)
        print('Done sleeping')
    
    tic = time.time()
    
    process_list = []
    for i in range(10):
        p =  multiprocessing.Process(target= sleepy_man)
        p.start()
        process_list.append(p)
    
    for process in process_list:
        process.join()
    
    toc = time.time()
    
    print('Done in {:.4f} seconds'.format(toc-tic))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    输出如下所示:

    Starting to sleep
    Starting to sleep
    Starting to sleep
    Starting to sleep
    Starting to sleep
    Starting to sleep
    Starting to sleep
    Starting to sleep
    Starting to sleep
    Starting to sleep
    Done sleeping
    Done sleeping
    Done sleeping
    Done sleeping
    Done sleeping
    Done sleeping
    Done sleeping
    Done sleeping
    Done sleeping
    Done sleeping
    Done in 1.4964 seconds
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    这里十个函数的执行是并行处理的,因此整个程序大概只需要一秒钟。 现在我的机器没有 10 个处理器。 当我们定义的进程多于我们的机器时,multiprocessing 库具有调度作业的逻辑,但是在最后的例子在本人 windows 电脑上跑的时候并没有体现出来,留个疑惑。
    我们还可以使用 args 将参数传递给 Process 函数。

    import multiprocessing
    import time
    
    def sleepy_man(sec):
        print('Starting to sleep')
        time.sleep(sec)
        print('Done sleeping')
    
    tic = time.time()
    
    process_list = []
    for i in range(10):
        p =  multiprocessing.Process(target= sleepy_man, args = [2])
        p.start()
        process_list.append(p)
    
    for process in process_list:
        process.join()
    
    toc = time.time()
    
    print('Done in {:.4f} seconds'.format(toc-tic))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    输出如下:

    Starting to sleep
    Starting to sleep
    Starting to sleep
    Starting to sleep
    Starting to sleep
    Starting to sleep
    Starting to sleep
    Starting to sleep
    Starting to sleep
    Starting to sleep
    Done sleeping
    Done sleeping
    Done sleeping
    Done sleeping
    Done sleepingDone sleeping
    
    Done sleeping
    Done sleeping
    Done sleeping
    Done sleeping
    Done in 2.4574 seconds
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    由于我们传递了一个参数 2, sleepy_man 函数休眠了 2 秒而不是 1 秒。

    Multi-Processing 使用 Pool 类

    在最后一个代码片段中,我们使用 for 循环执行了 10 个不同的进程。 我们也可以使用 Pool 类来做同样的事情。

    import multiprocessing
    import time
    
    def sleepy_man(sec):
        print('Starting to sleep for {} seconds'.format(sec))
        time.sleep(sec)
        print('Done sleeping for {} seconds'.format(sec))
    
    tic = time.time()
    
    pool = multiprocessing.Pool(5)
    pool.map(sleepy_man, range(1,11))
    pool.close()
    
    toc = time.time()
    
    print('Done in {:.4f} seconds'.format(toc-tic))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    multiprocessing.Pool(5) 定义了 workers 的数量。 这里定义为 5
    pool.map() 是触发函数执行的方法。 我们调用 pool.map(sleepy_man, range(1,11))。
    在这里, sleepy_man 是函数, range(1,11)是参数(通常传递一个列表)。
    输出如下

    Starting to sleep for 1 seconds
    Starting to sleep for 2 seconds
    Starting to sleep for 3 seconds
    Starting to sleep for 4 seconds
    Starting to sleep for 5 seconds
    Done sleeping for 1 seconds
    Starting to sleep for 6 seconds
    Done sleeping for 2 seconds
    Starting to sleep for 7 seconds
    Done sleeping for 3 seconds
    Starting to sleep for 8 seconds
    Done sleeping for 4 seconds
    Starting to sleep for 9 seconds
    Done sleeping for 5 seconds
    Starting to sleep for 10 seconds
    Done sleeping for 6 seconds
    Done sleeping for 7 seconds
    Done sleeping for 8 seconds
    Done sleeping for 9 seconds
    Done sleeping for 10 seconds
    Done in 15.7404 seconds
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    Pool 类是部署多进程的更好方法,因为它使用 First In First Out 调度将任务分发给可用的处理器。它几乎类似于map-reduce架构——本质上,它将输入映射到不同的处理器,并将所有处理器的输出收集为一个列表。正在执行的进程存储在内存中,其他非正在执行的进程存储在内存外。

    在这里插入图片描述

    而在Process类中,所有进程都在内存中执行,并使用FIFO策略调度执行。

    比较计算完全数的时间性能

    到目前为止,我们在睡眠功能上使用了 multiprocessing。
    现在让我们使用一个函数来检查一个数字是否是完美数字。
    如果一个数的正除数之和等于该数本身,那么它就是一个完美数。
    我们将列出小于或等于 10_000 和 100_000 的完美数字。我们将以 3 种方式实现它
    ——

    • 常规 for 循环
    • multiprocess.Process()
    • multiprocess.Pool()

    下文先分别给出在3种方式下求解小于或等于 10_000的完美数字代码和在本文作者 winodws 机器下的代码输出。
    最后用表格列出求解小于或等于 10_000 和 100_000 的完美数字的求解时间,分别在 本文作者 windows 机器下和 ubantu 服务器机器下。

    使用常规 for 循环

    import time
    
    def is_perfect(n):
        sum_factors = 0
        for i in range(1, n):
            if (n % i == 0):
                sum_factors = sum_factors + i
        if (sum_factors == n):
            print('{} is a Perfect number'.format(n))
    
    tic = time.time()
    for n in range(1,10_000):
        is_perfect(n)
    toc = time.time()
    
    print('Done in {:.4f} seconds'.format(toc-tic))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    输出如下:

    6 is a Perfect number
    28 is a Perfect number
    496 is a Perfect number
    8128 is a Perfect number
    Done in 3.9316 seconds
    
    • 1
    • 2
    • 3
    • 4
    • 5

    使用 Process 类

    import time
    import multiprocessing
    
    def is_perfect(n):
        sum_factors = 0
        for i in range(1, n):
            if(n % i == 0):
                sum_factors = sum_factors + i
        if (sum_factors == n):
            print('{} is a Perfect number'.format(n))
    
    if __name__ == "__main__":
        tic = time.time()
    
        processes = []
        for i in range(1,10_000):
            p = multiprocessing.Process(target=is_perfect, args=(i,))
            processes.append(p)
            p.start()
    
        for process in processes:
            process.join()
    
        toc = time.time()
        print('Done in {:.4f} seconds'.format(toc-tic))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    输出如下

    6 is a Perfect number
    28 is a Perfect number
    496 is a Perfect number
    8128 is a Perfect number
    Done in 491.4885 seconds
    
    • 1
    • 2
    • 3
    • 4
    • 5

    使用 Pool 类

    import time
    import multiprocessing
    
    def is_perfect(n):
        sum_factors = 0
        for i in range(1, n):
            if(n % i == 0):
                sum_factors = sum_factors + i
        if (sum_factors == n):
            print('{} is a Perfect number'.format(n))
    
    if __name__ == "__main__":
        tic = time.time()
        pool = multiprocessing.Pool()
        pool.map(is_perfect, range(1,10_000))
        pool.close()
        toc = time.time()
    
        print('Done in {:.4f} seconds'.format(toc-tic))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    输出如下:

    6 is a Perfect number
    28 is a Perfect number
    496 is a Perfect number
    8128 is a Perfect number
    Done in 2.1971 seconds
    
    • 1
    • 2
    • 3
    • 4
    • 5

    求解时间对比

    • Ubantu 18.4: Intel® Xeon® Gold 6139M CPU、 主频 2.30GHz、18核
    • Windows 10: Intel(R)、i5-8250U、 CPU主频1.60GHz、4核、8进程

    1.求解 小于等于 10_000的完美数字时间

    ubantu 环境下,使用 pool 类的时间开销远远低于其他方法,时间开销是 时间开销第二低的 10% 左右。
    windows环境下,使用 pool 类的时间开销同样最低,时间开销是 时间开销第二低的 56% 左右。需要注意的是,在使用 process 类的情况下,时间开销竟然达到 491 多秒,大概8 分多!!!

    for循环process 类pool类
    ubantu2.3741s2.3650s0.2481s
    windows3.9316s491.4885s2.1971s

    2.求解 小于等于 100_000的完美数字时间

    由于在windows环境下,使用 process 类求解 小于等于 100_000的完美数字,本文作者等了很久都没出结果,于是就没在 windows 下对比这种情况。仅仅在 ubantu 环境下对比不同方法的时间开销。
    从下面的表格中可以发现,pool 类仍然没让我们失望,时间开销是第二名的 5%不到。

    for循环process 类pool类
    ubantu242.7352s243.2628s10.8428s

    疑惑:为什么在本文作者 windows下使用multiprocessing模块的 process 类需要491秒,正常for循环只要 接近4 秒?而且在 ubantu 环境下跑 100_000 的结果,使用 process 类的时间开销仍然是最高的。

    参考

    本文主要参考了A beginners guide to Multi-Processing in Python

  • 相关阅读:
    基于Docker构建MySQL主从复制数据库
    2023国赛数学建模C题思路代码 - 蔬菜类商品的自动定价与补货决策
    使用python对比两个json文件的不同并输出
    SQL Server教程 - SQL Server 压缩(Compression)
    Revit中墙体绘制的小技巧?CAD识别墙体快速生成
    第2章丨IRIS Global 使用多维存储
    对极几何-三角测量-知识点
    Python Turtle Graphics 绘制I Love You字符
    Vue+ElementUI技巧分享:自定义表单项label的文字提示
    BP神经网络能够做什么,bp神经网络的应用场景
  • 原文地址:https://blog.csdn.net/OrdinaryMatthew/article/details/125896905