Python中的多程序(multiprocessing)
Python中的多執行緒、包括協程,由於CPython的sary.html#term-global-interpreter-lock" rel="nofollow,noindex" target="_blank">GIL (Global interpreter Lock ,全域性解釋鎖)問題,只能實現併發(Concurrency),不能實現並行(Parallelism)。 因此,在平行計算場景,多程序是Python最簡單的選擇。
Python多程序概念
概念,就是class
。
瞭解概念,就會了解class
之間的關係。
Process
與普通的多程序類似,Python多程序的核心概念是Process (程序)。 一個簡單的程序使用示例如下:
from multiprocessing import Process def f(name): print('hello', name) if __name__ == '__main__': p0 = Process(target=f, args=('alice',)) p1 = Process(target=f, args=('bob',)) p0.start() p1.start() p0.join() p1.join()
程序使用三部曲:
- 建立Process 。
-
start
,開始執行。 -
join
,等待執行完畢。
Pipe
Pipe
即管道,是Bash中最常見的跨程序通訊手段。echo hello | tee stdout.log
,中間的|
就是管道,把前一個程序的stdout傳遞給下一個程序。
Pipe 建立時,返回兩個Connection ,前者負責send 而後者負責recv 。 兩個程序各執一端,就可以實現單向通訊。
from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': receiver, sender = Pipe() p = Process(target=f, args=(sender,)) p.start() print(receiver.recv())# prints "[42, None, 'hello']" p.join()
如果在建立Pipe
時,指定duplex=True
,比如Pipe(True)
,兩個Connection
即可實現雙向通訊。
預設duplex=False
。
Queue
Queue 是一個基於標準模組queue 、包裝了Pipe 的類。 它不僅具有先進先出(FIFO)的特性,還能實現跨程序通訊。
from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get())# prints "[42, None, 'hello']" p.join()
在實際使用中,除了少數簡單場景外,都不會直接使用Process 、Pipe 、Queue 來實現多程序。 這種低層級(low level,無貶義)的API,可讀性差,容易出錯。 常用的是高層級API——程序池。
Pool
由於Process 建立、銷燬有較大開銷,並且並行數受機器CPU數量的限制,過多無益。 一個Pool (程序池)會統一建立並維持一定數量的Process ,並行地執行Task。 在所有Task執行完畢後,再統一地關閉Process 。
這裡Task(任務)的概念,並未被實現為一個class
,而是一個callable
,比如下面的f
、g
。
#!/usr/bin/env python3 # -*- coding:utf-8 -*- from multiprocessing.pool import Pool def f(x): return x * x def g(x, y): return x**y def main(): with Pool(4) as pool: result = pool.map(f, [1, 2, 3, 4, 5]) print(type(result)) print(result) with Pool(4) as pool: result = pool.starmap(g, [(1, 3), (2, 4), (3, 5), (4, 6), (5, 7)]) print(type(result)) print(result) if __name__ == '__main__': main()
以上程式碼儲存為multi.py
檔案,執行結果如下:
$ python3 multi.py <class 'list'> [1, 4, 9, 16, 25] <class 'list'> [1, 16, 243, 4096, 78125]
map
的用法類似內建函式map
,專門處理單個引數的callable
;
而starmap
則是用來處理多個引數的callable
。
此外,還有利用Pool 執行單個Task的apply 。 除非Task本身就是一個個來的,否則使用apply 的效率不高。
比起apply
,更值得關注的是imap
和imap_unordered
。imap
和map
非常類似,而這個多出來的i
,則是Iterable
。map
使用的是list
而imap
則是Iterable
,前者效率略高,而後者記憶體消耗顯著的小。
在處理結果上,imap
可以儘快返回一個Iterable
的結果,而map
則需要等待全部Task執行完畢,返回list
。
無論map
還是imap
,都需要按順序等待Task執行完成,而imap_unordered
則不必。imap_unordered
返回的Iterable
,會優先迭代到先執行完成的Task。
三者各有特點,要按需使用。
AsyncResult
以上為程序池的同步使用方案。 同步方案會卡在map 或starmap 這一行,直到所有任務都執行完畢。 有時,我們會需要一個非同步方案,這時就需要用到map_async 或starmap_async 。 它們返回的結果,就是AsyncResult 。
#!/usr/bin/env python3 # -*- coding:utf-8 -*- from multiprocessing.pool import Pool def f(x): return x * x def g(x, y): return x**y def main(): with Pool(4) as pool: result = pool.map_async(f, [1, 2, 3, 4, 5]) print(type(result)) print(result.get()) with Pool(4) as pool: result = pool.starmap_async(g, [(1, 3), (2, 4), (3, 5), (4, 6), (5, 7)]) print(type(result)) print(result.get()) if __name__ == '__main__': main()
以上程式碼儲存為multi_async.py
檔案,執行結果如下:
$ python3 multi_async.py <class 'multiprocessing.pool.MapResult'> [1, 4, 9, 16, 25] <class 'multiprocessing.pool.MapResult'> [1, 16, 243, 4096, 78125]
以上程式碼中,實際等待位置是result.get()
那一行。
Timeout
以上多程序程式碼,其實是不完善的。
除非Task非常簡單,並無IO、網路等資源依賴,否則多程序也好、多執行緒也好,都有可能執行不完。
為了避免未知原因的掛起,及時止損,通常需要設定timeout
。AsyncResult
在阻塞時,可以用wait
或get
,設定timeout
引數。
#!/usr/bin/env python3 # -*- coding:utf-8 -*- import time from multiprocessing.pool import Pool, TimeoutError def sleep(duration): time.sleep(duration) with open('result.log', 'a') as file: file.write(str(duration)) file.write('\n') return duration def main(): with Pool(4) as pool: result = pool.map_async(sleep, range(8)) try: print(result.get(timeout=5)) except TimeoutError: print(TimeoutError.__name__) if __name__ == '__main__': main()
以上程式碼儲存為timeout.py
檔案,執行結果如下:
$ python3 timeout.py TimeoutError $ cat result.log 0 1 2 3 4
可以看到,由於timeout=5
,4秒以前的Task都成功了,而大於(等於)5秒的Task都失敗了。
當get
需要等待所有程序結束時,需要在Pool
關閉以前。
因此,需要在with
作用域中執行,否則將超時或(沒設timeout
)掛死。
如果使用wait
,則get
可以在with
以外獲取結果。
因此,更推薦使用
wait
配合get
。
def main(): with Pool(4) as pool: result = pool.map_async(sleep, range(8)) result.wait() try: print(result.get(9)) except TimeoutError: print(TimeoutError.__name__)
替換main()
,執行結果如下:
$ python3 timeout.py [0, 1, 2, 3, 4, 5, 6, 7]
總結
前面提到Python做平行計算的選擇,多程序multiprocessing 只是最簡單的一個選擇。 另外還有兩個常見選擇: 一是使用其它直譯器實現的Python,比如PyPy、Jython等; 二是使用C語言優化需要並行的程式碼,在Native層繞過GIL的限制; 三是使用協程(或執行緒)加subprocess ,這也算是多程序的一個方案。 此外,確認程式碼是否真的會被GIL所影響,是首要工作。 如果程式碼中真正耗時的計算是在Native層執行——這在Python中非常常見,比如OpenCV——那麼用多執行緒也沒問題。
另外,要注意多程序的測試覆蓋問題。 在另一個程序執行的程式碼,是無法被coverage 確認為已覆蓋的。 需要對執行內容進行單獨測試,或者在程式中預留未用多程序優化的原始方案。
其實,多程序帶來的額外通訊、切換開銷,有時候也是很明顯的。 還有個問題是,主程序被殺掉後,子程序會仍然存活,這在某些場景下會產生未知問題。 所以,在機器不是很強大的場景下,用原始的單執行緒序列方案,是最經濟實用的選擇。