• python 并发、并行处理、分布式处理


    learn from 《Python高性能(第2版)》

    • 减少CPU指令:
      加速python可以利用 CPython 获取 C 语言的性能
      Numba 加速 Numpy
      PyPy解释器

    • 减少 IO 等待:
      异步

    1. 异步编程

    阻塞、回调

    import time
    
    
    def wait_and_print(msg):
        time.sleep(1)  # 阻塞程序执行流
        print(msg)
    
    
    import threading
    
    
    def wait_and_print_async(msg):
        def callback():
            print(msg)
    
        timer = threading.Timer(1, callback)  # 不会阻塞程序执行流程,1秒以后执行 callback 函数
        timer.start() # 启动定时器, 实质:启动了新线程
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    if __name__ == '__main__':
        t0 = time.time()
        wait_and_print('第一次')
        wait_and_print('第二次')
        print(f'After call, takes: {time.time() - t0} seconds')
    
    • 1
    • 2
    • 3
    • 4
    • 5

    输出

    第一次
    第二次
    After call, takes: 2.017909049987793 seconds
    
    • 1
    • 2
    • 3
    	t0 = time.time()
        wait_and_print_async('第一次')
        wait_and_print_async('第二次')
        print(f'After call, takes: {time.time() - t0} seconds')
    
    • 1
    • 2
    • 3
    • 4

    输出

    After call, takes: 0.0020036697387695312 seconds
    第二次第一次
    
    • 1
    • 2

    把返回结果当参数传递给回调函数

    def network_request_async(num, on_done):
        def timer_done():
            on_done({'success': True, 'result': num**2})
        timer = threading.Timer(1, timer_done)
        timer.start()
    
    def on_done(result):
        print(result)
    
    network_request_async(2, on_done)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    异步代码需要层层编写回调函数,很麻烦

    future

    future 更便利,可用来跟踪异步调用的结果

    from concurrent.futures import Future
    fut = Future()
    print(fut)  # 
    
    • 1
    • 2
    • 3

    pending 表示还未确定

    可以使用 fut.set_result() 使结果可用

    fut.set_result("hello michael")
    print(fut, fut.result())
    #  hello michael
    
    • 1
    • 2
    • 3

    还可以通过 add_done_callback 指定回调函数,当结果可用时,调用它(第一参数为 future obj)

    fut1 = Future()
    fut1.add_done_callback(lambda future_obj: print(future_obj.result(), flush=True))
    fut1.set_result("hello michael")
    #  hello michael
    
    • 1
    • 2
    • 3
    • 4
    import threading
    from concurrent.futures import Future
    def network_request_async(number):
        future = Future()
        result = {
            'success': True,
            'result': number**2
        }
        timer = threading.Timer(1, lambda: future.set_result(result))
        timer.start()
        return future
    
    if __name__ == '__main__':
        fut = network_request_async(2)
        print(fut)
        #  
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    上面的函数什么也没有返回,还处于 pending

    添加回调函数

    def fetch_square(number):
        fut = network_request_async(number)
        def on_done_future(future):
            response = future.result()
            if response['success']:
                print(f'result is {response["result"]}')
        fut.add_done_callback(on_done_future)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    事件循环

    不断监视各种资源的状态,并在事件发生时执行相应的回调函数

    事件循环:每个执行单元都不会与其他执行单元同时运行。(能规避同时写一个数据的风险?)

    import time
    
    
    class Timer:
        def __init__(self, timeout):
            self.timeout = timeout
            self.start_time = time.time()
    
        def done(self):
            return time.time() - self.start_time > self.timeout
    
    
    if __name__ == '__main__':
        timer = Timer(3)
        while True:
            if timer.done():
                print('Timer finished')
                break
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    流程不会被阻塞,可以在 while 循环中执行其他操作,通过循环不断轮询等待事件发生称为 busy-waiting

    import time
    
    
    class Timer:
        def __init__(self, timeout):
            self.timeout = timeout
            self.start_time = time.time()
    
        def done(self):
            return time.time() - self.start_time > self.timeout
    
        def on_timer_done(self, callback):
            self.callback = callback
    
    
    if __name__ == '__main__':
        timer = Timer(1)
        timer.on_timer_done(lambda: print('timer done from callback'))
        while True:
            if timer.done():
                timer.callback()
                break
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 扩展为多个定时器
    if __name__ == '__main__':
        timer = Timer(1)
        timer.on_timer_done(lambda: print('timer done from callback'))
        timer1 = Timer(2)
        timer1.on_timer_done(lambda: print('timer1 done from callback'))
        timers = [timer, timer1]
    
        while True:
            for timer in timers:
                if timer.done():
                    timer.callback()
                    timers.remove(timer)
            if len(timers) == 0:
                break
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    2. asyncio 框架

    import asyncio
    loop = asyncio.get_event_loop() # 获取asyncio循环
    def callback():
        print("hello michael")
        loop.stop()
    
    loop.call_later(1, callback) # 1秒后调用回调函数
    loop.run_forever() # 启动循环
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    协程

    回调函数很繁琐,协程 像编写同步代码一样,来编写异步代码,更自然优雅(可将协程看做可停止和恢复执行的函数)

    使用 yield 定义一个生成器

    def range_gen(n):
        i = 0
        while i < n:
            print(f'generating value {i}')
            yield i
            i += 1
    
    range_gen(5)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    代码没有执行,只返回一个生成器对象
    使用 next(gen) 取结果

    gen = range_gen(5)
    next(gen) # generating value 0
    
    • 1
    • 2

    程序会停在 yield 处,并保持内部状态

    yield 接收值

    def parrot():
        while True:
            message = yield
            print(f'parrot says: {message}')
    
    generator = parrot()
    generator.send(None) # 必须写这句初始化, 否则 
    # TypeError: can't send non-None value to a just-started generator
    generator.send('hello')
    generator.send({'hello': 'world'})
    # parrot says: hello
    # parrot says: {'hello': 'world'}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    生成器可仅在相关资源就绪时才往前推进,不需要使用回调函数

    asyncio 定义协程

    async def hello():
        await asyncio.sleep(1)  # 等待1 s
        print("hello michael")
    
    coro = hello()
    print(coro)  # 
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(coro) # hello michael
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • await 给事件循环提供了一个断点,等待资源期间,事件循环可继续管理其他协程
    async def network_request(number):
        await asyncio.sleep(1)
        return {'success': True, 'result': number**2}
    
    async def fetch_square(number):
        response = await network_request(number)
        if response['success']:
            print(response['result'])
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(fetch_square(5))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • asyncio.ensure_future() 调度协程和 future
    # 以下函数并发执行
    asyncio.ensure_future(fetch_square(2)) # 返回一个 Task 实例 (Future的子类),可以await
    asyncio.ensure_future(fetch_square(3))
    asyncio.ensure_future(fetch_square(4))
    loop.run_forever()
    
    • 1
    • 2
    • 3
    • 4
    • 5

    阻塞代码 -> 非阻塞 ThreadPoolExecutor

    • 将阻塞代码放在一个独立的线程(OS层级实现的,允许代码并行执行)中运行
    import time
    from concurrent.futures import ThreadPoolExecutor
    executor = ThreadPoolExecutor(max_workers=3)
    
    def wait_and_return(msg):
        time.sleep(1) # 阻塞代码
        return msg
    
    print(executor.submit(wait_and_return, "i am parameters: msg"))
    # executor.submit 调度函数,返回 future
    # 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    或者

    import asyncio
    loop = asyncio.get_event_loop()
    fut = loop.run_in_executor(executor, wait_and_return, "i am parameters: msg")
    print(fut)
    # ._call_check_cancel() at D:\ProgramData\Anaconda3\envs\cv\lib\asyncio\futures.py:360]>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 例子,requests 请求库是 阻塞的
    import requests
    
    
    async def fetch_urls(urls):
        responses = []
        for url in urls:
            responses.append(await loop.run_in_executor(executor, requests.get, url))
        return responses
    
    
    res = loop.run_until_complete(fetch_urls(["https://www.baidu.com",
                                              "https://www.csdn.net"]))
                                              # 不会并行获取 url
    print(res)
    
    
    def fetch_urls_1(urls):
        return asyncio.gather(*[loop.run_in_executor(executor, requests.get, url) for url in urls])
        # gather 一次性提交所有协程并收集结果
    
    res = loop.run_until_complete(fetch_urls_1(["https://www.baidu.com",
                                              "https://www.csdn.net"]))
                                              # 会并行但受制于 executor worker 数量
    print(res)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    为避免 executor worker 数量限制,应当使用 非阻塞aiohttp

    3. 响应式编程

    旨在打造出色的并发系统

    • 响应速度快
    • 伸缩性高,处理各种负载
    • 富有弹性,应对故障
    • 消息驱动,不阻塞

    ReactiveX 是一个项目,实现了用于众多语言的响应式编程工具,RxPy 是其中一个库

    https://reactivex.io/languages.html

    pip install reactivex  # 4.0.4 version
    
    • 1

    被观察者

    import reactivex as rx
    
    obs = rx.from_iterable(range(4))
    # Converts an iterable to an observable sequence  (被观察者)
    print(obs)
    # 
    obs.subscribe(print)  # 将数据源 emit 发射的每个值传入 print 函数
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    被观察者很像一个有序的迭代器

    c = [1,2,3,4,5]
    iterator = iter(c)
    print(next(iterator))
    print(next(iterator))
    for i in iterator:
        print(i)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • Observable.subscribe 注册回调函数
    c = [1,2,3,0,4,5]
    obs = rx.from_iterable(c)
    obs.subscribe(on_next=lambda x: print(f'next elem 1/{x}: {1/x}'),
                  on_error=lambda x: print(f'error: 1/{x} illegal'),
                  on_completed=lambda: print(f'completed calculation'))
    
    • 1
    • 2
    • 3
    • 4
    • 5

    输出

    next elem 1/1: 1.0
    next elem 1/2: 0.5
    next elem 1/3: 0.3333333333333333
    error: 1/division by zero illegal
    
    Process finished with exit code 0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    c = [1,2,3,4,5]
    obs = rx.from_iterable(c)
    obs.subscribe(on_next=lambda x: print(f'next elem 1/{x}: {1/x}'),
                  on_completed=lambda: print(f'completed calculation'))
    
    • 1
    • 2
    • 3
    • 4

    输出

    next elem 1/1: 1.0
    next elem 1/2: 0.5
    next elem 1/3: 0.3333333333333333
    next elem 1/4: 0.25
    next elem 1/5: 0.2
    completed calculation
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    RxPy 提供了可用来创建、变换、过滤 被观察者,以及对其进行编组的运算符,这些操作返回 被观察者(可以继续串接、组合,威力所在)

    obs = rx.from_iterable(range(5))
    obs2 = obs[:3]
    obs2.subscribe(print)  # 0 1 2
    obs.subscribe(print)  # 0 1 2 3 4
    
    • 1
    • 2
    • 3
    • 4

    运算符

    • map
    from reactivex.operators import map as rx_map
    op = rx_map(lambda x: x**2)
    (rx.from_iterable(range(5))).pipe(op).subscribe(print)
    # 0
    # 1
    # 4
    # 9
    # 16
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • group by
    from reactivex.operators import group_by as rx_group_by
    op = rx_group_by(lambda x: x%3)
    obs = (rx.from_iterable(range(10))).pipe(op)
    obs.subscribe(lambda x: print(f"group key: {x.key}"))
    # group key: 0
    # group key: 1
    # group key: 2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    每个组都是一个 被观察者

    obs[0].subscribe(lambda x: x.subscribe(print))
    print('-'*10)
    obs[1].subscribe(lambda x: x.subscribe(print))
    print('-'*10)
    obs[2].subscribe(lambda x: x.subscribe(print))
    print('-'*10)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    0
    3
    6
    9
    ----------
    1
    4
    7
    ----------
    2
    5
    8
    ----------
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • merge_all
    from reactivex.operators import merge_all
    obs.pipe(merge_all()).subscribe(print)
    
    • 1
    • 2

    输出 0 - 9 ,合并了所有 group 且按原顺序输出

    4. 并行编程

    问题是独立的,或者高度独立的,可以使用多核进行计算

    如果子问题之间需要共享数据,实现起来不那么容器,有进程间通信开销的问题

    线程

    共享内存方式实现并行的一种常见方式是 线程

    由于 python 的 全局解释器锁 GIL ,线程执行 python 语句时,获取一个锁,执行完毕后,释放锁
    每次只有一个线程能够获得这个锁,其他线程就不能执行 python 语句了

    虽然有 GIL 的问题,但是遇到耗时操作(I/O) 时,依然可以使用线程来实现并发

    进程

    通过使用 进程 可以完全避开 GIL,进程 不共享内存,彼此独立,每个进程都有自己的解释器
    进程的缺点:

    • 启动新进程比新线程慢
    • 消耗更多内存
    • 进程间通信速度慢

    优点:分布在多台计算机中,可伸缩性更佳

    使用多个进程

    • multiprocessing.Process 派生子类
    • 实现 Process.run 编写子进程中要执行的代码,processor_obj.start() 调用
    import multiprocessing
    import time
    
    class MyProcess(multiprocessing.Process):
        def __init__(self, id):
            super(MyProcess, self).__init__()
            self.id = id
        def run(self):
            time.sleep(1)
            print(f'i am a process with id {self.id}')
    
    if __name__ == '__main__':
        p = MyProcess(1)
        p.start()  # 不能直接调用 run
        p.join() # Wait until child process terminates
        print('end') # 没有 join 的话,会先打印 end
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
        t0 = time.time()
        processes = [MyProcess(1.1) for _ in range(4)]
        [p.start() for p in processes]
        [p.join() for p in processes]
        print(f'time: {time.time() - t0: .2f} s')
    
    • 1
    • 2
    • 3
    • 4
    • 5

    创建4个进程,执行并不需要 4倍的时间

    • 进程执行顺序是无法预测的,取决于操作系统

    • multiprocessing.Pool 类生成一组进程,可使用类方法 apply/apply_async map/map_async 提交任务

    import multiprocessing
    
    def square(x):
        return x * x
    
    if __name__ == '__main__':
        pool = multiprocessing.Pool(processes=4)
        inputs = list(range(4))
        out = pool.map(square, inputs) # 对每个元素执行 square 函数
        print(out)
        print('end')
        # [0, 1, 4, 9]
    	# end
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    调用 Pool.map 主程序将 停止执行,直到所有工作进程处理完毕
    使用 map_async 立即返回一个 AsyncResult 对象,在后台进行计算,不阻塞主程序,AsyncResult.get() 获取结果
    Pool.apply_async 将单个函数任务分配给一个进程,apply_async 使用 函数,函数的参数,作为参数,返回 AsyncResult 对象

    import multiprocessing
    import time
    
    
    def square(x):
        time.sleep(5)
        return x * x
    
    if __name__ == '__main__':
        t0 = time.time()
        pool = multiprocessing.Pool(processes=4)
        inputs = list(range(4))
        out = pool.map_async(square, inputs)
        print(out)
        print('end')
        print(f'{time.time() - t0} s')
        get_out = out.get()
        print(get_out)
        print(f'{time.time() - t0} s')
    # 
    # end
    # 0.07700085639953613 s
    # [0, 1, 4, 9]
    # 5.8083672523498535 s
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
        out = [pool.apply_async(square, (i,)) for i in range(4)] 
        # 传入 int 会报错,argument after * must be an iterable, not int
        # (i, ) 变成元组,可迭代
        get_out = [r.get() for r in out]
        print(get_out)
    
    • 1
    • 2
    • 3
    • 4
    • 5

    接口 Executor ,ProcessPoolExecutor

    from concurrent.futures import ProcessPoolExecutor
    
    def square(x):
        return x*x
    if __name__ == '__main__':
        executor = ProcessPoolExecutor(max_workers=4)
        fut = executor.submit(square, 2)
        print(fut)  # 
        print(fut.result()) # 4
    
        res = executor.map(square, range(4))  # 返回 迭代器
        print(list(res)) # [0, 1, 4, 9]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    要从一个或多个 Future 中提取结果,可使用 concurrent.futures.wait concurrent.futures.as_completed

    from concurrent.futures import wait, as_completed, ProcessPoolExecutor
    
    
    def square(x):
        return x * x
    
    
    if __name__ == '__main__':
        executor = ProcessPoolExecutor()
        fut1 = executor.submit(square, 2)
        fut2 = executor.submit(square, 3)
        wait([fut1, fut2])  # 阻塞程序执行,直到所有 future 执行完
        res = as_completed([fut1, fut2])
        print(res)
        print(list(res))
        out = [f.result() for f in [fut1, fut2]]
        print(out)
    
    # 
    # [, ]
    # [4, 9]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    5. 锁

    防止多个进程同时执行受保护的代码,例如同时写同一个文件

    multiprocessing.Lock()
    
    • 1

    6. 分布式处理

    dask

    https://www.dask.org/

    pyspark

    用户提交任务,集群管理器自动将任务分派给空闲的执行器

    mpi4py 科学计算

    https://pypi.org/project/mpi4py/

    7. 开发部署

    travis-ci

    https://www.travis-ci.org/

    编写 yaml 配置文件,当有新代码push后,自动运行 配置文件中的 测试项

    docker

    提供隔离环境

  • 相关阅读:
    Ims通话流程分析
    QWidget核心属性(二)
    go 学习02 基础知识
    Git分支合并别的分支代码
    降噪耳机哪款比较舒适?比较舒适的降噪耳机盘点
    JavaScript获取DOM元素相关信息和属性
    SpringMVC入门宝典(六)SpringMVC文件上传(上)
    20240701给NanoPi R6C开发板编译友善之臂的Android12系统
    华为云云耀云服务器L实例评测|在Redis的Docker容器中安装BloomFilter & 在Spring中使用Redis插件版的布隆过滤器
    C语言实现排序介绍
  • 原文地址:https://blog.csdn.net/qq_21201267/article/details/126373687