IO多路複用
一、IO模型介紹
1,主要的五種模型:阻塞IO、非阻塞IO、IO多路複用、訊號驅動IO(不常用)、非同步IO
對於一個network IO,它會涉及到兩個系統物件,一個是呼叫這個IO的process(Thread),另一個是系統核心。當一個read/recv讀資料的操作發生時,該操作會經歷兩個階段:
(1)等待資料準備
(2)將資料從核心拷貝到程序中
2,注意
#1、輸入操作:read、readv、recv、recvfrom、recvmsg共5個函式,如果會阻塞狀態,則會經理wait data和copy data兩個階段,如果設定為非阻塞則在wait 不到data時丟擲異常 #2、輸出操作:write、writev、send、sendto、sendmsg共5個函式,在傳送緩衝區滿了會阻塞在原地,如果設定為非阻塞,則會丟擲異常 #3、接收外來連結:accept,與輸入操作類似 #4、發起外出連結:connect,與輸出操作類似
二、阻塞IO
當用戶程序呼叫了recvfrom,然後系統呼叫,kernel就開始了IO的第一階段:準備資料。對於network IO來說,很多時候資料在一開始還沒到達,這個時候kernel就要等待資料到來,而在使用者程序這邊,整個程序會被阻塞。當kernel一直等到資料準備好了,它就會將資料從kernel中拷貝到使用者記憶體,然後kernel返回結果,使用者程序才接觸block的狀態,重新執行起來。
三、非阻塞IO
當用戶程序發出read操作時,如果kernel中的資料還沒準備好,那麼它並不會block使用者程序,而是立刻返回一個error。當用戶程序接收到一個error,就會知道資料還沒準備,於是使用者就可以做點其他的,過一段時間,再次傳送read操作,一旦kernel的資料準備好了,他就會立馬把資料拷貝到了使用者記憶體,然後返回。
在這種非阻塞IO模式下,使用者程序就不斷的詢問kernel的資料準備好了沒,若沒有,返回給 使用者一個error,在兩次詢問期間,使用者程序可以乾點其他的,若資料準備好了,就直接拷貝,這個詢問過程叫輪詢。在拷貝資料整個過程,程序任仍然處於阻塞狀態
’
socket服務端 import socket,time server=socket.socket() ip_port=('127.0.0.1',8888) server.bind(ip_port) server.listen() server.setblocking(False) while 1: try: conn,addr=server.accept() break except Exception: print('等待連線。。。。。') time.sleep(1) while 1: try: msg=conn.recv(1024) print(msg.decode('utf-8')) except Exception: print('等待接收訊息。。。。') time.sleep(1) socket客戶端 import socket client=socket.socket() ip_port=('127.0.0.1',8888) client.connect(ip_port) while 1: msg=input('請輸入:') client.send(msg.encode('utf-8'))
四、多路複用IO(IO multiplexing)
相當於select\epoll,這種IO方式也稱為事件驅動IO,select\epoll的好處在於單個process就可以同時處理多個網路連線的IO。它的基本原理就是select\epoll這個function會不斷的輪詢所負責的所有socket,當某個socket有資料響應了,就通知使用者程序。
當用戶程序呼叫了select,那麼整個程序會被block,而同時,kernel會“監視”所有select負責的socket,當任何一個socket中的資料準備好了,select就會返回。這個時候使用者程序再呼叫read操作,將資料從kernel拷貝到使用者程序。
這個圖和blocking IO的圖其實並沒有太大的不同,事實上還更差一些。因為它不僅阻塞了還多需要使用兩個系統呼叫(select和recvfrom),而blocking IO只調用了一個系統呼叫(recvfrom),當只有一個連線請求的時候,這個模型還不如阻塞IO效率高。但是,用select的優勢在於它可以同時處理多個connection,而阻塞IO那裡不能,我不管阻塞不阻塞,你所有的連線包括recv等操作,我都幫你監聽著(以什麼形式監聽的呢?先不要考慮,下面會講的~~),其中任何一個有變動(有連結,有資料),我就告訴你使用者,那麼你就可以去呼叫這個資料了,這就是他的NB之處。這個IO多路複用模型機制是作業系統幫我們提供的,在windows上有這麼個機制叫做select,那麼如果我們想通過自己寫程式碼來控制這個機制或者自己寫這麼個機制,我們可以使用python中的select模組來完成上面這一系列代理的行為。在一切皆檔案的unix下,這些可以接收資料的物件或者連線,都叫做檔案描述符fd
select模組及使用方法: select的優勢在於處理多個連線,不適用於單個連線
import select fd_r_list, fd_w_list, fd_e_list = select.select(rlist, wlist, xlist, [timeout]) 引數: 可接受四個引數(前三個必須) rlist: wait until ready for reading#等待讀的物件,你需要監聽的需要獲取資料的物件列表 wlist: wait until ready for writing#等待寫的物件,你需要寫一些內容的時候,input等等,也就是說我會迴圈他看看是否有需要傳送的訊息,如果有我取出這個物件的訊息併發送出去,一般用不到,這裡我們也給一個[]。 xlist: wait for an “exceptional condition”#等待異常的物件,一些額外的情況,一般用不到,但是必須傳,那麼我們就給他一個[]。 timeout: 超時時間 當超時時間 = n(正整數)時,那麼如果監聽的控制代碼均無任何變化,則select會阻塞n秒,之後返回三個空列表,如果監聽的控制代碼有變化,則直接執行。 返回值:三個列表與上面的三個引數列表是對應的 select方法用來監視檔案描述符(當檔案描述符條件不滿足時,select會阻塞),當某個檔案描述符狀態改變後,會返回三個列表 1、當引數1 序列中的fd滿足“可讀”條件時,則獲取發生變化的fd並新增到fd_r_list中 2、當引數2 序列中含有fd時,則將該序列中所有的fd新增到 fd_w_list中 3、當引數3 序列中的fd發生錯誤時,則將該發生錯誤的fd新增到 fd_e_list中 4、當超時時間為空,則select會一直阻塞,直到監聽的控制代碼發生變化
#服務端 from socket import * import select server = socket(AF_INET, SOCK_STREAM) server.bind(('127.0.0.1',8093)) server.listen(5) # 設定為非阻塞 server.setblocking(False) # 初始化將服務端socket物件加入監聽列表,後面還要動態新增一些conn連線物件,當accept的時候sk就有感應,當recv的時候conn就有動靜 rlist=[server,] rdata = {}#存放客戶端傳送過來的訊息 wlist=[]#等待寫物件 wdata={}#存放要返回給客戶端的訊息 print('預備!監聽!!!') count = 0 #寫著計數用的,為了看實驗效果用的,沒用 while True: # 開始 select 監聽,對rlist中的服務端server進行監聽,select函式阻塞程序,直到rlist中的套接字被觸發(在此例中,套接字接收到客戶端發來的握手訊號,從而變得可讀,滿足select函式的“可讀”條件),被觸發的(有動靜的)套接字(伺服器套接字)返回給了rl這個返回值裡面; rl,wl,xl=select.select(rlist,wlist,[],0.5) print('%s 次數>>'%(count),wl) count = count + 1 # 對rl進行迴圈判斷是否有客戶端連線進來,當有客戶端連線進來時select將觸發 for sock in rl: # 判斷當前觸發的是不是socket物件, 當觸發的物件是socket物件時,說明有新客戶端accept連線進來了 if sock == server: # 接收客戶端的連線, 獲取客戶端物件和客戶端地址資訊 conn,addr=sock.accept() #把新的客戶端連線加入到監聽列表中,當客戶端的連線有接收訊息的時候,select將被觸發,會知道這個連線有動靜,有訊息,那麼返回給rl這個返回值列表裡面。 rlist.append(conn) else: # 由於客戶端連線進來時socket接收客戶端連線請求,將客戶端連線加入到了監聽列表中(rlist),客戶端傳送訊息的時候這個連線將觸發 # 所以判斷是否是客戶端連線物件觸發 try: data=sock.recv(1024) #沒有資料的時候,我們將這個連線關閉掉,並從監聽列表中移除 if not data: sock.close() rlist.remove(sock) continue print("received {0} from client {1}".format(data.decode(), sock)) #將接受到的客戶端的訊息儲存下來 rdata[sock] = data.decode() #將客戶端連線物件和這個物件接收到的訊息加工成返回訊息,並新增到wdata這個字典裡面 wdata[sock]=data.upper() #需要給這個客戶端回覆訊息的時候,我們將這個連線新增到wlist寫監聽列表中 wlist.append(sock) #如果這個連接出錯了,客戶端暴力斷開了(注意,我還沒有接收他的訊息,或者接收他的訊息的過程中出錯了) except Exception: #關閉這個連線 sock.close() #在監聽列表中將他移除,因為不管什麼原因,它畢竟是斷開了,沒必要再監聽它了 rlist.remove(sock) # 如果現在沒有客戶端請求連線,也沒有客戶端傳送訊息時,開始對傳送訊息列表進行處理,是否需要傳送訊息 for sock in wl: sock.send(wdata[sock]) wlist.remove(sock) wdata.pop(sock) # #將一次select監聽列表中有接收資料的conn物件所接收到的訊息列印一下 # for k,v in rdata.items(): #print(k,'發來的訊息是:',v) # #清空接收到的訊息 # rdata.clear() --------------------------------------- #客戶端 from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8093)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) data=client.recv(1024) print(data.decode('utf-8')) client.close() select網路IO模型的示例程式碼
IO多路複用的機制:
poll機制: Linux #和lselect監聽機制一樣,但是對監聽列表裡面的數量沒有限制,select預設限制是1024個,但是他們兩個都是作業系統輪詢每一個被監聽的檔案描述符(如果數量很大,其實效率不太好),看是否有可讀操作。
epoll機制: Linux #它的監聽機制和上面兩個不同,他給每一個監聽的物件綁定了一個回撥函式,你這個物件有訊息,那麼觸發回撥函式給使用者,使用者就進行系統呼叫來拷貝資料,並不是輪詢監聽所有的被監聽物件,這樣的效率高很多。
五、非同步IO
使用者程序發起read操作之後,立刻就可以開始去做其它的事。而另一方面,從kernel的角度,當它受到一個asynchronous read之後,首先它會立刻返回,所以不會對使用者程序產生任何block。然後,kernel作業系統會等待資料(阻塞)準備完成,然後將資料拷貝到使用者記憶體,當這一切都完成之後,kernel會給使用者程序傳送一個signal,告訴它read操作完成了。
貌似非同步IO這個模型很牛~~但是你發現沒有,這不是我們自己程式碼控制的,都是作業系統完成的,而python在copy資料這個階段沒有提供操縱作業系統的介面,所以用python沒法實現這套非同步IO機制,其他幾個IO模型都沒有解決第二階段的阻塞(使用者態和核心態之間copy資料),但是C語言是可以實現的,因為大家都知道C語言是最接近底層的,雖然我們用python實現不了,但是python仍然有非同步的模組和框架(tornado、twstied,高併發需求的時候用),這些模組和框架很多都是用底層的C語言實現的,它幫我們實現了非同步,你只要使用就可以了,但是你要知道這個非同步是不是很好呀,不需要你自己等待了,作業系統幫你做了所有的事情,你就直接收資料就行了,就像你有一張銀行卡,銀行定期給你打錢一樣。
六、selectors模組
IO複用:為了解釋這個名詞,首先來理解下複用這個概念,複用也就是共用的意思,這樣理解還是有些抽象,為此,咱們來理解下複用在通訊領域的使用,在通訊領域中為了充分利用網路連線的物理介質,往往在同一條網路鏈路上採用時分複用或頻分複用的技術使其在同一鏈路上傳輸多路訊號,到這裡我們就基本上理解了複用的含義,即公用某個“介質”來儘可能多的做同一類(性質)的事,那IO複用的“介質”是什麼呢?為此我們首先來看看伺服器程式設計的模型,客戶端發來的請求服務端會產生一個程序來對其進行服務,每當來一個客戶請求就產生一個程序來服務,然而程序不可能無限制的產生,因此為了解決大量客戶端訪問的問題,引入了IO複用技術,即:一個程序可以同時對多個客戶請求進行服務。也就是說IO複用的“介質”是程序(準確的說複用的是select和poll,因為程序也是靠呼叫select和poll來實現的),複用一個程序(select和poll)來對多個IO進行服務,雖然客戶端發來的IO是併發的但是IO所需的讀寫資料多數情況下是沒有準備好的,因此就可以利用一個函式(select和poll)來監聽IO所需的這些資料的狀態,一旦IO有資料可以進行讀寫了,程序就來對這樣的IO進行服務。 理解完IO複用後,我們在來看下實現IO複用中的三個API(select、poll和epoll)的區別和聯絡 select,poll,epoll都是IO多路複用的機制,I/O多路複用就是通過一種機制,可以監視多個描述符,一旦某個描述符就緒(一般是讀就緒或者寫就緒),能夠通知應用程式進行相應的讀寫操作。但select,poll,epoll本質上都是同步I/O,因為他們都需要在讀寫事件就緒後自己負責進行讀寫,也就是說這個讀寫過程是阻塞的,而非同步I/O則無需自己負責進行讀寫,非同步I/O的實現會負責把資料從核心拷貝到使用者空間。三者的原型如下所示: int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout); int poll(struct pollfd *fds, nfds_t nfds, int timeout); int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout); 1.select的第一個引數nfds為fdset集合中最大描述符值加1,fdset是一個位數組,其大小限制為__FD_SETSIZE(1024),位陣列的每一位代表其對應的描述符是否需要被檢查。第二三四引數表示需要關注讀、寫、錯誤事件的檔案描述符位陣列,這些引數既是輸入引數也是輸出引數,可能會被核心修改用於標示哪些描述符上發生了關注的事件,所以每次呼叫select前都需要重新初始化fdset。timeout引數為超時時間,該結構會被核心修改,其值為超時剩餘的時間。 select的呼叫步驟如下: (1)使用copy_from_user從使用者空間拷貝fdset到核心空間 (2)註冊回撥函式__pollwait (3)遍歷所有fd,呼叫其對應的poll方法(對於socket,這個poll方法是sock_poll,sock_poll根據情況會呼叫到tcp_poll,udp_poll或者datagram_poll) (4)以tcp_poll為例,其核心實現就是__pollwait,也就是上面註冊的回撥函式。 (5)__pollwait的主要工作就是把current(當前程序)掛到裝置的等待佇列中,不同的裝置有不同的等待佇列,對於tcp_poll 來說,其等待佇列是sk->sk_sleep(注意把程序掛到等待佇列中並不代表程序已經睡眠了)。在裝置收到一條訊息(網路裝置)或填寫完檔案數 據(磁碟裝置)後,會喚醒裝置等待佇列上睡眠的程序,這時current便被喚醒了。 (6)poll方法返回時會返回一個描述讀寫操作是否就緒的mask掩碼,根據這個mask掩碼給fd_set賦值。 (7)如果遍歷完所有的fd,還沒有返回一個可讀寫的mask掩碼,則會呼叫schedule_timeout是呼叫select的程序(也就是 current)進入睡眠。當裝置驅動發生自身資源可讀寫後,會喚醒其等待佇列上睡眠的程序。如果超過一定的超時時間(schedule_timeout 指定),還是沒人喚醒,則呼叫select的程序會重新被喚醒獲得CPU,進而重新遍歷fd,判斷有沒有就緒的fd。 (8)把fd_set從核心空間拷貝到使用者空間。 總結下select的幾大缺點: (1)每次呼叫select,都需要把fd集合從使用者態拷貝到核心態,這個開銷在fd很多時會很大 (2)同時每次呼叫select都需要在核心遍歷傳遞進來的所有fd,這個開銷在fd很多時也很大 (3)select支援的檔案描述符數量太小了,預設是1024 2.poll與select不同,通過一個pollfd陣列向核心傳遞需要關注的事件,故沒有描述符個數的限制,pollfd中的events欄位和revents分別用於標示關注的事件和發生的事件,故pollfd陣列只需要被初始化一次。 poll的實現機制與select類似,其對應核心中的sys_poll,只不過poll向核心傳遞pollfd陣列,然後對pollfd中的每個描述符進行poll,相比處理fdset來說,poll效率更高。poll返回後,需要對pollfd中的每個元素檢查其revents值,來得指事件是否發生。 3.直到Linux2.6才出現了由核心直接支援的實現方法,那就是epoll,被公認為Linux2.6下效能最好的多路I/O就緒通知方法。epoll可以同時支援水平觸發和邊緣觸發(Edge Triggered,只告訴程序哪些檔案描述符剛剛變為就緒狀態,它只說一遍,如果我們沒有采取行動,那麼它將不會再次告知,這種方式稱為邊緣觸發),理論上邊緣觸發的效能要更高一些,但是程式碼實現相當複雜。epoll同樣只告知那些就緒的檔案描述符,而且當我們呼叫epoll_wait()獲得就緒檔案描述符時,返回的不是實際的描述符,而是一個代表就緒描述符數量的值,你只需要去epoll指定的一個數組中依次取得相應數量的檔案描述符即可,這裡也使用了記憶體對映(mmap)技術,這樣便徹底省掉了這些檔案描述符在系統呼叫時複製的開銷。另一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,程序只有在呼叫一定的方法後,核心才對所有監視的檔案描述符進行掃描,而epoll事先通過epoll_ctl()來註冊一個檔案描述符,一旦基於某個檔案描述符就緒時,核心會採用類似callback的回撥機制,迅速啟用這個檔案描述符,當程序呼叫epoll_wait()時便得到通知。 epoll既然是對select和poll的改進,就應該能避免上述的三個缺點。那epoll都是怎麼解決的呢?在此之前,我們先看一下epoll 和select和poll的呼叫介面上的不同,select和poll都只提供了一個函式——select或者poll函式。而epoll提供了三個函 數,epoll_create,epoll_ctl和epoll_wait,epoll_create是建立一個epoll控制代碼;epoll_ctl是注 冊要監聽的事件型別;epoll_wait則是等待事件的產生。 對於第一個缺點,epoll的解決方案在epoll_ctl函式中。每次註冊新的事件到epoll控制代碼中時(在epoll_ctl中指定 EPOLL_CTL_ADD),會把所有的fd拷貝進核心,而不是在epoll_wait的時候重複拷貝。epoll保證了每個fd在整個過程中只會拷貝 一次。 對於第二個缺點,epoll的解決方案不像select或poll一樣每次都把current輪流加入fd對應的裝置等待佇列中,而只在 epoll_ctl時把current掛一遍(這一遍必不可少)併為每個fd指定一個回撥函式,當裝置就緒,喚醒等待佇列上的等待者時,就會呼叫這個回撥 函式,而這個回撥函式會把就緒的fd加入一個就緒連結串列)。epoll_wait的工作實際上就是在這個就緒連結串列中檢視有沒有就緒的fd(利用 schedule_timeout()實現睡一會,判斷一會的效果,和select實現中的第7步是類似的)。 對於第三個缺點,epoll沒有這個限制,它所支援的FD上限是最大可以開啟檔案的數目,這個數字一般遠大於2048,舉個例子, 在1GB記憶體的機器上大約是10萬左右,具體數目可以cat /proc/sys/fs/file-max察看,一般來說這個數目和系統記憶體關係很大。 總結: (1)select,poll實現需要自己不斷輪詢所有fd集合,直到裝置就緒,期間可能要睡眠和喚醒多次交替。而epoll其實也需要呼叫 epoll_wait不斷輪詢就緒連結串列,期間也可能多次睡眠和喚醒交替,但是它是裝置就緒時,呼叫回撥函式,把就緒fd放入就緒連結串列中,並喚醒在 epoll_wait中進入睡眠的程序。雖然都要睡眠和交替,但是select和poll在“醒著”的時候要遍歷整個fd集合,而epoll在“醒著”的 時候只要判斷一下就緒連結串列是否為空就行了,這節省了大量的CPU時間,這就是回撥機制帶來的效能提升。 (2)select,poll每次呼叫都要把fd集合從使用者態往核心態拷貝一次,並且要把current往裝置等待佇列中掛一次,而epoll只要 一次拷貝,而且把current往等待佇列上掛也只掛一次(在epoll_wait的開始,注意這裡的等待佇列並不是裝置等待佇列,只是一個epoll內 部定義的等待佇列),這也能節省不少的開銷。 select,poll,epoll select,poll,epoll
這三種IO多路複用模型在不同的平臺有著不同的支援,而epoll在windows下就不支援,好在我們有selectors模組,幫我們預設選擇當前平臺下最合適的,我們只需要寫監聽誰,然後怎麼傳送訊息接收訊息,但是具體怎麼監聽的,選擇的是select還是poll還是epoll,這是selector幫我們自動選擇的。
#服務端 from socket import * import selectors sel=selectors.DefaultSelector() def accept(server_fileobj,mask): conn,addr=server_fileobj.accept() sel.register(conn,selectors.EVENT_READ,read) def read(conn,mask): try: data=conn.recv(1024) if not data: print('closing',conn) sel.unregister(conn) conn.close() return conn.send(data.upper()+b'_SB') except Exception: print('closing', conn) sel.unregister(conn) conn.close() server_fileobj=socket(AF_INET,SOCK_STREAM) server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server_fileobj.bind(('127.0.0.1',8088)) server_fileobj.listen(5) server_fileobj.setblocking(False) #設定socket的介面為非阻塞 sel.register(server_fileobj,selectors.EVENT_READ,accept) #相當於網select的讀列表裡append了一個檔案控制代碼server_fileobj,並且綁定了一個回撥函式accept while True: events=sel.select() #檢測所有的fileobj,是否有完成wait data的 for sel_obj,mask in events: callback=sel_obj.data #callback=accpet callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1) #客戶端 from socket import * c=socket(AF_INET,SOCK_STREAM) c.connect(('127.0.0.1',8088)) while True: msg=input('>>: ') if not msg:continue c.send(msg.encode('utf-8')) data=c.recv(1024) print(data.decode('utf-8')) selector程式碼示例 selectors