同步原語?

源代碼: Lib/asyncio/locks.py


asyncio 同步原語被設(shè)計(jì)為與 threading 模塊的類似,但有兩個(gè)關(guān)鍵注意事項(xiàng):

  • asyncio 原語不是線程安全的,因此它們不應(yīng)被用于 OS 線程同步 (而應(yīng)當(dāng)使用 threading);

  • 這些同步原語的方法不接受 timeout 參數(shù);請使用 asyncio.wait_for() 函數(shù)來執(zhí)行帶有超時(shí)的操作。

asyncio 具有下列基本同步原語:


Lock?

class asyncio.Lock?

實(shí)現(xiàn)一個(gè)用于 asyncio 任務(wù)的互斥鎖。 非線程安全。

asyncio 鎖可被用來保證對共享資源的獨(dú)占訪問。

使用 Lock 的推薦方式是通過 async with 語句:

lock = asyncio.Lock()

# ... later
async with lock:
    # access shared state

這等價(jià)于:

lock = asyncio.Lock()

# ... later
await lock.acquire()
try:
    # access shared state
finally:
    lock.release()

在 3.10 版更改: Removed the loop parameter.

coroutine acquire()?

獲取鎖。

此方法會(huì)等待直至鎖為 unlocked,將其設(shè)為 locked 并返回 True。

當(dāng)有一個(gè)以上的協(xié)程在 acquire() 中被阻塞則會(huì)等待解鎖,最終只有一個(gè)協(xié)程會(huì)被執(zhí)行。

鎖的獲取是 公平的: 被執(zhí)行的協(xié)程將是第一個(gè)開始等待鎖的協(xié)程。

release()?

釋放鎖。

當(dāng)鎖為 locked 時(shí),將其設(shè)為 unlocked 并返回。

如果鎖為 unlocked,則會(huì)引發(fā) RuntimeError

locked()?

如果鎖為 locked 則返回 True。

事件?

class asyncio.Event?

事件對象。 該對象不是線程安全的。

asyncio 事件可被用來通知多個(gè) asyncio 任務(wù)已經(jīng)有事件發(fā)生。

Event 對象會(huì)管理一個(gè)內(nèi)部旗標(biāo),可通過 set() 方法將其設(shè)為 true 并通過 clear() 方法將其重設(shè)為 false。 wait() 方法會(huì)阻塞直至該旗標(biāo)被設(shè)為 true。 該旗標(biāo)初始時(shí)會(huì)被設(shè)為 false。

在 3.10 版更改: Removed the loop parameter.

示例:

async def waiter(event):
    print('waiting for it ...')
    await event.wait()
    print('... got it!')

async def main():
    # Create an Event object.
    event = asyncio.Event()

    # Spawn a Task to wait until 'event' is set.
    waiter_task = asyncio.create_task(waiter(event))

    # Sleep for 1 second and set the event.
    await asyncio.sleep(1)
    event.set()

    # Wait until the waiter task is finished.
    await waiter_task

asyncio.run(main())
coroutine wait()?

等待直至事件被設(shè)置。

如果事件已被設(shè)置,則立即返回 True。 否則將阻塞直至另一個(gè)任務(wù)調(diào)用 set()。

set()?

設(shè)置事件。

所有等待事件被設(shè)置的任務(wù)將被立即喚醒。

clear()?

清空(取消設(shè)置)事件。

通過 wait() 進(jìn)行等待的任務(wù)現(xiàn)在將會(huì)阻塞直至 set() 方法被再次調(diào)用。

is_set()?

如果事件已被設(shè)置則返回 True。

Condition?

class asyncio.Condition(lock=None)?

條件對象。 該對象不是線程安全的。

asyncio 條件原語可被任務(wù)用于等待某個(gè)事件發(fā)生,然后獲取對共享資源的獨(dú)占訪問。

在本質(zhì)上,Condition 對象合并了 EventLock 的功能。 多個(gè) Condition 對象有可能共享一個(gè) Lock,這允許關(guān)注于共享資源的特定狀態(tài)的不同任務(wù)實(shí)現(xiàn)對共享資源的協(xié)同獨(dú)占訪問。

可選的 lock 參數(shù)必須為 Lock 對象或 None。 在后一種情況下會(huì)自動(dòng)創(chuàng)建一個(gè)新的 Lock 對象。

在 3.10 版更改: Removed the loop parameter.

使用 Condition 的推薦方式是通過 async with 語句:

cond = asyncio.Condition()

# ... later
async with cond:
    await cond.wait()

這等價(jià)于:

cond = asyncio.Condition()

# ... later
await cond.acquire()
try:
    await cond.wait()
finally:
    cond.release()
coroutine acquire()?

獲取下層的鎖。

此方法會(huì)等待直至下層的鎖為 unlocked,將其設(shè)為 locked 并返回 returns True。

notify(n=1)?

喚醒最多 n 個(gè)正在等待此條件的任務(wù)(默認(rèn)為 1 個(gè))。 如果沒有任務(wù)正在等待則此方法為空操作。

鎖必須在此方法被調(diào)用前被獲取并在隨后被快速釋放。 如果通過一個(gè) unlocked 鎖調(diào)用則會(huì)引發(fā) RuntimeError。

locked()?

如果下層的鎖已被獲取則返回 True。

notify_all()?

喚醒所有正在等待此條件的任務(wù)。

此方法的行為類似于 notify(),但會(huì)喚醒所有正在等待的任務(wù)。

鎖必須在此方法被調(diào)用前被獲取并在隨后被快速釋放。 如果通過一個(gè) unlocked 鎖調(diào)用則會(huì)引發(fā) RuntimeError。

release()?

釋放下層的鎖。

當(dāng)在未鎖定的鎖上發(fā)起調(diào)用時(shí),會(huì)引發(fā) RuntimeError

coroutine wait()?

等待直至收到通知。

當(dāng)此方法被調(diào)用時(shí)如果調(diào)用方任務(wù)未獲得鎖,則會(huì)引發(fā) RuntimeError。

這個(gè)方法會(huì)釋放下層的鎖,然后保持阻塞直到被 notify()notify_all() 調(diào)用所喚醒。 一旦被喚醒,Condition 會(huì)重新獲取它的鎖并且此方法將返回 True

coroutine wait_for(predicate)?

等待直到目標(biāo)值變?yōu)?true。

目標(biāo)必須為一個(gè)可調(diào)用對象,其結(jié)果將被解讀為一個(gè)布爾值。 最終的值將為返回值。

Semaphore?

class asyncio.Semaphore(value=1)?

信號(hào)量對象。 該對象不是線程安全的。

信號(hào)量會(huì)管理一個(gè)內(nèi)部計(jì)數(shù)器,該計(jì)數(shù)器會(huì)隨每次 acquire() 調(diào)用遞減并隨每次 release() 調(diào)用遞增。 計(jì)數(shù)器的值永遠(yuǎn)不會(huì)降到零以下;當(dāng) acquire() 發(fā)現(xiàn)其值為零時(shí),它將保持阻塞直到有某個(gè)任務(wù)調(diào)用了 release()

可選的 value 參數(shù)用來為內(nèi)部計(jì)數(shù)器賦初始值 (默認(rèn)值為 1)。 如果給定的值小于 0 則會(huì)引發(fā) ValueError。

在 3.10 版更改: Removed the loop parameter.

使用 Semaphore 的推薦方式是通過 async with 語句。:

sem = asyncio.Semaphore(10)

# ... later
async with sem:
    # work with shared resource

這等價(jià)于:

sem = asyncio.Semaphore(10)

# ... later
await sem.acquire()
try:
    # work with shared resource
finally:
    sem.release()
coroutine acquire()?

獲取一個(gè)信號(hào)量。

如果內(nèi)部計(jì)數(shù)器的值大于零,則將其減一并立即返回 True。 如果其值為零,則會(huì)等待直到 release() 并調(diào)用并返回 True。

locked()?

如果信號(hào)量對象無法被立即獲取則返回 True。

release()?

釋放一個(gè)信號(hào)量對象,將內(nèi)部計(jì)數(shù)器的值加一。 可以喚醒一個(gè)正在等待獲取信號(hào)量對象的任務(wù)。

不同于 BoundedSemaphore,Semaphore 允許執(zhí)行的 release() 調(diào)用多于 acquire() 調(diào)用。

BoundedSemaphore?

class asyncio.BoundedSemaphore(value=1)?

綁定的信號(hào)量對象。 該對象不是線程安全的。

BoundedSemaphore 是特殊版本的 Semaphore,如果在 release() 中內(nèi)部計(jì)數(shù)器值增加到初始 value 以上它將引發(fā)一個(gè) ValueError。

在 3.10 版更改: Removed the loop parameter.

Barrier?

class asyncio.Barrier(parties, action=None)?

A barrier object. Not thread-safe.

A barrier is a simple synchronization primitive that allows to block until parties number of tasks are waiting on it. Tasks can wait on the wait() method and would be blocked until the specified number of tasks end up waiting on wait(). At that point all of the waiting tasks would unblock simultaneously.

async with can be used as an alternative to awaiting on wait().

The barrier can be reused any number of times.

示例:

async def example_barrier():
   # barrier with 3 parties
   b = asyncio.Barrier(3)

   # create 2 new waiting tasks
   asyncio.create_task(b.wait())
   asyncio.create_task(b.wait())

   await asyncio.sleep(0)
   print(b)

   # The third .wait() call passes the barrier
   await b.wait()
   print(b)
   print("barrier passed")

   await asyncio.sleep(0)
   print(b)

asyncio.run(example_barrier())

Result of this example is:

<asyncio.locks.Barrier object at 0x... [filling, waiters:2/3]>
<asyncio.locks.Barrier object at 0x... [draining, waiters:0/3]>
barrier passed
<asyncio.locks.Barrier object at 0x... [filling, waiters:0/3]>

3.11 新版功能.

coroutine wait()?

Pass the barrier. When all the tasks party to the barrier have called this function, they are all unblocked simultaneously.

When a waiting or blocked task in the barrier is cancelled, this task exits the barrier which stays in the same state. If the state of the barrier is "filling", the number of waiting task decreases by 1.

The return value is an integer in the range of 0 to parties-1, different for each task. This can be used to select a task to do some special housekeeping, e.g.:

...
async with barrier as position:
   if position == 0:
      # Only one task print this
      print('End of *draining phasis*')

This method may raise a BrokenBarrierError exception if the barrier is broken or reset while a task is waiting. It could raise a CancelledError if a task is cancelled.

coroutine reset()?

Return the barrier to the default, empty state. Any tasks waiting on it will receive the BrokenBarrierError exception.

If a barrier is broken it may be better to just leave it and create a new one.

coroutine abort()?

Put the barrier into a broken state. This causes any active or future calls to wait() to fail with the BrokenBarrierError. Use this for example if one of the taks needs to abort, to avoid infinite waiting tasks.

parties?

The number of tasks required to pass the barrier.

n_waiting?

The number of tasks currently waiting in the barrier while filling.

broken?

A boolean that is True if the barrier is in the broken state.

exception asyncio.BrokenBarrierError?

This exception, a subclass of RuntimeError, is raised when the Barrier object is reset or broken.


在 3.9 版更改: 使用 await lockyield from lock 以及/或者 with 語句 (with await lock, with (yield from lock)) 來獲取鎖的操作已被移除。 請改用 async with lock。