queue --- 一個同步的隊列類?

源代碼: Lib/queue.py


queue 模塊實現(xiàn)了多生產(chǎn)者、多消費者隊列。這特別適用于消息必須安全地在多線程間交換的線程編程。模塊中的 Queue 類實現(xiàn)了所有所需的鎖定語義。

模塊實現(xiàn)了三種類型的隊列,它們的區(qū)別僅僅是條目取回的順序。在 FIFO 隊列中,先添加的任務(wù)先取回。在 LIFO 隊列中,最近被添加的條目先取回(操作類似一個堆棧)。優(yōu)先級隊列中,條目將保持排序( 使用 heapq 模塊 ) 并且最小值的條目第一個返回。

在內(nèi)部,這三個類型的隊列使用鎖來臨時阻塞競爭線程;然而,它們并未被設(shè)計用于線程的重入性處理。

此外,模塊實現(xiàn)了一個 "簡單的" FIFO 隊列類型, SimpleQueue ,這個特殊實現(xiàn)為小功能在交換中提供額外的保障。

queue 模塊定義了下列類和異常:

class queue.Queue(maxsize=0)?

Constructor for a FIFO queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

class queue.LifoQueue(maxsize=0)?

LIFO 隊列構(gòu)造函數(shù)。 maxsize 是個整數(shù),用于設(shè)置可以放入隊列中的項目數(shù)的上限。當達到這個大小的時候,插入操作將阻塞至隊列中的項目被消費掉。如果 maxsize 小于等于零,隊列尺寸為無限大。

class queue.PriorityQueue(maxsize=0)?

優(yōu)先級隊列構(gòu)造函數(shù)。 maxsize 是個整數(shù),用于設(shè)置可以放入隊列中的項目數(shù)的上限。當達到這個大小的時候,插入操作將阻塞至隊列中的項目被消費掉。如果 maxsize 小于等于零,隊列尺寸為無限大。

最小值先被取出( 最小值條目是由 sorted(list(entries))[0] 返回的條目)。條目的典型模式是一個以下形式的元組: (priority_number, data)

如果 data 元素沒有可比性,數(shù)據(jù)將被包裝在一個類中,忽略數(shù)據(jù)值,僅僅比較優(yōu)先級數(shù)字 :

from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: Any=field(compare=False)
class queue.SimpleQueue?

無界的 FIFO 隊列構(gòu)造函數(shù)。簡單的隊列,缺少任務(wù)跟蹤等高級功能。

3.7 新版功能.

exception queue.Empty?

對空的 Queue 對象,調(diào)用非阻塞的 get() (or get_nowait()) 時,引發(fā)的異常。

exception queue.Full?

對滿的 Queue 對象,調(diào)用非阻塞的 put() (or put_nowait()) 時,引發(fā)的異常。

Queue對象?

隊列對象 (Queue, LifoQueue, 或者 PriorityQueue) 提供下列描述的公共方法。

Queue.qsize()?

返回隊列的大致大小。注意,qsize() > 0 不保證后續(xù)的 get() 不被阻塞,qsize() < maxsize 也不保證 put() 不被阻塞。

Queue.empty()?

如果隊列為空,返回 True ,否則返回 False 。如果 empty() 返回 True ,不保證后續(xù)調(diào)用的 put() 不被阻塞。類似的,如果 empty() 返回 False ,也不保證后續(xù)調(diào)用的 get() 不被阻塞。

Queue.full()?

如果隊列是滿的返回 True ,否則返回 False 。如果 full() 返回 True 不保證后續(xù)調(diào)用的 get() 不被阻塞。類似的,如果 full() 返回 False 也不保證后續(xù)調(diào)用的 put() 不被阻塞。

Queue.put(item, block=True, timeout=None)?

item 放入隊列。如果可選參數(shù) block 是 true 并且 timeoutNone (默認),則在必要時阻塞至有空閑插槽可用。如果 timeout 是個正數(shù),將最多阻塞 timeout 秒,如果在這段時間沒有可用的空閑插槽,將引發(fā) Full 異常。反之 (block 是 false),如果空閑插槽立即可用,則把 item 放入隊列,否則引發(fā) Full 異常 ( 在這種情況下,timeout 將被忽略)。

Queue.put_nowait(item)?

Equivalent to put(item, block=False).

Queue.get(block=True, timeout=None)?

從隊列中移除并返回一個項目。如果可選參數(shù) block 是 true 并且 timeoutNone (默認值),則在必要時阻塞至項目可得到。如果 timeout 是個正數(shù),將最多阻塞 timeout 秒,如果在這段時間內(nèi)項目不能得到,將引發(fā) Empty 異常。反之 (block 是 false) , 如果一個項目立即可得到,則返回一個項目,否則引發(fā) Empty 異常 (這種情況下,timeout 將被忽略)。

POSIX系統(tǒng)3.0之前,以及所有版本的Windows系統(tǒng)中,如果 block 是 true 并且 timeoutNone , 這個操作將進入基礎(chǔ)鎖的不間斷等待。這意味著,沒有異常能發(fā)生,尤其是 SIGINT 將不會觸發(fā) KeyboardInterrupt 異常。

Queue.get_nowait()?

相當于 get(False) 。

提供了兩個方法,用于支持跟蹤 排隊的任務(wù) 是否 被守護的消費者線程 完整的處理。

Queue.task_done()?

表示前面排隊的任務(wù)已經(jīng)被完成。被隊列的消費者線程使用。每個 get() 被用于獲取一個任務(wù), 后續(xù)調(diào)用 task_done() 告訴隊列,該任務(wù)的處理已經(jīng)完成。

如果 join() 當前正在阻塞,在所有條目都被處理后,將解除阻塞(意味著每個 put() 進隊列的條目的 task_done() 都被收到)。

如果被調(diào)用的次數(shù)多于放入隊列中的項目數(shù)量,將引發(fā) ValueError 異常 。

Queue.join()?

阻塞至隊列中所有的元素都被接收和處理完畢。

當條目添加到隊列的時候,未完成任務(wù)的計數(shù)就會增加。每當消費者線程調(diào)用 task_done() 表示這個條目已經(jīng)被回收,該條目所有工作已經(jīng)完成,未完成計數(shù)就會減少。當未完成計數(shù)降到零的時候, join() 阻塞被解除。

如何等待排隊的任務(wù)被完成的示例:

import threading, queue

q = queue.Queue()

def worker():
    while True:
        item = q.get()
        print(f'Working on {item}')
        print(f'Finished {item}')
        q.task_done()

# Turn-on the worker thread.
threading.Thread(target=worker, daemon=True).start()

# Send thirty task requests to the worker.
for item in range(30):
    q.put(item)

# Block until all tasks are done.
q.join()
print('All work completed')

SimpleQueue 對象?

SimpleQueue 對象提供下列描述的公共方法。

SimpleQueue.qsize()?

返回隊列的大致大小。注意,qsize() > 0 不保證后續(xù)的 get() 不被阻塞。

SimpleQueue.empty()?

如果隊列為空,返回 True ,否則返回 False 。如果 empty() 返回 False ,不保證后續(xù)調(diào)用的 get() 不被阻塞。

SimpleQueue.put(item, block=True, timeout=None)?

item 放入隊列。此方法永不阻塞,始終成功(除了潛在的低級錯誤,例如內(nèi)存分配失?。?蛇x參數(shù) blocktimeout 僅僅是為了保持 Queue.put() 的兼容性而提供,其值被忽略。

CPython implementation detail: This method has a C implementation which is reentrant. That is, a put() or get() call can be interrupted by another put() call in the same thread without deadlocking or corrupting internal state inside the queue. This makes it appropriate for use in destructors such as __del__ methods or weakref callbacks.

SimpleQueue.put_nowait(item)?

Equivalent to put(item, block=False), provided for compatibility with Queue.put_nowait().

SimpleQueue.get(block=True, timeout=None)?

從隊列中移除并返回一個項目。如果可選參數(shù) block 是 true 并且 timeoutNone (默認值),則在必要時阻塞至項目可得到。如果 timeout 是個正數(shù),將最多阻塞 timeout 秒,如果在這段時間內(nèi)項目不能得到,將引發(fā) Empty 異常。反之 (block 是 false) , 如果一個項目立即可得到,則返回一個項目,否則引發(fā) Empty 異常 (這種情況下,timeout 將被忽略)。

SimpleQueue.get_nowait()?

相當于 get(False) 。

參見

multiprocessing.Queue

一個用于多進程上下文的隊列類(而不是多線程)。

collections.deque 是無界隊列的一個替代實現(xiàn),具有快速的不需要鎖并且支持索引的原子化 append()popleft() 操作。