• Python、Rust中的协程


    协程

    协程在不同的堆栈上同时运行,但每次只有一个协程运行,而其调用者则等待:

    1. F启动G,但G并不会立即运行,F必须显式的恢复G,然后 G 开始运行。
    2. 在任何时候,G 都可能转身并让步返回到 F。这会暂停 G 并继续 F 的恢复操作。
    3. F再次调用resume,这会暂停F并继续G的yield。它们不断地来回移动,直到 G 的return,这会清理 G 并从最近的恢复中继续 F,并向 F 发出一些信号,表明 G 已完成并且 F 不应再尝试恢复 G。
    4. 在这种模式中,一次只有一个协程运行,而其调用者则在不同的堆栈上等待。

    归根结底,协程的产生是为了非常快速地切换每个线程上当前运行的任务,这样所有的任务都有机会运行

    从阻塞(blocking)说起

    PythonRustasync/await是通过协作型调度(cooperative scheduling)来完成的。
    GolangGoruntine则是抢占式调度(Preemptive multitasking)。

    运行时(Runtime)

    在写异步Rust和Python的时候,Block意味着阻止运行时切换当前任务

    运行时(Runtime),也称为执行时或运行阶段,是指计算机程序在实际运行时执行的阶段,与编译时相对应。在程序的运行时阶段,计算机程序被加载到内存中,操作系统控制程序的执行,处理输入和输出,以及管理计算机的资源。

    在常规多线程编程中,每个线程都有自己的运行时(Runtime)。由于GIL,进程级别以下的python只有一个运行时,无论启动多少个线程,他们都共享相同的Runtime

    Notice:

    CPython 是 Python 的标准实现,它是用C语言编写的,是最常用的 Python 解释器。CPython解释器在运行Python程序时,将Python源代码翻译成字节码,并在Python虚拟机(Python Virtual Machine,简称PVM)上执行。因此,Python程序在CPython下运行时,实际上是在Python虚拟机中运行的,这个虚拟机叫做Python运行时。

    await

    为了防止上述情况,我们需要在异步编程的时候,注意一点: 避免长时间不使用await

    coroutine in Python

    Python的协程通常是通过事件循环(Event Loop)来调度的,事件循环是一个轮询机制,它负责管理协程的执行、挂起、恢复和调度,通过

    await关键字来挂起和恢复, 通过异步生成器来保存函数的状态。

    事件循环的原理如下:

    1. 单线程执行: 事件循环运行在一个单线程环境中,这个线程负责执行所有任务,包括异步任务。
    2. 任务队列: 事件循环维护一个任务队列,其中包含等待执行的任务,包括异步任务和事件处理程序。
    3. 事件驱动: 事件循环是事件驱动的,它会监听各种事件,如I/O事件、定时器事件、信号等。
    4. 挂起和恢复: 当任务需要等待某些条件满足时,它会被挂起,释放CPU资源,允许其他任务继续执行。

    源码如下:

        def _run_once(self):
            """Run one full iteration of the event loop.
    
            This calls all currently ready callbacks, polls for I/O,
            schedules the resulting callbacks, and finally schedules
            'call_later' callbacks.
            """
    
            sched_count = len(self._scheduled)
            if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
                self._timer_cancelled_count / sched_count >
                    _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
                # Remove delayed calls that were cancelled if their number
                # is too high
                new_scheduled = []
                for handle in self._scheduled:
                    if handle._cancelled:
                        handle._scheduled = False
                    else:
                        new_scheduled.append(handle)
    
                heapq.heapify(new_scheduled)
                self._scheduled = new_scheduled
                self._timer_cancelled_count = 0
            else:
                # Remove delayed calls that were cancelled from head of queue.
                while self._scheduled and self._scheduled[0]._cancelled:
                    self._timer_cancelled_count -= 1
                    handle = heapq.heappop(self._scheduled)
                    handle._scheduled = False
    
            timeout = None
            if self._ready or self._stopping:
                timeout = 0
            elif self._scheduled:
                # Compute the desired timeout.
                when = self._scheduled[0]._when
                timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
    
            event_list = self._selector.select(timeout)
            self._process_events(event_list)
            # Needed to break cycles when an exception occurs.
            event_list = None
    
            # Handle 'later' callbacks that are ready.
            end_time = self.time() + self._clock_resolution
            while self._scheduled:
                handle = self._scheduled[0]
                if handle._when >= end_time:
                    break
                handle = heapq.heappop(self._scheduled)
                handle._scheduled = False
                self._ready.append(handle)
    
            # This is the only place where callbacks are actually *called*.
            # All other places just add them to ready.
            # Note: We run all currently scheduled callbacks, but not any
            # callbacks scheduled by callbacks run this time around --
            # they will be run the next time (after another I/O poll).
            # Use an idiom that is thread-safe without using locks.
            ntodo = len(self._ready)
            for i in range(ntodo):
                handle = self._ready.popleft()
                if handle._cancelled:
                    continue
                if self._debug:
                    try:
                        self._current_handle = handle
                        t0 = self.time()
                        handle._run()
                        dt = self.time() - t0
                        if dt >= self.slow_callback_duration:
                            logger.warning('Executing %s took %.3f seconds',
                                           _format_handle(handle), dt)
                    finally:
                        self._current_handle = None
                else:
                    handle._run()
            handle = None  # Needed to break cycles when an exception occurs.
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    1. 通过_selector.select(timeout)返回一个任务状态列表
    2. 使用_process_events处理就绪的I/O任务
    3. 多次运行_run_once,直到所有任务处理完毕,事件循环中没有待执行的任务。
  • 相关阅读:
    FasterNet(PConv)paper笔记(CVPR2023)
    代码随想录算法训练营第五十九天 | 647. 回文子串、516.最长回文子序列
    AI大模型-流式处理 (以百度接口为例)
    flask-socketio实现websocket通信
    曲折的tensorflow安装过程(Tensorflow 安装问题的解决)
    模型融合——stacking原理与实现
    复盘:智能座舱系列文一,他到底是什么
    Oracle中含替换变量的查询(二)
    上位机与plc写心跳定时扫描连接状态
    业务线程池阻塞分析
  • 原文地址:https://blog.csdn.net/majiayu000/article/details/132737294