在大数据时代,Python 已经成为最受追捧的语言。 在本文中,让我们专注于 Python 的一个特定方面,它使其成为最强大的编程语言之一——Multi-Processing。
在阅读本文之前,我建议您阅读我之前关于 Python 中的线程的文章,因为它可以为当前文章提供更好的上下文。
假设你是一名小学生,你的作业是让 1200 对数字相乘,这让你感到麻木。 假设您能够在 3 秒内将一对数字相乘。 那么总共需要 12003 = 3600 秒,也就是 1 小时来解决整个作业。 但是你必须在 20 分钟内赶上你最喜欢的电视节目。
你会怎么做? 一个聪明的学生,虽然不诚实,但会召集另外三个能力相近的朋友并分配作业。
因此,您需要完成 250 个乘法任务,您将在 2503 = 750 秒内完成,即 15 分钟。
因此,您和您的其他 3 个朋友将在 15 分钟内完成任务,给您 5 分钟的时间来吃点零食并坐下来观看电视节目。
当你们 4 人一起工作时,这项任务只需要 15 分钟,否则需要 1 小时。
这是多进程的基本思想。
如果你有一个算法可以划分为不同的 workers(processors,处理器),那么你可以加速程序。 现在的机器有 4,8 和 16 核,然后可以并行部署。
多处理在数据科学中有两个关键应用。
任何数据密集型(data-intensive)管道(pipeline)都有输入、输出进程,其中数百万字节的数据在整个系统中流动。 通常,数据读取(输入)过程不会花费太多时间,但将数据写入数据仓库的过程会花费大量时间。 写入过程可以并行进行,节省大量时间。
尽管并非所有模型都可以并行训练,但很少有模型具有允许它们使用并行处理进行训练的固有特征(inherent characteristic)。 例如,随机森林算法部署多个决策树来做出累积决策。 这些树可以并行构建。 实际上,sklearn API 带有一个名为 n_jobs 的参数,它提供了使用多个工作人员的选项。
现在让我们掌握Python中的 multiprocessing 库。
import time
def sleepy_man():
print('Starting to sleep')
time.sleep(1)
print('Done sleeping')
tic = time.time()
sleepy_man()
sleepy_man()
toc = time.time()
print('Done in {:.4f} seconds'.format(toc-tic))
上面的代码很简单。 函数 sleepy_man 休眠一秒钟,我们调用该函数两次。 我们记录两个函数调用所花费的时间并打印结果。 输出如下图所示。
Starting to sleep
Done sleeping
Starting to sleep
Done sleeping
Done in 2.0098 seconds
这是预期的,因为我们两次调用该函数并记录时间。 流程如下图所示。
现在让我们将 Multi-Processing 合并到代码中。
import multiprocessing
import time
def sleepy_man():
print('Starting to sleep')
time.sleep(1)
print('Done sleeping')
tic = time.time()
p1 = multiprocessing.Process(target= sleepy_man)
p2 = multiprocessing.Process(target= sleepy_man)
p1.start()
p2.start()
toc = time.time()
print('Done in {:.4f} seconds'.format(toc-tic))
这里 multiprocessing.Process(target= sleepy_man) 定义了一个多进程实例。 我们将需要执行的函数 sleepy_man 作为参数传递。 我们通过 p1.start() 触发这两个实例。
输出如下 -
Done in 0.0210 seconds
Starting to sleep
Starting to sleep
Done sleeping
Done sleeping
现在注意一件事。 时间日志打印语句首先执行。 这是因为与 sleepy_man 函数触发的多进程实例一起,该函数的主要代码被并行单独执行。 下面给出流程图,以便更加清晰地理解。
join()方法,当其与 threading 或者 multiprocessing 库时,它和 str.join()方法无关的——它实际上并没有把什么拼接在一起。然而,它仅仅意味着 “等待这个[thread/process]去完成” 。
使用 join 这个名字是因为 multiprocessing 模块想和 threading 模块看起来比较像,而且 后者针对 Theread 使用 join 。
使用术语 join 去表达 “等待一个线程去完成” 在许多编程语言中通用,所以 Python 也采用了。
Remember also that non-daemonic processes will be automatically be joined.
import multiprocessing
import time
def sleepy_man():
print('Starting to sleep')
time.sleep(1)
print('Done sleeping')
tic = time.time()
p1 = multiprocessing.Process(target= sleepy_man)
p2 = multiprocessing.Process(target= sleepy_man)
p1.start()
p2.start()
p1.join()
p2.join()
toc = time.time()
print('Done in {:.4f} seconds'.format(toc-tic))
现在,代码块的其余部分只会在多处理任务完成后执行。输出如下所示。
Starting to sleep
Starting to sleep
Done sleepingDone sleeping
Done in 1.1837 seconds
流程图如下所示。
由于两个睡眠函数是并行执行的,因此该函数加起来大约需要 1 秒。
我们可以定义任意数量的多处理实例。 看看下面的代码。 它使用 for 循环定义了 10 个不同的多处理实例。
import multiprocessing
import time
def sleepy_man():
print('Starting to sleep')
time.sleep(1)
print('Done sleeping')
tic = time.time()
process_list = []
for i in range(10):
p = multiprocessing.Process(target= sleepy_man)
p.start()
process_list.append(p)
for process in process_list:
process.join()
toc = time.time()
print('Done in {:.4f} seconds'.format(toc-tic))
输出如下所示:
Starting to sleep
Starting to sleep
Starting to sleep
Starting to sleep
Starting to sleep
Starting to sleep
Starting to sleep
Starting to sleep
Starting to sleep
Starting to sleep
Done sleeping
Done sleeping
Done sleeping
Done sleeping
Done sleeping
Done sleeping
Done sleeping
Done sleeping
Done sleeping
Done sleeping
Done in 1.4964 seconds
这里十个函数的执行是并行处理的,因此整个程序大概只需要一秒钟。 现在我的机器没有 10 个处理器。 当我们定义的进程多于我们的机器时,multiprocessing 库具有调度作业的逻辑,但是在最后的例子在本人 windows 电脑上跑的时候并没有体现出来,留个疑惑。
我们还可以使用 args 将参数传递给 Process 函数。
import multiprocessing
import time
def sleepy_man(sec):
print('Starting to sleep')
time.sleep(sec)
print('Done sleeping')
tic = time.time()
process_list = []
for i in range(10):
p = multiprocessing.Process(target= sleepy_man, args = [2])
p.start()
process_list.append(p)
for process in process_list:
process.join()
toc = time.time()
print('Done in {:.4f} seconds'.format(toc-tic))
输出如下:
Starting to sleep
Starting to sleep
Starting to sleep
Starting to sleep
Starting to sleep
Starting to sleep
Starting to sleep
Starting to sleep
Starting to sleep
Starting to sleep
Done sleeping
Done sleeping
Done sleeping
Done sleeping
Done sleepingDone sleeping
Done sleeping
Done sleeping
Done sleeping
Done sleeping
Done in 2.4574 seconds
由于我们传递了一个参数 2, sleepy_man 函数休眠了 2 秒而不是 1 秒。
在最后一个代码片段中,我们使用 for 循环执行了 10 个不同的进程。 我们也可以使用 Pool 类来做同样的事情。
import multiprocessing
import time
def sleepy_man(sec):
print('Starting to sleep for {} seconds'.format(sec))
time.sleep(sec)
print('Done sleeping for {} seconds'.format(sec))
tic = time.time()
pool = multiprocessing.Pool(5)
pool.map(sleepy_man, range(1,11))
pool.close()
toc = time.time()
print('Done in {:.4f} seconds'.format(toc-tic))
multiprocessing.Pool(5) 定义了 workers 的数量。 这里定义为 5。
pool.map() 是触发函数执行的方法。 我们调用 pool.map(sleepy_man, range(1,11))。
在这里, sleepy_man 是函数, range(1,11)是参数(通常传递一个列表)。
输出如下
Starting to sleep for 1 seconds
Starting to sleep for 2 seconds
Starting to sleep for 3 seconds
Starting to sleep for 4 seconds
Starting to sleep for 5 seconds
Done sleeping for 1 seconds
Starting to sleep for 6 seconds
Done sleeping for 2 seconds
Starting to sleep for 7 seconds
Done sleeping for 3 seconds
Starting to sleep for 8 seconds
Done sleeping for 4 seconds
Starting to sleep for 9 seconds
Done sleeping for 5 seconds
Starting to sleep for 10 seconds
Done sleeping for 6 seconds
Done sleeping for 7 seconds
Done sleeping for 8 seconds
Done sleeping for 9 seconds
Done sleeping for 10 seconds
Done in 15.7404 seconds
到目前为止,我们在睡眠功能上使用了 multiprocessing。
现在让我们使用一个函数来检查一个数字是否是完美数字。
如果一个数的正除数之和等于该数本身,那么它就是一个完美数。
我们将列出小于或等于 10_000 和 100_000 的完美数字。我们将以 3 种方式实现它——
下文先分别给出在3种方式下求解小于或等于 10_000的完美数字代码和在本文作者 winodws 机器下的代码输出。
最后用表格列出求解小于或等于 10_000 和 100_000 的完美数字的求解时间,分别在 本文作者 windows 机器下和 ubantu 服务器机器下。
import time
def is_perfect(n):
sum_factors = 0
for i in range(1, n):
if (n % i == 0):
sum_factors = sum_factors + i
if (sum_factors == n):
print('{} is a Perfect number'.format(n))
tic = time.time()
for n in range(1,10_000):
is_perfect(n)
toc = time.time()
print('Done in {:.4f} seconds'.format(toc-tic))
输出如下:
6 is a Perfect number
28 is a Perfect number
496 is a Perfect number
8128 is a Perfect number
Done in 3.9316 seconds
import time
import multiprocessing
def is_perfect(n):
sum_factors = 0
for i in range(1, n):
if(n % i == 0):
sum_factors = sum_factors + i
if (sum_factors == n):
print('{} is a Perfect number'.format(n))
if __name__ == "__main__":
tic = time.time()
processes = []
for i in range(1,10_000):
p = multiprocessing.Process(target=is_perfect, args=(i,))
processes.append(p)
p.start()
for process in processes:
process.join()
toc = time.time()
print('Done in {:.4f} seconds'.format(toc-tic))
输出如下
6 is a Perfect number
28 is a Perfect number
496 is a Perfect number
8128 is a Perfect number
Done in 491.4885 seconds
import time
import multiprocessing
def is_perfect(n):
sum_factors = 0
for i in range(1, n):
if(n % i == 0):
sum_factors = sum_factors + i
if (sum_factors == n):
print('{} is a Perfect number'.format(n))
if __name__ == "__main__":
tic = time.time()
pool = multiprocessing.Pool()
pool.map(is_perfect, range(1,10_000))
pool.close()
toc = time.time()
print('Done in {:.4f} seconds'.format(toc-tic))
输出如下:
6 is a Perfect number
28 is a Perfect number
496 is a Perfect number
8128 is a Perfect number
Done in 2.1971 seconds
在ubantu 环境下,使用 pool 类的时间开销远远低于其他方法,时间开销是 时间开销第二低的 10% 左右。
在windows环境下,使用 pool 类的时间开销同样最低,时间开销是 时间开销第二低的 56% 左右。需要注意的是,在使用 process 类的情况下,时间开销竟然达到 491 多秒,大概8 分多!!!
for循环 | process 类 | pool类 | |
---|---|---|---|
ubantu | 2.3741s | 2.3650s | 0.2481s |
windows | 3.9316s | 491.4885s | 2.1971s |
由于在windows环境下,使用 process 类求解 小于等于 100_000的完美数字,本文作者等了很久都没出结果,于是就没在 windows 下对比这种情况。仅仅在 ubantu 环境下对比不同方法的时间开销。
从下面的表格中可以发现,pool 类仍然没让我们失望,时间开销是第二名的 5%不到。
for循环 | process 类 | pool类 | |
---|---|---|---|
ubantu | 242.7352s | 243.2628s | 10.8428s |