c++ 網路程式設計(八)TCP/IP LINUX-epoll/windows-IOCP下 socket opoll函式用法 優於select方法的ep...
原文作者:aircraft
原文連結:https://www.cnblogs.com/DOMLX/p/9622548.html
鍥子:關於併發伺服器中的I/O複用實現方式,前面在網路程式設計系列四還是五來著????我們講過select的方式,但select的效能比較低,當連線數量超過幾百個的時候就很慢了,並不適合以Web伺服器端開發為主流的現代開發環境。因此就有了Linux下的epoll,BSD的kqueue,Solaris的/dev/poll和Windows的IOCP等複用技術。本章就來講講Linux下的epoll技術和Windows下的IOCP模型。
一:IOCP和Epoll之間的異同。
異:
1:IOCP是WINDOWS系統下使用。Epoll是Linux系統下使用。
2:IOCP是IO操作完畢之後,通過Get函式獲得一個完成的事件通知。
Epoll是當你希望進行一個IO操作時,向Epoll查詢是否可讀或者可寫,若處於可讀或可寫狀態後,Epoll會通過epoll_wait進行通知。
3:IOCP封裝了非同步的訊息事件的通知機制,同時封裝了部分IO操作。但Epoll僅僅封裝了一個非同步事件的通知機制,並不負責IO讀寫操作。Epoll保持了事件通知和IO操作間的獨立性,更加簡單靈活。
4:基於上面的描述,我們可以知道Epoll不負責IO操作,所以它只告訴你當前可讀可寫了,並且將協議讀寫緩衝填充,由使用者去讀寫控制,此時我們可以做出額外的許多操作。IOCP則直接將IO通道里的讀寫操作都做完了才通知使用者,當IO通道里發生了堵塞等狀況我們是無法控制的。
同:
1:它們都是非同步的事件驅動的網路模型。
2:它們都可以向底層進行指標資料傳遞,當返回事件時,除可通知事件型別外,還可以通知事件相關資料。
二:Epoll理解與應用。
1、epoll 是什麼?
epoll 是當前在 Linux 下開發大規模併發網路程式的熱門人選, epoll 在 Linux2.6 核心中正式引入,和 select 相似,都是 I/O 多路複用 (IO multiplexing)技術 。
Linux下設計併發網路程式,常用的模型有:
Apache 模型( Process Per Connection ,簡稱 PPC)
TPC ( Thread PerConnection)模型
select 模型和 poll模型。
epoll模型
2、epoll
與select對比優化:
-
基於select的I/O複用技術速度慢的原因:
1,呼叫select函式後常見的針對所有檔案描述符的迴圈語句。它每次事件發生需要遍歷所有檔案描述符,找出發生變化的檔案描述符。(以前寫的示例沒加迴圈)
2,每次呼叫select函式時都需要向該函式傳遞監視物件資訊。即每次呼叫select函式時向作業系統傳遞監視物件資訊,至於為什麼要傳?是因為我們監視的套接字變化的函式,而套接字是作業系統管理的。(這個才是最耗效率的)
註釋:基於這樣的原因並不是說select就沒用了,在這樣的情況下就適合選用select:1,服務端接入者少2,程式應具有相容性。
-
epoll是怎麼優化select問題的:
1,每次發生事件它不需要迴圈遍歷所有檔案描述符,它把發生變化的檔案描述符單獨集中到了一起。
2,僅向作業系統傳遞1次監視物件資訊,監視範圍或內容發生變化時只通知發生變化的事項。
-
實現epoll時必要的函式和結構體
函式:
epoll_create:建立儲存epoll檔案描述符的空間,該函式也會返回檔案描述符,所以終止時,也要呼叫close函式。(建立記憶體空間)
epoll_ctl:向空間註冊,新增或修改檔案描述符。(註冊監聽事件)
epoll_wait:與select函式類似,等待檔案描述符發生變化。(監聽事件回撥)
結構體:
struct epoll_event
{
__uint32_t events;
epoll_data_t data;
}
typedef union epoll_data
{
void *ptr;
int fd;
__uinit32_t u32;
__uint64_t u64;
} epoll_data_t;
epoll的幾個函式的介紹:
1、epoll_create函式:
/** * @brief該函式生成一個epoll專用的檔案描述符。它其實是在核心申請一空間,用來存放你想關注的socket fd上是否發生以及發生了什麼事件。 * * @paramsizesize就是你在這個epoll fd上能關注的最大socket fd數 * * @return生成的檔案描述符 */ int epoll_create(int size);
2、epoll_ctl函式:
/** * @brief該函式用於控制某個epoll檔案描述符上的事件,可以註冊事件,修改事件,刪除事件。 * * @paramepfd由 epoll_create 生成的epoll專用的檔案描述符 * @paramop要進行的操作例如註冊事件,可能的取值EPOLL_CTL_ADD 註冊、EPOLL_CTL_MOD 修 改、EPOLL_CTL_DEL 刪除 * @paramfd關聯的檔案描述符 * @paramevent指向epoll_event的指標 * * @return0succ *-1fail */ int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
其中用到的資料結構結構如下:
op值:
EPOLL_CTL_ADD:註冊新的fd到epfd中;
EPOLL_CTL_MOD:修改已經註冊的fd的監聽事件;
EPOLL_CTL_DEL:從epfd中刪除一個fd;
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
常用的事件型別:
EPOLLIN :表示對應的檔案描述符可以讀;
EPOLLOUT:表示對應的檔案描述符可以寫;
EPOLLPRI:表示對應的檔案描述符有緊急的資料可讀
EPOLLERR:表示對應的檔案描述符發生錯誤;
EPOLLHUP:表示對應的檔案描述符被結束通話;
EPOLLET: 表示對應的檔案描述符有事件發生;
例:
<code class="language-cpp">struct epoll_event ev; //設定與要處理的事件相關的檔案描述符 ev.data.fd=listenfd; //設定要處理的事件型別 ev.events=EPOLLIN|EPOLLET; //註冊epoll事件 epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev); </code>
3、epoll_wait函式:
/** * @brief該函式用於輪詢I/O事件的發生 * * @paramepfd由epoll_create 生成的epoll專用的檔案描述符 * @paramevents用於回傳代處理事件的陣列 * @parammaxevents每次能處理的事件數 * @paramtimeout等待I/O事件發生的超時值;-1相當於阻塞,0相當於非阻塞。一般用-1即可 * * @return>=0返回發生事件數 *-1錯誤 */ int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout);
用改良的epoll實現回聲服務端程式碼:
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <arpa/inet.h> #include <sys/socket.h> #include <sys/epoll.h> #define BUF_SIZE 100 #define EPOLL_SIZE 50 void error_handling(char *buf); int main(int argc, const char * argv[]) { int serv_sock, clnt_sock; struct sockaddr_in serv_adr, clnt_adr; socklen_t adr_sz; int str_len, i; char buf[BUF_SIZE]; //類似select的fd_set變數檢視監視物件的狀態變化,epoll_event結構體將發生變化的檔案描述符單獨集中到一起 struct epoll_event *ep_events; struct epoll_event event; int epfd, event_cnt; if(argc != 2) { printf("Usage: %s <port> \n", argv[0]); exit(1); } serv_sock = socket(PF_INET, SOCK_STREAM, 0); if(serv_sock == -1) error_handling("socket() error"); memset(&serv_adr, 0, sizeof(serv_adr)); serv_adr.sin_family = AF_INET; serv_adr.sin_addr.s_addr = htonl(INADDR_ANY); serv_adr.sin_port = htons(atoi(argv[1])); if(bind(serv_sock, (struct sockaddr *) &serv_adr, sizeof(serv_adr)) == -1) error_handling("bind() error"); if(listen(serv_sock, 5) == -1) error_handling("listen() error"); //建立檔案描述符的儲存空間稱為“epoll例程” epfd = epoll_create(EPOLL_SIZE); ep_events = malloc(sizeof(struct epoll_event) *EPOLL_SIZE); //新增讀取事件的監視(註冊事件) event.events = EPOLLIN;//讀取資料事件 event.data.fd = serv_sock; epoll_ctl(epdf, EPOLL_CTL_ADD, serv_sock, &event); while (1) { //響應事件,返回發生事件的檔案描述符數 event_cnt = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1);//傳-1時,一直等待直到事件發生 if(event_cnt == -1) { puts("epoll_wait() error"); break; } //服務端套接字和客服端套接字 for (i = 0; i < event_cnt; i++) { if(ep_events[i].data.fd == serv_sock)//服務端與客服端建立連線 { adr_sz = sizeof(clnt_adr); clnt_sock = accept(serv_sock, (struct sockaddr *)&clnt_adr, &adr_sz); event.events = EPOLLIN; event.data.fd = clnt_sock; epoll_ctl(epfd, EPOLL_CTL_ADD, clnt_sock, &event); printf("connected client: %d \n", clnt_sock); } else//連線之後傳遞資料 { str_len = read(ep_events[i].data.fd, buf, BUF_SIZE); if(str_len == 0) { //刪除事件 epoll_ctl(epfd, EPOLL_CTL_DEL, ep_events[i].data.fd, NULL); close(ep_events[i].data.fd); printf("closed client: %d \n", ep_events[i].data.fd); } else { write(ep_events[i].data.fd, buf, str_len); } } } } close(serv_sock); close(epfd); return 0; } void error_handling(char *message) { fputs(message, stderr); fputc('\n', stderr); exit(1); }
epoll客戶端程式碼:
#define _GNU_SOURCE #include "sysutil.h" #include "buffer.h" #include <sys/epoll.h> int main(int argc, char const *argv[]) { //建立client套接字 int sockfd = tcp_client(0); //呼叫非阻塞connect函式 int ret = nonblocking_connect(sockfd, "localhost", 9981, 5000); if(ret == -1) { perror("Connect Timeout ."); exit(EXIT_FAILURE); } //將三個fd設定為Non-Blocking activate_nonblock(sockfd); activate_nonblock(STDIN_FILENO); activate_nonblock(STDOUT_FILENO); buffer_t recvbuf; //sockfd -> Buffer -> stdout buffer_t sendbuf; //stdin -> Buffer -> sockfd //初始化緩衝區 buffer_init(&recvbuf); buffer_init(&sendbuf); //建立epoll int epollfd = epoll_create1(0); if(epollfd == -1) ERR_EXIT("create epoll"); struct epoll_event events[1024]; uint32_t sockfd_event = 0; uint32_t stdin_event = 0; uint32_t stdout_event = 0; epoll_add_fd(epollfd, sockfd, sockfd_event); epoll_add_fd(epollfd, STDIN_FILENO, stdin_event); epoll_add_fd(epollfd, STDOUT_FILENO, stdout_event); while(1) { //重新裝填epoll事件 sockfd_event = 0; stdin_event = 0; stdout_event = 0; //epoll無法每次都重新裝填,所以給每個fd新增一個空事件 if(buffer_is_readable(&sendbuf)) { sockfd_event |= kWriteEvent; } if(buffer_is_writeable(&sendbuf)) { stdin_event |= kReadEvent; } if(buffer_is_readable(&recvbuf)) { stdout_event |= kWriteEvent; } if(buffer_is_writeable(&recvbuf)) { sockfd_event |= kReadEvent; } epoll_mod_fd(epollfd, sockfd, sockfd_event); epoll_mod_fd(epollfd, STDIN_FILENO, stdin_event); epoll_mod_fd(epollfd, STDOUT_FILENO, stdout_event); //監聽fd陣列 int nready = epoll_wait(epollfd, events, 1024, 5000); if(nready == -1) ERR_EXIT("epoll wait"); else if(nready == 0) { printf("epoll timeout.\n"); continue; } else { int i; for(i = 0; i < nready; ++i) { int peerfd = events[i].data.fd; int revents = events[i].events; if(peerfd == sockfd && revents & kReadREvent) { //從sockfd接收資料到recvbuf if(buffer_read(&recvbuf, peerfd) == 0) { fprintf(stderr, "server close.\n"); exit(EXIT_SUCCESS); } } if(peerfd == sockfd && revents & kWriteREvent) { buffer_write(&sendbuf, peerfd); //將sendbuf中的資料寫入sockfd } if(peerfd == STDIN_FILENO && revents & kReadREvent) { //從stdin接收資料寫入sendbuf if(buffer_read(&sendbuf, peerfd) == 0) { fprintf(stderr, "exit.\n"); exit(EXIT_SUCCESS); } } if(peerfd == STDOUT_FILENO && revents & kWriteREvent) { buffer_write(&recvbuf, peerfd); //將recvbuf中的資料輸出至stdout } } } } }
條件觸發和邊緣觸發
-
什麼是條件觸發和邊緣觸發?它們是指事件響應的方式,epoll預設是條件觸發的方式。條件觸發是指:只要輸入緩衝中有資料就會一直通知該事件,迴圈響應epoll_wait。而邊緣觸發是指:輸入緩衝收到資料時僅註冊1次該事件,即使輸入緩衝中還留有資料,也不會再進行註冊,只響應一次。
-
邊緣觸發相對條件觸發的優點:可以分離接收資料和處理資料的時間點,從實現模型的角度看,邊緣觸發更有可能帶來高效能。
-
將上面epoll例項改為邊緣觸發:
1,首先改寫 event.events = EPOLLIN | EPOLLET; (EPOLLIN:讀取資料事件EPOLLET:邊緣觸發方式)
2,邊緣觸發只響應一次接收資料事件,所以要一次性全部讀取輸入緩衝中的資料,那麼就需要判斷什麼時候資料讀取完了?Linux聲明瞭一個全域性的變數:int errno; (error.h中),它能記錄發生錯誤時提供額外的資訊。這裡就可以用它來判斷是否讀取完資料:
str_len = read(...); if(str_len < 0) { if(errno == EAGAIN) //讀取輸入緩衝中的全部資料的標誌 break; }
3,邊緣觸發方式下,以阻塞方式工作的read&write有可能會引起服務端的長時間停頓。所以邊緣觸發一定要採用非阻塞的套接字資料傳輸形式。那麼怎麼將套接字的read,write資料傳輸形式修改為非阻塞模式呢?
//fd套接字檔案描述符,將此套接字資料傳輸模式修改為非阻塞 void setnonblockingmode(int fd) { int flag = fcntl(fd, F_GETFL,0); //得到套接字原來屬性 fcntl(fd, F_SETFL, flag | O_NONBLOCK);//在原有屬性基礎上設定新增非阻塞模式 }
三.IOCP理解與應用。
扯遠點。首先傳統伺服器的網路IO流程如下:
接到一個客戶端連線->建立一個執行緒負責這個連線的IO操作->持續對新執行緒進行資料處理->全部資料處理完畢->終止執行緒。
但是這樣的設計代價是:
-
1:每個連線建立一個執行緒,將導致過多的執行緒。
-
2:維護執行緒所消耗的堆疊記憶體過大。
-
3:作業系統建立和銷燬執行緒過大。
- 4:執行緒之間切換的上下文代價過大。
-
3:作業系統建立和銷燬執行緒過大。
-
2:維護執行緒所消耗的堆疊記憶體過大。
此時我們可以考慮使用執行緒池解決其中3和4的問題。這種傳統的伺服器網路結構稱之為會話模型。
後來我們為防止大量執行緒的維護,建立了I/O模型,它被希望要求可以:
1:允許一個執行緒在不同時刻給多個客戶端進行服務。
2:允許一個客戶端在不同時間被多個執行緒服務。
這樣做的話,我們的執行緒則會大幅度減少,這就要求以下兩點:
1:客戶端狀態的分離,之前會話模式我們可以通過執行緒狀態得知客戶端狀態,但現在客戶端狀態要通過其他方式獲取。
2:I/O請求的分離。一個執行緒不再服務於一個客戶端會話,則要求客戶端對這個執行緒提交I/O處理請求。
那麼就產生了這樣一個模式,分為三部分:
-
1:會話狀態管理模組。它負責接收到一個客戶端連線,就建立一個會話狀態。
-
2:當會話狀態發生改變,例如斷掉連線,接收到網路訊息,就傳送一個I/O請求給 I/O工作模組進行處理。
- 3:I/O工作模組接收到一個I/O請求後,從執行緒池裡喚醒一個工作執行緒,讓該工作執行緒處理這個I/O請求,處理完畢後,該工作執行緒繼續掛起。
-
2:當會話狀態發生改變,例如斷掉連線,接收到網路訊息,就傳送一個I/O請求給 I/O工作模組進行處理。
上面的做法,則將網路連線 和I/O工作執行緒分離為三個部分,相互通訊僅依靠 I/O請求。此時可知有以下一些建議:
-
1:在進行I/O請求處理的工作執行緒是被喚醒的工作執行緒,一個CPU對應一個的話,可以最大化利用CPU。所以 活躍執行緒的個數 建議等於 硬體CPU個數。
-
2:工作執行緒我們開始建立了執行緒池,免除建立和銷燬執行緒的代價。因為執行緒是對I/O進行操作的,且一一對應,那麼當I/O全部並行時,工作執行緒必須滿足I/O並行操作需求,所以 執行緒池內最大工作執行緒個數 建議大於或者等於 I/O並行個數。
- 3:但是我們可知CPU個數又限制了活躍的執行緒個數,那麼執行緒池過大意義很低,所以按常規建議 執行緒池大小 等於 CPU個數*2 左右為佳。例如,8核伺服器建議建立16個工作執行緒的執行緒池。 上面描述的依然是I/O模型並非IOCP,那麼IOCP是什麼呢,全稱 IO完成埠。
-
2:工作執行緒我們開始建立了執行緒池,免除建立和銷燬執行緒的代價。因為執行緒是對I/O進行操作的,且一一對應,那麼當I/O全部並行時,工作執行緒必須滿足I/O並行操作需求,所以 執行緒池內最大工作執行緒個數 建議大於或者等於 I/O並行個數。
它是一種WIN32的網路I/O模型,既包括了網路連線部分,也負責了部分的I/O操作功能,用於方便我們控制有併發性的網路I/O操作。它有如下特點:
-
1:它是一個WIN32核心物件,所以無法運行於Linux.
-
2:它自己負責維護了工作執行緒池,同時也負責了I/O通道的記憶體池。
-
3:它自己實現了執行緒的管理以及I/O請求通知,最小化的做到了執行緒的上下文切換。
- 4:它自己實現了執行緒的優化排程,提高了CPU和記憶體緩衝的使用率。
-
3:它自己實現了執行緒的管理以及I/O請求通知,最小化的做到了執行緒的上下文切換。
-
2:它自己負責維護了工作執行緒池,同時也負責了I/O通道的記憶體池。
使用IOCP的基本步驟很簡單:
-
1:建立IOCP物件,由它負責管理多個Socket和I/O請求。CreateIoCompletionPort需要將IOCP物件和IOCP控制代碼繫結。
-
2:建立一個工作執行緒池,以便Socket傳送I/O請求給IOCP物件後,由這些工作執行緒進行I/O操作。注意,建立這些執行緒的時候,將這些執行緒繫結到IOCP上。
-
3:建立一個監聽的socket。
-
4:輪詢,當接收到了新的連線後,將socket和完成埠進行關聯並且投遞給IOCP一個I/O請求。注意:將Socket和IOCP進行關聯的函式和建立IOCP的函式一樣,都是CreateIoCompletionPort,不過注意傳參必然是不同的。
-
5:因為是非同步的,我們可以去做其他,等待IOCP將I/O操作完成會回饋我們一個訊息,我們再進行處理。
-
其中需要知道的是:I/O請求被放在一個I/O請求佇列裡面,對,是佇列,LIFO機制。當一個裝置處理完I/O請求後,將會將這個完成後的I/O請求丟回IOCP的I/O完成佇列。
-
我們應用程式則需要在GetQueuedCompletionStatus去詢問IOCP,該I/O請求是否完成。
- 其中有一些特殊的事情要說明一下,我們有時有需要人工的去投遞一些I/O請求,則需要使用PostQueuedCompletionStatus函式向IOCP投遞一個I/O請求到它的請求佇列中。
-
我們應用程式則需要在GetQueuedCompletionStatus去詢問IOCP,該I/O請求是否完成。
-
其中需要知道的是:I/O請求被放在一個I/O請求佇列裡面,對,是佇列,LIFO機制。當一個裝置處理完I/O請求後,將會將這個完成後的I/O請求丟回IOCP的I/O完成佇列。
-
5:因為是非同步的,我們可以去做其他,等待IOCP將I/O操作完成會回饋我們一個訊息,我們再進行處理。
-
4:輪詢,當接收到了新的連線後,將socket和完成埠進行關聯並且投遞給IOCP一個I/O請求。注意:將Socket和IOCP進行關聯的函式和建立IOCP的函式一樣,都是CreateIoCompletionPort,不過注意傳參必然是不同的。
-
3:建立一個監聽的socket。
-
2:建立一個工作執行緒池,以便Socket傳送I/O請求給IOCP物件後,由這些工作執行緒進行I/O操作。注意,建立這些執行緒的時候,將這些執行緒繫結到IOCP上。
參考部落格:https://blog.csdn.net/educast/article/details/15500349
參考部落格:https://blog.csdn.net/u010223072/article/details/49276415
參考書籍:《TCP/IP網路程式設計--尹聖雨》