Python 實用程式設計技巧(多程序篇)
我們在多執行緒篇說過 Python 有一個 GIL 鎖,這導致我們無法發揮多核CPU 的效能,於是對於一些耗CPU 的操作(比如:計算、影象處理),我們使用多執行緒程式設計顯得就不那麼好,於是我們採用多程序程式設計,這樣就能充分利用CPU 併發來提高執行的效率(多I/O操作的儘量使用多執行緒程式設計,這樣不會影響效能)
注:本文使用 Python3 實現,但是除了和 Python2 在 print 上的差別外其他都是一樣的
二、多執行緒與多程序的直觀比較
1.我們以計算斐波那契數列為例來比較兩種方式的執行速度
1.多執行緒方式
示例程式碼:
import time from concurrent.futures import ThreadPoolExecutor,as_completed def fib(n): if n<= 2: return 1 return fib(n-1)+fib(n-2) with ThreadPoolExecutor(3) as executor: tasks = [executor.submit(fib, (num)) for num in range(25,35)] start_time = time.time() for future in as_completed(tasks): data = future.result() print("exe result {num}".format(num = data)) print("last time is {time}".format(time = time.time()-start_time))
結果:
exe result 75025 exe result 121393 exe result 196418 exe result 317811 exe result 514229 exe result 832040 exe result 1346269 exe result 2178309 exe result 3524578 exe result 5702887 last time is 3.65224289894104
2.多程序方式:
示例程式碼:
import time from concurrent.futures import ThreadPoolExecutor,as_completed from concurrent.futures import ProcessPoolExecutor def fib(n): if n<= 2: return 1 return fib(n-1)+fib(n-2) if __name__ == '__main__': with ProcessPoolExecutor(3) as executor: tasks = [executor.submit(fib, (num)) for num in range(25,35)] start_time = time.time() for future in as_completed(tasks): data = future.result() print("exe result {num}".format(num = data)) print("last time is {time}".format(time = time.time()-start_time))
結果:
exe result 75025 exe result 121393 exe result 196418 exe result 317811 exe result 514229 exe result 832040 exe result 1346269 exe result 2178309 exe result 3524578 exe result 5702887 last time is 2.270967960357666
結論:
可以很清楚地看到使用多執行緒的方式計算來的更快
2.我們以頻繁I/O操作的任務為例來比較兩種方式的執行速度
1.多執行緒方式
示例程式碼:
import time from concurrent.futures import ThreadPoolExecutor,as_completed from concurrent.futures import ProcessPoolExecutor def random_sleep(n): time.sleep(n) return n if __name__ == '__main__': with ThreadPoolExecutor(3) as executor: tasks = [executor.submit(random_sleep, (num)) for num in [2]*30] start_time = time.time() for future in as_completed(tasks): data = future.result() print("exe result {num}".format(num = data)) print("last time is {time}".format(time = time.time()-start_time))
結果:
last time is 20.006227493286133
2.多程序方式:
示例程式碼:
import time from concurrent.futures import ThreadPoolExecutor,as_completed from concurrent.futures import ProcessPoolExecutor def random_sleep(n): time.sleep(n) return n if __name__ == '__main__': with ProcessPoolExecutor(3) as executor: tasks = [executor.submit(random_sleep, (num)) for num in [2]*30] start_time = time.time() for future in as_completed(tasks): data = future.result() print("exe result {num}".format(num = data)) print("last time is {time}".format(time = time.time()-start_time))
結果:
last time is 20.205044746398926
結論:
對於I/O 操作來講,多執行緒方式要優於多程序方式
三、multiprocessing 多程序程式設計
程序的資料是完全隔離的,不能像執行緒一樣通過全域性變數進行通訊。
多程序程式設計首選還是使用上面我們測試用的那個包,因為他和多執行緒程式設計的介面一致,設計精良,而這個multiprocessig 更加底層一些
示例程式碼:
import multiprocessing import time def get_html(n): time.sleep(n) print("sub progress success") return n if __name__ == '__main__': progress = multiprocessing.Process(target=get_html,args=(2,)) progress.start() print(progress.pid) progress.join() print("main progress end")
結果:
29244 sub progress success main progress end
四、使用程序池
示例程式碼:
import multiprocessing import time def get_html(n): time.sleep(n) print("sub process success") return n if __name__ == '__main__': pool = multiprocessing.Pool(multiprocessing.cpu_count())# 建立執行緒池 result = pool.apply_async(get_html,args=(3,)) #新增進執行緒池 pool.close() #禁止其他程序再次加入 pool.join() print(result.get())
結果:
sub process success 3
當然我們可能會想用迭代的程序池,沒問題,Python 給我們提供了這樣的方法 pool.imap(),
示例程式碼:
import multiprocessing import time def get_html(n): time.sleep(n) print("sub process success") return n if __name__ == '__main__': pool = multiprocessing.Pool(multiprocessing.cpu_count())# 建立執行緒池 for result in pool.imap(get_html,[1,3,5,7,9]): print("sleep {time} success".format(time = result))
結果:
sub process success sleep 1 success sub process success sleep 3 success sub process success sleep 5 success sub process success sleep 7 success sub process success sleep 9 success
五、程序間通訊
方法一:使用程序佇列
示例程式碼:
from multiprocessing import Process,Queue import time def Producer(queue): queue.put("a") time.sleep(2) def Consumer(queue): time.sleep(2) res = queue.get() print(res) if __name__ == '__main__': queue = Queue(10) task1 = Process(target=Producer,args=(queue,)) task2 = Process(target=Consumer,args=(queue,)) task1.start() task2.start() task1.join() task2.join()
結果:
a
注意:
1.共享全域性變數進行通訊的方法在多程序程式設計中是不能實現的,因為程序在 fork 的時候會將所有的變數賦值一分到自己的空間,程序之間是隔離的
2.Queue() 是不能用於程序池的程序之間的通訊的
方法二:使用Manager 中的 Queue 實現程序池中的通訊
示例程式碼:
from multiprocessing import Process,Queue,Pool,Manager import time def Producer(queue): queue.put("a") time.sleep(2) def Consumer(queue): time.sleep(2) res = queue.get() print(res) if __name__ == '__main__': queue = Manager().Queue(10) pool = Pool(2) pool.apply_async(Producer,args=(queue,)) pool.apply_async(Consumer,args=(queue,)) pool.close() pool.join()
結果:
a
方法三:使用Pipe 實現兩個程序之間的通訊
pipe 只能用於兩個程序之間的通訊,效率比queue 高
示例程式碼:
from multiprocessing import Process,Pipe
def Producer(pipe):
pipe.send(“a”)
def Consumer(pipe):
print(pipe.recv())
ifname == ‘main ‘:
receive_pipe,send_pipe = Pipe()
task1 = Process(target=Producer,args=(send_pipe,))
task2 = Process(target=Consumer,args=(receive_pipe,))
task1.start()
task2.start()
task1.join()
task2.join()
結果:
a
六、程序間記憶體共享
示例程式碼:
from multiprocessing import Process import multiprocessing def add_data(p_dict,key,value): p_dict[key] = value if __name__ == '__main__': mgr = multiprocessing.Manager() progress_dict = mgr.dict() first_progress = Process(target=add_data,args=(progress_dict,"Bob",22)) second_progress =Process(target=add_data,args=(progress_dict,"Alic",20)) first_progress.start() second_progress.start() first_progress.join() second_progress.join() print progress_dict
結果:
{'Bob': 22, 'Alic': 20}
解釋:
可以看到,雖然是不同的程序,但是他們共同操縱了一個變數