程序、執行緒和協程
一、程序
1、多工原理
多工是指作業系統同時可以執行多個任務。
- 單核CPU實現多工原理:作業系統輪流讓各個任務交替執行;
- 多核CPU實現多工原理:真正的執行多工只能在多核CPU上實現,多出來的任務輪流排程到每個核心上執行。
- 併發:看上去一起執行,任務數多於CPU核心數;
- 並行:真正的一起執行,任務數小於等於CPU核心數。
實現多工的方式:
1、多程序模式
2、多執行緒模式
3、協程模式
4、多程序+多執行緒模式
2、程序
對於作業系統而言,一個任務就是一個程序;
程序是系統中程式執行和資源分配的基本單元,每個程序都有自己的資料段、程式碼段、堆疊段。
下面是一小段程式,一個單任務的例子。在其中,有兩個輸出語句分別在在兩個不同的迴圈當中,單任務的執行方式,也就是最初學習時,當一個迴圈沒有結束的時候,無法執行到下面的程式當中。如果想要讓兩個迴圈可以同時在執行,就是在實現多工,當然不是說同時輸出,而是兩個迴圈都在執行著。
1 from time import sleep 2 # 只能執行到那一個迴圈,執行不了run,所以叫單任務 3 def run(): 4while True: 5print("&&&&&&&&&&&&&&&") 6sleep(1.2) 7 8 if __name__ == "__main__": 9while True: 10print("**********") 11sleep(1) 12run()
接下來啟用多工,通過程序來實現。
multiprocessing庫:跨平臺版本的多程序模組,提供了一個Process類來代表一個程序物件(fork僅適用於Linux)。
下面的程式是在一個父程序中建立一個子程序,讓父程序和子程序可以都在執行,建立方式程式中已經很簡潔了。可以自己把這兩段程式複製下來執行一下,看看輸出的效果。
1 from multiprocessing import Process 2 from time import sleep 3 import os 4 5 def run(str): 6# os.getpid()獲取當前程序id號 7# os.getppid()獲取當前程序的父程序id號 8while True: 9print("&&&&&&&&&&&&&&&%s--%s--%s" % (str, os.getpid(), os.getppid())) 10sleep(0.5) 11 12 if __name__ == "__main__": 13print("主(父)程序啟動 %s" % (os.getpid())) 14# 建立子程序 15# target說明程序執行的任務 16p = Process(target=run, args=("nice",)) 17# 啟動程序 18p.start() 19 20while True: 21print("**********") 22sleep(1)
我想第一個單任務的程式就不必說了吧,就是一個死迴圈,一直沒有執行到下面的run函式。第二段程式是通過多程序實現的多工,兩個迴圈都能執行到,我把結果截圖放下面,最好自己去試一下。
3、父子程序的先後順序
上面的多程序的例子中輸出了那麼多,我們使用 的時候究竟是先執行哪個後執行哪個呢?根據我們的一般思維來說,我們寫的主函式其實就是父程序,在主函式中間,要呼叫的也就是子程序。
1 from multiprocessing import Process 2 from time import sleep 3 import os 4 5 def run(): 6print("啟動子程序") 7print("子程序結束") 8sleep(3) 9 10 if __name__ == "__main__": 11print("父程序啟動") 12p = Process(target=run) 13p.start() 14 15# 父程序的結束不能影響子程序,讓程序等待子程序結束再執行父程序 16p.join() 17 18print("父程序結束")
4、全域性變數在多個程序中不能共享
在多程序的程式當中定義的全域性變數在多個程序中是不能共享的,篇幅較長在這裡就不舉例子了,可以自己試一下。這個也是和稍後要說的執行緒的一個區別,線上程中,變數是可以共享的,也因此衍生出一些問題,稍後再說。
5、啟動多個程序
在正常工作使用的時候,當然不止有有個一個兩個程序,畢竟這一兩個也起不到想要的效果。那麼就需要採用更多的程序,這時候需要通過程序池來實現,就是在程序池中放好你要建立的程序,然後執行的時候,把他們都啟動起來,就可以同時進行了,在一定的環境下可以大大的提高效率。當然這個也和起初提到的有關,如果你的CPU是單核的,那麼多程序也只是起到了讓幾個任務同時在執行著,並沒有提高效率,而且啟動程序的時候還要花費一些時間,因此在多核CPU當中更能發揮優勢。
在multiprocessing中有個Pool方法,可以實現程序池。在利用程序池時可以設定要啟動幾個程序,一般情況下,它預設和你電腦的CPU核數一致,也可以自己設定,如果設定的程序數多於CPU核數,那多出來的程序會輪流排程到每個核心上執行。下面是啟動多個程序的過程。
1 from multiprocessing import Pool 2 import os 3 import time 4 import random 5 6 7 def run(name): 8print("子程序%s啟動--%s" % (name, os.getpid())) 9start = time.time() 10time.sleep(random.choice([1,2,3,4,5])) 11end = time.time() 12print("子程序%s結束--%s--耗時%.2f" % (name, os.getpid(), end-start)) 13 14 if __name__ == "__main__": 15print("啟動父程序") 16 17# 建立多個程序 18# Pool 程序池 :括號裡的數表示可以同時執行的程序數量 19# Pool()預設大小是CPU核心數 20pp = Pool(4) 21for i in range(5): 22# 建立程序,放入程序池,統一管理 23pp.apply_async(run, args=(i,)) 24 25# 在呼叫join之前必須先呼叫close,呼叫close之後就不能再繼續新增新的程序了 26pp.close() 27# 程序池物件呼叫join還等待程序池中所有的子程序結束 28pp.join() 29 30print("結束父程序")
6、檔案拷貝(單程序與多程序對比)
(1)單程序實現
1 from multiprocessing import Pool 2 import time 3 import os 4 5 # 實現檔案的拷貝 6 def copyFile(rPath, wPath): 7fr = open(rPath, 'rb') 8fw = open(wPath, 'wb') 9context = fr.read() 10fw.write(context) 11fr.close() 12fw.close() 13 14 path = r'F:\python_note\執行緒、協程' 15 toPath = r'F:\python_note\test' 16 17 # 讀取path下的所有檔案 18 filesList = os.listdir(path) 19 20 # 啟動for迴圈處理每一個檔案 21 start = time.time() 22 for fileName in filesList: 23copyFile(os.path.join(path,fileName), os.path.join(toPath,fileName)) 24 25 end = time.time() 26 print('總耗時:%.2f' % (end-start)) View Code
(2)多程序實現
1 from multiprocessing import Pool 2 import time 3 import os 4 5 # 實現檔案的拷貝 6 def copyFile(rPath, wPath): 7fr = open(rPath, 'rb') 8fw = open(wPath, 'wb') 9context = fr.read() 10fw.write(context) 11fr.close() 12fw.close() 13 14 path = r'F:\python_note\執行緒、協程' 15 toPath = r'F:\python_note\test' 16 17 18 if __name__ == "__main__": 19# 讀取path下的所有檔案 20filesList = os.listdir(path) 21 22start = time.time() 23pp = Pool(4) 24for fileName in filesList: 25pp.apply_async(copyFile, args=(os.path.join( 26path, fileName), os.path.join(toPath, fileName))) 27pp.close() 28pp.join() 29end = time.time() 30print("總耗時:%.2f" % (end - start)) View Code
上面兩個程式是兩種方法實現同一個目標的程式,可以將其中的檔案路徑更換為你自己的路徑,可以看到最後計算出的耗時是多少。也許有人發現並不是多程序的效率就高,說的的確沒錯,因為建立程序也要花費時間,沒準啟動程序的時間遠多讓這一個核心執行所有核心用的時間要多。這個例子也只是演示一下如何使用,在大資料的任務下會有更深刻的體驗。
7、程序物件
我們知道Python是一個面向物件的語言。而且Python中萬物皆物件,程序也可以封裝成物件,來方便以後自己使用,只要把他封裝的足夠豐富,提供清晰的介面,以後使用時會快捷很多,這個就根據自己的需求自己可以試一下,不寫了。
8、程序間通訊
上面提到過程序間的變數是不能共享的,那麼如果有需要該怎麼辦?通過佇列的方式進行傳遞。在父程序中建立佇列,然後把佇列傳到每個子程序當中,他們就可以共同對其進行操作。
1 from multiprocessing import Process, Queue 2 import os 3 import time 4 5 6 def write(q): 7print("啟動寫子程序%s" % (os.getpid())) 8for chr in ['A', 'B', 'C', 'D']: 9q.put(chr) 10time.sleep(1) 11print("結束寫子程序%s" % (os.getpid())) 12 13 def read(q): 14print("啟動讀子程序%s" % (os.getpid())) 15while True: 16value = q.get() 17print("value = "+value) 18print("結束讀子程序%s" % (os.getpid())) 19 20 if __name__ == "__main__": 21# 父程序建立佇列,並傳遞給子程序 22q = Queue() 23pw = Process(target=write, args=(q,)) 24pr = Process(target=read, args=(q,)) 25 26pw.start() 27pr.start() 28# 寫程序結束 29pw.join() 30# pr程序裡是個死迴圈,無法等待期結束,只能強行結束 31pr.terminate() 32print("父程序結束")
二、執行緒
1、執行緒
- 在一個程序內部,要同時幹多件事,就需要執行多個"子任務",我們把程序內的多個"子任務"叫做執行緒
- 執行緒通常叫做輕型的程序,執行緒是共享記憶體空間,併發執行的多工,每一個執行緒都共享一個程序的資源
- 執行緒是最小的執行單元而程序由至少一個執行緒組成。如何排程程序和執行緒,完全由作業系統來決定,程式自己不能決定什麼時候執行,執行多長時間
模組:
1、_thread模組 低階模組(更接近底層)
2、threading模組 高階模組,對_thread進行了封裝
2、啟動一個執行緒
同樣,先給一個多執行緒的例子,其中,仍然使用run函式作為其中的一個子執行緒,主函式為父執行緒。通過threading的Thread方法建立執行緒並開啟,join來等待子執行緒。
1 import threading 2 import time 3 4 5 def run(): 6print("子執行緒(%s)啟動" % (threading.current_thread().name)) 7 8# 實現執行緒的功能 9time.sleep(1) 10print("列印") 11time.sleep(2) 12 13print("子執行緒(%s)結束" % (threading.current_thread().name)) 14 15 16 if __name__ == "__main__": 17# 任何程序都預設會啟動一個執行緒,稱為主執行緒,主執行緒可以啟動新的子執行緒 18# current_thread():返回執行緒的例項 19print("主執行緒(%s)啟動" % (threading.current_thread().name)) 20 21# 建立子執行緒 22t = threading.Thread(target=run, name="runThread") 23t.start() 24 25# 等待執行緒結束 26t.join() 27 28print("主執行緒(%s)結束" % (threading.current_thread().name))
3、執行緒間資料共享
多執行緒和多程序最大的不同在於,多程序中,同一個變數,各自有一份拷貝存在每個程序中,互不影響。
而多執行緒所有變數都由所有執行緒共享。所以任何一個變數都可以被任何一個執行緒修改,因此,執行緒之間共享資料最大的危險在於多個執行緒同時修改一個變數,容易把內容改亂了。
1 import threading 2 3 4 num = 10 5 6 def run(n): 7global num 8for i in range(10000000): 9num = num + n 10num = num - n 11 12 if __name__ == "__main__": 13t1 = threading.Thread(target=run, args=(6,)) 14t2 = threading.Thread(target=run, args=(9,)) 15 16t1.start() 17t2.start() 18t1.join() 19t2.join() 20 21print("num = ",num)
4、執行緒鎖
在第三小點中已經提到了,多執行緒的一個缺點就是資料是共享的,如果有兩個執行緒正同時在修改這個資料,就會出現混亂,它自己也不知道該聽誰的了,尤其是在運算比較複雜,次數較多的時候,這種錯誤的機會會更大。
當然,解決辦法也是有的,那就是利用執行緒鎖。加鎖的意思就是在其中一個執行緒正在對資料進行操作時,讓其他執行緒不得介入。這個加鎖和釋放鎖是由人來確定的。
- 確保了這段程式碼只能由一個執行緒從頭到尾的完整執行
- 阻止了多執行緒的併發執行,要比不加鎖時候效率低。包含鎖的程式碼段只能以單執行緒模式執行
- 由於可以存在多個鎖,不同執行緒持有不同的鎖,並試圖獲取其他的鎖,可能造成死鎖導致多個執行緒掛起,只能靠作業系統強制終止
1 def run(n): 2global num 3for i in range(10000000): 4lock.acquire() 5try: 6num = num + n 7num = num - n 8finally: 9# 修改完釋放鎖 10lock.release() 11 12 if __name__ == "__main__": 13t1 = threading.Thread(target=run, args=(6,)) 14t2 = threading.Thread(target=run, args=(9,)) 15 16t1.start() 17t2.start() 18t1.join() 19t2.join() 20 21print("num = ",num)
上面這段程式是迴圈多次num+n-n+n-n的過程,變數n分別設為6和9是在兩個不同的執行緒當中,程式中已經加了鎖,你可以先去掉試一下,當迴圈次數較小的時候也許還能正確,但次數一旦取的較高就會出現混亂。
加鎖是在迴圈體當中,依次執行加減法,定義中說到確保一個執行緒從頭到尾的完整執行,也就是在計算途中,不會有其他的執行緒打擾。你可以想一下,如果一個執行緒執行完加法,正在執行減法,另一個執行緒進來了,它要先進行加法時的初始sum值該是多少呢,執行緒二不一定線上程一的什麼時候進來,萬一剛進來時候,執行緒一恰好給sum賦值了,而執行緒二仍然用的是正準備進來時候的sum值,那從這裡開始豈不已經分道揚鑣了。所以,運算的次數越多,結果會越離譜。
這個說完了,還有一個小小的改進。你是否記得讀寫檔案時候書寫的一種簡便形式,通過with來實現,可以避免我們忘記關閉檔案,自動幫我們關閉。當然還有一些其他地方也用到了這個方法。這裡也同樣適用。
1 # 與上面程式碼功能相同,with lock可以自動上鎖與解鎖 2 with lock: 3num = num + n 4num = num - n
5、ThreadLocal
- 建立一個全域性的ThreadLocal物件
- 每個執行緒有獨立的儲存空間
- 每個執行緒對ThreadLocal物件都可以讀寫,但是互不影響
根據名字也可以看出,也就是在本地建個連線,所有的操作在本地進行,每個執行緒之間沒有資料的影響。
1 import threading 2 3 4 num = 0 5 local = threading.local() 6 7 def run(x, n): 8x = x + n 9x = x - n 10 11 def func(n): 12# 每個執行緒都有local.x 13local.x = num 14for i in range(10000000): 15run(local.x, n) 16print("%s-%d" % (threading.current_thread().name, local.x)) 17 18 19 if __name__ == "__main__": 20t1 = threading.Thread(target=func, args=(6,)) 21t2 = threading.Thread(target=func, args=(9,)) 22 23t1.start() 24t2.start() 25t1.join() 26t2.join() 27 28print("num = ",num)
6、控制執行緒數量
1 ''' 2 控制執行緒數量是指控制執行緒同時觸發的數量,可以拿下來這段程式碼執行一下,下面啟動了5個執行緒,但是他們會兩個兩個的進行 3 ''' 4 import threading 5 import time 6 7 # 控制併發執行執行緒的數量 8 sem = threading.Semaphore(2) 9 10 def run(): 11with sem: 12for i in range(10): 13print("%s---%d" % (threading.current_thread().name, i)) 14time.sleep(1) 15 16 17 if __name__ == "__main__": 18for i in range(5): 19threading.Thread(target=run).start()
上面的程式是有多個執行緒,但是每次限制同時執行的執行緒,通俗點說就是限制併發執行緒的上限;除此之外,也可以限制執行緒數量的下限,也就是至少達到多少個執行緒才能觸發。
1 import threading 2 import time 3 4 5 # 湊夠一定數量的執行緒才會執行,否則一直等著 6 bar = threading.Barrier(4) 7 8 def run(): 9print("%s--start" % (threading.current_thread().name)) 10time.sleep(1) 11bar.wait() 12print("%s--end" % (threading.current_thread().name)) 13 14 15 if __name__ == "__main__": 16for i in range(5): 17threading.Thread(target=run).start()
7、定時執行緒
1 import threading 2 3 4 def run(): 5print("***********************") 6 7 # 延時執行執行緒 8 t = threading.Timer(5, run) 9 t.start() 10 11 t.join() 12 print("父執行緒結束")
8、執行緒通訊
1 import threading 2 import time 3 4 5 def func(): 6# 事件物件 7event = threading.Event() 8def run(): 9for i in range(5): 10# 阻塞,等待事件的觸發 11event.wait() 12# 重置阻塞,使後面繼續阻塞 13event.clear() 14print("**************") 15t = threading.Thread(target=run).start() 16return event 17 18 e = func() 19 20 # 觸發事件 21 for i in range(5): 22time.sleep(2) 23e.set()
9、一個小栗子
這個例子是用了生產者和消費者來模擬,要進行資料通訊,還引入了佇列。先來理解一下。
1 import threading 2 import queue 3 import time 4 import random 5 6 7 # 生產者 8 def product(id, q): 9while True: 10num = random.randint(0, 10000) 11q.put(num) 12print("生產者%d生產了%d資料放入了佇列" % (id, num)) 13time.sleep(3) 14# 任務完成 15q.task_done() 16 17 # 消費者 18 def customer(id, q): 19while True: 20item = q.get() 21if item is None: 22break 23print("消費者%d消費了%d資料" % (id, item)) 24time.sleep(2) 25# 任務完成 26q.task_done() 27 28 29 if __name__ == "__main__": 30# 訊息佇列 31q = queue.Queue() 32 33# 啟動生產者 34for i in range(4): 35threading.Thread(target=product, args=(i, q)).start() 36 37# 啟動消費者 38for i in range(3): 39threading.Thread(target=customer, args=(i, q)).start()
10、執行緒排程
1 import threading 2 import time 3 4 5 # 執行緒條件變數 6 cond = threading.Condition() 7 8 9 def run(): 10with cond: 11for i in range(0, 10, 2): 12print(threading.current_thread().name, i) 13time.sleep(1) 14cond.wait()# 阻塞 15cond.notify()# 告訴另一個執行緒可以執行 16 17 18 def run2(): 19with cond: 20for i in range(1, 10, 2): 21print(threading.current_thread().name, i) 22time.sleep(1) 23cond.notify() 24cond.wait() 25 26 27 threading.Thread(target=run).start() 28 threading.Thread(target=run2).start()
三、協程
1、協程
- 子程式/子函式:在所有語言中都是層級呼叫,比如A呼叫B,在B執行的工程中又可以呼叫C,C執行完畢返回,B執行完畢返回最後是A執行完畢。是通過棧實現的,一個執行緒就是一個子程式,子程式呼叫總是一個入口,一次返回,呼叫的順序是明確的
- 協程:看上去也是子程式,但執行過程中,在子程式的內部可中斷,然後轉而執行別的子程式,不是函式呼叫,有點類似CPU中斷
1 # 這是一個子程式的呼叫 2 def C(): 3print("C--start") 4print("C--end") 5 6 def B(): 7print("B--start") 8C() 9print("B--end") 10 11 def A(): 12print("A--start") 13B() 14print("A--end") 15 16 A()
- 協程與子程式呼叫的結果類似,但不是通過在函式中呼叫另一個函式
- 協程執行起來有點像執行緒,但協程的特點在於是一個執行緒
- 與執行緒相比的優點:協程的執行效率極高,因為只有一個執行緒,也不存在同時寫變數的衝突,在協程中共享資源不加鎖,只需要判斷狀態
2、協程的原理
1 # python對協程的支援是通過generator實現的 2 def run(): 3print(1) 4yield 10 5print(2) 6yield 20 7print(3) 8yield 30 9 10 # 協程的最簡單風格,控制函式的階段執行,節約執行緒或者程序的切換 11 # 返回值是一個生成器 12 m = run() 13 print(next(m)) 14 print(next(m)) 15 print(next(m))
3、資料傳輸
1 # python對協程的支援是通過generator實現的 2 def run(): 3print(1) 4yield 10 5print(2) 6yield 20 7print(3) 8yield 30 9 10 # 協程的最簡單風格,控制函式的階段執行,節約執行緒或者程序的切換 11 # 返回值是一個生成器 12 m = run() 13 print(next(m)) 14 print(next(m)) 15 print(next(m))
4、小栗子
1 def product(c): 2c.send(None) 3for i in range(5): 4print("生產者產生資料%d" % (i)) 5r = c.send(str(i)) 6print("消費者消費了資料%s" % (r)) 7c.close() 8 9 10 def customer(): 11data = "" 12while True: 13n = yield data 14if not n: 15return 16print("消費者消費了%s" % (n)) 17data = "200" 18 19 20 c = customer() 21 product(c)