• Python 多进程和多线程【彻底搞懂篇】


    一、进程与线程概念理解

    1、计算机中的进程与线程

    在这里插入图片描述
    (1)进程的概念

    • [1] 程序执行过程中分配和管理资源的基本单位,是竟争计算机系统资源的基本单位;
    • [2] 每一个进程都有一个自己的地址空间,即进程空间;
    • [3] 进程至少有 5 种基本状态分别是:初始态,执行态,等待状态,就绪状态,终止状态;
    • [4] 启动app会启动1个或者多个进程【进程个数:取决你使用的app开发设计】–例如:上图中的飞书;
    • [5] 每一个进程对应一个计算机端口;
    • [6] 每个进程中包含着1个或者多个线程;

    (2)线程的概念

    • [1] 线程是进程的一部分,一个没有线程的进程可以被看作是单线程的。
    • [2] 线程又被称为轻量级进程,也是CPU 调度的一个基本单位。

    2、生活中的进程与线程

    在这里插入图片描述
    类比理解:

    • 一个工厂建立了m个车间,每个车间雇佣n个工人;{m>=1 & n>=1}
    • 一个程序包含了m个进程,每个进程创建n个线程;{m>=1 & n>=1}

    注意事项:

    1. python语言为例,运行一个py文件就默认启动一个进程;
    2. 每一个进程创建会默认自动启动一个线程,一般称为主线程;
    3. 进程拥有一个完整的虚拟地址空间,不依赖于线程而独立存在;
    4. 线程是进程的一部分,没有自己的地址空间,与进程内的其他线程一起共享分配给该进程的所有资源;
    5. 对于操作系统来说,一个任务就是一个进程(Process),比如打开一个浏览器就是启动一个浏览器进程;
    6. 一个 “没有线程的进程” ,可以被看作是单线程; “没有线程的进程” ,不是真正意义上的没有创建任何线程,而是说在你没有学习多线程之前,写的那种“一条龙”脚本。由于每个进程至少要干一件事,所以一个进程至少有一个线程。本质上,你运行脚本就会默认分别创建一个进程和线程;

    二、多线程实现-Python语言

    1、多线程

    • 主线程一般情况下会优先执行完成,主线程执行完成,也就意味着从main程序执行结束。
    • 主线程执行完成后会等待子线程执行完成【可通过setDaemon调整参数】。

    案例2.1 :多线程逻辑体验

    import threading
    def sing():
        song=["《一千年以后》","《演员》","《天外来物》","《刚刚好》","《动物世界》"]
        for i in range(5):
            print("我在唱"+str(song[i]))
            if i == len(song)-1:
                print("【子线程sing】:执行完成")
    
    def dance():
        dance = ["《Bohemian Rhapsody》", "《Needles in the Camel's Eye》", "《Quiet Life》", "《Hot One》", "《Fox On The Run》"]
        for i in range(5):
            print("我在跳"+str(dance[i]))
            if i == len(dance)-1:
                print("【子线程dance】:执行完成")
    
    if __name__ == '__main__':
        threading.Thread(target=sing).start()
        threading.Thread(target=dance).start()
        print("【主线程代码】:执行完成")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    代码执行结果如下:【截图仅供参考,打印结果与个人电脑CPU有关系】

    在这里插入图片描述

    2、start、join、setDaemon介绍

    函数说明
    start表示当前进程准备就绪,等待CPU调度,具体执行时间由CPU来决定
    join等待当前线程执行完成再继续往下执行
    setDaemon守护线程只能放在start之前才能生效,其为布尔值
    (1)start 函数

    方法说明:表示当前进程准备就绪,等待CPU调度,具体执行时间由CPU来决定;

    import threading
    
    result = 0
    count_num = 100000000
    
    def add(count_num):
        global result
        for i in range(count_num):
            result +=1
    
    
    def sub(count_num):
        global result
        for i in range(count_num):
            result -= 1
    
    if __name__ == '__main__':
        t1 = threading.Thread(target=add, args=[count_num])
        t2 = threading.Thread(target=sub, args=[count_num])
        # 同时开启两个子线程:加法线程和减法线程
        t1.start()
        t2.start()
        print(result)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 同时启动两个子线程,t1和t2都在等待过程中,但是两者的执行顺序有CPU决定不受代码顺序影响
    • 本案例中启动加和减两个子线程,每个线程都是累加(减)到一亿,因为执行顺序有CPU调度决定,因此,每次运行脚本输出的result都是不一样的。
    (2)join 函数

    方法说明:等待当前线程执行完成再继续往下执行

    import threading
    
    result = 0
    count_num = 100000000
    
    def add(count_num):
        global result
        for i in range(count_num):
            result +=1
    
    
    def sub(count_num):
        global result
        for i in range(count_num):
            result -= 1
    
    if __name__ == '__main__':
        t1 = threading.Thread(target=add, args=[count_num])
        t2 = threading.Thread(target=sub, args=[count_num])
        # 同时开启两个子线程:加法线程和减法线程
        print("【子线程t1】:开启  t1.start()")
        t1.start()
        print("【子线程t1】:执行中")
        # 主线程等待子线程t1执行后再继续运行
        t1.join()
        print("【子线程t2】:开启  t2.start()")
        t2.start()
        # 主线程等待子线程t2执行后再继续运行
        print("【子线程t2】:执行中")
        t2.join()
        print("【主线程】:执行完成 \n result = " + str(result))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    代码执行结果如下:

    在这里插入图片描述

    • 子线程t1添加join()后,主程序会等待其执行完成后再执行后续代码
    • 因此本案例中,先从0加到1亿,后再做减法到0,result=0
    (3)setDaemon 函数

    方法说明:守护线程只能放在start之前才能生效,其为布尔值;

    • t.setDaemon(True),设置为守护线程,主线程执行完毕后,子线程也自动关闭;
    • t.setDaemon(False),设置为非守护线程,主线程等待子线程,子线程执行完毕后,主线程才结束;【默认】
    import threading
    def sing():
        song=["《一千年以后》","《演员》","《天外来物》","《刚刚好》","《动物世界》"]
        for i in range(5):
            print("我在唱"+str(song[i]))
            if i == len(song)-1:
                print("【子线程sing】:执行完成")
    
    if __name__ == '__main__':
        t1=threading.Thread(target=sing)
        # t1.setDaemon(True) 不推荐使用setDaemon,推荐使用直接修改aemon参数
        t1.daemon=True
        t1.start()
        print("【主线程代码】:执行完成")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    代码执行结果如下:类比与你【子线程】KTV点了6首歌,唱完第一首之后,包间【主线程】到期啦。

    在这里插入图片描述

    3、线程锁

    (1)线程锁

    步骤一:创建线程锁

    lock_object=threading.RLock()
    
    • 1

    步骤二:申请锁

    lock_object.acquire()
    
    • 1

    步骤三:释放锁

    lock_object.acquire()
    
    • 1

    案例2.4.1:线程锁

    import threading
    
    lock_object=threading.RLock()
    
    def sing():
        lock_object.acquire()
        song=["《一千年以后》","《演员》","《天外来物》","《刚刚好》","《动物世界》"]
        for i in range(5):
            print("我在唱"+str(song[i]))
            # time.sleep(1)
        lock_object.release()
    
    
    def dance():
        lock_object.acquire()
        dance = ["《Bohemian Rhapsody》", "《Needles in the Camel's Eye》", "《Quiet Life》", "《Hot One》", "《Fox On The Run》"]
        for i in range(5):
            print("我在跳"+str(dance[i]))
            # time.sleep(1)
        lock_object.release()
    
    if __name__ == '__main__':
        threading.Thread(target=sing).start()
        threading.Thread(target=dance).start()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    代码执行结果如下:sing 线程和 dance线程共用同一把锁🔒,只有等待sing线程执行结束释放锁🔒才会继续执行dance线程。

    在这里插入图片描述

    (2)自动锁
    • with lock 概念:模块内自动执行acquire()和release()操作
    lock_object=threading.RLock()
    with lock_object:
    	pass
    
    • 1
    • 2
    • 3
    import threading
    import time
    lock_object=threading.RLock()
    
    def sing():
        with lock_object:# 模块内自动执行acquire()和release()操作
            song=["《一千年以后》","《演员》","《天外来物》","《刚刚好》","《动物世界》"]
            for i in range(5):
                print("我在唱"+str(song[i]))
                time.sleep(1)
    
    
    if __name__ == '__main__':
        t1=threading.Thread(target=sing)
        t1.start()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    (3)死锁
    • 死锁概念:在程序执行过程中加一把锁🔒后,再其没有释放之前又上了相同的object锁🔒,导致死锁。

    • 案例解析:模块1加了一个锁🔒,但是在模块1的锁还没有释放之前,模块2中加了一个相同的object锁,导致程序锁死,第一首歌唱完就一直卡住了。

    在这里插入图片描述
    案例4-4:死锁

    import threading
    import time
    lock_object=threading.Lock()
    
    def drink(num):
        print("唱完"+str(num+1)+"首歌,喝"+"【"+str(num+1)+"】口水")
    
    def sing():
        with lock_object:# 模块内自动执行acquire()和release()操作
            song=["《一千年以后》","《演员》","《天外来物》","《刚刚好》","《动物世界》"]
            for i in range(5):
                print("我在唱"+str(song[i]))
                time.sleep(1)
                with lock_object:
                    drink(i)
    
    if __name__ == '__main__':
        t1=threading.Thread(target=sing)
        t1.start()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    (4)lock 和 Rlock 的区别
    锁分类名称特点
    lock同步锁lock执行效率比Rlock,但是容易死锁
    Rlock递归锁执行效率低,但是灵活性高【推荐使用】

    与(3)对比,(3)中所用的锁为lock🔒改成Rlock🔒,程序就可以继续运行,因为生产中考虑到自己编写的代码可能被其他同事调用,别的同事,在其本身的函数中考虑到线程安全也会加锁,因此推荐使用Rlock

    在这里插入图片描述

    4、线程池

    • python3才支持线程池

    (1)导入线程池模块

    from concurrent.futures import ThreadPoolExecutor
    
    • 1

    (2)生成线程池

    # 允许最大的线程数
    pool=ProcessPoolExecutor(2)
    
    • 1
    • 2

    (3)提交线程

    # submit(函数,参数1,参数2)
    pool.submit(get_image,name,url)
    
    • 1
    • 2

    (4)等待线程池内所有线程池执行完毕

    pool.shutdown(True)
    
    • 1

    案例2.5 线程池

    import time
    import requests
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    
    url_list = [("陈钰琪.jpg","https://wx3.sinaimg.cn/mw2000/006mwaFngy1h2zkvf40e4j621u32qqv602.jpg"),
                ("王鹤棣.jpg", "https://wx4.sinaimg.cn/mw2000/a2d2ddc4ly1h5s5ju99g1j241n2p34qv.jpg"),
                ("虞书欣.jpg","https://wx1.sinaimg.cn/mw2000/0068mQp7gy1h5nhpezy36j61hf27q4qr02.jpg")]
    
    def get_image(name,url):
        res = requests.get(url)
        with open(name, mode="wb") as f:
            f.write(res.content)
        print("正在下载"+str(name))
        time.sleep(2)
    
    if __name__ == '__main__':
        pool=ThreadPoolExecutor(2)
        for name,url in url_list:
            pool.submit(get_image,name,url)
        pool.shutdown()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    5、单线程和多线程的执行效率

    (1)单线程执行案例
    import datetime
    import requests
    url_list=[  "https://wx3.sinaimg.cn/mw2000/006mwaFngy1h2zkvf40e4j621u32qqv602.jpg",
                "https://wx4.sinaimg.cn/mw2000/a2d2ddc4ly1h5s5ju99g1j241n2p34qv.jpg",
                "https://wx1.sinaimg.cn/mw2000/0068mQp7gy1h5nhpezy36j61hf27q4qr02.jpg"]
    image_content=["陈钰琪.jpg","王鹤棣.jpg","虞书欣.jpg"]
    
    def get_image():
        for i in range(len(url_list)):
            res=requests.get(url_list[i])
            with open(image_content[i],mode="wb") as f:
                f.write(res.content)
    
    if __name__ == '__main__':
        begin_time = datetime.datetime.now()
        print("开始时间:"+str(begin_time))
        get_image()
        end_time = datetime.datetime.now()
        print("结束时间:"+str(end_time))
        consume_time = end_time-begin_time
        print("消耗时间:" + str(consume_time))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    代码执行结果如下:
    在这里插入图片描述
    本案例执行消耗时间为:0:00:02.264276秒

    (2)多线程执行案例
    import datetime
    import requests
    import threading
    
    url_list = [("陈钰琪.jpg","https://wx3.sinaimg.cn/mw2000/006mwaFngy1h2zkvf40e4j621u32qqv602.jpg"),
                ("王鹤棣.jpg", "https://wx4.sinaimg.cn/mw2000/a2d2ddc4ly1h5s5ju99g1j241n2p34qv.jpg"),
                ("虞书欣.jpg","https://wx1.sinaimg.cn/mw2000/0068mQp7gy1h5nhpezy36j61hf27q4qr02.jpg")]
    
    def get_image(name,url):
        res=requests.get(url)
        with open(name,mode="wb") as f:
            f.write(res.content)
        end_time = datetime.datetime.now()
        consume_time = end_time - begin_time
        print("消耗时间:" +str(consume_time))
    
    
    if __name__ == '__main__':
        begin_time = datetime.datetime.now()
        print("开始时间:" + str(begin_time))
        for name,url in url_list:
            t=threading.Thread(target=get_image,args=(name,url))
            t.start()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    代码执行结果如下:
    在这里插入图片描述
    本案例执行消耗时间为:0:00:00.426551秒

    三、多进程实现- Python语言

    1、多进程

    • 新建一个进程,分配一个完整的虚拟地址空间,同时也会新增一个主线程;
     p=multiprocessing.Process(target=get_image,args=(name,url))
    
    • 1
    • start()、join()方法与多线程类似,这里不再继续展开讲解;
     # 开启进程
     p.start()
     # 等待进程执行完成
     p.join()
    
    • 1
    • 2
    • 3
    • 4

    案例3.1 多进程

    import multiprocessing
    import requests
    import datetime
    url_list = [("陈钰琪.jpg","https://wx3.sinaimg.cn/mw2000/006mwaFngy1h2zkvf40e4j621u32qqv602.jpg"),
                ("王鹤棣.jpg", "https://wx4.sinaimg.cn/mw2000/a2d2ddc4ly1h5s5ju99g1j241n2p34qv.jpg"),
                ("虞书欣.jpg","https://wx1.sinaimg.cn/mw2000/0068mQp7gy1h5nhpezy36j61hf27q4qr02.jpg")]
    
    def get_image(name,url):
        res = requests.get(url)
        with open(name, mode="wb") as f:
            f.write(res.content)
    
    if __name__ == '__main__':
        begin_time = datetime.datetime.now()
        print("开始时间:" + str(begin_time))
        for name,url in url_list:
        	# 新增一个进程;
            p=multiprocessing.Process(target=get_image,args=(name,url))
            p.start()
        end_time = datetime.datetime.now()
        print("结束时间:" + str(end_time))
        consume_time = end_time - begin_time
        print("消耗时间:" + str(consume_time))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    代码执行结果如下:
    在这里插入图片描述

    2、进程锁

    步骤一:修改系统运行模式【fork】,详解请阅读本文第三.4

    multiprocessing.set_start_method("fork")
    
    • 1

    步骤二:生成进程锁对象

    lock=multiprocessing.RLock()
    
    • 1

    步骤三:上锁释放锁

    p=multiprocessing.Process(target=函数名称,args=(lock,参数2,参数3))
    def 函数:
    	lock.acquire()#加锁
    	lock.release()#释放锁
    
    • 1
    • 2
    • 3
    • 4

    案例3.2 进程锁

    import multiprocessing
    import time
    import requests
    
    multiprocessing.set_start_method("fork")
    
    url_list = [("陈钰琪.jpg","https://wx3.sinaimg.cn/mw2000/006mwaFngy1h2zkvf40e4j621u32qqv602.jpg"),
                ("王鹤棣.jpg", "https://wx4.sinaimg.cn/mw2000/a2d2ddc4ly1h5s5ju99g1j241n2p34qv.jpg"),
                ("虞书欣.jpg","https://wx1.sinaimg.cn/mw2000/0068mQp7gy1h5nhpezy36j61hf27q4qr02.jpg")]
    
    def get_image(name,url,lock):
        lock.acquire()
        res=requests.get(url)
        with open(name,mode="wb") as f:
            f.write(res.content)
        time.sleep(3)
        lock.release()
        print(str(name) + "下载完成")
    
    
    if __name__ == '__main__':
        lock=multiprocessing.RLock()
        for name,url in url_list:
            p=multiprocessing.Process(target=get_image,args=(name,url,lock))
            p.start()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    3、进程池

    (1)导入进程池模块

    from concurrent.futures import ProcessPoolExecutor
    
    • 1

    (2)生成进程池

    # 允许最大化进程数据
    pool=ProcessPoolExecutor(2)
    
    • 1
    • 2

    (3)提交进程

    # submit(函数,参数1,参数2)
    pool.submit(get_image,name,url)
    
    • 1
    • 2

    (4)等待进程池内所有进程执行完毕

    pool.shutdown(True)
    
    • 1

    案例3.3 进程池

    import time
    import requests
    from concurrent.futures import ProcessPoolExecutor
    
    url_list = [("陈钰琪.jpg","https://wx3.sinaimg.cn/mw2000/006mwaFngy1h2zkvf40e4j621u32qqv602.jpg"),
                ("王鹤棣.jpg", "https://wx4.sinaimg.cn/mw2000/a2d2ddc4ly1h5s5ju99g1j241n2p34qv.jpg"),
                ("虞书欣.jpg","https://wx1.sinaimg.cn/mw2000/0068mQp7gy1h5nhpezy36j61hf27q4qr02.jpg")]
    
    def get_image(name,url):
        res = requests.get(url)
        with open(name, mode="wb") as f:
            f.write(res.content)
        print("正在下载"+str(name))
        time.sleep(2)
    
    if __name__ == '__main__':
        pool=ProcessPoolExecutor(2)
        for name,url in url_list:
            pool.submit(get_image,name,url)
        pool.shutdown()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    4、fork和spawn

    构建子进程的不同方式,有forkspawn两种模式

    • fork:除了必要的启动资源外,其他变量,包,数据等都继承自父进程,并且是copy-on-write的,也就是共享了父进程的一些内存页,因此启动较快,但是由于大部分都用的父进程数据,所以是不安全的进程
    • spawn:从头构建一个子进程,父进程的数据等拷贝到子进程空间内,拥有自己的Python解释器,所以需要重新加载一遍父进程的包,因此启动较慢,由于数据都是自己的,安全性较高
    系统构建子进程模式
    Macfork 和 spawn 【3.8+默认】
    Windowspawn
    Liunxfork
    import multiprocessing
    import time
    
    def sing():
        song=["《一千年以后》","《演员》","《天外来物》","《刚刚好》","《动物世界》"]
        for i in range(5):
            print("我在唱"+str(song[i]))
            time.sleep(1)
    
    # if __name__ == '__main__':
        # multiprocessing.set_start_method("spawn")
    p=multiprocessing.Process(target=sing)
    p.start()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    如图所示,主进程未放置main方法下面会报以下错误:

    在这里插入图片描述
    接近的方法就是添加multiprocessing.set_start_method(“fork”)参数或者放置在main方法中:

    • 放置在main方法中
    import multiprocessing
    import time
    
    def sing():
        song=["《一千年以后》","《演员》","《天外来物》","《刚刚好》","《动物世界》"]
        for i in range(5):
            print("我在唱"+str(song[i]))
            time.sleep(1)
    
    if __name__ == '__main__':
        # multiprocessing.set_start_method("spawn")
        p=multiprocessing.Process(target=sing)
        p.start()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 添加multiprocessing.set_start_method(“fork”)参数
    import multiprocessing
    import time
    
    def sing():
        song=["《一千年以后》","《演员》","《天外来物》","《刚刚好》","《动物世界》"]
        for i in range(5):
            print("我在唱"+str(song[i]))
            time.sleep(1)
    
    # if __name__ == '__main__':
    multiprocessing.set_start_method("fork")
    p=multiprocessing.Process(target=sing)
    p.start()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
  • 相关阅读:
    SSM框架整合详细教程
    黑马课程 Mybatis依赖出现报红
    网络防火墙入门
    【西瓜书】9.聚类
    Linux ARM平台开发系列讲解(platform平台子系统) 2.10.1 platform平台子系统介绍
    深度学习——day21(外 Q1 2021) 具有动态正则化的卷积神经网络
    SCI投稿经验(三) 回复审稿人
    VUE后台管理系统模板
    初步了解 docker
    WordPress 6.1 “Misha“
  • 原文地址:https://blog.csdn.net/weixin_44894162/article/details/126679233