• python进程与线程


    1. 前言        

            最近总结了python中如何创建并使用多进程和多线程的主要方法,所有代码都自测成功,用于以后编写代码时复制修改。

    2. 各种代码

    2.1 使用multiprocessing模块创建进程

            演示process类的方法和属性的使用,创建2个字进程,分别使用os模块和time模块输出父进程和子进程的ID以及子进程的时间,并调用process类的name和pid属性,代码如下:

    1. # -*- coding:utf-8 -*-
    2. from multiprocessing import Process
    3. import time
    4. import os
    5. # 两个子进程将会调用的两个方法
    6. def child_1(interval):
    7. print("子进程(%s)开始执行,父进程为(%s)" % (os.getpid(), os.getppid()))
    8. t_start = time.time() # 计时开始
    9. time.sleep(interval) # 程序将会被挂起interval秒
    10. t_end = time.time() # 计时结束
    11. print("子进程(%s)执行时间为'%0.2f'秒" % (os.getpid(), t_end - t_start))
    12. def child_2(interval):
    13. print("子进程(%s)开始执行,父进程为(%s)" % (os.getpid(), os.getppid()))
    14. t_start = time.time() # 计时开始
    15. time.sleep(interval) # 程序将会被挂起interval秒
    16. t_end = time.time() # 计时结束
    17. print("子进程(%s)执行时间为'%0.2f'秒" % (os.getpid(), t_end - t_start))
    18. if __name__ == '__main__':
    19. print("------父进程开始执行-------")
    20. print("父进程PID:%s" % os.getpid()) # 输出当前程序的ID
    21. p1 = Process(target=child_1, name="process1", args=(1,)) # 实例化进程p1
    22. p2 = Process(target=child_2, name="process2", args=(2,)) # 实例化进程p2
    23. p1.start() # 启动进程p1
    24. p2.start() # 启动进程p2
    25. # 同时父进程仍然往下执行,如果p2进程还在执行,将会返回True
    26. print("p1.is_alive=%s" % p1.is_alive())
    27. print("p2.is_alive=%s" % p2.is_alive())
    28. # 输出p1和p2进程的别名和PID
    29. print("p1.name=%s" % p1.name)
    30. print("p1.pid=%s" % p1.pid)
    31. print("p2.name=%s" % p2.name)
    32. print("p2.pid=%s" % p2.pid)
    33. print("------等待子进程-------")
    34. p1.join() # 等待p1进程结束
    35. p2.join() # 等待p2进程结束
    36. print("------父进程执行结束-------")

    运行结果:

    1. ------父进程开始执行-------
    2. 父进程PID:616
    3. p1.is_alive=True
    4. p2.is_alive=True
    5. p1.name=process1
    6. p1.pid=12256
    7. p2.name=process2
    8. p2.pid=14976
    9. ------等待子进程-------
    10. 子进程(12256)开始执行,父进程为(616
    11. 子进程(14976)开始执行,父进程为(616
    12. 子进程(12256)执行时间为'1.00'
    13. 子进程(14976)执行时间为'2.00'
    14. ------父进程执行结束-------
    15. Process finished with exit code 0

    2.2 使用process子类创建进程

    对于简单的任务,通常使用上一节方式实现多进程,但是如果要处理复杂任务,通常定义一个类,使其继承Process类,每次实例化这个类的时候,等同于实例化一个进程对象。代码如下:

    1. # -*- coding:utf-8 -*-
    2. from multiprocessing import Process
    3. import time
    4. import os
    5. # 继承Process类
    6. class SubProcess(Process):
    7. # 由于Process类本身也有__init__初识化方法,这个子类相当于重写了父类的这个方法
    8. def __init__(self, interval, name=''):
    9. Process.__init__(self) # 调用Process父类的初始化方法
    10. self.interval = interval # 接收参数interval
    11. if name: # 判断传递的参数name是否存在
    12. self.name = name # 如果传递参数name,则为子进程创建name属性,否则使用默认属性
    13. # 重写了Process类的run()方法
    14. def run(self):
    15. print("子进程(%s) 开始执行,父进程为(%s)" % (os.getpid(), os.getppid()))
    16. t_start = time.time()
    17. time.sleep(self.interval)
    18. t_stop = time.time()
    19. print("子进程(%s)执行结束,耗时%0.2f秒" % (os.getpid(), t_stop - t_start))
    20. if __name__ == "__main__":
    21. print("------父进程开始执行-------")
    22. print("父进程PID:%s" % os.getpid()) # 输出当前程序的ID
    23. p1 = SubProcess(interval=1, name='subprocess1')
    24. p2 = SubProcess(interval=2, name='subprocess2')
    25. # 对一个不包含target属性的Process类执行start()方法,就会运行这个类中的run()方法,
    26. # 所以这里会执行p1.run()
    27. p1.start() # 启动进程p1
    28. p2.start() # 启动进程p2
    29. # 输出p1和p2进程的执行状态,如果真正进行,返回True,否则返回False
    30. print("p1.is_alive=%s" % p1.is_alive())
    31. print("p2.is_alive=%s" % p2.is_alive())
    32. # 输出p1和p2进程的别名和PID
    33. print("p1.name=%s" % p1.name)
    34. print("p1.pid=%s" % p1.pid)
    35. print("p2.name=%s" % p2.name)
    36. print("p2.pid=%s" % p2.pid)
    37. print("------等待子进程-------")
    38. p1.join() # 等待p1进程结束
    39. p2.join() # 等待p2进程结束
    40. print("------父进程执行结束-------")

    运行结果:

    1. ------父进程开始执行-------
    2. 父进程PID:8292
    3. p1.is_alive=True
    4. p2.is_alive=True
    5. p1.name=subprocess1
    6. p1.pid=7376
    7. p2.name=subprocess2
    8. p2.pid=8904
    9. ------等待子进程-------
    10. 子进程(8904) 开始执行,父进程为(8292)子进程(7376) 开始执行,父进程为(8292
    11. 子进程(7376)执行结束,耗时1.00
    12. 子进程(8904)执行结束,耗时2.02
    13. ------父进程执行结束-------
    14. Process finished with exit code 0

    2.3 使用进程池Pool创建进程

    定义一个进程池,设置最大进程数为3,然后使用非阻塞方式执行10个任务,查看每个进程执行的任务,代码如下:

    1. # -*- coding=utf-8 -*-
    2. from multiprocessing import Pool
    3. import os, time
    4. def task(name):
    5. print('子进程(%s)执行task %s ...' % (os.getpid(), name))
    6. time.sleep(1) # 休眠1秒
    7. if __name__ == '__main__':
    8. print('父进程(%s).' % os.getpid())
    9. p = Pool(3) # 定义一个进程池,最大进程数3
    10. for i in range(10): # 从0开始循环10次
    11. p.apply_async(task, args=(i,)) # 使用非阻塞方式调用task()函数
    12. print('等待所有子进程结束...')
    13. p.close() # 关闭进程池,关闭后p不再接收新的请求
    14. p.join() # 等待子进程结束
    15. print('所有子进程结束.')

    执行结果:

    1. 父进程(2992).
    2. 等待所有子进程结束...
    3. 子进程(3772)执行task 0 ...
    4. 子进程(11708)执行task 1 ...
    5. 子进程(5084)执行task 2 ...
    6. 子进程(11708)执行task 3 ...
    7. 子进程(5084)执行task 4 ...
    8. 子进程(3772)执行task 5 ...
    9. 子进程(3772)执行task 6 ...
    10. 子进程(5084)执行task 7 ...
    11. 子进程(11708)执行task 8 ...
    12. 子进程(3772)执行task 9 ...
    13. 所有子进程结束.
    14. Process finished with exit code 0

    2.4 队列的基本功能

    代码如下:

    1. # coding=utf-8
    2. from multiprocessing import Queue
    3. if __name__ == '__main__':
    4. q = Queue(3) # 初始化一个Queue对象,最多可接收三条put消息
    5. q.put("消息1")
    6. q.put("消息2")
    7. print(q.full()) # 返回False
    8. q.put("消息3")
    9. print(q.full()) # 返回True
    10. # 因为消息列队已满,下面的try都会抛出异常,
    11. # 第一个try会等待2秒后再抛出异常,第二个try会立刻抛出异常
    12. try:
    13. q.put("消息4", True, 2)
    14. except:
    15. print("消息列队已满,现有消息数量:%s" % q.qsize())
    16. try:
    17. q.put_nowait("消息4")
    18. except:
    19. print("消息列队已满,现有消息数量:%s" % q.qsize())
    20. # 读取消息时,先判断消息列队是否为空,再读取
    21. if not q.empty():
    22. print('----从队列中获取消息---')
    23. for i in range(q.qsize()):
    24. print(q.get_nowait())
    25. # 先判断消息列队是否已满,再写入
    26. if not q.full():
    27. q.put_nowait("消息4")

    执行结果:

    1. False
    2. True
    3. 消息列队已满,现有消息数量:3
    4. 消息列队已满,现有消息数量:3
    5. ----从队列中获取消息---
    6. 消息1
    7. 消息2
    8. 消息3
    9. Process finished with exit code 0

    2.5 使用队列再进程间通信

    创建两个子进程,一个子进程负责向队列中写入数据,另外一个子进程负责从队列中读取数据。代码如下:

    1. # -*- coding: utf-8 -*-
    2. from multiprocessing import Process, Queue
    3. import time
    4. # 向队列中写入数据
    5. def write_task(q):
    6. if not q.full():
    7. for i in range(5):
    8. message = "消息" + str(i)
    9. q.put(message)
    10. print("写入:%s" % message)
    11. # 从队列读取数据
    12. def read_task(q):
    13. time.sleep(1) # 休眠1秒
    14. while not q.empty():
    15. print("读取:%s" % q.get(True, 2)) # 等待2秒,如果还没读取到任何消息,
    16. # 则抛出"Queue.Empty"异常
    17. if __name__ == "__main__":
    18. print("-----父进程开始-----")
    19. q = Queue() # 父进程创建Queue,并传给各个子进程
    20. pw = Process(target=write_task, args=(q,)) # 实例化写入队列的子进程,并且传递队列
    21. pr = Process(target=read_task, args=(q,)) # 实例化读取队列的子进程,并且传递队列
    22. pw.start() # 启动子进程 pw,写入
    23. pr.start() # 启动子进程 pr,读取
    24. pw.join() # 等待 pw 结束
    25. pr.join() # 等待 pr 结束
    26. print("-----父进程结束-----")

    执行结果:

    1. -----父进程开始-----
    2. 写入:消息0
    3. 写入:消息1
    4. 写入:消息2
    5. 写入:消息3
    6. 写入:消息4
    7. 读取:消息0
    8. 读取:消息1
    9. 读取:消息2
    10. 读取:消息3
    11. 读取:消息4
    12. -----父进程结束-----
    13. Process finished with exit code 0

    2.6 使用theading模块创建线程

    使用theading模块创建线程,代码如下:

    1. # -*- coding:utf-8 -*-
    2. import threading, time
    3. def process():
    4. for i in range(3):
    5. time.sleep(1)
    6. print("thread name is %s" % threading.current_thread().name)
    7. if __name__ == '__main__':
    8. print("-----主线程开始-----")
    9. threads = [threading.Thread(target=process) for i in range(4)] # 创建4个线程,存入列表
    10. for t in threads:
    11. t.start() # 开启线程
    12. for t in threads:
    13. t.join() # 等待子线程结束
    14. print("-----主线程结束-----")

    执行结果:

    1. -----主线程开始-----
    2. thread name is Thread-2
    3. thread name is Thread-4
    4. thread name is Thread-3
    5. thread name is Thread-1
    6. thread name is Thread-3
    7. thread name is Thread-1
    8. thread name is Thread-4
    9. thread name is Thread-2
    10. thread name is Thread-2thread name is Thread-3
    11. thread name is Thread-1thread name is Thread-4
    12. -----主线程结束-----
    13. Process finished with exit code 0

    2.7 使用Thead子类创建线程

    创建一个子类SubThread,继承threading.Thread线程类,并定义一个run()方法。实例化SubThread类创建2个线程,并且调用start()方法开启线程,程序会自动调用run()方法。代码如下:

    1. # -*- coding: utf-8 -*-
    2. import threading
    3. import time
    4. class SubThread(threading.Thread):
    5. def run(self):
    6. for i in range(3):
    7. time.sleep(1)
    8. msg = "子线程" + self.name + '执行,i=' + str(i) # name属性中保存的是当前线程的名字
    9. print(msg)
    10. if __name__ == '__main__':
    11. print('-----主线程开始-----')
    12. t1 = SubThread() # 创建子线程t1
    13. t2 = SubThread() # 创建子线程t2
    14. t1.start() # 启动子线程t1
    15. t2.start() # 启动子线程t2
    16. t1.join() # 等待子线程t1
    17. t2.join() # 等待子线程t2
    18. print('-----主线程结束-----')

    执行结果:

    1. -----主线程开始-----
    2. 子线程Thread-2执行,i=0
    3. 子线程Thread-1执行,i=0
    4. 子线程Thread-1执行,i=1
    5. 子线程Thread-2执行,i=1
    6. 子线程Thread-2执行,i=2子线程Thread-1执行,i=2
    7. -----主线程结束-----
    8. Process finished with exit code 0

    2.8 线程间通信

    进程间不能共享信息,但是线程间可以通过全局变量共享信息。代码如下

    1. # -*- coding:utf-8 -*-
    2. from threading import Thread
    3. import time
    4. def plus():
    5. print('-------子线程1开始------')
    6. global g_num
    7. g_num += 50
    8. print('g_num is %d' % g_num)
    9. print('-------子线程1结束------')
    10. def minus():
    11. time.sleep(1)
    12. print('-------子线程2开始------')
    13. global g_num
    14. g_num -= 50
    15. print('g_num is %d' % g_num)
    16. print('-------子线程2结束------')
    17. g_num = 100 # 定义一个全局变量
    18. if __name__ == '__main__':
    19. print('-------主线程开始------')
    20. print('g_num is %d' % g_num)
    21. t1 = Thread(target=plus) # 实例化线程p1
    22. t2 = Thread(target=minus) # 实例化线程p2
    23. t1.start() # 开启线程p1
    24. t2.start() # 开启线程p2
    25. t1.join() # 等待p1线程结束
    26. t2.join() # 等待p2线程结束
    27. print('-------主线程结束------')

    执行结果:

    1. -------主线程开始------
    2. g_num is 100
    3. -------子线程1开始------
    4. g_num is 150
    5. -------子线程1结束------
    6. -------子线程2开始------
    7. g_num is 100
    8. -------子线程2结束------
    9. -------主线程结束------
    10. Process finished with exit code 0

    2.9 使用互斥锁

    互斥锁可以防止多个线程同事读写某一块内存区域。代码如下:

    1. from threading import Thread, Lock
    2. import time
    3. n = 100 # 共100张票
    4. def task():
    5. global n
    6. mutex.acquire() # 上锁
    7. temp = n # 赋值给临时变量
    8. time.sleep(0.1) # 休眠0.1秒
    9. n = temp - 1 # 数量减1
    10. print('购买成功,剩余%d张电影票' % n)
    11. mutex.release() # 释放锁
    12. if __name__ == '__main__':
    13. mutex = Lock() # 实例化Lock类
    14. t_l = [] # 初始化一个列表
    15. for i in range(10):
    16. t = Thread(target=task) # 实例化线程类
    17. t_l.append(t) # 将线程实例存入列表中
    18. t.start() # 创建线程
    19. for t in t_l:
    20. t.join() # 等待子线程结束

    执行结果:

    1. 购买成功,剩余99张电影票
    2. 购买成功,剩余98张电影票
    3. 购买成功,剩余97张电影票
    4. 购买成功,剩余96张电影票
    5. 购买成功,剩余95张电影票
    6. 购买成功,剩余94张电影票
    7. 购买成功,剩余93张电影票
    8. 购买成功,剩余92张电影票
    9. 购买成功,剩余91张电影票
    10. 购买成功,剩余90张电影票
    11. Process finished with exit code 0

    2.10 使用队列在线程间通信

    进程间通信可以用multiprocessing模块的Queue队列,线程间也可以使用Queue队列实现线程间通信。使用Queue在线程间通信通常应用于生产者消费者模式。代码如下:

    1. from queue import Queue
    2. import random, threading, time
    3. # 生产者类
    4. class Producer(threading.Thread):
    5. def __init__(self, name, queue):
    6. threading.Thread.__init__(self, name=name)
    7. self.data = queue
    8. def run(self):
    9. for i in range(5):
    10. print("生成者%s将产品%d加入队列!" % (self.getName(), i))
    11. self.data.put(i)
    12. time.sleep(random.random())
    13. print("生成者%s完成!" % self.getName())
    14. # 消费者类
    15. class Consumer(threading.Thread):
    16. def __init__(self, name, queue):
    17. threading.Thread.__init__(self, name=name)
    18. self.data = queue
    19. def run(self):
    20. for i in range(5):
    21. val = self.data.get()
    22. print("消费者%s将产品%d从队列中取出!" % (self.getName(), val))
    23. time.sleep(random.random())
    24. print("消费者%s完成!" % self.getName())
    25. if __name__ == '__main__':
    26. print('-----主线程开始-----')
    27. queue = Queue() # 实例化队列
    28. producer = Producer('Producer', queue) # 实例化线程Producer,并传入队列作为参数
    29. consumer = Consumer('Consumer', queue) # 实例化线程Consumer,并传入队列作为参数
    30. producer.start() # 启动线程Producer
    31. consumer.start() # 启动线程Consumer
    32. producer.join() # 等待线程Producer结束
    33. consumer.join() # 等待线程Consumer结束
    34. print('-----主线程结束-----')

    执行结果:

    1. -----主线程开始-----
    2. 生成者Producer将产品0加入队列!
    3. 消费者Consumer将产品0从队列中取出!
    4. 生成者Producer将产品1加入队列!
    5. 消费者Consumer将产品1从队列中取出!
    6. 生成者Producer将产品2加入队列!
    7. 消费者Consumer将产品2从队列中取出!
    8. 生成者Producer将产品3加入队列!
    9. 消费者Consumer将产品3从队列中取出!
    10. 生成者Producer将产品4加入队列!
    11. 生成者Producer完成!
    12. 消费者Consumer将产品4从队列中取出!
    13. 消费者Consumer完成!
    14. -----主线程结束-----
    15. Process finished with exit code 0

    3.最后

    这是我总结的python代码编写常的有关进程线程的代码。基本功能应该不外呼这10组代码。

  • 相关阅读:
    学习记忆——图像篇——记忆古诗词
    Java实现扫雷小游戏【优化版】
    [附源码]Java计算机毕业设计SSM儿童成长记录与分享系统
    18、Java的类变量、类方法;static 关键字;静态导入;初始化块;静态初始化块;单例模式
    【Node.js】querystring 模块
    Linux快速上手6:常用命令之压缩解压命令
    JAX XLA 还没开始
    LayerNorm的图是不是画错了
    单例模式。
    【LeetCode】经典的环形链表
  • 原文地址:https://blog.csdn.net/qq_33163046/article/details/127659269