asyncio 同步原语被设计为与 threading 模块的类似,但有两个关键注意事项:
asyncio 原语不是线程安全的,因此它们不应被用于 OS 线程同步 (而应当使用 threading);
这些同步原语的方法不接受 timeout 参数;请使用 asyncio.wait_for() 函数来执行带有超时的操作。
class asyncio.Lock
实现一个用于 asyncio 任务的互斥锁。 非线程安全。
asyncio 锁可被用来保证对共享资源的独占访问。
使用 Lock 的推荐方式是通过 async with 语句:
- lock = asyncio.Lock()
-
- # ... later
- async with lock:
- # access shared state
这等价于:
- lock = asyncio.Lock()
-
- # ... later
- await lock.acquire()
- try:
- # access shared state
- finally:
- lock.release()
主要方法和属性:
方法和属性名 | 说明 |
coroutine acquire() | 获取锁。 此方法会等待直至锁为 unlocked,将其设为 locked 并返回 True。 当有一个以上的协程在 acquire() 中被阻塞则会等待解锁,最终只有一个协程会被执行。 锁的获取是 公平的: 被执行的协程将是第一个开始等待锁的协程。 |
release() | 释放锁。 当锁为 locked 时,将其设为 unlocked 并返回。 如果锁为 unlocked,则会引发 RuntimeError。 |
locked() | 如果锁为 locked 则返回 True 。 |
class asyncio.Event
事件对象。 该对象不是线程安全的。
asyncio 事件可被用来通知多个 asyncio 任务已经有事件发生。
Event 对象会管理一个内部旗标,可通过 set() 方法将其设为 true 并通过 clear() 方法将其重设为 false。 wait() 方法会阻塞直至该旗标被设为 true。 该旗标初始时会被设为 false。
- async def waiter(event):
- print('waiting for it ...')
- await event.wait()
- print('... got it!')
-
- async def main():
- # Create an Event object.
- event = asyncio.Event()
-
- # Spawn a Task to wait until 'event' is set.
- waiter_task = asyncio.create_task(waiter(event))
-
- # Sleep for 1 second and set the event.
- await asyncio.sleep(1)
- event.set()
-
- # Wait until the waiter task is finished.
- await waiter_task
-
- asyncio.run(main())
主要方法和属性:
方法和属性名 | 说明 |
coroutine wait() | 等待直至事件被设置。 如果事件已被设置,则立即返回 True。 否则将阻塞直至另一个任务调用 set()。 |
set() | 设置事件。 所有等待事件被设置的任务将被立即唤醒。 |
clear() | 清空(取消设置)事件。 通过 wait() 进行等待的任务现在将会阻塞直至 set() 方法被再次调用。 |
is_set() | 如果事件已被设置则返回 True 。 |
class asyncio.Condition(lock=None)
条件对象。 该对象不是线程安全的。
asyncio 条件原语可被任务用于等待某个事件发生,然后获取对共享资源的独占访问。
在本质上,Condition 对象合并了 Event 和 Lock 的功能。 多个 Condition 对象有可能共享一个 Lock,这允许关注于共享资源的特定状态的不同任务实现对共享资源的协同独占访问。
可选的 lock 参数必须为 Lock 对象或 None。 在后一种情况下会自动创建一个新的 Lock 对象。
使用 Condition 的推荐方式是通过 async with 语句:
- cond = asyncio.Condition()
-
- # ... later
- async with cond:
- await cond.wait()
这等价于:
- cond = asyncio.Condition()
-
- # ... later
- await cond.acquire()
- try:
- await cond.wait()
- finally:
- cond.release()
主要方法和属性:
方法和属性名 | 说明 |
coroutine acquire() | 获取下层的锁。 此方法会等待直至下层的锁为 unlocked,将其设为 locked 并返回 returns |
notify(n=1) | 唤醒最多 n 个正在等待此条件的任务(默认为 1 个)。 如果没有任务正在等待则此方法为空操作。 锁必须在此方法被调用前被获取并在随后被快速释放。 如果通过一个 unlocked 锁调用则会引发 RuntimeError。 |
locked() | 如果下层的锁已被获取则返回 True 。 |
notify_all() | 唤醒所有正在等待此条件的任务。 |
release() | 释放下层的锁。 当在未锁定的锁上发起调用时,会引发 RuntimeError。 |
coroutine wait() | 等待直至收到通知。 当此方法被调用时如果调用方任务未获得锁,则会引发 RuntimeError。 这个方法会释放下层的锁,然后保持阻塞直到被 notify() 或 notify_all() 调用所唤醒。 一旦被唤醒,Condition 会重新获取它的锁并且此方法将返回 True。 |
coroutine wait_for(predicate) | 等待直到目标值变为 true。 目标必须为一个可调用对象,其结果将被解读为一个布尔值。 最终的值将为返回值。 |
class asyncio.Semaphore(value=1)
信号量对象。 该对象不是线程安全的。
信号量会管理一个内部计数器,该计数器会随每次 acquire() 调用递减并随每次 release() 调用递增。 计数器的值永远不会降到零以下;当 acquire() 发现其值为零时,它将保持阻塞直到有某个任务调用了 release()。
可选的 value 参数用来为内部计数器赋初始值 (默认值为 1)。 如果给定的值小于 0 则会引发 ValueError。
使用 Semaphore 的推荐方式是通过 async with 语句。:
- sem = asyncio.Semaphore(10)
-
- # ... later
- async with sem:
- # work with shared resource
这等价于:
- sem = asyncio.Semaphore(10)
-
- # ... later
- await sem.acquire()
- try:
- # work with shared resource
- finally:
- sem.release()
主要方法和属性:
方法和属性名 | 说明 |
coroutine acquire() | 获取一个信号量。 如果内部计数器的值大于零,则将其减一并立即返回 True。 如果其值为零,则会等待直到 release() 并调用并返回 True。 |
release() | 释放一个信号量对象,将内部计数器的值加一。 可以唤醒一个正在等待获取信号量对象的任务。 不同于 BoundedSemaphore,Semaphore 允许执行的 release() 调用多于 acquire() 调用。 |
locked() | 如果信号量对象无法被立即获取则返回 True 。 |
class asyncio.BoundedSemaphore(value=1)
绑定的信号量对象。 该对象不是线程安全的。
BoundedSemaphore 是特殊版本的 Semaphore,如果在 release() 中内部计数器值增加到初始 value 以上它将引发一个 ValueError。
class asyncio.Barrier(parties)
栅栏是一个简单的同步原语,它允许阻塞,直到有大量任务在等待它。任务可以在wait()方法上等待,并且会被阻塞,直到指定数量的任务最终达到等待的数量parties。此时,所有等待的任务都将同时解除锁定。
栅栏最好通过async with来使用。
- async def example_barrier():
- # barrier with 3 parties
- b = asyncio.Barrier(3)
-
- # create 2 new waiting tasks
- asyncio.create_task(b.wait())
- asyncio.create_task(b.wait())
-
- await asyncio.sleep(0)
- print(b)
-
- # The third .wait() call passes the barrier
- await b.wait()
- print(b)
- print("barrier passed")
-
- await asyncio.sleep(0)
- print(b)
-
- asyncio.run(example_barrier())
主要方法和属性:
方法和属性名 | 说明 |
coroutine wait() | 等待在栅栏前 |
coroutine reset() | 重置栅栏为空 |
coroutine abort() | 设置栅栏为破损,会引起所有的wait()调用引发BrokenBarrierError异常 |
parties | 栅栏要达到的等待数量 |
n_waiting | 等待在栅栏前的任务数量 |
broken | 返回栅栏是否处于断开的状态 |
- ...
- async with barrier as position:
- if position == 0:
- # Only one task prints this
- print('End of *draining phase*')