• [python]十九、进程、线程和协程


    目录

    1、操作系统

    1.1、多任务系统

    1.2.1、CPU时间片

    1.3、linux操作系统:有五大子系统

    1.3.1、进程通信的方式

    1.3.2、进程通信

    1.3.3、所有的进程都是由另一个进程创建的,那么谁创建的第一个进程呢?

    1.3.4、进程的组成

    1.3.5、PCB用来存放一些管理信息

    1.4、内存空间

    1.5、线程定义

    1.5.1、为什么要使用多线程?

    1.5.2、多线程的缺点

    1.6、多进程定义

    1.7、进程与线程的共性和区别

    1.7.1、多进程和多线程,该选谁?

    1.7.2、多进程和多线程对比图

    1.8、进程的三态模型和五态模型

    1.8.1、linux的五种进程状态

    1.8.2、linux中关于暂停任务的相关命令

    1.9、 上下文

    1.9.1、上下文切换  ⭐⭐⭐

    1.9.2、系统调用

    1.9.3、进程的上下文切换

    1.10、load average

    1.11、linux中proc目录

    2、threading

    2.1、Thread构造方法

    2.2、Thread实例方法

    2.3、示例讲解

    2.4、使用自定义线程类创建

    3、互斥锁

    3.1、死锁

    4、信号锁

    4.1、GIL,全局解释器锁

    5、多进程

    5.1、os.fork

    5.1.1、os.fork的返回值

    5.1、僵尸进程

    5.2、孤儿进程

    6、Multiprocessing

    6.1、multiprocessing.Process

    6.2、数据共享

    6.2.1、Multiprocessing.Manager

    6.2.2、Queue队列

    6.3、进程池

    6.3.1、进程池的原理

    6.3.2、进程池主进程管理进程的机制

    6.3.3、进程池的应用场景

    6.3.4、构造方法和实例方法


    1、操作系统

    • 计算机系统抽象组成:CPU + 存储器 + IO(input/output)
    • 资源
      • 计算机资源:CPU
      • 存储资源:内存、磁盘等

    1.1、多任务系统

    物理CPU和逻辑CPU

    ① 物理CPU               
                  实际Server中插槽上的CPU个数
                  物理cpu数量,可以数不重复的 physical id 有几个           
    ② 逻辑CPU               
                  Linux用户对 /proc/cpuinfo 这个文件肯定不陌生. 它是用来存储cpu硬件信息的
                  信息内容分别列出了processor 0 – n 的规格。这里需要注意,如果你认为n就是真实的cpu数的话, 就大错特错了
                  一般情况,我们认为一颗cpu可以有多核,加上intel的超线程技术(HT), 可以在逻辑上再分一倍数量的cpu core出来
                  逻辑CPU数量=物理cpu数量 x cpu cores 这个规格值 x 2(如果支持并开启ht)
                  备注一下:Linux下top查看的CPU也是逻辑CPU个数              
    ③ CPU核数           
                  一块CPU上面能处理数据的芯片组的数量、比如现在的i5 760,是双核心四线程的CPU、而 i5 2250 是四核心四线程的CPU              
                  一般来说,物理CPU个数×每颗核数就应该等于逻辑CPU的个数,如果不相等的话,则表示服务器的CPU支持超线程技术     

    1.2.1、CPU时间片

    • 对于单核CPU同一时刻只能有一个任务执行

    • 并发:交替执行(某时间段内的处理能力)

    • 并行:同时执行(要有多核支撑)

    1.3、linux操作系统:有五大子系统

    • 进程调度,算法有:先进先出、短作业优先、优先级
    • 内存管理:虚拟内存、虚拟地址映射、段页机制、缺页终端、内存的分配管理、伙伴系统
    • 文件系统:虚拟文件系统、ext系列系统、xfs系统
    • 网络接口
    • 进程通信(本质-借助内核通信)

    1.3.1、进程通信的方式

    1.管道(本质使用环形队列来实现的,一侧写,一侧读)    

    • 匿名管道:亲缘进程之间才能够通信(父进程创建子进程时,子进程会拷贝父进程所有的内容。所以只有子进程才知道父进程是在哪里打开的管道。)

    • 命令管道:没有亲缘关系进程也能通信

    • 进程不起来的时候,管道就会关掉

    2.信号(由内核去做的)

    • 异步通信
    • 发送信号
      • 通过硬件。如:ctrl+c;通过软件  kill

      • 有些信号是可以拒绝的,比如说15号信号

      • 信号本质就是修改程序pcb

    • hup信号--hohup

      • 关闭子进程(发送了term信号)和reload(本质上是给进程发送了hup信号)

    1. [root@ngingx-kafka01 ~]# kill -l
    2. 1) SIGHUP 2) SIGINT 3) SIGQUIT 4) SIGILL 5) SIGTRAP
    3. 6) SIGABRT 7) SIGBUS 8) SIGFPE 9) SIGKILL 10) SIGUSR1
    4. 11) SIGSEGV 12) SIGUSR2 13) SIGPIPE 14) SIGALRM 15) SIGTERM
    5. 16) SIGSTKFLT 17) SIGCHLD 18) SIGCONT 19) SIGSTOP 20) SIGTSTP
    6. 21) SIGTTIN 22) SIGTTOU 23) SIGURG 24) SIGXCPU 25) SIGXFSZ
    7. 26) SIGVTALRM 27) SIGPROF 28) SIGWINCH 29) SIGIO 30) SIGPWR
    8. 31) SIGSYS 34) SIGRTMIN 35) SIGRTMIN+1 36) SIGRTMIN+2 37) SIGRTMIN+3
    9. 38) SIGRTMIN+4 39) SIGRTMIN+5 40) SIGRTMIN+6 41) SIGRTMIN+7 42) SIGRTMIN+8
    10. 43) SIGRTMIN+9 44) SIGRTMIN+10 45) SIGRTMIN+11 46) SIGRTMIN+12 47) SIGRTMIN+13
    11. 48) SIGRTMIN+14 49) SIGRTMIN+15 50) SIGRTMAX-14 51) SIGRTMAX-13 52) SIGRTMAX-12
    12. 53) SIGRTMAX-11 54) SIGRTMAX-10 55) SIGRTMAX-9 56) SIGRTMAX-8 57) SIGRTMAX-7
    13. 58) SIGRTMAX-6 59) SIGRTMAX-5 60) SIGRTMAX-4 61) SIGRTMAX-3 62) SIGRTMAX-2
    14. 63) SIGRTMAX-1 64) SIGRTMAX
    15. ExecReload= :/bin/sh -C "/bin/kill -s HUP $(/bin/cat /var/ run/nginx. pid) "
    16. ExecStop=/bin/sh -C "/bin/kill -S TERM $(/bin/cat /var/ run/nginx. pid) "

    3.信号量

    是一种锁,可以规定同一时刻有几个程序可以访问这个共享内存。信号量一般搭配共享内存使用。

    4.共享内存

    最快的通信方式

    1. [root@ngingx-kafka01 ~]# ipcs -m
    2. ------------ 共享内存段 --------------
    3. 键 shmid 拥有者 权限 字节 nattch 状态

    5.socket

    一般用于不同主机的不用进程通信。也可以用在同一种主机不用进程之间的通信。

    6.消息队列

    • 支持传输的类型多一点
    • 内核启动的时候创建好了的,不用的时候不会消失。(对比于管道)
    • 存储空间比较小(对比于管道)

    1.3.2、进程通信

    进程通信的前提:进程之间都是相互独立的

    定义:进程就是正在运行的程序,是计算机进行资源(计算资源和存储资源)分配的最小单位

    1.3.3、所有的进程都是由另一个进程创建的,那么谁创建的第一个进程呢?

    开机启动会启动两个进程:systemd(后续用来创建用户进程,pid号为1)和kthreadd(后续用来创建内核进程,pid号为2)

    1.3.4、进程的组成

    进程组成:PCB(是进程的唯一标识,是进程的组成核心,是一个数据结构)+数据段+代码段

    1.3.5、PCB用来存放一些管理信息

    1、pid 进程唯一标识符
    2、有效用户信息 -euid,egid(通常情况下就是uid,gid)
    3、程序的状态
    4、程序优先级
    5、程序的上下文切换

    1.4、内存空间

    内存空间分为用户空间和内核空间。

    cpu两种状态:用户态(运行普通程序)和内核态(运行操作系统程序)

    1.5、线程定义

    线程是运行在进程之上的。是操作系统进行调用的最小单位(是最小的运行单位)

    1.5.1、为什么要使用多线程?

    • 在一个程序中,有很多的操作是非常耗时的,如数据库读写操作,IO操作等,如果使用单线程,那么程序就必须等待这些操作执行完成之后才能执行其他操作。使用多线程,可以在将耗时任务放在后台继续执行的同时,同时执行其他操作。
    • 可以提高程序的效率。
    • 在一些等待的任务上,如用户输入,文件读取等,多线程就非常有用了。

    1.5.2、多线程的缺点

    • 使用太多线程,是很耗系统资源,因为线程需要开辟内存。更多线程需要更多内存。
    • 影响系统性能,因为操作系统需要在线程之间来回切换。
    • 需要考虑线程操作对程序的影响,如线程挂起,中止等操作对程序的影响。
    • 线程使用不当会发生很多问题。

    总结:多线程是异步的,但这不代表多线程真的是几个线程是在同时进行,实际上是系统不断地在各个线程之间来回的切换(因为系统切换的速度非常的快,所以给我们在同时运行的错觉)

    1.6、多进程定义

    进程是程序在计算机上的一次执行活动。当你运行一个程序,你就启动了一个进程。凡是用于完成操作系统的各种功能的进程就是系统进程,而所有由你启动的进程都是用户进程。 

    同理,多进程就是指计算机同时执行多个进程,一般是同时运行多个软件。

    1.7、进程与线程的共性和区别

    共性:多进程和多线程都是用来提高素速度的

    区别

    1、一个进程是可以有一个以上的线程,这些线程都是共享内存空间的。
    2、不同进程之间内存空间都是独立的
    3、创建新的线程很简单,但是创建一个新的进程需要对其父进程进行一次克隆。
    4、一个线程可以控制和操作同一个进程里的其他线程。进程只能操作子进程
    5、一个主线程改变,可能会影响其他线程,改变父进程不会影响子进程。
    6、线程是看不到的,进程能够看到。
    7、安全性来说:多进程会比多线程安全一些

    一般来说,多线程开销会比多进程少。

    1.7.1、多进程和多线程,该选谁?

    • 单进程单线程:一个人在一个桌子上吃菜。
    • 单进程多线程:多个人在同一个桌子上一起吃菜。
    • 多进程单线程:多个人每个人在自己的桌子上吃菜。

    多线程的问题是多个人同时吃一道菜的时候容易发生争抢,例如两个人同时夹一个菜,一个人刚伸出筷子,结果伸到的时候已经被夹走菜了。。。此时就必须等一个人夹一口之后,在还给另外一个人夹菜,也就是说资源共享就会发生冲突争抢。

    1。对于 Windows 系统来说,【开桌子】的开销很大,因此 Windows 鼓励大家在一个桌子上吃菜。因此 Windows 多线程学习重点是要大量面对资源争抢与同步方面的问题。

    2。对于 Linux 系统来说,【开桌子】的开销很小,因此 Linux 鼓励大家尽量每个人都开自己的桌子吃菜。这带来新的问题是:坐在两张不同的桌子上,说话不方便。因此,Linux 下的学习重点大家要学习进程间通讯的方法。

    开桌子的意思是指创建进程。开销这里主要指的是时间开销。
    可以做个实验:创建一个进程,在进程中往内存写若干数据,然后读出该数据,然后退出。此过程重复 1000 次,相当于创建/销毁进程 1000 次。在我机器上的测试结果是:
    UbuntuLinux:耗时 0.8 秒 Windows7:耗时 79.8 秒 两者开销大约相差一百倍。
    这意味着,在 Windows 中,进程创建的开销不容忽视。换句话说就是,Windows 编程中不建议你创建进程,如果你的程序架构需要大量创建进程,那么最好是切换到 Linux 系统。

    1.7.2、多进程和多线程对比图

     

    1.8、进程的三态模型和五态模型

    三态模型

    引起进程状态转换的具体原因如下:

    1. 运行态→等待态:等待使用资源;如等待外设传输;等待人工干预。
    2. 等待态→就绪态:资源得到满足;如外设传输结束;人工干预完成。
    3. 运行态→就绪态:运行时间片到;出现有更高优先权进程。
    4. 就绪态→运行态:CPU 空闲时选择一个就绪进程。

    五态模型

    引起进程状态转换的具体原因如下:

    NULL→新建态:执行一个程序,创建一个子进程。

    新建态→就绪态:当操作系统完成了进程创建的必要操作,并且当前系统的性能和虚拟内存的容量均允许。

    运行态→终止态:当一个进程到达了自然结束点,或是出现了无法克服的错误,或是被操作系统所终结,或是被其他有终止权的进程所终结。

    运行态→就绪态:运行时间片到;出现有更高优先权进程。

    运行态→等待态:等待使用资源;如等待外设传输;等待人工干预。

    就绪态→终止态:未在状态转换图中显示,但某些操作系统允许父进程终结子进程。

    等待态→终止态:未在状态转换图中显示,但某些操作系统允许父进程终结子进程。

    终止态→NULL:完成善后操作。

    1.8.1、linux的五种进程状态

    1、R (TASK_ RUNNING,是为状态标识state值), 可执行状态。

    • 是正在运行和即将运行的进程的状态;即将运行的进程:程序已被挂入运行队列,处于准备运行状态。一旦获得处理器使用权,即可进入运行状态;正在运行的进程:Linux会把一个专门用来指向当前运行任务的指针current指向它,以表示它是一个正在运行的进程。

    2、S (TASK_ INTERRUPTIBLE), 可中断的睡眠状态

    • 由于进程未获得它所申请的资源而处在等待状态。一旦资源有效或者有唤醒信号,进程会立即结束等待而进入就绪状态。

    3、D (TASK_ UNINTERRUPTIBLE), 不可中断的睡眠状态

    • 此时,进程也处于等待资源状态。一旦资源有效,进程会立即进入就绪状态。这个等待状态与可中断等待状态的区别在于:处于TASK_UNINTERRUPTIBL状态的进程不能被信号量或者中断所唤醒,只有当它申请的资源有效时才能被唤醒。

    4、T/t (TASK STOPPED or TASK TRACED),暂停状态或跟踪状态

    • 当进程收到一个SIGSTOP信号后,就由运行状态进入停止状态,当受到一个SIGCONT信号时,又会恢复运行状态。这种状态主要用于程序的调试,又被叫做“暂停状态”、“挂起状态”。

    5、Z (TASK_ DEAD - EXIT ZOMBIE),退出状态,进程成为僵尸进程。

    • 进程因某种原因而中止运行,进程占有的所有资源将被回收,除了task_struct结构(以及少数资源)以外,并且系统对它不再予以理睬,所以这种状态也叫做“僵死状态”,进程成为僵尸进程。

    1.8.2、linux中关于暂停任务的相关命令

    1. # 编辑一个死循环脚本,并执行
    2. #!/bin/bash
    3. while :
    4. do
    5. echo "hello"
    6. sleep 1
    7. done

    按ctrl + z  暂停任务 

    1. [root@nginx-kafaka03 ~]# jobs # 查看暂停的任务
    2. [1]+ 已停止 bash text.sh
    3. [root@nginx-kafaka03 ~]# fg # 运行停止的任务
    4. bash text.sh
    5. hello
    6. hello
    7. hello

    1.9、 上下文

    首先,清楚什么是上下文

    每个任务运行前,CPU 都需要知道任务从哪里加载、又从哪里开始运行,这就涉及到CPU 寄存器程序计数器(PC)

    • CPU 寄存器是 CPU 内置的容量小、但速度极快的内存;
    • 程序计数器会存储 CPU 正在执行的指令位置,或者即将执行的指令位置。

    这两个是 CPU 运行任何任务前都必须依赖的环境,因此叫做CPU上下文

    1.9.1、上下文切换  ⭐⭐⭐

    定义:(程序之间的互相切换,时间片到了之后,保存当前的上下文,进入下一个程序)

    需要履行的步骤:

    1. 将前一个 CPU 的上下文(也就是 CPU 寄存器和程序计数器里边的内容)保存起来;
    2. 然后加载新任务的上下文到寄存器和程序计数器;
    3. 最后跳转到程序计数器所指的新位置,运行新任务。

    被保存起来的上下文会存储到系统内核中,等待任务重新调度执行时再次加载进来。

    CPU 的上下文切换分三种:进程上下文切换、线程上下文切换、中断上下文切换

    1.9.2、系统调用

    Linux 按照特权等级,把进程的运行空间分为内核空间用户空间

    • 内核空间:具有最高权限,可以访问所有资源;
    • 用户空间:只能访问受限资源,不能直接访问内存等硬件设备,必须借助系统调用

    进程可以在用户空间运行(叫作:进程用户态),也可以在内核空间运行(叫作:进程内核态)。从用户态到内核态需要系统调用完成。

    系统调用过程中也会发生 CPU 上下文切换。CPU 寄存器会先保存用户态的状态,然后加载内核态相关内容。系统调用结束之后,CPU 寄存器要恢复原来保存的用户态,继续运行进程。所以,一次系统调用,发生两次 CPU 上下文切换

    需要注意的是,系统调用过程中,不涉及虚拟内存等进程用户态的资源也不会切换进程。与通常所说的进程上下文切换不同:

    • 进程上下文切换是指,从一个进程切换到另一个进程;
    • 系统调用过程中一直是同一个进程在运行。

    1.9.3、进程的上下文切换

    进程是由内核管理和调度的,进程的切换只能发生在内核态。因此,进程的上下文不但包括虚拟内存、栈、全局变量等用户空间资源,还包括内核堆栈、寄存器等内核空间状态。所以,进程的上下文切换比系统调用多一个步骤:保存当前进程的内核状态和 CPU 寄存器之前,先把该进程的虚拟内存、栈等保存起来;加载下一个进程的内核态后,还需要刷新进程的虚拟内存和用户栈。保存上下文和恢复上下文需要内核在 CPU 上运行才能完成

    进程切换时需要切换上下文,进程切换的场景有:

    • 进程时间片耗尽;
    • 系统资源不足(如内存不足);
    • 进程通过睡眠函数 sleep 把自己挂起来;
    • 当有优先级更高的进程运行时,为了去运行高优先级进程,当前进程会被挂起;
    • 发生硬中断,CPU 上的进程会被挂起,然后去执行内核中的中断服务进程。

    1.10、load average

    系统负载(System Load)是系统CPU繁忙程度的度量,即有多少进程在等待被CPU调度(进程等待队列的长度)。

    平均负载(Load Average)是一段时间内系统的平均负载,这个一段时间一般取1分钟、5分钟、15分钟。根据就绪+运行状态 线程队列的情况来反映出CPU一段时间内的繁忙程度

    1.11、linux中proc目录

    Linux 内核提供了一种通过 /proc 文件系统,在运行时访问内核内部数据结构、改变内核设置的机制。proc文件系统是一个伪文件系统,它只存在内存当中,而不占用外存空间。它以文件系统的方式为访问系统内核数据的操作提供接口。

    用户和应用程序可以通过proc得到系统的信息,并可以改变内核的某些参数。由于系统的信息,如进程,是动态改变的,所以用户或应用程序读取proc文件时,proc文件系统是动态从系统内核读出所需信息并提交的。

    1. [root@localhost ~]# ls /proc
    2. 1 1656 27 3 404 494 599 691 consoles kallsyms mtrr timer_list
    3. 10 1673 279 30 405 5 6 693 cpuinfo kcore net timer_stats
    4. 1003 1676 28 301 406 50 601 7 crypto keys pagetypeinfo tty
    5. 103 1678 281 302 407 51 610 8 devices key-users partitions uptime
    6. 11 1698 283 303 408 515 620 9 diskstats kmsg sched_debug version
    7. 12 18 284 35 409 518 621 957 dma kpagecount schedstat vmallocinfo
    8. 120 19 286 36 410 52 646 958 driver kpageflags scsi vmstat
    9. 1252 2 287 37 411 53 66 961 execdomains loadavg self zoneinfo
    10. 13 20 288 377 412 584 669 980 fb locks slabinfo
    11. 1352 21 289 378 413 585 670 acpi filesystems mdstat softirqs
    12. 1375 22 29 38 414 590 671 asound fs meminfo stat
    13. 1376 23 290 389 46 593 672 buddyinfo interrupts misc swaps
    14. 14 24 291 390 47 594 675 bus iomem modules sys
    15. 15 25 293 4 48 596 677 cgroups ioports mounts sysrq-trigger
    16. 16 26 296 403 49 597 679 cmdline irq mpt sysvipc

    这些以数字命名的目录,它们是进程目录。系统中当前运行的每一个进程都有对应的一个目录在/proc下,以进程的 PID号为目录名,它们是读取进程信息的接口。而self目录则是读取进程本身的信息接口,是一个link。

    2、threading

    线程

    线程:线程被称为轻量级进程(Lightweight Process,LWP),是cpu调度的基本单位

    组成:线程ID、当前指令指针(PC)、寄存器集合、堆栈组成

    在单个程序中同时运行多个线程完成不同的工作,称为多线程。

    threading的功能

    threading用于提供线程相关的操作,线程是应用程序中工作的最小单元。

    threading模块提供的常用类:

    • Thread:创建线程
    • Lock/Rlck:互斥锁

    2.1、Thread构造方法

    • 构造方法: Thread(group=None, target=None, name=None, args=(), kwargs={})
      • group: 线程组,目前还没有实现,库引用中提示必须是None;
      • target: 要执行的方法;
      • name: 线程名;
      • args/kwargs: 要传入方法的参数

    2.2、Thread实例方法

    • t.name:获取或设置线程的名称
    • t.getName()/setName(name): 获取/设置线程名。
    • t.is_alive()、t.isAlive():判断线程是否为激活状态。返回线程是否在运行。正在运行指启动后、终止前。
    • t.ident :获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None
    • t.run() :线程被cpu调度后自动执行线程对象的run方法
    • t.start(): 线程准备就绪,等待CPU调度,start会自动调用t.run()
    • t.join([timeout]): 阻塞当前上下文环境的线程,直到调用此方法的线程终止或到达指定的timeout (可选参数)。
    • t.setDaemon(bool): 设置是后台线程(默认前台线程(False))。(在start之前设置)
      • 如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,主线程和后台线程均停止
      • 如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
    • t.isDaemon:判断是否为后台线程

    2.3、示例讲解

    1. import requests
    2. import time
    3. import functools
    4. import threading
    5. def get_content(url):
    6. text = requests.get(url).content
    7. time.sleep(0.5)
    8. print("get content")
    9. def runtime(func):
    10. # 保留传递进来的函数的元数据,将他的元数据赋值给inner
    11. # 元数据:函数名字、注释……
    12. @functools.wraps(func)
    13. def inner(*args, **kwargs):
    14. start = time.time()
    15. result = func(*args, **kwargs)
    16. end = time.time()
    17. print(f"函数执行花了{end - start}s")
    18. return result
    19. return inner
    20. @runtime
    21. def main():
    22. t_list = []
    23. for i in range(5):
    24. # get_content("https://www.baidu.com")# 若是只是用这行代码,后面的代码不使用,那么就是一个人做五个事情
    25. # 创建线程
    26. # target --》指定传入的方法的名字,而不是调用的结果。
    27. # args --》指定方法需要传入的参数 元组类型
    28. t = threading.Thread(target=get_content, args=("https://www.baidu.com",))
    29. t_list.append(t) # 把创造的每个线程实例都加到列表中去
    30. # 默认是前台线程,主线程执行完了,等待子线程执行完再退出。
    31. t.setDaemon(True) # 设置后台线程,主线程退出子线程也退出
    32. t.start() # 启动线程 --> 自动执行run方法 线程准备就绪,等待CPU调度,start会自动调用t.run()
    33. # t.join() # 若是把阻塞放在这里,那么这个多线程就没有意义了。
    34. # 这部分代码是主线程来做
    35. for t in t_list:
    36. # 阻塞当前环境上下文,直到t的线程执行完成
    37. # 谁执行了这个join代码,谁就是当前环境
    38. # t.join() # 若是t运行完成之后,main会继续往下运行;若是t没有运行完成,main会阻塞
    39. t.join(timeout=3) # 阻塞当前上下文环境的线程,直到调用此方法的线程终止或到达指定的timeout
    40. print("ending")
    41. main()
    42. ####### 执行结果
    43. get content
    44. get content
    45. get content
    46. get content
    47. get content
    48. ending
    49. 函数执行花了1.3826136589050293s

    2.4、使用自定义线程类创建

    1. import threading
    2. class MyThread(threading.Thread):
    3. def __init__(self, num):
    4. super(MyThread, self).__init__()
    5. self.num = num
    6. def run(self):
    7. print(f"running on numbers:{self.num}")
    8. t1 = MyThread(1)
    9. t2 = MyThread(2)
    10. t1.start()
    11. t2.start()

    3、互斥锁

    作用:主要是解决资源争用、数据读取不一致等问题。

    加锁的前提:在线程需要时使用公共资源的时候

    类型:

    • LOCK  原始锁 ,获取锁之前不做判断,直接获取到锁的位置

    • RLOCK  重入锁  获取锁之间先判断,如果自己有了锁就立即返回

    示例使用公共资源没有使用锁的情况

    1. import threading
    2. import time
    3. # 这样会出现读脏数据
    4. num = 0
    5. def sum_num(i):
    6. global num
    7. time.sleep(0.5)
    8. num += i
    9. print(num)
    10. for i in range(5):
    11. t = threading.Thread(target=sum_num, args=(i,))
    12. t.start()
    13. ####### 执行结果,这个结果是随机的,这只是一种结果
    14. 269
    15. 1010

    使用锁的例子

    1. import threading
    2. import time
    3. #只需要在有公共资源的地方加锁
    4. num = 0
    5. def sum_num(i):
    6. lock.acquire() # 获取锁
    7. global num
    8. time.sleep(0.5)
    9. num += i
    10. print(num)
    11. lock.release() # 释放锁
    12. def sum_num(i):
    13. with lock: # 自动获取和释放锁
    14. global num
    15. time.sleep(0.5)
    16. num += i
    17. print(num)
    18. lock = threading.Lock() # 创建一个锁对象 也可以这样写:threading.RLock()
    19. for i in range(5):
    20. t = threading.Thread(target=sum_num, args=(i,))
    21. t.start()
    22. #### 执行结果
    23. 0
    24. 1
    25. 3
    26. 6
    27. 10

    3.1、死锁

    示例:使用原始锁

    1. import threading
    2. lock1 = threading.Lock() # 只会执行第一步"lock1 acquire"
    3. lock1.acquire()
    4. print("lock1 acquire")
    5. lock1.acquire()
    6. print("lock1 acquire 2")
    7. lock1.release()
    8. print("lock1 release")
    9. print("lock1 release 2")
    10. #### 执行结果
    11. lock1 acquire

    示例:使用重入锁

    1. import threading
    2. lock1 = threading.RLock() # 这个能够执行完毕
    3. lock1.acquire()
    4. print("lock1 acquire")
    5. lock1.acquire()
    6. print("lock1 acquire 2")
    7. lock1.release()
    8. print("lock1 release")
    9. print("lock1 release 2")
    10. ### 执行结果
    11. lock1 acquire
    12. lock1 acquire 2
    13. lock1 release
    14. lock1 release 2

    4、信号锁

    • 互斥锁:允许1个线程执行
    • 信号量:允许n个线程执行
    • 事件锁:条件变量 , 满足条件,全部线程都执行

    • 条件锁:信号量+事件锁,满足条件,允许n个线程访问执行

    linux线程通信:互斥锁+信号量+条件变量

    1. # 这个是信号量的例子
    2. import threading
    3. import time
    4. num = 0
    5. def sum_num(i):
    6. with lock: # 自动获取和释放锁
    7. global num
    8. print(f"this is thread {i}")
    9. time.sleep(5)
    10. num += i
    11. print(num)
    12. lock = threading.BoundedSemaphore(2) # 定义了两把钥匙,同时有两个线程去执行
    13. for i in range(5):
    14. t = threading.Thread(target=sum_num, args=(i,))
    15. t.start()

    执行结果

    1. this is thread 0
    2. this is thread 1
    3. 0
    4. this is thread 2
    5. 1
    6. this is thread 3
    7. 36
    8. this is thread 4
    9. 10

    4.1、GIL,全局解释器锁

    • GIL全称Global Interpreter Lock(全局解释器锁
    • GIL和Python语言没有任何关系,只是因为历史原因导致在官方推荐的解释器Cpython中遗留的问题(Jpython无此类问题)
    • 每个线程在执行的过程中都需要先获取GIL保证同一时刻只有一个线程可以执行代码

    Python中多线程

    • GIL最基本的行为只有下面两个:
      • 当前执行的线程持有GIL
      • 当线程遇到io阻塞时,会释放GIL
    • 由于GIL锁的限制,所以多线程不适合计算型任务,而更适合IO型任务
      • 计算密集型任务:用CPU、计算   =>多进程
      •  IO密集型任务:网络IO(抓取网页数据)、磁盘操作(读写文件)、键盘输入..... => 多线程+多进程

    5、多进程

    5.1、os.fork

    os.fork中就用来创建子进程的方法

    注意:这个os.fork()方法只有在unix系统中才会有,在window下没有。

    使用fork创建子进程后,操作系统会将当前的进程复制一份

    原来的进程称为父进程,新创建的进程称为子进程

    两个进程会各自互不干扰的执行下面的程序

    父进程与子进程的执行顺序与系统调度有关

    在子进程内,这个方法会返回0;在父进程内,这个方法会返回子进程的编号PID。

    5.1.1、os.fork的返回值

    返回值为大于0时,此进程为父进程,且返回的数字为子进程的PID;

    当返回值为0时,此进程为子进程。

    如果返回值为负数则表明创建子进程失败。

    父进程结束时,子进程并不会随父进程立刻结束。同样,父进程不会等待子进程执行完。

    • os.getpid():获取进程的进程号。

    os.getppid():获取父进程的进程号。

    1. # 这段代码要放到unix环境里执行,我们可以把它放到linux环境中
    2. import os, time
    3. print("start...")
    4. result = os.fork() # 系统调用让内核返回的值
    5. # 父进程运行时得到的pid为子进程的pid
    6. # 子进程运行时这个pid就是0
    7. print("outerside pid is:", result)
    8. if result == 0:
    9. print("child paocess")
    10. time.sleep(60)
    11. print("child pid is:", os.getpid())
    12. print("child-parent pid is :", os.getppid())
    13. else:
    14. print("parent process")
    15. time.sleep(60)
    16. print("parent pid is :", os.getpid())

    得到的结果:

    1. [root@nginx-kafaka03 22-08-10]# ps -ef|grep os-fork
    2. root 1997 1947 1 15:20 pts/0 00:00:00 python3 os-fork.py
    3. root 1998 1997 0 15:20 pts/0 00:00:00 python3 os-fork.py
    4. root 2000 1912 0 15:20 pts/3 00:00:00 grep --color=auto os-fork
    5. [root@nginx-kafaka03 22-08-10]# python3 os-fork.py
    6. start...
    7. outerside pid is: 1998
    8. parent process
    9. outerside pid is: 0
    10. child paocess
    11. parent pid is : 1997
    12. child pid is: 1998
    13. child-parent pid is : 1997

    5.1、僵尸进程

    定义:子进程退出,父进程没有响应,父进程没有去调用wait()或者waitpid()去获取子进程的状态。子进程的进程控制块就依然会保存在系统中,这种进程就称之为僵尸进程。僵尸进程只有会少量的元数据残留。

    制造僵尸进程

    1. # 这段代码要放到unix环境里执行,我们可以把它放到linux环境中
    2. import os, time
    3. print("start...")
    4. result = os.fork() # 系统调用让内核返回的值
    5. # 父进程运行时得到的pid为子进程的pid
    6. # 子进程运行时这个pid就是0
    7. print("outerside pid is:", result)
    8. if result == 0:
    9. print("child paocess")
    10. # time.sleep(60) 把这段注释掉
    11. print("child pid is:", os.getpid())
    12. print("child-parent pid is :", os.getpid())
    13. else:
    14. print("parent process")
    15. time.sleep(60)
    16. print("parent pid is :", os.getpid())
    1. [root@nginx-kafaka03 22-08-10]# ps aux|grep python3
    2. root 2087 0.0 0.3 125188 5752 pts/0 S+ 15:40 0:00 python3 os-fork.py
    3. root 2088 0.0 0.0 0 0 pts/0 Z+ 15:40 0:00 [python3]
    4. root 2095 0.0 0.0 112824 988 pts/3 S+ 15:40 0:00 grep --color=auto python
    5. [root@nginx-kafaka03 22-08-10]# python3 os-fork.py
    6. start...
    7. outerside pid is: 2088
    8. parent process
    9. outerside pid is: 0
    10. child paocess
    11. child pid is: 2088
    12. child-parent pid is : 2088
    13. parent pid is : 2087

    5.2、孤儿进程

    定义:一个父进程退出,子进程还在运行,那么这个子进程就会称为孤儿进程。孤儿进程会被pid为1的进程所收养。

    制造孤儿进程

    1. import os, time
    2. print("start...")
    3. pid = os.fork()
    4. print("outerside pid is:",pid)
    5. if pid == 0:
    6. print("child paocess")
    7. time.sleep(30)
    8. print("child pid is:", os.getpid())
    9. print("child-parent pid is :", os.getpid())
    10. else:
    11. print("parent process")
    12. print("parent pid is :", os.getpid())
    1. [root@nginx-kafaka03 22-08-10]# ps -ef|grep python3
    2. root 3355 1 0 17:05 pts/0 00:00:00 python3 os-fork.py # 这里说明这个进程是一个孤儿进程
    3. root 3357 1912 0 17:05 pts/3 00:00:00 grep --color=auto python3

    6、Multiprocessing

    • Process(用于创建进程模块)
    • Pool(用于创建管理进程池)
    • Queue(用于进程通信,资源共享)
    • Value,Array(用于进程通信,资源共享)
    • Pipe(用于管道通信)
    • Manager(用于资源共享)

    6.1、multiprocessing.Process

    语法

    构造方法:Process([group [, target [, name [, args [, kwargs]]]]])

    group: 线程组,目前还没有实现,库引用中提示必须是None;

    target: 要执行的方法;

    name: 进程名;

    args/kwargs: 要传入方法的参数

    实例方法

    p.start():启动进程,并调用该子进程中的p.run()

    p.run(): strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法

    p.terminate(): 不管任务是否完成,立即停止工作进程

    p.is_alive(): 如果p仍然运行,返回True

    p.join([timeout]): 阻塞当前上下文环境的进程,直到调用此方法的进程终止或到达指定的timeout

    1. from multiprocessing import Process, current_process
    2. import time
    3. lst = []
    4. def task(i):
    5. print(current_process().name, i, 'start.....')
    6. time.sleep(2)
    7. lst.append(i)
    8. print(lst)
    9. print(current_process().name, i, 'end...')
    10. if __name__ == "__main__": # 多进程必须有这个__name__= "__main__",必须是主程序运行多进程
    11. p_lst = []
    12. for i in range(4):
    13. p = Process(target=task, args=(i,))
    14. p_lst.append(p)
    15. p.start()
    16. for p in p_lst:
    17. p.join()
    18. print("main end .......")
    1. # 执行结果
    2. Process-2 1 start.....
    3. Process-1 0 start.....
    4. Process-3 2 start.....
    5. Process-4 3 start.....
    6. [1]
    7. Process-2 1 end...
    8. [0]
    9. Process-1 [2]0
    10. end...
    11. Process-3 2 end...
    12. [3]
    13. Process-4 3 end...
    14. main end .......
    15. Process finished with exit code 0

    自定义进程类创建多进程

    1. class MyMultip(Process):
    2. def __init__(self,num):
    3. super(MyMultip, self).__init__()
    4. self.num = num
    5. def run(self) -> None:
    6. print(f"running on numbers:{self.num}")
    7. if __name__ == "__main__": # 多进程必须有这个__name__= "__main__",必须是主程序运行多进程
    8. t1 = MyMultip(1)
    9. t2 = MyMultip(2)
    10. t1.start()
    11. t2.start()
    12. # 运行结果
    13. running on numbers:1running on numbers:2

    6.2、数据共享

    不同进程间内存是不共享的,multiprocessing中提供以下方式实现进程间的数据交换

    • Queue(用于进程通信,资源共享)
    • Value,Array(用于进程通信,资源共享)
    • Pipe(用于管道通信)
    • Manager(用于资源共享)

    6.2.1、Multiprocessing.Manager

    使用multiprocessing.Manager共享数据

    • 由Manager()返回的manager提供 list, dict, Namespace, Lock, RLock, Semaphore,
    • BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array类型的支持。
    • Manager比Array要好用一点,因为它可以同时保存多种类型的数据格式。
    1. from multiprocessing import Manager, Process,Lock
    2. import time
    3. def func(i, temp):
    4. with lock:
    5. temp[0] += 100
    6. print(i, "------------------>", temp[0])
    7. time.sleep(1)
    8. # 使用manager,父进程要等待子进程结束再退出
    9. lock = Lock()
    10. if __name__ == '__main__':
    11. manager = Manager()
    12. temp = manager.list([1, 2, 3]) # 要传递什么类型就在manager.后面放什么类型
    13. p_list = []
    14. for i in range(10):
    15. p = Process(target=func, args=(i, temp))
    16. p.start()
    17. p_list.append(p)
    18. for i in p_list:
    19. i.join()
    20. # 运行结果之一
    21. # """
    22. # 不加锁可能会出现脏数据
    23. # 0 ------------------> 101
    24. # 2 ------------------> 201
    25. # 4 ------------------> 201
    26. # 1 ------------------> 301
    27. # 3 ------------------> 401
    28. # 5 ------------------> 501
    29. # 6 ------------------> 601
    30. # 7 ------------------> 701
    31. # 8 ------------------> 801
    32. # 9 ------------------> 901
    33. # """
    1. # 加锁之后
    2. from multiprocessing import Manager, Process,Lock
    3. import time
    4. def func(i, temp):
    5. with lock:
    6. temp[0] += 100
    7. print(i, "------------------>", temp[0])
    8. time.sleep(1)
    9. # 使用manager,父进程要等待子进程结束再退出
    10. lock = Lock()
    11. if __name__ == '__main__':
    12. manager = Manager()
    13. temp = manager.list([1, 2, 3]) # 要传递什么类型就在manager.后面放什么类型
    14. p_list = []
    15. for i in range(10):
    16. p = Process(target=func, args=(i, temp))
    17. p.start()
    18. p_list.append(p)
    19. for i in p_list:
    20. i.join()
    21. # 运行结果
    22. 1 ------------------> 101
    23. 2 ------------------> 201
    24. 0 ------------------> 301
    25. 4 ------------------> 401
    26. 3 ------------------> 501
    27. 9 ------------------> 601
    28. 5 ------------------> 701
    29. 6 ------------------> 801
    30. 8 ------------------> 901
    31. 7 ------------------> 1001

    manager使用socket实现的数据共享,父进程结束,manager进程会先退出

    1. from multiprocessing import Manager, Process,Lock
    2. import time
    3. def func(i, temp):
    4. time.sleep(50)
    5. with lock:
    6. temp[0] += 100
    7. print(i, "------------------>", temp[0])
    8. # 使用manager,父进程要等待子进程结束再退出
    9. # 使用socket方式实现
    10. lock = Lock()
    11. if __name__ == '__main__':
    12. manager = Manager()
    13. temp = manager.list([1, 2, 3]) # 要传递什么类型就在manager.后面放什么类型
    14. p_list = []
    15. for i in range(10):
    16. p = Process(target=func, args=(i, temp))
    17. p.start()
    18. p_list.append(p)
    19. # for i in p_list:
    20. # i.join() # 不加join,manager会提前退出
    21. # 执行结果
    22. [root@nginx-kafaka03 22-08-10]# ps -ef|grep python3
    23. root 3605 1912 0 21:43 pts/3 00:00:00 python3 test_nojoin.py
    24. root 3611 3605 0 21:43 pts/3 00:00:00 python3 test_nojoin.py
    25. root 3612 3605 0 21:43 pts/3 00:00:00 python3 test_nojoin.py
    26. root 3613 3605 0 21:43 pts/3 00:00:00 python3 test_nojoin.py
    27. root 3615 3605 0 21:43 pts/3 00:00:00 python3 test_nojoin.py
    28. root 3616 3605 0 21:43 pts/3 00:00:00 python3 test_nojoin.py
    29. root 3617 3605 0 21:43 pts/3 00:00:00 python3 test_nojoin.py
    30. root 3618 3605 0 21:43 pts/3 00:00:00 python3 test_nojoin.py
    31. root 3620 3605 0 21:43 pts/3 00:00:00 python3 test_nojoin.py
    32. root 3621 3605 0 21:43 pts/3 00:00:00 python3 test_nojoin.py
    33. root 3622 3605 0 21:43 pts/3 00:00:00 python3 test_nojoin.py
    34. root 3634 1947 0 21:43 pts/0 00:00:00 grep --color=auto python3
    35. [root@nginx-kafaka03 22-08-10]# ps aux|grep python3|wc -l
    36. 12
    37. # 报错信息中可以在直到manager使用的是socket方式实现的数据共享
    38. File "/usr/lib64/python3.6/multiprocessing/connection.py", line 491, in Client
    39. c = SocketClient(address)

    6.2.2、Queue队列

    使用multiprocessing.Queue共享数据

    • 消息队列:multiprocessing.Queue
      • Queue是对进程安全的队列,可以使用Queue实现对进程之间的数据传输;还有一个重要作用是作为缓存使用。
      • Queue(maxzize = 0) 创建一个队列对象,maxsize 表示队列中最多存放消息的数量。
    • 实例方法:
      • put(obj [, block=True[, timeout]]):调用队列对象的put()方法将obj插入到队列中
      • get([block=True[, timeout]]):get方法可以将队列中读取并删除一个元素
      • full():判断队列是否为满
      • empty():判断队列是否为空
      • qsize():获取队列中消息数量

    Queue不能再Pool进程池中使用,使用Multiprocessing.Manager类可以适用Pool类

    • 不一致读
    • 为了防止和多线程一样的出现数据抢夺和脏数据的问题,同样需要设置进程锁。与threading类似,在multiprocessing里也有同名的锁类RLock, Lock, Event, Condition, Semaphore,连用法 都是一样的!
    • 当创建进程时(非使用时),共享数据会被拿到子进程中,当进程中执行完毕后,再赋值给原值。
    1. from multiprocessing import Process, Queue
    2. import time
    3. def func(i, q):
    4. if not q.empty():
    5. print(i, "-->get value", q.get())
    6. time.sleep(2)
    7. # 队列先进先出
    8. if __name__ == '__main__':
    9. q = Queue() # 不能用在进程池
    10. for i in range(6):
    11. q.put(10 - i)
    12. p = Process(target=func, args=(i, q))
    13. p.start()
    14. # 执行结果
    15. 0 -->get value 10
    16. 4 -->get value 9
    17. 1 -->get value 8
    18. 3 -->get value 7
    19. 2 -->get value 6
    20. 5 -->get value 5

    6.3、进程池

    进程池的作用:有效的降低频繁创建销毁线程所带来的额外开销

    6.3.1、进程池的原理

    • 进程池都是采用预创建的技术,在应用启动之初便预先创建一定数目的进程。
    • 应用在运行的过程中,需要时可以从这些进程所组成的进程池里申请分配一个空闲的进程,来执行一定的任务,任务完成后,并不是将进程销毁,而是将它返还给进程,由线程池自行管理。
    • 如果进程池中预先分配的线程已经全部分配完毕,但此时又有新的任务请求,则进程池会动态的创建新的进程去适应这个请求。
    • 某些时段应用并不需要执行很多的任务,导致了进程池中的线程大多处于空闲的状态,为了节省系统资源,进程池就需要动态的销毁其中的一部分空闲进程
    • 进程需要一个管理者,按照一定的要求去动态的维护其中进程的数目。

    6.3.2、进程池主进程管理进程的机制

    最简单、最常用的算法是随机算法Round Robin(轮流算法)

    • 主进程和所有子进程通过一个共享的工作队列来实现同步:子进程都睡眠在该工作队列上,当有新的任务到来时,主进程将任务添加到工作队列中。这将唤醒正在等待任务的子进程,不过只有一个子进程将获得新任务的“接管权”,它可以从工作队列中取出任务并执行之,而其他子进程将继续睡眠在工作队列上。 
    • 当选择好子进程后,主线程程还需要使用某种通知机制来告诉目标子进程有新任务需要处理,并传递必要的数据。我们可以把这些数据定义为全局,那么它们本身就是被所有进程共享的。对于进程池而言,最简单的方式是,在父进程和子进程之间预先建立好一条管道,然后通过管道来实现所有的进程间通信

    6.3.3、进程池的应用场景

    • 需要大量的进程来完成任务,且完成任务的时间比较短。
    • 但对于长时间的任务,比如一个Telnet连接请求,进程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。

    6.3.4、构造方法和实例方法

    构造方法

    • Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
    • processes :使用的工作进程的数量,如果processes是None那么使用 os.cpu_count()返回的数量。
    • initializer:如果initializer是None,那么每一个工作进程在开始的时候会调initializer(*initargs)。
    • maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代 原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。

    实例方法

    • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞。
    • apply(func[, args[, kwds]])是阻塞的。
    • close() 关闭pool,使其不在接受新的任务。
    • terminate() 关闭pool,结束工作进程,不在处理未完成的任务。
    • join() 主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。这样是因为被终止的进程需要被父进程调用wait(join等价与wait),否则进程会成为僵尸进程

    注意

    1. 使用Pool创建进程池对象,同时进程池中进程已经启动
    2. 向进程池对象中添加事件,事件排队执行
    3. 如果主进程退出,则进程池中所有进程都退出
  • 相关阅读:
    leetcode:254. 因子的组合【经典递归 + 新型递归 + 所有可能的因数分解】
    C语言期末复习题(上)
    大厂SQL题5-同时在线人数、不同类别的第一
    前端获取文件后缀名
    C语言--用二分法快速计算指定整数的整数平方根
    React - 01
    Mac OS 使用ScreenCaptureKit进行窗口抓取和系统声音抓取
    议题征集中|KCD 2023 杭州站
    【算法】三层嵌套循环的时间复杂度
    深入理解算法的时间复杂度
  • 原文地址:https://blog.csdn.net/m0_48638643/article/details/126179791