python模組之threading
threading在低階的_thread
模組上構建了更高階的執行緒介面。
threading模組基於Java執行緒模型設計。不過Java中鎖和條件變數是每個物件的基本行為,在python中卻是單獨的物件。python的Thread
類行為是Java的Thread
類行為的子集,目前尚不支援優先順序、執行緒組,執行緒無法銷燬、停止、暫停、恢復或中斷。Java中Thread類的靜態方法在Python中對映為模組級的函式。
模組級函式
threading.active_count()
返回當前活動的Thread物件的數量,與enumerate()
函式返回的列表元素個數相同
threading.current_thread()
返回當前Thread物件,對應呼叫者的控制執行緒(thread of control
)。如果呼叫者的控制執行緒不是通過threading
模組建立,返回一個功能受限的啞執行緒物件(dummy thread object
)
threading.get_ident()
返回一個非零整數,代表當前執行緒的"執行緒識別符號"。這個值意在作為魔術cookie使用,例如作為索引從特定於執行緒的字典物件獲取資料。當一個執行緒退出,新的執行緒建立,執行緒識別符號可能被回收使用
threading.enumerate()
返回當前活動Thread物件的列表。該列表包含守護執行緒、current_thread()
函式建立的啞執行緒,以及主執行緒,不包含已終止的執行緒和未啟動的執行緒。
threading.main_thread()
返回主執行緒物件。通常來說,主執行緒就是啟動python直譯器的執行緒。
threading.settrace(func )
為啟動自threading
模組的所有執行緒設定一個trace函式。在每個執行緒的run()
方法呼叫前,傳遞func引數給sys.settrace()
threading.setprofile(func)
為啟動自threading
模組的所有執行緒設定一個profile函式。在每個執行緒的run()
方法呼叫前,傳遞func引數給sys.setprofile()
threading.stack_size([size ])
返回建立新執行緒使用的執行緒堆疊大小。
可選引數size指定後續建立的執行緒的堆疊大小,必須是0(表示使用平臺或配置的預設值)或大於等於32768(32KiB)的正整數。如果未指定,預設size為0.
如果不支援改動執行緒堆疊大小,丟擲RuntimeError
異常。如果size不合法,丟擲ValueError
異常,堆疊大小保持不變。
32KiB是目前能保證直譯器堆疊空間充足的最小值。某些平臺可能對堆疊大小做了特殊的限制,比如要求最小堆疊大小在32KiB以上,或要求以系統記憶體頁大小的倍數分配。
Windows系統及使用POSIX執行緒的系統可用
常量
threading.TIMEOUT_MAX
阻塞函式(Lock.acquire()
,RLock.acquire()
,Condition.wait()
等)的timeout引數可接受的最大值。超出該值將丟擲OverflowError
異常。
All of the methods described below are executed atomically.
Thread-Local Data
Thread-local資料的值是特定於執行緒的。管理Thread-local資料,只需要建立local
或其子類的例項並在該例項上儲存屬性:
mydata = threading.local() mydata.x = 1
不同的執行緒,例項的值也會不同。
class threading.local
表示thread-local資料的類。
Thread
Thread類代表在單獨的控制執行緒中執行的活動,有兩種方式指定:傳遞可呼叫物件到構造器的target引數,或重寫子類的run()
方法。除了__int__()
方法和run()
方法,Thread
子類不應該重寫除此之外的其他方法。
建立的執行緒物件,必須使用start()
方法啟動,start()
在一個單獨的控制執行緒呼叫run()
方法。這時該執行緒被認為是"活動的"。當run()
方法結束(正常執行完成或丟擲了未處理的異常)時,執行緒物件不再是"活動的"。is_alive()
方法可用於檢查執行緒是否處於活動狀態。
呼叫執行緒物件的join()
方法將導致執行緒阻塞,直到呼叫join()
方法的執行緒執行結束。
執行緒擁有名字,可以傳遞給構造器。通過name屬性讀取或修改。
主執行緒:對應python程式的初始控制執行緒。主執行緒不是守護執行緒。
守護執行緒:當沒有非守護執行緒處於活動狀態時,整個python程式將退出。通過daemon
屬性或構造器引數,可以標記一個執行緒為守護執行緒。daemon
屬性的初始值繼承自建立該執行緒的執行緒
啞執行緒:對應"外部執行緒"alien thread
,即在threading
模組之外(比如C程式碼)啟動的控制執行緒。啞執行緒具有有限的功能,總是認為是活動的和守護的,不能呼叫join()
方法。它們永遠不會被刪除,因為不能檢測外部執行緒的結束情況。
Note:守護執行緒將在程式關閉時直接停止。相關資源(比如開啟的檔案、資料庫事務等)可能不會被妥善地釋放。如果想要執行緒優雅地停止,將執行緒設定為非守護執行緒,並使用合適的訊號機制比如Event
class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None )
ThreadGroup run() Thread-N
如果子類繼承Thread並重寫構造器,必須確保在執行執行緒的其他操作前在構造器中呼叫Thread.__init__()
start()
開啟執行緒。每個執行緒最多隻能呼叫一次,否則丟擲RuntimeError
異常。它將在一個單獨的控制執行緒呼叫執行緒物件的run()
方法。
run()
定義執行緒功能的方法,通常在子類中重寫。標準的run()
方法呼叫傳入構造器的可呼叫物件target(存在的話),並使用args和kwargs分別作為target的位置引數和關鍵字引數。
# 建立Thread的例項,傳給它一個函式 from threading import Thread from time import sleep, ctime sleep_time = [4, 2] def task(task_tag, sleep_tag): print("task", task_tag, "started at:", ctime()) sleep(sleep_tag) print("task", task_tag, "done at:", ctime()) def main(): print("Main thread started at:", ctime()) threads = [] nloops = range(len(sleep_time))# [0, 1] for i in nloops: t = Thread(target=task, args=(i, sleep_time[i])) threads.append(t) for i in nloops: threads[i].start()# 啟動執行緒 for i in nloops: threads[i].join()# 主執行緒阻塞,直至呼叫join()方法的執行緒終止 print("Main thread done at:", ctime()) if __name__ == '__main__': main()
# 派生Thread的子類,並建立子類的例項 from threading import Thread from time import sleep, ctime sleep_time = [4, 2] class MyThread(Thread): # 重寫run()方法 def run(self): print(self.name, "started at:", ctime()) self._target(self._args) print(self.name, "done at:", ctime()) def task(sleep_tag): sleep(sleep_tag) def main(): print("Main thread started at:", ctime()) threads = [] nloops = range(len(sleep_time)) for i in nloops: t = MyThread(target=task, args=sleep_time[i], name=task.__name__ + str(i)) threads.append(t) for i in nloops: threads[i].start() for i in nloops: threads[i].join() print("Main thread done at:", ctime()) if __name__ == '__main__': main()
join(timeout=None )
阻塞主執行緒直到呼叫join方法的執行緒終止(可能是正常執行完成,也可能是丟擲了未處理的異常)或達到timeout設定的時間。可多次呼叫。
-
timeout:阻塞時間(秒)。如果為None,表示一直阻塞直至呼叫join方法的執行緒終止;如果不為None,表示阻塞的時間,達到該時間後,不管呼叫
join()
方法的執行緒是否執行完成,繼續執行主執行緒或其他啟動的執行緒。
如果執行緒呼叫join()
方法可能導致死鎖,或在呼叫start()
之前呼叫join()
,丟擲RuntimeError
異常。
name
獲取或設定執行緒名稱。多個執行緒可能名稱相同,初始值由構造器設定。
ident
執行緒識別符號,如果為None說明該執行緒未啟動。當一個執行緒退出,新的執行緒建立,執行緒識別符號可能被回收使用。即使執行緒退出,該識別符號仍可用。
is_alive()
判斷執行緒是否處於活動狀態。
daemon
布林標誌,表示這個執行緒是否是守護執行緒。必須在呼叫start()
之前設定,否則丟擲RuntimeError
異常。初始值繼承自建立該執行緒的執行緒。主執行緒不是守護執行緒,因此在主執行緒中建立的執行緒daemon屬性預設值為False
CPython實現細節:在CPython中,由於GIL的原因,一次只有一個執行緒能夠執行python程式碼(即使某些面向效能的庫能克服這個限制???)。想要python程式更好地利用多核機器的計算機資源(計算密集型),建議使用multiprocessing
或concurrent.futures.ProcessPoolExecutor
。如果是同時執行多個I/O密集型任務,threading
仍然不失為一個合適的模組
Lock
原語鎖,是同步原語的一種,當它處於"locked"狀態時不屬於特定執行緒。在python中,這是目前可用的最低階的同步原語,實現自_thread
擴充套件模組。
原語鎖有兩種狀態:locked
(鎖定)或unlocked
(未鎖定)。建立時為未鎖定狀態。
原語鎖有兩種方法:acquire()
和release()
。當鎖處於未鎖定狀態時,acquire()
改變其為鎖定狀態。當鎖處於鎖定狀態時,呼叫acquire()
方法將導致執行緒阻塞,直到其他執行緒呼叫release()
釋放鎖。
class threading.Lock
acquire(blocking=True, timeout=-1 )
獲取鎖。成功返回True,獲取返回False。
- blocking:預設為True,在獲取到鎖之前阻塞執行緒;反之即使沒有獲取到鎖也不會阻塞執行緒。
- timeout:指定執行緒阻塞的最長時間,單位為秒;-1表示無限制等待。當blocking為False時,禁止指定timeout引數
release()
釋放鎖。任何執行緒都可以呼叫,不只是獲取了鎖的執行緒。
鎖更改為未上鎖狀態後,對於呼叫了acquire()
方法而導致阻塞的執行緒,將由系統決定哪個執行緒獲取到鎖。
release()
方法只能在上鎖狀態呼叫,否則將丟擲RuntimeError
異常。
RLock
重入鎖,同步原語的一種,可由同一執行緒多次獲取已持有的鎖。除了原語鎖的上鎖/解鎖狀態,重入鎖還使用了owning thread
和recursion level
的概念。在上鎖狀態,可能有多個執行緒擁有鎖;在解鎖狀態,沒有執行緒擁有鎖。
acquire()
/release()
必須成對出現,可以巢狀,只有最後一個release(即最外層的release)呼叫才會最終釋放鎖。
class threading.RLock
acquire(blocking=True, timeout=-1 )
使用預設引數呼叫時,如果當前執行緒已經擁有鎖,增加1次遞迴深度並立即返回;如果是其他執行緒擁有鎖,阻塞當前執行緒直到鎖被釋放。一旦鎖釋放(遞迴深度為0,此時鎖不屬於任何執行緒),各個執行緒爭奪鎖,並設定遞迴深度為1。
release()
釋放鎖且遞迴深度減1。如果呼叫後遞迴深度為0,重置鎖為未鎖定狀態(不屬於任何執行緒),由其他執行緒爭奪鎖。如果呼叫後遞迴深度非0,鎖仍為上鎖狀態,屬於當前執行緒。
只能由已經獲取了鎖的執行緒呼叫,否則丟擲RuntimeError
異常。
Condition
condition變數總是與某種鎖相聯絡:傳入或者預設建立的鎖物件。傳入鎖物件適用於多個condition變數需要共享同一個鎖的場景。鎖是condition物件的一部分,不需要對鎖單獨進行追蹤。
condition物件遵循上下文管理協議:使用with語句在封閉塊內獲取關聯的鎖物件,在condition物件上呼叫acquire和release實際上呼叫的是關聯鎖的對應方法。
class threading.Condition(lock=None )
條件變數允許一個或多個執行緒等待,直到接收到另一個執行緒的通知。
lock引數必須是Lock
或RLock
物件,作為底層的鎖使用。預設使用RLock
acquire(*args )
呼叫底層lock物件的acquire()方法獲取鎖
release()
呼叫底層lock物件的release()方法釋放鎖
wait(timeout=None )
釋放鎖並阻塞當前執行緒直到被另外一個執行緒呼叫notify()
或notify_all()
喚醒,或者達到設定的timeout時間,任意一種情況都將重新獲取鎖並返回。
只能由已獲取到鎖的執行緒呼叫,否則丟擲RuntimeError
異常。
3.2版本前該方法始終返回None,3.2版本開始除非超時會返回False,其他情況都返回True
wait_for(predicate, timeout=None )
阻塞當前執行緒直到可呼叫物件predicate返回值為True或bool()判斷為True。
wait_for方法將不斷呼叫wait()方法直到超時或滿足predicate返回值為True或bool()判斷為True。
返回值為最後一次執行predicate的返回值,如果超時返回False。
只能由已獲取到鎖的執行緒呼叫,否則丟擲RuntimeError
異常。
notify(n=1 )
喚醒wait()或wait_for()狀態下的某個執行緒。只能由已獲取到鎖的執行緒呼叫,否則丟擲RuntimeError
異常。
notify_all()
喚醒wait()或wait_for()狀態下的所有執行緒。只能由已獲取到鎖的執行緒呼叫,否則丟擲RuntimeError
異常。
notify()和notify_all()並不釋放鎖。意思是呼叫wait()方法的執行緒不會立即返回,需要等到呼叫notify()和notify_all()的執行緒釋放鎖之後才返回。
# 生產者-消費者模式中Condition的用法 # 消費者: with cv: while not an_item_is_available(): cv.wait() get_an_available_item() # 生產者: with cv: make_an_item_available() cv.notify() # 生產者(使用wait_for改進): with cv: cv.wait_for(an_item_is_available) get_an_available_item()
Semaphore Objects
訊號量物件管理一個內部計數器,隨著呼叫acquire()減1,release()呼叫加1,但一定不會小於0。當呼叫acquire()時如果計數器等於0將會阻塞執行緒直到某個執行緒呼叫release()方法。支援上下文管理器協議
class threading.Semaphore(value=1)
指定初始計數器的訊號量,每呼叫一次release()加1,每呼叫一次acquire()減1。
acquire(blocking=True, timeout=None)
獲取訊號量。
使用預設引數呼叫時:
1. 如果計數器大於0,減1並立即返回True 2. 如果計數器等於0,阻塞直到某個執行緒呼叫release()喚醒,喚醒後計數器減1並返回True
release()
釋放訊號量。
class threading.BoundedSemaphore(value=1)
邊界訊號量,計數器值不能超過設定的最大邊界。常用於限制資源佔用的場景比如資料庫連線。
Event Objects
事件是最簡單的執行緒間通訊機制。事件物件管理一個內部標誌,呼叫set()時該標誌為True,呼叫clear()時該標誌為False,呼叫wait()時執行緒阻塞直到標誌為True
class threading.Event
is_set()
如果事件標誌為True,返回True
set()
設定事件標誌為True。將喚醒所有呼叫了wait()而阻塞的執行緒。
clear()
重置事件標誌為False。將阻塞所有呼叫了wait()的執行緒。
wait(timeout=None)
阻塞執行緒直到事件標誌為True或超時。
Timer Objects
Timer繼承自Thread,表示經過一定時間後要執行的任務。
class threading.Timer(interval, function, args=None, kwargs=None)
建立定時器,在interval時間後執行function任務。
cancel()
終止定時器並結束任務(僅對待執行狀態中的任務有效)。