目录
12.案例:多进程写入同一个txt
进程是资源分配的最小单位,也是线程的容器,线程(python 线程 (概念+示例代码))是CPU调度的基本单位,一个进程包括多个线程。
程序:例如xxx.py是一个程序
进程:一个程序运行起来后,代码+用到的资源称为进程,它是系统分配资源的基本单位。
在计算机工作中,其任务数往往大于CPU核数,即一定有一些任务正在执行,而另外一些任务在等待CPU执行,因此进程有了不同的状态。
就绪态:运行的条件都已经满足,正在等待CPU执行
执行态:CPU正在执行其功能
等待态:等待某些条件满足,例如一个程序sleep了,此时处于等待态
multiprocessing模块时夸平台版本的多进程模块,提供了一个Process类来代表一个进程对象,这个对象可以理解为是一个独立的进程,可以执行另外的事情。
子进程的创建:
- import time,multiprocessing
-
- def work1():
- for i in range(5):
- print("正在运行 work1...")
- time.sleep(0.5)
-
- if __name__ == '__main__':
- process_obj = multiprocessing.Process(target=work1) # 创建子进程对象
- process_obj.start() # 启动进程
-
- print("主进程同时在运行...")
结果:
子进程的语法结构、常用方法和常用属性:
- if __name__ == '__main__':
- print(multiprocessing.current_process())
结果:
- import time,multiprocessing
-
- def work1():
- print(multiprocessing.current_process()) # 获取子进程名称
- time.sleep(1)
-
- if __name__ == '__main__':
- process_obj = multiprocessing.Process(target=work1) # 创建子进程对象
- process_obj.start() # 启动进程
结果:
- import time,multiprocessing
-
- def work1():
- print(multiprocessing.current_process()) # 获取子进程名称
- time.sleep(1)
-
- if __name__ == '__main__':
- process_obj = multiprocessing.Process(target=work1, name="MyProcess") # 创建子进程对象
- process_obj.start() # 启动进程
结果:
方式一,通过multiprocessing模块获取
获取主进程pid:
- if __name__ == '__main__':
- print(multiprocessing.current_process().pid) # 获取主进程pid
获取子进程pid:
- import time,multiprocessing
-
- def work1():
- print(multiprocessing.current_process().pid) # 获取子进程pid
- time.sleep(1)
-
- if __name__ == '__main__':
- process_obj = multiprocessing.Process(target=work1, name="MyProcess") # 创建子进程对象
- process_obj.start() # 启动进程
方式二,通过os模块获取
- import os
-
-
- if __name__ == '__main__':
- print(os.getpid()) # 获取主进程pid
- import time,multiprocessing,os
-
- def work1():
- print("该子进程的父id是:%s" % str(os.getppid())) # 获取子进程父id
- time.sleep(1)
-
- if __name__ == '__main__':
- process_obj = multiprocessing.Process(target=work1, name="MyProcess") # 创建子进程对象
- process_obj.start() # 启动进程
- print(os.getpid())
结果:
- import time,multiprocessing,os
-
- def work1():
- for i in range(10):
- print("正在运行work1...",i,"子进程pid:",multiprocessing.current_process().pid)
- time.sleep(1)
-
- if __name__ == '__main__':
- process_obj = multiprocessing.Process(target=work1, name="MyProcess") # 创建子进程对象
- process_obj.start() # 启动进程
- print("主进程pid:", multiprocessing.current_process().pid)
-
- time.sleep(2)
- os.popen("taskkill /f /t /im " + str(multiprocessing.current_process().pid)) # 杀死主进程
结果:
也可以采用terminate()方法来中止子进程执行:
- import time,multiprocessing
-
-
- def work1():
- for i in range(5):
- print("正在运行work1...")
- time.sleep(0.5)
-
- if __name__ == '__main__':
- process_obj1 = multiprocessing.Process(target=work1)
- process_obj1.start()
-
- time.sleep(1)
- process_obj1.terminate() # 关闭子进程
- exit() # 关闭主进程
-
- print("123456")
结果:
方式一、使用 args 传递元组:
- import time,multiprocessing
-
- def work1(a, b, c):
- for i in range(5):
- print("a=%d, b=%d, c=%d" % (a,b,c))
- time.sleep(1)
-
- if __name__ == '__main__':
- process_obj = multiprocessing.Process(target=work1, args=(10,20,30)) # 创建子进程对象
- process_obj.start() # 启动进程
方式二、使用 kwargs 传递字典:
- import time,multiprocessing
-
- def work1(a, b, c):
- for i in range(5):
- print("a=%d, b=%d, c=%d" % (a,b,c))
- time.sleep(1)
-
- if __name__ == '__main__':
- process_obj = multiprocessing.Process(target=work1, kwargs={"c":30,"a":10,"b":20}) # 创建子进程对象
- process_obj.start() # 启动进程
方式三、混合使用 args 和 kwargs:
- import time,multiprocessing
-
- def work1(a, b, c):
- for i in range(5):
- print("a=%d, b=%d, c=%d" % (a,b,c))
- time.sleep(1)
-
- if __name__ == '__main__':
- process_obj = multiprocessing.Process(target=work1, args=(10,),kwargs={"c":30, "b":20}) # 创建子进程对象
- process_obj.start() # 启动进程
三种方式的结果均如下图所示
- import time,multiprocessing
-
- g_num = 10
-
- def work1():
- global g_num
- for i in range(5):
- g_num += 1
- print("---work1---",g_num)
-
- def work2():
- time.sleep(2)
- print("---work2---",g_num)
-
- if __name__ == '__main__':
- process_obj1 = multiprocessing.Process(target=work1)
- process_obj2 = multiprocessing.Process(target=work2)
-
- process_obj1.start()
- process_obj2.start()
-
- time.sleep(2)
- print("---mian---",g_num)
结果:
原因是每个子进程会把主进程中的部分资源(如:变量g_num的值)分别复制到各自的子进程内,子进程内部改变的是复制的全局变量的值,不影响主进程和其它子进程的全局变量的值。
- import time,multiprocessing
-
-
- def work1():
- for i in range(5):
- print("正在运行work1...")
- time.sleep(0.5)
-
- if __name__ == '__main__':
- process_obj1 = multiprocessing.Process(target=work1)
- process_obj1.start()
-
- time.sleep(1)
- print("结束主进程")
- exit()
-
- print("123456")
结果:
可以看出主进程结束后,子进程依然在执行。
- import time,multiprocessing
-
-
- def work1():
- for i in range(5):
- print("正在运行work1...")
- time.sleep(0.5)
-
- if __name__ == '__main__':
- process_obj1 = multiprocessing.Process(target=work1)
- process_obj1.daemon = True # 设置子进程守护主进程
- process_obj1.start()
-
- time.sleep(1)
- print("结束主进程")
- exit()
-
- print("123456")
结果:
可以看到当主进程结束后,子进程也结束了。
可以使用multprocessing模块的Queue实现多进程之间的是数据传递
Queue本身是一个消息队列程序
- import multiprocessing
-
- # 创建队列
- queue = multiprocessing.Queue(5) # 5表示队列长度为5
方式一、使用put()方法
- import multiprocessing
-
- # 创建队列
- queue = multiprocessing.Queue(5) # 5表示队列长度为5
-
- # 向队列放值
- queue.put(1)
- queue.put("hello")
- queue.put([1,2,3])
- queue.put((4,5,6))
- queue.put({"a":10,"b":20})
- # queue.put(2) # 由于队列长度为5,当准备向队列放入第6个值时,队列就会处于阻塞状态,默认等待直到队列取出值后有空余位置
方式二、使用put_nowait()方法
- import multiprocessing
-
- # 创建队列
- queue = multiprocessing.Queue(5) # 5表示队列长度为5
-
- # 向队列放值
- queue.put_nowait(1)
- queue.put_nowait("hello")
- queue.put_nowait([1,2,3])
- queue.put_nowait((4,5,6))
- queue.put_nowait({"a":10,"b":20})
- # queue.put_nowait(2) # 超出队列长度直接报错
方式一 、使用get()方法
- import multiprocessing
-
- # 创建队列
- queue = multiprocessing.Queue(5) # 5表示队列长度为5
-
- # 向队列放值
- queue.put_nowait(1)
- queue.put_nowait("hello")
- queue.put_nowait([1,2,3])
- queue.put_nowait((4,5,6))
- queue.put_nowait({"a":10,"b":20})
-
- # 取值
- for i in range(6):
- value = queue.get()
- print(i,value)
当取第6个值时,由于队列已经空了,此时队列会处于阻塞状态,直到有新的值进入队列
方式二、使用get_nowait()方法
- import multiprocessing
-
- # 创建队列
- queue = multiprocessing.Queue(5) # 5表示队列长度为5
-
- # 向队列放值
- queue.put_nowait(1)
- queue.put_nowait("hello")
- queue.put_nowait([1,2,3])
- queue.put_nowait((4,5,6))
- queue.put_nowait({"a":10,"b":20})
-
- # 取值
- for i in range(6):
- value = queue.get_nowait()
- print(i,value)
当队列已经空后,再取值会报错
- import multiprocessing
-
- # 创建队列
- queue = multiprocessing.Queue(3) # 5表示队列长度为5
-
- # 向队列放值
- queue.put_nowait(1)
- queue.put_nowait("hello")
- queue.put_nowait([1,2,3])
-
- # 判断队列是否已满
- print(queue.full())
结果:
print(queue.qsize())
print(queue.empty())
有一定的概率会打印相反的结果,因此在调用empty()方法前,通常可以sleep 0.00001秒
或使用
if queue.qsize() == 0:
来判断队列是都为空
在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据,看看子进程read_queue能否读取到子进程write_queue写入到队列中的数据。
- import multiprocessing,time
-
- def write_queue(queue):
- """写入数据到队列"""
- for i in range(10):
- if queue.full():
- print("队列已满")
- break
- else:
- queue.put(i)
- print("已经写入:%d" % i)
- time.sleep(0.5)
-
- def read_queue(queue):
- """读取队列数据并显示"""
- while True:
- if queue.qsize() == 0:
- print("队列已空")
- break
- else:
- value = queue.get()
- print("获取数据:%d" % value)
-
-
- if __name__ == '__main__':
- # 创建空队列
- queue = multiprocessing.Queue(5)
-
- # 创建两个子进程
- write_q = multiprocessing.Process(target=write_queue, args=(queue,))
- read_q = multiprocessing.Process(target=read_queue, args=(queue,))
-
- write_q.start()
-
- # 让写入队列的子进程先执行
- write_q.join()
-
- read_q.start()
结果:
当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态生成多个进程,但如果是成百上千个进程,用手动方式创建就十分麻烦,此时就可以用到multiprocessing模块提供的Pool方法。
初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求,但如果池中的进程数已经达到最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务。
1.apply()
进程池中进程以同步方式执行任务
- import multiprocessing,time
-
- def copy_work(a,b):
- """用于模拟文件拷贝的函数"""
- print("正在拷贝文件...",multiprocessing.current_process(),a,b)
- time.sleep(0.5)
-
- if __name__ == '__main__':
- # 创建进程池
- pool = multiprocessing.Pool(3) # 最大允许创建3个进程
- for i in range(10):
- # 让进程池以同步方式执行copy_work
- pool.apply(copy_work,(10,20))
结果:
2.apply_async()
进程池中进程以异步方式执行任务
如果使用apply_async方式,需要做以下两点:
(1)pool.close() 表示不再接收新的任务
(2)pool.join() 让主进程等待进程池执行结束后再退出
- import multiprocessing,time
-
- def copy_work(a,b):
- """用于模拟文件拷贝的函数"""
- print("正在拷贝文件...",multiprocessing.current_process(),a,b)
- time.sleep(0.5)
-
- if __name__ == '__main__':
- # 创建进程池
- pool = multiprocessing.Pool(3) # 最大允许创建3个进程
- for i in range(10):
- # 让进程池以同步方式执行copy_work
- pool.apply_async(copy_work,(10,20))
- pool.close()
- pool.join()
专门用于进程池中的进程间的数据共享
(1)同步方式
- import multiprocessing,time
-
- def write_queue(queue):
- """写入数据到队列"""
- for i in range(10):
- if queue.full():
- print("队列已满")
- break
- else:
- queue.put(i)
- print("已经写入:%d" % i)
- time.sleep(0.5)
-
- def read_queue(queue):
- """读取队列数据并显示"""
- while True:
- if queue.qsize() == 0:
- print("队列已空")
- break
- else:
- value = queue.get()
- print("获取数据:%d" % value)
-
-
- if __name__ == '__main__':
- # 创建进程池
- pool = multiprocessing.Pool(2)
-
- # 创建进程池中的队列
- queue = multiprocessing.Manager().Queue(5)
-
- # 使用进程池执行任务(同步方式)
- pool.apply(write_queue, (queue,))
- pool.apply(read_queue, (queue,))
结果:
(2)异步方式
- import multiprocessing,time
-
- def write_queue(queue):
- """写入数据到队列"""
- for i in range(10):
- if queue.full():
- print("队列已满")
- break
- else:
- queue.put(i)
- print("已经写入:%d" % i)
- time.sleep(0.5)
-
- def read_queue(queue):
- """读取队列数据并显示"""
- while True:
- if queue.qsize() == 0:
- print("队列已空")
- break
- else:
- value = queue.get()
- print("获取数据:%d" % value)
-
-
- if __name__ == '__main__':
- # 创建进程池
- pool = multiprocessing.Pool(2)
-
- # 创建进程池中的队列
- queue = multiprocessing.Manager().Queue(5)
-
- # 使用进程池执行任务(异步方式)
- pool.apply_async(write_queue, (queue,))
- pool.apply_async(read_queue, (queue,))
-
- pool.close() # 表示不再接收新的任务
- pool.join() # 主进程会等待进程池执行结束后再退出
结果:
目标:使用进程池实现文件夹整体拷贝到另外一个目录
思路:
- import multiprocessing,time,os
-
- def copy_file(source_dir, target_dir, file):
- print(multiprocessing.current_process())
- source_path = source_dir + "/" + file
- target_path = target_dir + "/" + file
- # print("%s --> %s" % (source_path, target_path))
-
- # 读取源文件内容
- with open(source_path,"rb") as source_file:
-
- # 创建目标文件
- with open(target_path, "wb") as target_file:
- while True:
- # 读源文件保存到目标文件
- source_file_data = source_file.read(1024) # 每次读1024个字节
- if source_file_data: # 判断是否完成读取源文件
- target_file.write(source_file_data)
- else:
- break
-
-
- if __name__ == '__main__':
- source_dir = "C:/Users/DOUH/Desktop/pythonCode" # 源文件路径
- target_dir = "C:/Users/DOUH/Desktop/test" # 目标文件路径
-
- # 在指定位置创建test文件夹
- try:
- os.mkdir(target_dir)
- except:
- pass
-
- # 获取源文件夹中的所有的文件
- file_list = os.listdir(source_dir)
-
- # 创建进程池
- pool = multiprocessing.Pool(3)
-
-
- for file in file_list:
- # 拷贝文件
- pool.apply_async(copy_file, (source_dir, target_dir, file))
-
- pool.close()
- pool.join()
结果:
- import multiprocessing
-
- # 设置回调函数
- def setcallback(x):
- with open('result.txt', 'a+') as f:
- line = str(x) + "\n"
- f.write(line)
-
- def multiplication(num):
- return num
-
- if __name__ == '__main__':
- pool = multiprocessing.Pool(6)
- for i in range(1000):
- pool.apply_async(func=multiplication, args=(i,), callback=setcallback)
- pool.close()
- pool.join()
了解 python 线程请参考: