執行緒threading
<python的執行緒與threading模組>
一 ,執行緒的兩種呼叫方式
threading 模組建立在thread 模組之上。thread模組以低階、原始的方式來處理和控制執行緒,而threading 模組通過對thread進行二次封裝,
提供了更方便的api來處理執行緒。
直接呼叫:
1 import threading 2 import time 3 4 def sayhi(num): #定義每個執行緒要執行的函式 5 6print("running on number:%s" %num) 7 8time.sleep(3) 9 10 if __name__ == '__main__': 11 12t1 = threading.Thread(target=sayhi,args=(1,)) #生成一個執行緒例項 13t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一個執行緒例項 14 15t1.start() #啟動執行緒 16t2.start() #啟動另一個執行緒 17 18print(t1.getName()) #獲取執行緒名 19print(t2.getName())
繼承式呼叫:
1 import threading 2 import time 3 4 5 class MyThread(threading.Thread): 6def __init__(self,num): 7threading.Thread.__init__(self) 8self.num = num 9 10def run(self):#定義每個執行緒要執行的函式 11 12print("running on number:%s" %self.num) 13 14time.sleep(3) 15 16 if __name__ == '__main__': 17 18t1 = MyThread(1) 19t2 = MyThread(2) 20t1.start() 21t2.start() 22 23print("ending......")
二 ,threading.thread的例項方法
join&Daemon方法:
1 import threading 2 from time import ctime,sleep 3 import time 4 5 def ListenMusic(name): 6 7print ("Begin listening to %s. %s" %(name,ctime())) 8sleep(3) 9print("end listening %s"%ctime()) 10 11 def RecordBlog(title): 12 13print ("Begin recording the %s! %s" %(title,ctime())) 14sleep(5) 15print('end recording %s'%ctime()) 16 17 18 threads = [] 19 20 21 t1 = threading.Thread(target=ListenMusic,args=('水手',)) 22 t2 = threading.Thread(target=RecordBlog,args=('python執行緒',)) 23 24 threads.append(t1) 25 threads.append(t2) 26 27 if __name__ == '__main__': 28 29for t in threads: 30#t.setDaemon(True) #注意:一定在start之前設定 31t.start() 32# t.join() 33# t1.join() 34t1.setDaemon(True) 35 36#t2.join()########考慮這三種join位置下的結果? 37print ("all over %s" %ctime())
join():在子執行緒完成執行之前,這個子執行緒的父執行緒將一直被阻塞。
setDaemon(True):
將執行緒宣告為守護執行緒,必須在start() 方法呼叫之前設定, 如果不設定為守護執行緒程式會被無限掛起。這個方法基本和join是相反的。
當我們 在程式執行中,執行一個主執行緒,如果主執行緒又建立一個子執行緒,主執行緒和子執行緒 就分兵兩路,分別執行,那麼當主執行緒完成
想退出時,會檢驗子執行緒是否完成。如 果子執行緒未完成,則主執行緒會等待子執行緒完成後再退出。但是有時候我們需要的是 只要主執行緒
完成了,不管子執行緒是否完成,都要和主執行緒一起退出,這時就可以 用setDaemon方法啦
其它方法:
1 # run():執行緒被cpu排程後自動執行執行緒物件的run方法 2 # start():啟動執行緒活動。 3 # isAlive(): 返回執行緒是否活動的。 4 # getName(): 返回執行緒名。 5 # setName(): 設定執行緒名。 6 7 threading模組提供的一些方法: 8 # threading.currentThread(): 返回當前的執行緒變數。 9 # threading.enumerate(): 返回一個包含正在執行的執行緒的list。正在執行指執行緒啟動後、結束前,不包括啟動前和終止後的執行緒。 10 # threading.activeCount(): 返回正在執行的執行緒數量,與len(threading.enumerate())有相同的結果。
三, 同步鎖(Lock)
1 import time 2 import threading 3 4 def addNum(): 5global num #在每個執行緒中都獲取這個全域性變數 6#num-=1 7 8temp=num 9#print('--get num:',num ) 10time.sleep(0.1) 11num =temp-1 #對此公共變數進行-1操作 12 13 num = 100#設定一個共享變數 14 thread_list = [] 15 for i in range(100): 16t = threading.Thread(target=addNum) 17t.start() 18thread_list.append(t) 19 20 for t in thread_list: #等待所有執行緒執行完畢 21t.join() 22 23 print('final num:', num )
觀察:time.sleep(0.1) /0.001/0.0000001 結果分別是多少?
多個執行緒都在同時操作同一個共享資源,所以造成了資源破壞,怎麼辦呢?(join會造成序列,失去所執行緒的意義)
我們可以通過 同步鎖 來解決這種問題
1 R=threading.Lock() 2 3 #### 4 def sub(): 5global num 6R.acquire() 7temp=num-1 8time.sleep(0.1) 9num=temp 10R.release()
四, 執行緒死鎖和遞迴鎖
線上程間共享多個資源的時候,如果兩個執行緒分別佔有一部分資源並且同時等待對方的資源,就會造成死鎖,因為系統判斷這部分資源都正在使用,所有這兩個執行緒在無外力作用下將一直等待下去。下面是一個死鎖的例子:
1 import threading,time 2 3 class myThread(threading.Thread): 4def doA(self): 5lockA.acquire() 6print(self.name,"gotlockA",time.ctime()) 7time.sleep(3) 8lockB.acquire() 9print(self.name,"gotlockB",time.ctime()) 10lockB.release() 11lockA.release() 12 13def doB(self): 14lockB.acquire() 15print(self.name,"gotlockB",time.ctime()) 16time.sleep(2) 17lockA.acquire() 18print(self.name,"gotlockA",time.ctime()) 19lockA.release() 20lockB.release() 21 22def run(self): 23self.doA() 24self.doB() 25 if __name__=="__main__": 26 27lockA=threading.Lock() 28lockB=threading.Lock() 29threads=[] 30for i in range(5): 31threads.append(myThread()) 32for t in threads: 33t.start() 34for t in threads: 35t.join()#等待執行緒結束,後面再講。
解決辦法:使用遞迴鎖,將
1 lockA=threading.Lock() 2 lockB=threading.Lock()<br>#--------------<br>lock=threading.RLock()
為了支援在同一執行緒中多次請求同一資源,python提供了“可重入鎖”:threading.RLock。RLock內部維護著一個Lock和一個counter變數,counter記錄了acquire的次數,從而使得資源可以被多次acquire。直到一個執行緒所有的acquire都被release,其他的執行緒才能獲得資源。
應用:
1 import time 2 3 import threading 4 5 class Account: 6def __init__(self, _id, balance): 7self.id = _id 8self.balance = balance 9self.lock = threading.RLock() 10 11def withdraw(self, amount): 12 13with self.lock: 14self.balance -= amount 15 16def deposit(self, amount): 17with self.lock: 18self.balance += amount 19 20 21def drawcash(self, amount):#lock.acquire中巢狀lock.acquire的場景 22 23with self.lock: 24interest=0.05 25count=amount+amount*interest 26 27self.withdraw(count) 28 29 30 def transfer(_from, to, amount): 31 32#鎖不可以加在這裡 因為其他的其它執行緒執行的其它方法在不加鎖的情況下資料同樣是不安全的 33_from.withdraw(amount) 34 35to.deposit(amount) 36 37 38 39 alex = Account('kelvin',1000) 40 yuan = Account('bob',1000) 41 42 t1=threading.Thread(target = transfer, args = (kelvin,bob, 100)) 43 t1.start() 44 45 t2=threading.Thread(target = transfer, args = (bob,kelvin, 200)) 46 t2.start() 47 48 t1.join() 49 t2.join() 50 51 print('>>>',kelvin.balance) 52 print('>>>',bob.balance)
五,同步條件(Event):
事件是一個簡單的同步物件;該事件表示一個內部標誌,
執行緒可以等待設定標誌,或者自己設定或清除標誌。
event = threading.Event()
#客戶端執行緒可以等待設定標誌
event.wait()
#伺服器執行緒可以設定或重置它
event.set()
event.clear()
如果設定了標誌,則wait方法不執行任何操作。
如果該標誌被清除,等待將被阻止,直到它再次設定。
任何數量的執行緒都可以等待相同的事件。
1 import threading,time 2 class Boss(threading.Thread): 3def run(self): 4print("BOSS:今晚大家都要加班到22:00。") 5print(event.isSet()) 6event.set() 7time.sleep(5) 8print("BOSS:<22:00>可以下班了。") 9print(event.isSet()) 10event.set() 11 class Worker(threading.Thread): 12def run(self): 13event.wait() 14print("Worker:哎……命苦啊!") 15time.sleep(1) 16event.clear() 17event.wait() 18print("Worker:OhYeah!") 19 if __name__=="__main__": 20event=threading.Event() 21threads=[] 22for i in range(5): 23threads.append(Worker()) 24threads.append(Boss()) 25for t in threads: 26t.start() 27for t in threads: 28t.join()
六,訊號量(Semaphore)
訊號量用來控制執行緒併發數的,BoundedSemaphore或Semaphore管理一個內建的計數 器,每當呼叫acquire()時-1,呼叫release()時+1。
計數器不能小於0,當計數器為 0時,acquire()將阻塞執行緒至同步鎖定狀態,直到其他執行緒呼叫release()。( 類似於停車位的概念)
BoundedSemaphore與Semaphore的唯一區別在於前者將在呼叫release()時檢查計數 器的值是否超過了計數器的初始值,如果超過了將丟擲一個異常。
1 import threading,time 2 class myThread(threading.Thread): 3def run(self): 4if semaphore.acquire(): 5print(self.name) 6time.sleep(5) 7semaphore.release() 8 if __name__=="__main__": 9semaphore=threading.Semaphore(5) 10thrs=[] 11for i in range(100): 12thrs.append(myThread()) 13for t in thrs: 14t.start()
七,多執行緒利器---佇列(queue)
列表是不安全的資料結構:
1 import threading,time 2 3 li=[1,2,3,4,5] 4 5 def pri(): 6while li: 7a=li[-1] 8print(a) 9time.sleep(1) 10try: 11li.remove(a) 12except Exception as e: 13print('----',a,e) 14 15 t1=threading.Thread(target=pri,args=()) 16 t1.start() 17 t2=threading.Thread(target=pri,args=()) 18 t2.start()
思考:如何通過對列來完成上述功能?
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
queue列隊類的方法:
1 建立一個“佇列”物件 2 import Queue 3 q = Queue.Queue(maxsize = 10) 4 Queue.Queue類即是一個佇列的同步實現。佇列長度可為無限或者有限。可通過Queue的建構函式的可選引數maxsize來設定佇列長度。如果maxsize小於1就表示佇列長度無限。 5 6 將一個值放入佇列中 7 q.put(10) 8 呼叫佇列物件的put()方法在隊尾插入一個專案。put()有兩個引數,第一個item為必需的,為插入專案的值;第二個block為可選引數,預設為 9 1。如果隊列當前為空且block為1,put()方法就使呼叫執行緒暫停,直到空出一個數據單元。如果block為0,put方法將引發Full異常。 10 11 將一個值從佇列中取出 12 q.get() 13 呼叫佇列物件的get()方法從隊頭刪除並返回一個專案。可選引數為block,預設為True。如果佇列為空且block為True, 14 get()就使呼叫執行緒暫停,直至有專案可用。如果佇列為空且block為False,佇列將引發Empty異常。 15 16 Python Queue模組有三種佇列及建構函式: 17 1、Python Queue模組的FIFO佇列先進先出。class queue.Queue(maxsize) 18 2、LIFO類似於堆,即先進後出。class queue.LifoQueue(maxsize) 19 3、還有一種是優先順序佇列級別越低越先出來。class queue.PriorityQueue(maxsize) 20 21 此包中的常用方法(q = Queue.Queue()): 22 q.qsize() 返回佇列的大小 23 q.empty() 如果佇列為空,返回True,反之False 24 q.full() 如果佇列滿了,返回True,反之False 25 q.full 與 maxsize 大小對應 26 q.get([block[, timeout]]) 獲取佇列,timeout等待時間 27 q.get_nowait() 相當q.get(False) 28 非阻塞 q.put(item) 寫入佇列,timeout等待時間 29 q.put_nowait(item) 相當q.put(item, False) 30 q.task_done() 在完成一項工作之後,q.task_done() 函式向任務已經完成的佇列傳送一個訊號 31 q.join() 實際上意味著等到佇列為空,再執行別的操作
other mode:
1 import queue 2 3 #先進後出 4 5 q=queue.LifoQueue() 6 7 q.put(34) 8 q.put(56) 9 q.put(12) 10 11 #優先順序 12 # q=queue.PriorityQueue() 13 # q.put([5,100]) 14 # q.put([7,200]) 15 # q.put([3,"hello"]) 16 # q.put([4,{"name":"alex"}]) 17 18 while 1: 19 20data=q.get() 21print(data)
生產者消費者模型:
為什麼要使用生產者和消費者模式?
線上程世界裡,生產者就是生產資料的執行緒,消費者就是消費資料的執行緒。在多執行緒開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產資料。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。
什麼是生產者消費者模式?
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。
這就像,在餐廳,廚師做好菜,不需要直接和客戶交流,而是交給前臺,而客戶去飯菜也不需要不找廚師,直接去前臺領取即可,這也是一個結耦的過程。
1 import time,random 2 import queue,threading 3 4 q = queue.Queue() 5 6 def Producer(name): 7count = 0 8while count <10: 9print("making........") 10time.sleep(random.randrange(3)) 11q.put(count) 12print('Producer %s has produced %s baozi..' %(name, count)) 13count +=1 14#q.task_done() 15#q.join() 16print("ok......") 17 def Consumer(name): 18count = 0 19while count <10: 20time.sleep(random.randrange(4)) 21if not q.empty(): 22data = q.get() 23#q.task_done() 24#q.join() 25print(data) 26print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data)) 27else: 28print("-----no baozi anymore----") 29count +=1 30 31 p1 = threading.Thread(target=Producer, args=('A',)) 32 c1 = threading.Thread(target=Consumer, args=('B',)) 33 # c2 = threading.Thread(target=Consumer, args=('C',)) 34 # c3 = threading.Thread(target=Consumer, args=('D',)) 35 p1.start() 36 c1.start() 37 # c2.start() 38 # c3.start()
<多程序模組 multiprocessing>
multiprocessing>
M
is a package that supports spawning processes using an API similar to the threading module. The ultiprocessing
package offers both local and remote concurrency,effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing
module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows. multiprocessing
由於GIL的存在,python中的多執行緒其實並不是真正的多執行緒,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多程序。
multiprocessing包是Python中的多程序管理包。與threading.Thread類似,它可以利用multiprocessing.Process物件來建立一個程序。該程序可以執行在Python程式內部編寫的函式。該Process物件與Thread物件的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類 (這些物件可以像多執行緒那樣,通過引數傳遞給各個程序),用以同步程序,其用法與threading包中的同名類一致。所以,multiprocessing的很大一部份與threading使用同一套API,只不過換到了多程序的情境。
一 ,程序的呼叫
呼叫方式1
1 from multiprocessing import Process 2 import time 3 def f(name): 4time.sleep(1) 5print('hello', name,time.ctime()) 6 7 if __name__ == '__main__': 8p_list=[] 9for i in range(3): 10p = Process(target=f, args=('alvin',)) 11p_list.append(p) 12p.start() 13for i in p_list: 14p.join() 15print('end')
呼叫方式2
1 from multiprocessing import Process 2 import time 3 4 class MyProcess(Process): 5def __init__(self): 6super(MyProcess, self).__init__() 7#self.name = name 8 9def run(self): 10time.sleep(1) 11print ('hello', self.name,time.ctime()) 12 13 14 if __name__ == '__main__': 15p_list=[] 16for i in range(3): 17p = MyProcess() 18p.start() 19p_list.append(p) 20 21for p in p_list: 22p.join() 23 24print('end')
要顯示涉及的各個程序ID,下面是一個擴充套件的示例:
1 from multiprocessing import Process 2 import os 3 import time 4 def info(title): 5 6print("title:",title) 7print('parent process:', os.getppid()) 8print('process id:', os.getpid()) 9 10 def f(name): 11info('function f') 12print('hello', name) 13 14 if __name__ == '__main__': 15info('main process line') 16time.sleep(1) 17print("------------------") 18p = Process(target=info, args=('yuan',)) 19p.start() 20p.join()
二, Process類
構造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
group: 執行緒組,目前還沒有實現,庫引用中提示必須是None;
target: 要執行的方法;
name: 程序名;
args/kwargs: 要傳入方法的引數。
例項方法:
is_alive():返回程序是否在執行。
join([timeout]):阻塞當前上下文環境的程序程,直到呼叫此方法的程序終止或到達指定的timeout(可選引數)。
start():程序準備就緒,等待CPU排程
run():strat()呼叫run方法,如果例項程序時未制定傳入target,這star執行t預設run()方法。
terminate():不管任務是否完成,立即停止工作程序
屬性:
daemon:和執行緒的setDeamon功能一樣
name:程序名字。
pid:程序號。
1 import time 2 frommultiprocessing import Process 3 4 def foo(i): 5time.sleep(1) 6print (p.is_alive(),i,p.pid) 7time.sleep(1) 8 9 if __name__ == '__main__': 10p_list=[] 11for i in range(10): 12p = Process(target=foo, args=(i,)) 13#p.daemon=True 14p_list.append(p) 15 16for p in p_list: 17p.start() 18# for p in p_list: 19#p.join() 20 21print('main process end')
三 ,程序間通訊
1 程序對列Queue
1 from multiprocessing import Process, Queue 2 import queue 3 4 def f(q,n): 5#q.put([123, 456, 'hello']) 6q.put(n*n+1) 7print("son process",id(q)) 8 9 if __name__ == '__main__': 10q = Queue()#try: q=queue.Queue() 11print("main process",id(q)) 12 13for i in range(3): 14p = Process(target=f, args=(q,i)) 15p.start() 16 17print(q.get()) 18print(q.get()) 19print(q.get())
2 管道
函式的作用是:返回一對由管道連線的連線物件,預設情況下,管道是雙向的。例如:
1 from multiprocessing import Process, Pipe 2 3 def f(conn): 4conn.send([12, {"name":"yuan"}, 'hello']) 5response=conn.recv() 6print("response",response) 7conn.close() 8print("q_ID2:",id(child_conn)) 9 10 if __name__ == '__main__': 11 12parent_conn, child_conn = Pipe() 13print("q_ID1:",id(child_conn)) 14p = Process(target=f, args=(child_conn,)) 15p.start() 16print(parent_conn.recv())# prints "[42, None, 'hello']" 17parent_conn.send("兒子你好!") 18p.join()
pipe()返回的兩個連線物件表示管道的兩端。每個連線物件都有send()和recv()方法(以及其他方法)。請注意,如果兩個程序(或執行緒)同時嘗試從管道的同一端讀取或寫入資料,則管道中的資料可能會損壞。當然,同時使用不同管端的過程不會有損壞的風險。
3 Managers
Queue和pipe只是實現了資料互動,並沒實現資料共享,即一個程序去更改另一個程序的資料。
manager()返回的manager物件控制包含python物件的伺服器程序,並允許其他程序使用代理來操作這些物件。
manager()返回的管理器將支援型別列表、dict、名稱空間、鎖、rlock、訊號量、有界訊號量、條件、事件、屏障、佇列、值和陣列。例如:
1 from multiprocessing import Process, Manager 2 3 def f(d, l,n): 4d[n] = '1' 5d['2'] = 2 6d[0.25] = None 7l.append(n) 8#print(l) 9 10print("son process:",id(d),id(l)) 11 12 if __name__ == '__main__': 13 14with Manager() as manager: 15 16d = manager.dict() 17 18l = manager.list(range(5)) 19 20print("main process:",id(d),id(l)) 21 22p_list = [] 23 24for i in range(10): 25p = Process(target=f, args=(d,l,i)) 26p.start() 27p_list.append(p) 28 29for res in p_list: 30res.join() 31 32print(d) 33print(l)
四 ,程序同步
如果不使用來自不同程序的鎖輸出,則很容易混淆所有內容。
1 from multiprocessing import Process, Lock 2 3 def f(l, i): 4 5with l.acquire(): 6print('hello world %s'%i) 7 8 if __name__ == '__main__': 9lock = Lock() 10 11for num in range(10): 12Process(target=f, args=(lock, num)).start()
五 ,程序池
程序池內部維護一個程序序列,當使用時,則去程序池中獲取一個程序,如果程序池序列中沒有可供使用的進程序,那麼程式就會等待,直到程序池中有可用程序為止。
程序池中有兩個方法:
- apply
- apply_async
1 frommultiprocessing import Process,Pool 2 import time,os 3 4 def Foo(i): 5time.sleep(1) 6print(i) 7return i+100 8 9 def Bar(arg): 10 11print(os.getpid()) 12print(os.getppid()) 13print('logger:',arg) 14 15 pool = Pool(5) 16 17 Bar(1) 18 print("----------------") 19 20 for i in range(10): 21#pool.apply(func=Foo, args=(i,)) 22#pool.apply_async(func=Foo, args=(i,)) 23pool.apply_async(func=Foo, args=(i,),callback=Bar) 24 25 pool.close() 26 pool.join() 27 print('end')
<Python中的上下文管理器(contextlib模組)>
上下文管理器的任務是:程式碼塊執行前準備,程式碼塊執行後收拾
1 如何使用上下文管理器:
如何開啟一個檔案,並寫入"hello world":
1 filename="my.txt" 2 mode="w" 3 f=open(filename,mode) 4 f.write("hello world") 5 f.close()
當發生異常時(如磁碟寫滿),就沒有機會執行第5行。當然,我們可以採用try-finally語句塊進行包裝:
1 writer=open(filename,mode) 2 try: 3writer.write("hello world") 4 finally: 5writer.close()
當我們進行復雜的操作時,try-finally語句就會變得醜陋,採用with語句重寫:
1 with open(filename,mode) as writer: 2writer.write("hello world")
as指代了從open()函式返回的內容,並把它賦給了新值。with完成了try-finally的任務。
2 自定義上下文管理器
with語句的作用類似於try-finally,提供一種上下文機制。要應用with語句的類,其內部必須提供兩個內建函式__enter__和__exit__。前者在主體程式碼執行前執行,後者在主體程式碼執行後執行。as後面的變數,是在__enter__函式中返回的。
1 class echo(): 2def output(self): 3print "hello world" 4def __enter__(self): 5print "enter" 6return self#可以返回任何希望返回的東西 7def __exit__(self,exception_type,value,trackback): 8print "exit" 9if exception_type==ValueError: 10return True 11else: 12return Flase 13 14 >>>with echo as e: 15e.output() 16 17 輸出: 18 enter 19 hello world 20 exit
完備的__exit__函式如下:
1 def __exit__(self,exc_type,exc_value,exc_tb)
其中,exc_type:異常型別;exc_value:異常值;exc_tb:異常追蹤資訊
當__exit__返回True時,異常不傳播
3 ,contextlib模組
contextlib模組的作用是提供更易用的上下文管理器,它是通過Generator實現的。contextlib中的contextmanager作為裝飾器來提供一種針對函式級別的上下文管理機制,常用框架如下:
1 from contextlib import contextmanager 2 @contextmanager 3 def make_context(): 4print 'enter' 5try: 6yield "ok" 7except RuntimeError,err: 8print 'error',err 9finally: 10print 'exit' 11 12 >>>with make_context() as value: 13print value 14 15 輸出為: 16enter 17ok 18exit
其中,yield寫入try-finally中是為了保證異常安全(能處理異常)as後的變數的值是由yield返回。 yield前面的語句可看作程式碼塊執行前操作,yield之後的操作可以看作在__exit__函式中的操作。
以執行緒鎖為例:
1 @contextlib.contextmanager 2 def loudLock(): 3print 'Locking' 4lock.acquire() 5yield 6print 'Releasing' 7lock.release() 8 9 with loudLock(): 10print 'Lock is locked: %s' % lock.locked() 11print 'Doing something that needs locking' 12 13 #Output: 14 #Locking 15 #Lock is locked: True 16 #Doing something that needs locking 17 #Releasing
4 contextlib.nested:減少巢狀
對於:
1 with open(filename,mode) as reader: 2with open(filename1,mode1) as writer: 3writer.write(reader.read())
可以通過contextlib.nested進行簡化:
1 with contextlib.nested(open(filename,mode),open(filename1,mode1)) as (reader,writer): 2writer.write(reader.read())
在python 2.7及以後,被一種新的語法取代:
1 with open(filename,mode) as reader,open(filename1,mode1) as writer: 2writer.write(reader.read())
5 contextlib.closing()
file類直接支援上下文管理器API,但有些表示開啟控制代碼的物件並不支援,如urllib.urlopen()返回的物件。還有些遺留類,使用close()方法而不支援上下文管理器API。為了確保關閉控制代碼,需要使用closing()為它建立一個上下文管理器(呼叫類的close方法)。
<協程>
協程,又稱微執行緒,纖程。英文名Coroutine。
優點1: 協程極高的執行效率。因為子程式切換不是執行緒切換,而是由程式自身控制,因此,沒有執行緒切換的開銷,和多執行緒比,執行緒數量越多,協程的效能優勢就越明顯。
優點2: 不需要多執行緒的鎖機制,因為只有一個執行緒,也不存在同時寫變數衝突,在協程中控制共享資源不加鎖,只需要判斷狀態就好了,所以執行效率比多執行緒高很多。
因為協程是一個執行緒執行,那怎麼利用多核CPU呢?最簡單的方法是多程序+協程,既充分利用多核,又充分發揮協程的高效率,可獲得極高的效能。
yield的簡單實現
1 import time 2 import queue 3 4 def consumer(name): 5print("--->ready to eat baozi...") 6while True: 7new_baozi = yield 8print("[%s] is eating baozi %s" % (name,new_baozi)) 9#time.sleep(1) 10 11 def producer(): 12 13r = con.__next__() 14r = con2.__next__() 15n = 0 16while 1: 17time.sleep(1) 18print("\033[32;1m[producer]\033[0m is making baozi %s and %s" %(n,n+1) ) 19con.send(n) 20con2.send(n+1) 21 22n +=2 23 24 25 if __name__ == '__main__': 26con = consumer("c1") 27con2 = consumer("c2") 28p = producer()
Greenlet
greenlet是一個用C實現的協程模組,相比與python自帶的yield,它可以使你在任意函式之間隨意切換,而不需把這個函式先宣告為generator
1 from greenlet import greenlet 2 3 4 def test1(): 5print(12) 6gr2.switch() 7print(34) 8gr2.switch() 9 10 11 def test2(): 12print(56) 13gr1.switch() 14print(78) 15 16 17 gr1 = greenlet(test1) 18 gr2 = greenlet(test2) 19 gr1.switch()
Gevent
1 import gevent 2 3 import requests,time 4 5 6 start=time.time() 7 8 def f(url): 9print('GET: %s' % url) 10resp =requests.get(url) 11data = resp.text 12print('%d bytes received from %s.' % (len(data), url)) 13 14 gevent.joinall([ 15 16gevent.spawn(f, 'https://www.python.org/'), 17gevent.spawn(f, 'https://www.yahoo.com/'), 18gevent.spawn(f, 'https://www.baidu.com/'), 19gevent.spawn(f, 'https://www.sina.com.cn/'), 20 21 ]) 22 23 # f('https://www.python.org/') 24 # 25 # f('https://www.yahoo.com/') 26 # 27 # f('https://baidu.com/') 28 # 29 # f('https://www.sina.com.cn/') 30 31 print("cost time:",time.time()-start)