• python 进程 (概念+示例代码)


    目录

    1. 进程的概念

    2. 进程的状态

    3. 进程的基本使用

    4.进程的名称和PID

            4.1 获取主进程的名称

            4.2 获取子进程的名称

            4.3 设置子进程的名称

            4.4 获取进程PID (process id)

            4.5 获取子进程的父id

            4.6 杀掉进程

    5. 进程参数、全局变量问题

            5.1 进程的参数传递

            5.2  进程间不共享全局变量

    6 守护主进程

            6.1没有守护主进程

            6.2 守护主进程

    7. 消息队列的基本操作

            7.1 Queue介绍

            7.2 创建Queue

            7.3 向Queue中放值

            7.4 从Queue中取值

    8. 消息队列的常见判断

            8.1 判断队列是否已满

            8.2 判断队列的消息数量

            8.3 判断队列是否为空

    9. Queue实现进程间数据共享

    10. 进程池

            10.1 进程池概述

            10.2 multiprocessing.Pool常用函数

            10.3 进程池中的Queue

    11. 案例:进程池实现文件夹拷贝器

    12.案例:多进程写入同一个txt


    1. 进程的概念

            进程是资源分配的最小单位,也是线程的容器,线程(python 线程 (概念+示例代码))是CPU调度的基本单位,一个进程包括多个线程。

    程序:例如xxx.py是一个程序

    进程:一个程序运行起来后,代码+用到的资源称为进程,它是系统分配资源的基本单位。

    2. 进程的状态

            在计算机工作中,其任务数往往大于CPU核数,即一定有一些任务正在执行,而另外一些任务在等待CPU执行,因此进程有了不同的状态。

            就绪态:运行的条件都已经满足,正在等待CPU执行

            执行态:CPU正在执行其功能

            等待态:等待某些条件满足,例如一个程序sleep了,此时处于等待态

    3. 进程的基本使用

            multiprocessing模块时夸平台版本的多进程模块,提供了一个Process类来代表一个进程对象,这个对象可以理解为是一个独立的进程,可以执行另外的事情。

    子进程的创建

    1. import time,multiprocessing
    2. def work1():
    3. for i in range(5):
    4. print("正在运行 work1...")
    5. time.sleep(0.5)
    6. if __name__ == '__main__':
    7. process_obj = multiprocessing.Process(target=work1) # 创建子进程对象
    8. process_obj.start() # 启动进程
    9. print("主进程同时在运行...")

    结果:

    子进程的语法结构、常用方法和常用属性:

     4.进程的名称和PID

            4.1 获取主进程的名称

    1. if __name__ == '__main__':
    2. print(multiprocessing.current_process())

    结果:

             4.2 获取子进程的名称

    1. import time,multiprocessing
    2. def work1():
    3. print(multiprocessing.current_process()) # 获取子进程名称
    4. time.sleep(1)
    5. if __name__ == '__main__':
    6. process_obj = multiprocessing.Process(target=work1) # 创建子进程对象
    7. process_obj.start() # 启动进程

    结果: 

             4.3 设置子进程的名称

    1. import time,multiprocessing
    2. def work1():
    3. print(multiprocessing.current_process()) # 获取子进程名称
    4. time.sleep(1)
    5. if __name__ == '__main__':
    6. process_obj = multiprocessing.Process(target=work1, name="MyProcess") # 创建子进程对象
    7. process_obj.start() # 启动进程

    结果:

             4.4 获取进程PID (process id)

      方式一,通过multiprocessing模块获取

             获取主进程pid:

    1. if __name__ == '__main__':
    2. print(multiprocessing.current_process().pid) # 获取主进程pid

            获取子进程pid:

    1. import time,multiprocessing
    2. def work1():
    3. print(multiprocessing.current_process().pid) # 获取子进程pid
    4. time.sleep(1)
    5. if __name__ == '__main__':
    6. process_obj = multiprocessing.Process(target=work1, name="MyProcess") # 创建子进程对象
    7. process_obj.start() # 启动进程

    方式二,通过os模块获取

    1. import os
    2. if __name__ == '__main__':
    3. print(os.getpid()) # 获取主进程pid

            4.5 获取子进程的父id

    1. import time,multiprocessing,os
    2. def work1():
    3. print("该子进程的父id是:%s" % str(os.getppid())) # 获取子进程父id
    4. time.sleep(1)
    5. if __name__ == '__main__':
    6. process_obj = multiprocessing.Process(target=work1, name="MyProcess") # 创建子进程对象
    7. process_obj.start() # 启动进程
    8. print(os.getpid())

    结果:

             4.6 杀掉进程

    1. import time,multiprocessing,os
    2. def work1():
    3. for i in range(10):
    4. print("正在运行work1...",i,"子进程pid:",multiprocessing.current_process().pid)
    5. time.sleep(1)
    6. if __name__ == '__main__':
    7. process_obj = multiprocessing.Process(target=work1, name="MyProcess") # 创建子进程对象
    8. process_obj.start() # 启动进程
    9. print("主进程pid:", multiprocessing.current_process().pid)
    10. time.sleep(2)
    11. os.popen("taskkill /f /t /im " + str(multiprocessing.current_process().pid)) # 杀死主进程

    结果:

    也可以采用terminate()方法来中止子进程执行:

    1. import time,multiprocessing
    2. def work1():
    3. for i in range(5):
    4. print("正在运行work1...")
    5. time.sleep(0.5)
    6. if __name__ == '__main__':
    7. process_obj1 = multiprocessing.Process(target=work1)
    8. process_obj1.start()
    9. time.sleep(1)
    10. process_obj1.terminate() # 关闭子进程
    11. exit() # 关闭主进程
    12. print("123456")

     结果:

    5. 进程参数、全局变量问题

            5.1 进程的参数传递

    方式一、使用 args 传递元组:

    1. import time,multiprocessing
    2. def work1(a, b, c):
    3. for i in range(5):
    4. print("a=%d, b=%d, c=%d" % (a,b,c))
    5. time.sleep(1)
    6. if __name__ == '__main__':
    7. process_obj = multiprocessing.Process(target=work1, args=(10,20,30)) # 创建子进程对象
    8. process_obj.start() # 启动进程

    方式二、使用 kwargs 传递字典:

    1. import time,multiprocessing
    2. def work1(a, b, c):
    3. for i in range(5):
    4. print("a=%d, b=%d, c=%d" % (a,b,c))
    5. time.sleep(1)
    6. if __name__ == '__main__':
    7. process_obj = multiprocessing.Process(target=work1, kwargs={"c":30,"a":10,"b":20}) # 创建子进程对象
    8. process_obj.start() # 启动进程

    方式三、混合使用 args 和 kwargs:

    1. import time,multiprocessing
    2. def work1(a, b, c):
    3. for i in range(5):
    4. print("a=%d, b=%d, c=%d" % (a,b,c))
    5. time.sleep(1)
    6. if __name__ == '__main__':
    7. process_obj = multiprocessing.Process(target=work1, args=(10,),kwargs={"c":30, "b":20}) # 创建子进程对象
    8. process_obj.start() # 启动进程

    三种方式的结果均如下图所示

            5.2  进程间不共享全局变量

    1. import time,multiprocessing
    2. g_num = 10
    3. def work1():
    4. global g_num
    5. for i in range(5):
    6. g_num += 1
    7. print("---work1---",g_num)
    8. def work2():
    9. time.sleep(2)
    10. print("---work2---",g_num)
    11. if __name__ == '__main__':
    12. process_obj1 = multiprocessing.Process(target=work1)
    13. process_obj2 = multiprocessing.Process(target=work2)
    14. process_obj1.start()
    15. process_obj2.start()
    16. time.sleep(2)
    17. print("---mian---",g_num)

    结果:

             原因是每个子进程会把主进程中的部分资源(如:变量g_num的值)分别复制到各自的子进程内,子进程内部改变的是复制的全局变量的值,不影响主进程和其它子进程的全局变量的值。

    6 守护主进程

            6.1没有守护主进程

    1. import time,multiprocessing
    2. def work1():
    3. for i in range(5):
    4. print("正在运行work1...")
    5. time.sleep(0.5)
    6. if __name__ == '__main__':
    7. process_obj1 = multiprocessing.Process(target=work1)
    8. process_obj1.start()
    9. time.sleep(1)
    10. print("结束主进程")
    11. exit()
    12. print("123456")

    结果:

     可以看出主进程结束后,子进程依然在执行。

            

            6.2 守护主进程

    1. import time,multiprocessing
    2. def work1():
    3. for i in range(5):
    4. print("正在运行work1...")
    5. time.sleep(0.5)
    6. if __name__ == '__main__':
    7. process_obj1 = multiprocessing.Process(target=work1)
    8. process_obj1.daemon = True # 设置子进程守护主进程
    9. process_obj1.start()
    10. time.sleep(1)
    11. print("结束主进程")
    12. exit()
    13. print("123456")

    结果:

    可以看到当主进程结束后,子进程也结束了。 

    7. 消息队列的基本操作

            7.1 Queue介绍

            可以使用multprocessing模块的Queue实现多进程之间的是数据传递

            Queue本身是一个消息队列程序

            7.2 创建Queue

    1. import multiprocessing
    2. # 创建队列
    3. queue = multiprocessing.Queue(5) # 5表示队列长度为5

            7.3 向Queue中放值

            方式一、使用put()方法

    1. import multiprocessing
    2. # 创建队列
    3. queue = multiprocessing.Queue(5) # 5表示队列长度为5
    4. # 向队列放值
    5. queue.put(1)
    6. queue.put("hello")
    7. queue.put([1,2,3])
    8. queue.put((4,5,6))
    9. queue.put({"a":10,"b":20})
    10. # queue.put(2) # 由于队列长度为5,当准备向队列放入第6个值时,队列就会处于阻塞状态,默认等待直到队列取出值后有空余位置

            方式二、使用put_nowait()方法

    1. import multiprocessing
    2. # 创建队列
    3. queue = multiprocessing.Queue(5) # 5表示队列长度为5
    4. # 向队列放值
    5. queue.put_nowait(1)
    6. queue.put_nowait("hello")
    7. queue.put_nowait([1,2,3])
    8. queue.put_nowait((4,5,6))
    9. queue.put_nowait({"a":10,"b":20})
    10. # queue.put_nowait(2) # 超出队列长度直接报错

            7.4 从Queue中取值

            方式一 、使用get()方法

    1. import multiprocessing
    2. # 创建队列
    3. queue = multiprocessing.Queue(5) # 5表示队列长度为5
    4. # 向队列放值
    5. queue.put_nowait(1)
    6. queue.put_nowait("hello")
    7. queue.put_nowait([1,2,3])
    8. queue.put_nowait((4,5,6))
    9. queue.put_nowait({"a":10,"b":20})
    10. # 取值
    11. for i in range(6):
    12. value = queue.get()
    13. print(i,value)

            当取第6个值时,由于队列已经空了,此时队列会处于阻塞状态,直到有新的值进入队列

            方式二、使用get_nowait()方法

    1. import multiprocessing
    2. # 创建队列
    3. queue = multiprocessing.Queue(5) # 5表示队列长度为5
    4. # 向队列放值
    5. queue.put_nowait(1)
    6. queue.put_nowait("hello")
    7. queue.put_nowait([1,2,3])
    8. queue.put_nowait((4,5,6))
    9. queue.put_nowait({"a":10,"b":20})
    10. # 取值
    11. for i in range(6):
    12. value = queue.get_nowait()
    13. print(i,value)

            当队列已经空后,再取值会报错

    8. 消息队列的常见判断

            8.1 判断队列是否已满

    1. import multiprocessing
    2. # 创建队列
    3. queue = multiprocessing.Queue(3) # 5表示队列长度为5
    4. # 向队列放值
    5. queue.put_nowait(1)
    6. queue.put_nowait("hello")
    7. queue.put_nowait([1,2,3])
    8. # 判断队列是否已满
    9. print(queue.full())

    结果:

             8.2 判断队列的消息数量

    print(queue.qsize())

            8.3 判断队列是否为空

    print(queue.empty())

    有一定的概率会打印相反的结果,因此在调用empty()方法前,通常可以sleep 0.00001秒

    或使用

    if queue.qsize() == 0:

    来判断队列是都为空

    9. Queue实现进程间数据共享

            在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据,看看子进程read_queue能否读取到子进程write_queue写入到队列中的数据。

    1. import multiprocessing,time
    2. def write_queue(queue):
    3. """写入数据到队列"""
    4. for i in range(10):
    5. if queue.full():
    6. print("队列已满")
    7. break
    8. else:
    9. queue.put(i)
    10. print("已经写入:%d" % i)
    11. time.sleep(0.5)
    12. def read_queue(queue):
    13. """读取队列数据并显示"""
    14. while True:
    15. if queue.qsize() == 0:
    16. print("队列已空")
    17. break
    18. else:
    19. value = queue.get()
    20. print("获取数据:%d" % value)
    21. if __name__ == '__main__':
    22. # 创建空队列
    23. queue = multiprocessing.Queue(5)
    24. # 创建两个子进程
    25. write_q = multiprocessing.Process(target=write_queue, args=(queue,))
    26. read_q = multiprocessing.Process(target=read_queue, args=(queue,))
    27. write_q.start()
    28. # 让写入队列的子进程先执行
    29. write_q.join()
    30. read_q.start()

    结果:

    10. 进程池

            10.1 进程池概述

            当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态生成多个进程,但如果是成百上千个进程,用手动方式创建就十分麻烦,此时就可以用到multiprocessing模块提供的Pool方法。

            初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求,但如果池中的进程数已经达到最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务。

            10.2 multiprocessing.Pool常用函数

            1.apply()

            进程池中进程以同步方式执行任务

    1. import multiprocessing,time
    2. def copy_work(a,b):
    3. """用于模拟文件拷贝的函数"""
    4. print("正在拷贝文件...",multiprocessing.current_process(),a,b)
    5. time.sleep(0.5)
    6. if __name__ == '__main__':
    7. # 创建进程池
    8. pool = multiprocessing.Pool(3) # 最大允许创建3个进程
    9. for i in range(10):
    10. # 让进程池以同步方式执行copy_work
    11. pool.apply(copy_work,(10,20))

    结果:

            2.apply_async()

            进程池中进程以异步方式执行任务

            如果使用apply_async方式,需要做以下两点:

            (1)pool.close() 表示不再接收新的任务

            (2)pool.join() 让主进程等待进程池执行结束后再退出

    1. import multiprocessing,time
    2. def copy_work(a,b):
    3. """用于模拟文件拷贝的函数"""
    4. print("正在拷贝文件...",multiprocessing.current_process(),a,b)
    5. time.sleep(0.5)
    6. if __name__ == '__main__':
    7. # 创建进程池
    8. pool = multiprocessing.Pool(3) # 最大允许创建3个进程
    9. for i in range(10):
    10. # 让进程池以同步方式执行copy_work
    11. pool.apply_async(copy_work,(10,20))
    12. pool.close()
    13. pool.join()

            10.3 进程池中的Queue

            专门用于进程池中的进程间的数据共享

            (1)同步方式

    1. import multiprocessing,time
    2. def write_queue(queue):
    3. """写入数据到队列"""
    4. for i in range(10):
    5. if queue.full():
    6. print("队列已满")
    7. break
    8. else:
    9. queue.put(i)
    10. print("已经写入:%d" % i)
    11. time.sleep(0.5)
    12. def read_queue(queue):
    13. """读取队列数据并显示"""
    14. while True:
    15. if queue.qsize() == 0:
    16. print("队列已空")
    17. break
    18. else:
    19. value = queue.get()
    20. print("获取数据:%d" % value)
    21. if __name__ == '__main__':
    22. # 创建进程池
    23. pool = multiprocessing.Pool(2)
    24. # 创建进程池中的队列
    25. queue = multiprocessing.Manager().Queue(5)
    26. # 使用进程池执行任务(同步方式)
    27. pool.apply(write_queue, (queue,))
    28. pool.apply(read_queue, (queue,))

    结果:

            (2)异步方式 

    1. import multiprocessing,time
    2. def write_queue(queue):
    3. """写入数据到队列"""
    4. for i in range(10):
    5. if queue.full():
    6. print("队列已满")
    7. break
    8. else:
    9. queue.put(i)
    10. print("已经写入:%d" % i)
    11. time.sleep(0.5)
    12. def read_queue(queue):
    13. """读取队列数据并显示"""
    14. while True:
    15. if queue.qsize() == 0:
    16. print("队列已空")
    17. break
    18. else:
    19. value = queue.get()
    20. print("获取数据:%d" % value)
    21. if __name__ == '__main__':
    22. # 创建进程池
    23. pool = multiprocessing.Pool(2)
    24. # 创建进程池中的队列
    25. queue = multiprocessing.Manager().Queue(5)
    26. # 使用进程池执行任务(异步方式)
    27. pool.apply_async(write_queue, (queue,))
    28. pool.apply_async(read_queue, (queue,))
    29. pool.close() # 表示不再接收新的任务
    30. pool.join() # 主进程会等待进程池执行结束后再退出

    结果:

    11. 案例:进程池实现文件夹拷贝器

    目标:使用进程池实现文件夹整体拷贝到另外一个目录

     思路:

    1. import multiprocessing,time,os
    2. def copy_file(source_dir, target_dir, file):
    3. print(multiprocessing.current_process())
    4. source_path = source_dir + "/" + file
    5. target_path = target_dir + "/" + file
    6. # print("%s --> %s" % (source_path, target_path))
    7. # 读取源文件内容
    8. with open(source_path,"rb") as source_file:
    9. # 创建目标文件
    10. with open(target_path, "wb") as target_file:
    11. while True:
    12. # 读源文件保存到目标文件
    13. source_file_data = source_file.read(1024) # 每次读1024个字节
    14. if source_file_data: # 判断是否完成读取源文件
    15. target_file.write(source_file_data)
    16. else:
    17. break
    18. if __name__ == '__main__':
    19. source_dir = "C:/Users/DOUH/Desktop/pythonCode" # 源文件路径
    20. target_dir = "C:/Users/DOUH/Desktop/test" # 目标文件路径
    21. # 在指定位置创建test文件夹
    22. try:
    23. os.mkdir(target_dir)
    24. except:
    25. pass
    26. # 获取源文件夹中的所有的文件
    27. file_list = os.listdir(source_dir)
    28. # 创建进程池
    29. pool = multiprocessing.Pool(3)
    30. for file in file_list:
    31. # 拷贝文件
    32. pool.apply_async(copy_file, (source_dir, target_dir, file))
    33. pool.close()
    34. pool.join()

     结果:

    12. 案例:使用多进程向同一文件写入数据

    1. import multiprocessing
    2. # 设置回调函数
    3. def setcallback(x):
    4. with open('result.txt', 'a+') as f:
    5. line = str(x) + "\n"
    6. f.write(line)
    7. def multiplication(num):
    8. return num
    9. if __name__ == '__main__':
    10. pool = multiprocessing.Pool(6)
    11. for i in range(1000):
    12. pool.apply_async(func=multiplication, args=(i,), callback=setcallback)
    13. pool.close()
    14. pool.join()

    了解 python 线程请参考:

    python 线程 (概念+示例代码)

  • 相关阅读:
    OpenCV入门(C++/Python)- 使用OpenCV标注图像(六)
    采集侠-免费采集侠-免费采集侠插件
    力扣 -- 132. 分割回文串 II
    MPC入门
    Flask 用户登录,表单提交
    【MyBatis源码分析】四、XML解析与核心对象的构建
    linux systemd start stop enable disable命令区别
    【LeetCode:86. 分隔链表 | 链表】
    Mybatis-Plus从入门到入土
    RabbitMQ 知识点解读
  • 原文地址:https://blog.csdn.net/ChaoChao66666/article/details/126829595