• python_多进程、多线程


    多进程

    1.创建进程

    import multiprocessing as mp
    
    def job(a):
        print('aaaa')
    
    if __name__ == '__main__':
        p1 = mp.Process(target=job, args=(1,))
        p1.start()
        p1.join()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    2.queue进程输出

    import multiprocessing as mp
    
    def job(q,a):
        res = 0
        for i in range(a):
            res += i
        q.put(res)
        
    if __name__ == '__main__':
        q = mp.Queue()
        p1 = mp.Process(target=job, args=(q,3,))
        p2 = mp.Process(target=job, args=(q,2))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
        res1 = q.get()
        res2 = q.get()
        p1.close()
        p2.close()
        print(res1 + res2)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    3.效率对比(多线程与多进程)

    import multiprocessing as mp
    import threading as td
    import time
    
    
    def job(q):
        res = 0
        for i in range(1000000):
            res += i + i**2 + i**3
        q.put(res) # queue
    
    def multicore():
        q = mp.Queue()
        p1 = mp.Process(target=job, args=(q,))
        p2 = mp.Process(target=job, args=(q,))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
        p1.close()
        p2.close()
        res1 = q.get()
        res2 = q.get()
        print('multicore:',res1 + res2)
    
    def multithread():
        q = mp.Queue() # thread可放入process同样的queue中
        t1 = td.Thread(target=job, args=(q,))
        t2 = td.Thread(target=job, args=(q,))
        t1.start()
        t2.start()
        t1.join()
        t2.join()
        res1 = q.get()
        res2 = q.get()
        print('multithread:', res1 + res2)
    
    def normal():
        res = 0
        for _ in range(2):
            for i in range(1000000):
                res += i + i**2 + i**3
        print('normal:', res)
    
    
    if __name__ == '__main__':
        st = time.time()
        normal()
        st1 = time.time()
        print('normal time:', st1 - st)
        multithread()
        st2 = time.time()
        print('multithread time:', st2 - st1)
        multicore()
        print('multicore time:', time.time() - st2)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    注意:其实这里跟数据规模和运算量有关系,如果将job中的1000000改成10,那么normal最快,multprocee最慢,因为创建进程也需要一定开销。如果把1000000改成10000000,那么multprocess就相对较快了,因为创建进程的开销相比于多进程节省的时间就可以忽略不记了。

    4.进程池pool

    import multiprocessing as mp
    
    def job(x):
        return x*x
    
    def main():
        pool = mp.Pool(processes=3) # 默认使用所有的核,可以用process指定核数
        res = pool.map(job,range(10))
        print(res)
    
        res = pool.apply_async(job,(2,))
        print(res.get())
    
        multi_res = [pool.apply_async(job,(i,)) for i in range(10)]
        print([res.get() for res in multi_res])
        
    
    if __name__ == '__main__':
        main()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    mp.Pool(process=)会将所有的数据推给指定的核;而使用apply_async可以将指定的数据推给一个核。如果想达到多核运算,可以用迭代的方式。

    主要区别在于:直接使用mp.Pool指定process,那么一个进程可能会处理多次job,而如果用apply_async则是一个进程处理一个job。

    5.共享内存

    进程之间交流数据。(如果是多线程可以使用global)

    import multiprocessing as mp
    
    value = mp.Value('d', 0) # 'd'表示类型:double, 0表示数值
    array = mp.Array('i', [1,2,3]) # 这里的array只是list,不能使用多维
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    6.进程锁lock

    import multiprocessing as mp
    import time
    
    
    def job(v, num, l):
        l.acquire()
        for _ in range(10):
            time.sleep(0.1)
            v.value += num
            print(v.value)
        l.release()
    
    
    def main():
        l = mp.Lock()
        v = mp.Value('i', 0)
        p1 = mp.Process(target=job, args=(v, 1, l))
        p2 = mp.Process(target=job, args=(v, 3, l))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
    
    if __name__ == '__main__':
        main()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    使用进程锁可以使得进程之间不互相抢夺资源。一般如果我们想要一个进程处理完之后再去处理另一个进程时,就需要用到进程锁。

    多线程

    1.添加线程

    import threading
    
    def Myjob():
        print(f'this is an added Thread, number is {threading.current_thread()}')
    
    
    def main():
        added_thread = threading.Thread(target=Myjob) # 添加一个线程,并指定该线程的工作内容
        added_thread.start() # 执行该线程
        # print(threading.active_count()) # 打印有几个激活的线程
        # print(threading.enumerate()) # 打印激活线程
        # print(threading.current_thread()) # 打印运行该程序的线程
    
    
    if __name__ == '__main__':
        main()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    2.join功能

    join的意义:保证某个线程先执行完,再去执行主线程(如果有两个线程,它们都join了,表示当他们都执行完再去执行主线程,这两个线程不因执行join的先后而受影响

    import threading
    import time
    
    def Myjob():
        print('Thread1 start\n')
        for i in range(10):
            time.sleep(0.1)
        print('Thread1 finish\n')
    
    def main():
        added_thread = threading.Thread(target=Myjob, name='Thread1') # 添加一个线程,并指定该线程的工作内容
        added_thread.start() # 执行该线程
        print('all done\n')
    
    if __name__ == '__main__':
        main()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    结果:

    Thread1 start
    
    all done
    
    Thread1 finish
    
    • 1
    • 2
    • 3
    • 4
    • 5

    可以发现all done在Thread1 finish之前打印了。因为这里使用了多线程,Thread1的执行不影响主程序线程的执行。

    import threading
    import time
    
    def Myjob():
        print('Thread1 start\n')
        for i in range(10):
            time.sleep(0.1)
        print('Thread1 finish\n')
    
    def main():
        added_thread = threading.Thread(target=Myjob, name='Thread1') # 添加一个线程,并指定该线程的工作内容
        added_thread.start() # 执行该线程
        added_thread.join() # Thread1执行完之后,再执行下面的语句
        print('all done\n')
    
    if __name__ == '__main__':
        main()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    结果:

    Thread1 start
    
    Thread1 finish
    
    all done
    
    • 1
    • 2
    • 3
    • 4
    • 5

    可以发现加了added_thread.join()后,all done最后才打印,因此join函数的作用是保证该线程执行完成后,才执行下面语句。

    import threading
    import time
    
    def Myjob():
        print('Thread1 start\n')
        for i in range(10):
            time.sleep(0.1)
        print('Thread1 finish\n')
    
    def T2_job():
        print('T2 start\n')
        for i in range(5):
            time.sleep(0.1)
        print('T2 finish\n')
    
    def main():
        added_thread = threading.Thread(target=Myjob, name='Thread1') # 添加一个线程,并指定该线程的工作内容
        thread2 = threading.Thread(target=T2_job, name='T2')
        thread2.start()
        added_thread.start() # 执行该线程
        thread2.join() # 这里保证thread2执行完成之后就可以执行后面语句了,不用管thread1是否执行完
        print('all done\n')
    
    if __name__ == '__main__':
        main()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    结果:

    T2 start
    
    Thread1 start
    
    T2 finish
    
    all done
    
    Thread1 finish
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    发现all done在Thread1 finish前打印了,因为thread2.join()只保证thread2执行完成之后就可以执行后面语句了,不用管thread1是否执行完,因此只要执行完了thread2的工作(这里的标志是打印完T2 finish),就可以打印all done了。

    3.Queue

    Queue的意义:将所有进程的处理结果放到一个队列Queue中,再返回到主线程中做处理

    import threading
    import time
    from queue import Queue
    
    def job(l,q):
        # time.sleep(1)
        for i in range(len(l)):
            l[i] = l[i]**2
        q.put(l)
    
    
    def main():
        q = Queue()
        threads = []
        data = [[1,2,3],[2,3,4],[3,4,5],[5,6]]
        for i in range(4):
            added_thread = threading.Thread(target=job, args=(data[i],q))
            added_thread.start()
            threads.append(added_thread)
        for thread in threads:
            thread.join()
        results = []
        for _ in range(4):
            results.append(q.get())
        print(results)
    
    if __name__ == '__main__':
        main()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    注意:q中获取的结果顺序不一定是[[1,4,9],[4,9,16],[9,16,25],[25,36]],如果将job()函数中的time.sleep(1)取消注释,就会发现结果并不是按data中的顺序计算的,他是按照哪个线程先完成的顺序排列的,即线程之间的结果顺序只有一个评判标准:哪个进程先完成(注意先开始的不一定先完成,只比较线程完成的时间点)

    4.GIL不一定有效率

    多线程看似是同时执行,其实是程序不停切换进程执行,让他看起来像是在同时进行。简单来说,多线程是并发,不是并行

    多线程的意义:当cpu处于空闲时,可以先执行别的任务,从而提高效率。

    就像前面代码中用到了time.sleep(1),如果只有一个线程,那么程序要先等休眠1秒后再继续执行任务。但如果是多线程的话, 程序可以在这休眠的1秒钟,先把其他线程的任务完成,这样就提高了效率。

    5.锁(lock)

    意义:如果线程1需要线程2的结果才能继续执行,那么可以锁住线程2,等线程2执行完再去执行线程1。

    import threading
    import time
    from queue import Queue
    
    def job1():
        global A, lock
        lock.acquire()
        for i in range(10):
            A = A + 1
            print('job1', A)
        lock.release()
    
    def job2():
        global A, lock
        lock.acquire()
        A = A * 10
        print('job2', A)
        lock.release()
    
    if __name__ == '__main__':
        lock = threading.Lock()
        A = 0
        t1 = threading.Thread(target=job1)
        t2 = threading.Thread(target=job2)
        t1.start()
        t2.start()
        t1.join()
        t2.join()
        print(A)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    保证在thread1执行完之后,thread2再执行

  • 相关阅读:
    决策树(ID3,C4.5,C5.0,CART算法)以及条件推理决策树R语言实现
    【矩阵论】4. 矩阵运算——张量积
    操作系统存储管理
    软件设计师考试---标题、判定表、页式存储管理器、快速原型模型、三层C/S结构、耦合类型,
    c++编程实例
    Redis05:Redis高级部分
    taichi 和正常python 速度对比
    Dubbo-服务暴露
    docker学习-1CentOS安装Docker CE
    编写汽车零部件开发问题分析报告培训与报告内容检查单
  • 原文地址:https://blog.csdn.net/Dajian1040556534/article/details/126905195