不同于IO堆疊對象在IO完成時通過觸發事件或觸發CompletionRoutine回調函數,IOCP模型將socket和完成端口對象(CompletionPort,簡稱CP對象)綁定,當IO操作完成時,會改變該對象的狀態,而我們通過完成端口對象,即可以確認IO操作是不是完成。
創建完成端口對象的API以下:
HANDLE WINAPI CreateIoCompletionPort( __in HANDLE FileHandle, __in_opt HANDLE ExistingCompletionPort, __in ULONG_PTR CompletionKey, __in DWORD NumberOfConcurrentThreads );
CreateIoCompletionPort既可以創建完成端口對象,也能夠用來將socket和完成對象綁定,通過對其賦予不同的參數,可以實現不同的功能:
FileHandle ——創建CP對象時傳入INVALID_HANDLE_VALUE;綁定socket和CP對象時傳入socket描寫符。
ExistingCompletionPort——創建CP對象時傳入NULL;綁定socket和CP對象傳入完成端口對象的句柄;
CompletionKey —— 創建CP對象是傳入0;綁定socket和CP對象時作為參數傳遞給GetQueuedCompletionStatus
NumberOfConcurrentThreads—— 分配給CP對象用于處理IO的線程數。如果參數是0,系統中的CPU個數就是最大的線程數。
返回值 —— 返回CP對象的句柄。
以下,我們創建了1個完成端口對象:
hComPort=CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0);
我們依然用CreateIoCompletionPort將socket和CP對象進行綁定:
CreateIoCompletionPort((HANDLE)clntSock,hComPort,(DWORD)handleInfo,0);
clntSock是我們要進行綁定的socket描寫符,hComPort使我們之前創建的CP對象,而handleInfo是我們自定義的結構體,作為參數可以傳給GetQueuedCompletionStatus函數。
GetQueuedCompletionStatus在我們定義的線程函數中調用,用于獲得CP對象的狀態。當IO操作未完成時,該函數會產生阻塞;若IO操作完成,函數產生返回。其函數原型以下:
BOOL WINAPI GetQueuedCompletionStatus( __in HANDLE CompletionPort, __out LPDWORD lpNumberOfBytes, __out PULONG_PTR lpCompletionKey, __out LPOVERLAPPED *lpOverlapped, __in DWORD dwMilliseconds );
CompletionPort —— 進行注冊過的完成對象的句柄。
lpNumberOfBytes —— 完成IO對象時傳遞或接受的字節數。
lpCompletionKey —— 使用CreateIoCompletionPort注冊時傳遞的參數,參數可以傳遞我們自定義的結構信息。
lpOverlapped —— 調用WSARecv或WSASend時傳遞的OVERLAPPED對象指針。
dwMilliseconds —— GetQueuedCompletionStatus阻塞的時間,如果設置成INFINITE時無窮期等待。
#include "stdafx.h" #include "stdio.h" #include "process.h" #include "stdlib.h" #include "WinSock2.h" #include "Windows.h" #pragma comment(lib,"ws2_32.lib") #define BUF_SIZE 100 #define READ 3 #define WRITE 5 typedef struct { SOCKEThClntSock; SOCKADDRclntAdr; }HANDLE_DATA,*LPHANDLE_DATA; typedef struct { OVERLAPPEDoverlapped; WSABUFwsaBuf; char buffer[BUF_SIZE]; int rwMode; }IO_DATA,*LPIO_DATA; unsigned WINAPI ThreadMain(LPVOID CompletionPortIO); void ErrorHandler(char* message); int _tmain(int argc, _TCHAR* argv[]) { WSADatawsaData; HANDLEhComPort; SYSTEM_INFOsysInfo; LPIO_DATAioInfo; LPHANDLE_DATAhandleInfo; SOCKETservSock; SOCKADDR_INservAddr; DWORDrecvBytes,i,flags=0; WSAStartup(MAKEWORD(2,2),&wsaData); hComPort=CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0); GetSystemInfo(&sysInfo); for(i=0;i<sysInfo.dwNumberOfProcessors;i++) _beginthreadex(NULL,0,ThreadMain,(LPVOID)hComPort,0,NULL); servSock=WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED); if(servSock==INVALID_SOCKET) ErrorHandler("WSASocket Error"); memset(&servAddr,0,sizeof(servAddr)); servAddr.sin_family=AF_INET; servAddr.sin_addr.s_addr=htonl(INADDR_ANY); servAddr.sin_port=htons(atoi("8888")); if(bind(servSock,(SOCKADDR*)&servAddr,sizeof(servAddr))==SOCKET_ERROR) ErrorHandler("bind error"); if(listen(servSock,5)==SOCKET_ERROR) ErrorHandler("listen error"); while(1) { SOCKETclntSock; SOCKADDR_INclntAddr; int clntAddrSz; clntSock=accept(servSock,(SOCKADDR*)&clntAddr,&clntAddrSz); handleInfo=(LPHANDLE_DATA)malloc(sizeof(HANDLE_DATA)); handleInfo->hClntSock=clntSock; memcpy(&(handleInfo->clntAdr),&clntAddr,sizeof(clntAddr)); CreateIoCompletionPort((HANDLE)clntSock,hComPort,(DWORD)handleInfo,0); ioInfo=(LPIO_DATA)malloc(sizeof(IO_DATA)); memset(&(ioInfo->overlapped),0,sizeof(OVERLAPPED)); ioInfo->wsaBuf.len=BUF_SIZE; ioInfo->wsaBuf.buf=ioInfo->buffer; ioInfo->rwMode=READ; WSARecv(handleInfo->hClntSock,&(ioInfo->wsaBuf),1,&recvBytes,&flags,&(ioInfo->overlapped),NULL); } WSACleanup(); return 0; } unsigned WINAPI ThreadMain(LPVOID CompletionPortIO) { HANDLEhComPort=(HANDLE)CompletionPortIO; SOCKET sock; DWORDbytesTrans; LPHANDLE_DATAhandleInfo; LPIO_DATAioInfo; DWORD flags; while(1) { GetQueuedCompletionStatus(hComPort,&bytesTrans,(PULONG_PTR)&handleInfo,(LPOVERLAPPED*)&ioInfo,INFINITE); sock=handleInfo->hClntSock; if(ioInfo->rwMode==READ) { puts("message received!"); if(bytesTrans==0) { closesocket(sock); free(handleInfo); free(ioInfo); continue; } memset(&(ioInfo->overlapped),0,sizeof(OVERLAPPED)); ioInfo->wsaBuf.len=bytesTrans; ioInfo->rwMode=WRITE; WSASend(sock,&(ioInfo->wsaBuf),1,NULL,0,&(ioInfo->overlapped),NULL); ioInfo=(LPIO_DATA)malloc(sizeof(IO_DATA)); memset(&(ioInfo->overlapped),0,sizeof(OVERLAPPED)); ioInfo->wsaBuf.len=BUF_SIZE; ioInfo->wsaBuf.buf=ioInfo->buffer; ioInfo->rwMode=READ; WSARecv(sock,&(ioInfo->wsaBuf),1,NULL,&flags,&(ioInfo->overlapped),NULL); } else { puts("message sent"); free(ioInfo); } } return 0; } void ErrorHandler(char* message) { fputs(message,stderr); fputc('\n',stderr); exit(1); }
Github位置:
https://github.com/HymanLiuTS/NetDevelopment
克隆本項目:
git clone git@github.com:HymanLiuTS/NetDevelopment.git
獲得本文源代碼:
git checkout NL56
上一篇 Java 反射機制淺析
下一篇 python-初始篇(一)