• Python多进程开发


    Python多线程开发

    在这里插入图片描述

    1.1 创建一个多线程任务

    下面我们创建两子线程任务,并查看输出。

    import os
    import time
    import threading
    from threading import Thread
    
    def asyncSay(words):
      for i in range(5):
        print(f"{words} :: {i}")
    
    def say(words):
      print(f"process ID : {os.getpid()}")
      print(f"{words} :: hi")
      time.sleep(2)
    
    if __name__ == '__main__':
      # target 指定子线程执行的函数 args指定函数执行所需要的参数
      target1 = Thread(target=asyncSay, args=(["asyncSay"]))
      target2 = Thread(target=say, args=(["say"]))
      target1.start()
      target2.start()
      print(threading.current_thread())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    Output

    每次运行时下面的输出结果不一定一样,不过通过输出我们可以看到target2中传递的函数并没有在执行target1中指定的函数执行结束后再执行。

    asyncSay :: 0
    asyncSay :: 1
    asyncSay :: 2<_MainThread(MainThread, started 27396)>process ID : 24312
    
    
    asyncSay :: 3say :: hi
    
    asyncSay :: 4
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    1.2 引入线程锁

    观察最后结果的输出,我们可以看到本该有多行输出的但是却出现了两个不改在一起的内容在同一行中输出。这是两个子线程同时进行输出的结果,那么我们可以通过线程锁来解决这种问题(保证线程在执行该任务时其它子线程任务暂停)。

    import os
    import time
    import threading
    from threading import Thread
    
    lock = threading.Lock()
    
    def asyncSay(words):
      for i in range(5):
        lock.acquire()
        print(f"{words} :: {i}")
        lock.release()
    
    def say(words):
      lock.acquire()
      print(f"process ID : {os.getpid()}")  # 获得线程ID
      print(f"{words} :: hi")
      lock.release()
      time.sleep(2)
    
    if __name__ == '__main__':
      # target 指定子线程执行的函数 args指定函数执行所需要的参数
      target1 = Thread(target=asyncSay, args=(["asyncSay"]))
      target2 = Thread(target=say, args=(["say"]))
      target1.start()
      target2.start()
      print(threading.current_thread())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    Output

    后面的输出结果顺序可能不同,但是子线程中的输出都会独立一行,但是与主线程的输出任会同行。

    asyncSay :: 0
    <_MainThread(MainThread, started 16176)>asyncSay :: 1
    
    asyncSay :: 2
    asyncSay :: 3
    process ID : 15808
    say :: hi
    asyncSay :: 4
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    1.3 将子线程加入到主线程(不保证子线程依然是多线程任务

    如下图,这个案例的代码只是对前面案例进行了一些修改。通过这个案例的输出现象我们可以知道在多个线程的任务同时 join到主线程上,此时其实这些子线程的执行已经不是多线程了。也就是说右侧的写法保证了两个子线程的任务执行完之后再输出finish,但是这种写法让两个多线程的任务变成了单线程的任务。下一小节的中我们将进行改进,保证了多个子线程的同时让主线程等待多个子线程结束。

    image-20220906092442066

    1.4 将子线程加入到主线程(保证子线程依然是多线程任务

    import os
    import time
    import threading
    from threading import Thread
    lock = threading.Lock()
    def asyncSay(words):
      for i in range(10):
        lock.acquire()
        print(f"{words} :: {i} {os.getpid()}")
        lock.release()
    
    def say(words):
      lock.acquire()
      print(f"process ID : {os.getpid()}")  # 获得线程ID
      print(f"{words} :: hi")
      lock.release()
      time.sleep(2)
      print(f"process ID : {os.getpid()}")  # 获得线程ID
    
    if __name__ == '__main__':
      # target 指定子线程执行的函数 args指定函数执行所需要的参数
      target1 = Thread(target=asyncSay, args=(["asyncSay"]))
      target2 = Thread(target=say, args=(["say"]))
      target1.start()
      target2.start()
      # 当有存活的子线程的时候进入死循环
      while target1.is_alive() or target2.is_alive():
        pass
      print("finish")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    Output

    image-20220906101047320

    可以看到两个子线程中的内容交替输出。同时,finish在所有子线程执行结束后再输出

    1.5 获得子线程任务的返回值

    在下面的任务中我们重写了Thread中的run方法,将运行的结果进行保存

    import types
    import time
    import random
    import threading
    from threading import Thread
    class MyThread(Thread):
      """重写多线程类"""
      def __init__(self, func, args, **kwargs):
        super(MyThread, self).__init__()
        self.func = func
        self.args = args
        self.kwargs = kwargs
      def run(self):
        try:
          if self.func and isinstance(self.func, types.FunctionType):
            self.__result = self.func(*self.args, **self.kwargs)
        finally:
          del self.args, self.kwargs
      @property
      def result(self):
        try:
          return self.__result
        except Exception:
          return None
    def getRandom(maxNum):
      time.sleep(2)
      return random.randint(1, maxNum)
    
    if __name__ == '__main__':
      t1 = MyThread(getRandom,args=([100]))
      t1.start()
      print(t1.result)
      while t1.is_alive():
        pass
      print(f"random Number :: {t1.result}")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    Output

    None
    random Number :: 60
    
    • 1
    • 2

    1.6 延时执行

    from threading import Thread, Timer
    
    def worker():
      print("延时执行")
    
    if __name__ == '__main__':
      t1 = Timer(2, worker)
      # t1.cancel()   # 取消定时器的执行
      t1.start()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    Output

    延时执行
    
    • 1

    1.7 线程间进行协作

    在这个案例中我们通过threading.Thread进行线程间的协作,子线程二收到子线程一发送的信号后再继续执行。

    import time
    from threading import Thread, Timer, Event
    def student(event):
      works = [
        "do math",
        "do English",
        "do history",
      ]
      for item in works:
        time.sleep(1)
        print(item)
      event.set()
    def teacher(event):
      print("please do home work")
      event.wait()
      print("do good work")
    
    if __name__ == '__main__':
      event = Event()
      t1 = Thread(target=student, args=(event,))
      t2 = Thread(target=teacher, args=(event,))
      t1.start()
      t2.start()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    Output

    please do home work
    do math
    do English
    do history
    do good work
    
    • 1
    • 2
    • 3
    • 4
    • 5

    这里我们可以看一下实现的源码,重点关注下面两部分框起来的内容。内部其实就是通过一个状态变量和条件锁实现的。

    image-20220906111242074

    1.8 条件锁进行广播

    import time
    from threading import Thread, Lock, Event, Condition
    
    def produce(cond):
      for i in range(5):
        with cond:
          print(f"制作 {i}")
          cond.notify(1)
        time.sleep(0.5)
    def consumer(cond, count):
      with cond:
        cond.wait()
        print(f"消费 {count}")
    if __name__ == '__main__':
      cond = Condition(lock=Lock())
      t1 = Thread(target=produce, args=(cond, ))
      for i in range(5):
        t2 = Thread(target=consumer, args=(cond, i))
        t2.start()
      t1.start()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    Output

    制作 0
    消费 0
    制作 1
    消费 1
    制作 2
    消费 2
    制作 3
    消费 3
    制作 4
    消费 4
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    这里我们用条件锁让两种类型的子线程交替执行。执行**produce****的子线程每完成一个任务时就通知执行 consumer子线程。

    1.9 Semaphore 信号量

    与Lock很像,通过信号量内部有一个倒计数器。每一次acquire都会减1,当acquire方法发现计数为0就阻塞请求 的线程,直到其它线程对信号量release后,计数大于0,恢复阻塞的线程。

    名称含义
    Semaphore(value=1)构造方法。value为初始信号量。value小于0,抛出ValueError异常
    Semaphore.acquire(self,blocking=True,timeout=None)获取信号量,技术器减1,即_value的值减少1。如果_value的值为0会变成阻塞状态。获取成功返回True
    Semaphore.release(self)释放信号量,计数器加1。即_value的值加1
    Semaphore._value信号量,当前信号量

    下面我们通过信号量控制可以同时执行的线程数

    import time
    from threading import Thread, Lock, Semaphore, Condition
    
    def readFileData(semaphore, num):
      semaphore.acquire()
      time.sleep(2)
      print(f"read {num}")
      semaphore.release()
    if __name__ == '__main__':
      s = Semaphore(3)
      for i in range(9):
        Thread(target=readFileData, args=(s, i)).start()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    Output

    可以观察输出的时间间隔,几乎是三组同时输出的。

    read 2read 0read 1
    
    
    read 4read 3read 5
    
    
    read 6read 7read 8
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    1.10 BoundedSemaphore 有界信号量

    有界信号量其实与上一小节的信号量类似。不同的是每当我们使用release,信号量就会加一,有界信号量保证其信号量不大于初始的范围,若超过则抛出异常。

    import time
    from threading import BoundedSemaphore
    
    if __name__ == '__main__':
      s = BoundedSemaphore(2)
      print(s._value)
      s.acquire()
      s.acquire()
      print(s.release())
      print(s._value)
      print(s.release())
      print(s._value)
      print(s.release())
      print(s._value)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    Output

    image-20220906120627962

    异常。

    import time
    from threading import BoundedSemaphore
    
    if __name__ == '__main__':
      s = BoundedSemaphore(2)
      print(s._value)
      s.acquire()
      s.acquire()
      print(s.release())
      print(s._value)
      print(s.release())
      print(s._value)
      print(s.release())
      print(s._value)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    Output

    在这里插入图片描述

  • 相关阅读:
    springboot摄影器材设备租赁系统java
    C#异步和多线程
    低代码平台简介(10家国产化低代码平台详细介绍)
    GameOff2022参与有感
    1.pytorch学习:安装pytorch
    PySimpleGUI小试牛刀之Tomcat项目部署工具
    某电商网站的数据库设计(8)——创建花费信息查询视图
    NIO Netty(四)
    React歌词滚动效果(跟随音乐播放时间滚动)
    萌新卷妹带你逃出算法无名岛第二站
  • 原文地址:https://blog.csdn.net/qq_52785898/article/details/126722431