• Python多进程和多线程对比总结


    总结

    场景API通信机制
    多线程IO 密集型- joblib.threading
    - threading.Thread
    - concurrent.futures.ThreadPoolExecutor
    可以共享内存, 可以共享变量,但需要加锁
    多进程计算密集型- joblib.multiprocessing
    - multiprocessing.Pool
    - multiprocessing.apply_async
    - concurrent.futures.ProcessPoolExecutor, from concurrent.futures import ThreadPoolExecutor
    队列,无需对queue加锁

    Python中,线程池的使用可以通过两个库实现:concurrent.futures和threading。其中,concurrent.futures是Python3.2版本新增的标准库,提供了高层次的异步执行模型,适用于大量I/O密集型任务;而threading库则适用于CPU密集型任务。

    关于GIL

    GIL, is a mutex that protects access to Python objects, preventing multiple threads from executing Python bytecodes at once — Python Wiki

    为何需要GIL? 因为Python解释器不是线程安全的。

    代码示例

    多进程用法:
    # create a process
    process = multiprocessing.Process(target=task)
    # create a process
    process = multiprocessing.Process(target=task, args=(arg1, arg2))
    # run the new process
    process.start()
    
    # SuperFastPython.com
    # example of running a function in another process
    from time import sleep
    from multiprocessing import Process
     
    # a custom function that blocks for a moment
    def task():
        # block for a moment
        sleep(1)
        # display a message
        print('This is from another process')
     
    # entry point
    if __name__ == '__main__':
        # create a process
        process = Process(target=task)
        # run the process
        process.start()
        # wait for the process to finish
        print('Waiting for the process...')
        process.join()
        
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    多进程之间采用队列通信:
    from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, as_completed
    if __name__ == '__main__':
    	print('main thread is {}'.format(threading.current_thread().name))
        start_time = time.time()
        ### multithread queue
        from queue import Queue
        queue = Queue()
        tw = threading.Thread(target=write_to_queue, args=(queue,))
        tr = threading.Thread(target=read_from_queue, args=(queue,))
        tr.setDaemon(True)
        tw.start()
        tr.start()
        tw.join()
        end_time = time.time()
        print('total time is {}'.format(str(end_time - start_time)))
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    多线程之间通信:
    def increase(var, lock):
        global total_increase_times
        for i in range(1000000):
            if lock.acquire():
                var[0] += 1
                lock.release()
                total_increase_times += 1
    
    
    def decrease(var, lock):
        global total_decrease_times
        for i in range(1000000):
            if lock.acquire():
                var[0] -= 1
                lock.release()
                total_decrease_times += 1
                
                
    if __name__ == '__main__':
        print('main thread is {}'.format(threading.current_thread().name))
        start_time = time.time()
        lock = threading.Lock()
        var = [5]
        total_increase_times = 0
        total_decrease_times = 0
        t1 = threading.Thread(target=increase, args=(var, lock))
        t2 = threading.Thread(target=decrease, args=(var, lock))
        t1.start()
        t2.start()
        t1.join()
        t2.join()
        print(var)
        print('total increase times: {}'.format(str(total_increase_times)))
        print('total decrease times: {}'.format(str(total_decrease_times)))
        end_time = time.time()
        print('total time is {}'.format(str(end_time - start_time)))
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    多线程同步队列

    场景:需要保证结果有序,同时还需要解决执行效率问题。例如引擎测试中的大规模的数据对比,数据字典中的正确性验证等等。在这种场景下,单线程串行执行虽然能保证有序,但是效率却是难以恭维,而简单的多线程执行,有序性又不好保证。此时,多线程同步队列处理这类问题就是一个很不错的选择。

       对类似的场景进行抽象,输入文件为仅有的一个生产者,同步队列作为容器,每个工作线程作为一个消费者,就形成了一个简单的生产者-消费者模型(一个生产者,多个消费者)。
    
    • 1

    alt

       等等,在上面的模型中,我们可以保证进入同步队列的时候是有序的,不能保证消费者输出是有序的。这个时候,可以引入一个buffer,用于存储结果,同时在进入同步队列的时候加入一个idx用于标识顺序。输出的时候,交由生产者进行输出,生产者记录一个已处理的idx,当要输出的idx大于生产者记录的idx的时候,仅仅把数据放入buffer;在要输出的idx和生产者记录的idx一致的时候,从buffer中逐次取出idx连续的结果,逐次输出,并更新生产者记录的idx。流程如下图所示。
    
    • 1

    在这里插入图片描述

       python中有Queue和threading等module可以供我们使用,可以方便的帮助我们实现多线程同步队列,实现代码如下。注意,根据场景需要,会有不同的处理方法,下面代码里的处理函数只是简单的对数据进行输出。
    
    • 1
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    from time import sleep
    from threading import Thread, Lock
    from Queue import Queue
    
    class Consumer (Thread) :
            def __init__ (self, producer) :
                    Thread.__init__(self)
                    self.p = producer
    
            def run (self) :
                    while True :
                            idx, data = self.p.get()
                            #Here to add processing code
                            sleep (1)
                            #Here output
                            p.output (idx, data)
                            self.p.task_done()
    
    class Producer (Queue) :
            def __init__(self) :
                    Queue.__init__(self)
                    self.lock = Lock()
                    self.idx = 0
                    self.buf = {}
    
            def output (self, idx, data) :
                    self.lock.acquire()
                    if idx > self.idx : #only store the result
                            self.buf[idx] = data
                    elif idx == self.idx :
                            self._output(data) #output one result
                            self.idx += 1
                            while self.idx in self.buf: #output serial result until break
                                    data = self.buf[self.idx]
                                    self._output(data)
                                    self.idx += 1
    
                    self.lock.release()
    
            def _output (self, data) :
                    print data
    
    
    if __name__ == '__main__' :
            p = Producer ()
            for i in xrange(4) :
                    c = Consumer(p)
                    c.daemon = True
                    c.start()
    
            num = 0
            for line in open('data', 'r') :
                    p.put ( (num, line.strip()) )
                    num += 1
    
            p.join()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    from time import sleep
    from threading import Thread, Lock
    from Queue import Queue
     
    class Consumer (Thread) :
            def __init__ (self, producer) :
                    Thread.__init__(self)
                    self.p = producer
     
            def run (self) :
                    while True :
                            idx, data = self.p.get()
                            #Here to add processing code
                            sleep (1)
                            #Here output
                            p.output (idx, data)
                            self.p.task_done()
     
    class Producer (Queue) :
            def __init__(self) :
                    Queue.__init__(self)
                    self.lock = Lock()
                    self.idx = 0
                    self.buf = {}
     
            def output (self, idx, data) :
                    self.lock.acquire()
                    if idx > self.idx : #only store the result
                            self.buf[idx] = data
                    elif idx == self.idx :
                            self._output(data) #output one result
                            self.idx += 1
                            while self.idx in self.buf: #output serial result until break
                                    data = self.buf[self.idx]
                                    self._output(data)
                                    self.idx += 1
     
                    self.lock.release()
     
            def _output (self, data) :
                    print data
     
     
    if __name__ == '__main__' :
            p = Producer ()
            for i in xrange(4) :
                    c = Consumer(p)
                    c.daemon = True
                    c.start()
     
            num = 0
            for line in open('data', 'r') :
                    p.put ( (num, line.strip()) )
                    num += 1
     
            p.join()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    高并发库

    结论就是 3.11 下面,asyncio 比 gevent 快 50%,加上 uvloop 可以快一倍。纯用 asyncio 性能可以做到 redis 的 68%,而加上 uvloop 后可以做到 redis 的 88%,当然程序比较简单,没有复杂的数据处理,主要就是测评网络 I/O 性能

    参考链接

  • 相关阅读:
    Kernel: network:问题分析一例,包从二层到了三层,却没有到四层
    【笔记】samba shell 脚本 离线安装 - Ubuntu 20.04
    记一次 .NET某管理局检测系统 内存暴涨分析
    Java关于由子类构造器生成的父类对象的反射问题
    大规模语言模型--训练成本
    清华大学YOLOv10公版目标检测算法在地平线Bayes架构神经网络加速单元BPU上部署参考—— 以RDK Ultra为例
    投票礼物打赏流量主小程序开发
    PlayWright(十七)- 参数化
    【类、抽象与继承】
    美团一面,面试官让介绍AQS原理并手写一个同步器,直接凉了
  • 原文地址:https://blog.csdn.net/jgku/article/details/128020494