• python并发编程笔记


    笔记参考B站python面向对象——并发编程详解

    进程与程序的概念
    进程:一个由CPU正在执行的过程
    程序:QQ程序仅仅是一堆代码,不是一个进程,当它启动起来以后,在任务管理器里看到的才是进程

    操作系统介绍
    计算机系统包含3层,硬件、操作系统、应用程序,
    硬件:硬盘、cpu、内存等,除此以外的都称为软件,
    应用程序:QQ、word等软件,
    操作系统:连接硬件和应用程序的特殊软件。

    操作系统是电脑开机的第一个启动的进程,它首先被写在硬盘中,然后代码被读入内存中由CPU执行,然后就出现了电脑桌面。启动QQ程序启动,会告诉操作系统,操作系统将从硬盘中读取QQ代码放入内存中,然后调用cpu运算。操作系统会管理多个应用程序。操作系统管理CPU硬件。
    1.封装硬件接口,给应用程序调用。
    2.操作系统负责管理、调度机器之上的多个进程。

    串行:CPU在内存中一次只能执行一个应用程序
    空间复用:将多个程序存入内存中
    时间复用:CPU来回切换运行的程序
    并发:(看起来是同时运行的)在单个CPU下利用多道技术,在内存中的多个程序之间来回切换(遇到内存和硬盘之间的IO,需要切换保持CPU一直运行),可能会提高效率,但是当一个程序运行时间过长,会因为切换完成所有程序的时间增加了。
    并行多个CPU同时计算(真正的同时运行)

    开启进程

    方式1

    from multiprocessing import Process
    import time
    
    def task(name):
        print('%s is running'  %name)
        time.sleep(3)
        print('%s is done '%name)
    
    if __name__ == '__main__':
        p = Process(target=task,args=('子进程1',))#加逗号变成元组
        p.start()#仅仅只是给操作系统发送了一个信号
    
        print('主')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    方式2

    #方式2
    from multiprocessing import Process
    import time
    
    class MyProcess(Process):
        def __init__(self, name):
            super().__init__()
            self.name = name
    
        def run(self):
            print('%s is runing'%self.name)
            time.sleep(3)
            print('%s is done'%self.name)
    
    if __name__ == '__main__':
        p = MyProcess('子进程')
        p.start()
        print('主')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    查看PID

    from multiprocessing import Process
    import time,os
    
    def task(name):
        print('%s is running,parent id is %s ' %(os.getpid(),os.getppid()))
        time.sleep(3)
        print('%s is done ,parent id is %s' %(os.getpid(),os.getppid()))
    
    if __name__ == '__main__':
        p = Process(target=task,args=('子进程1',))#加逗号变成元组
        p.start()#仅仅只是给操作系统发送了一个信号
    
        print('主',os.getpid(),os.getppid())19080 9064
    22352 is running,parent id is 19080 
    22352 is done ,parent id is 19080
    补充:子进程运行完了,PID依然被占用,变成僵尸进程,父进程结束,才会被完全释放。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    join方法

    from multiprocessing import Process
    import time,os
    
    def task(name):
        print('%s is running ' %name)
        time.sleep(3)
    
    if __name__ == '__main__':
        p1 = Process(target=task,args=('子进程1',))#加逗号变成元组
        p2 = Process(target=task,args=('子进程2',))
        p3 = Process(target=task,args=('子进程3',))
        p1.start()#仅仅只是给操作系统发送了一个信号
        p2.start()
        p3.start()
        # p1.join()
        # p2.join()
        # p3.join()
    
        print('主',os.getpid(),os.getppid())
    
    补充:不加join()多次运行可能出现如下情况,因为start()函数只是发送一个开始进程的信号,子进程还没走完,其余进程就已经开始走了。
    子进程1 is running 
    主 16096 9064
    子进程3 is running 
    子进程2 is running 
    or9836 9064
    子进程2 is running
    子进程1 is running
    子进程3 is running
    
    加入join()后,子进程都走完了才会到主进程的最好,join()表示等待进程结束。
    子进程1 is running 
    子进程3 is running 
    子进程2 is running
    主 5788 9064
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    其他方法

    p.is_alive()#判断是活进程还是死进程
    p.terminate()#发送一个杀死进程的信号,但不能立即杀死,要等待操作系统杀死。
    p=Process(target=task,name='子进程1')
    print(p.name)#打印进程名字
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    守护进程

    #开启守护进程
    p.daemon = True #主进程死掉自动kill掉子进程,守护进程中不能有再开子进程的函数,可能导致子进程未运行完强行结束
    p.start()
    
    • 1
    • 2
    • 3

    互斥锁

    from multiprocessing import Process,Lock
    import time
    
    def task(name,mutex):
        mutex.acquire()#子进程获得锁
        print('%s 1 ' %name)
        time.sleep(1)
        print('%s 2 ' %name)
        time.sleep(1)
        print('%s 3 ' %name)
        mutex.release()#子进程释放锁
    if __name__ == '__main__':
        mutex=Lock()#实例化一个锁
        for i in range(3):
            p1 = Process(target=task,args=('子进程%s' %i,mutex))#每个进程公用一把锁
            p1.start()#仅仅只是给操作系统发送了一个信号
            
    #补充:谁先抢到锁谁运行,与串行的区别在于,子进程2比子进程1先抢到锁,但函数内部会按序运行。
    子进程0 1 
    子进程0 2 
    子进程0 3 
    子进程2 1 
    子进程2 2 
    子进程2 3 
    子进程1 1 
    子进程1 2 
    子进程1 3 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    多个进程操作同一个数据

    需要利用json,操作一个公共的文件。

    队列,多个进程在内存中操作同一个数据

    from multiprocessing import Process,Queue
    import time
    
    def producer(q):
        for i in range(10):
            res = '包子%s'%i
            time.sleep(0.5)
            print('生产者生产了%s'%res)
            q.put(res)
    
    def consumer(q):
        while True:
            res=q.get()
            if res == None :break
            time.sleep(1)
            print('消费者吃了%s'%res)
    
    if __name__ == '__main__':
        q = Queue()
        p1 = Process(target=producer,args=(q,))
        p2 = Process(target=producer,args=(q,))
        p3 = Process(target=producer,args=(q,))
    
        c1 = Process(target=consumer,args=(q,))
        c2 = Process(target=consumer,args=(q,))
        p1.start()
        p2.start()
        p3.start()
        c1.start()
        c2.start()
        p1.join()
        p2.join()
        p3.join()
        q.put(None)
        q.put(None)
        print('zhu')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    JoinableQueue的解释

    from multiprocessing import Process,JoinableQueue,Value
    import time,os
    
    def producer(q):
        for i in range(3):
            res = '包子%s'%i
            time.sleep(3)
            print('生产者生产了%s'%res)
            q.put(res)#(4)开始存数
        q.join()#(4)子进程实例开始等待
        print("test 等待在哪里%s"%str(os.getpid()))#(5)测试q.join()的释放
    
    def consumer(q,cout):
    
        while True:
            res=q.get()#(4)同步取数
            if res == None :break
            time.sleep(1)
            print('消费者吃了%s'%res)
            q.task_done()#(4)每从q中取出一个数据就发送一个task_done,当发送完最后一个task_done后,每个实例中的q.join()都会被释放。
            cout.value += 1#测试最后发送了多少个task_done
    
    if __name__ == '__main__':
        cout = Value('i', 0)#打开一个值共享,测试q.task_done()的功能
        q = JoinableQueue()#(1)定义一个JoinableQueue
        p1 = Process(target=producer,args=(q,))#(2)实例化子进程
        p2 = Process(target=producer,args=(q,))
        p3 = Process(target=producer,args=(q,))
    
        c1 = Process(target=consumer,args=(q,cout))
        c2 = Process(target=consumer,args=(q,cout))
        c1.daemon = True#开启守护进程,while True:会无限循环,开启后保证父进程结束,子进程也被kill
        c2.daemon = True
        p1.start()#(3)开启
        p2.start()
        p3.start()
        c1.start()
        c2.start()
        p1.join()#确保子进程执行完,因为q.join()后还有print函数,如果是其他耗时的函数,防止出错需要确保都执行完。
        p2.join()
        p3.join()
        print(cout.value)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    开启线程

    #同一进程内的多个线程共享该进程的地址空间
    from threading import Thread
    
    n = 100
    def task():
        global n
        n=0
    if __name__ == '__main__':
        p = Thread(target=task)
        p.start()#仅仅只是给操作系统发送了一个信号
        print('主线程',n)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    #线程pid相等
    from threading import Thread
    #from multiprocessing import Process,current_process
    import os
    n = 100
    
    def task():
        print(os.getpid())
    
    if __name__ == '__main__':
        t1 = Thread(target=task)
        t1.start()
        print('主线程',os.getpid())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    守护线程

    #开启守护线程
    #txt.py文件被执行,会开启一个主进程,主进程下自动开启一个主线程
    t.daemon = True #主线程死掉,其他开启守护线程的线程都会被Kill,否则主线程将等待其他线程执行完毕。
    t.start()
    
    • 1
    • 2
    • 3
    • 4

    GIL锁同一个进程中的多线程不能利用多核优势

    参考https://www.cnblogs.com/mingerlcm/p/9026090.html
    由于python文件的执行,是由CPython解释器进行的,相当于把python文件中的代码当做参数传递到CPython解释器中,为了防止多个线程同时修改同一个数据造成冲突,因此需要GIL锁的存在,GIL锁导致一个进程中的多线程在某一时间内只能运行一个线程,无法利用多核优势,因此需要开启多进程隔离GIL锁利用多核
    在这里插入图片描述

    进程和线程的选择

    IO密集型选多线程
    计算密集型选多进程

    https://blog.csdn.net/weixin_38314865/article/details/118277644
    IO密集型可以多线程。比如有一个任务,执行10万次循环,每次都打印hello world,然后休眠1秒,如果单线程,需要10万秒完成,如果10个线程,就只需要1万秒。

    CPU密集型尽量少点线程。还是上面那个任务,不同的是取消休眠,如果是单线程,几乎一下完成,如果是多线程会慢很多,而且随着线程数越多,速度会越慢,因为线程的切换是要时间的。所以要不要多线程就看IO要不要很花时间。

    #测试两种方案耗时
    from threading import Thread
    from multiprocessing import Process
    import os,time
    
    def task():
        time.sleep(2)
    
    if __name__ == '__main__':
        l = []
        print(os.cpu_count())
        start = time.time()
        for i in range(4):
            # p = Process(target=task)
            p = Thread(target=task)#加逗号变成元组
            l.append(p)
            p.start()#仅仅只是给操作系统发送了一个信号
        for p in l:
            p.join()
        stop = time.time()
        print('run time is %s'%(stop-start))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    协程

    单线程下实现并发,开发者自己控制切换。

  • 相关阅读:
    一段代码实现WordPress彩色标签云
    【Stable Diffusion】(基础篇三)—— 关键词和参数设置
    开利网络项目动态 | 八月重点上线项目集锦
    【C++入门】(纯)虚函数和多态、抽象类、接口
    MyBatis多条件查询、动态SQL、多表操作、注解开发详细教程
    MySQL运维实战(7.2) MySQL复制server_id相关问题
    vue watch监听
    services资源+pod详解
    SpringCloud:Gateway服务网关
    2022快手电商短视频运营白皮书:Q2对比Q1GMV总值增长率达12%
  • 原文地址:https://blog.csdn.net/qq_42472662/article/details/127402061