您的位置:首页 > 理论基础 > 计算机网络

7种网络编程I/O模型代码实现实例

2014-01-20 14:37 666 查看

部分代码参考《[WINDOWS网络与通信程序设计].王艳平》,网络中一些I/O模型的代码都没有对socket是否可写做过深入研究,我这边会提供一些解决方法。

阻塞模式下,send会发生阻塞(非阻塞模式下send返回WSAEWOULDBLOCK错误,重叠I/O下表现为投递的发送请求一直无法完成)的情况一般可以分为3种 :

1, 服务器虽然发送了大量数据,但客户端并未调用recv函数去接。

2,网络状况不佳,发送缓冲区中的数据一直发不出去。

3,发送数据量很大,如下载功能,协议发送数据的速度比不上send函数将数据拷贝到发送缓冲区的速度。

对于1,2情况,我们似乎可以直接关闭套接字,让客户端重新请求。但对于3,却不行。而且实际操作过程中,我们无法区分是1,2,还是3,我们能做的是尽量去保证发送的正确性。当然防止1情况或者2情况中长时间网络不畅,可以设定超时。若socket一直处于不可写状态超过1分钟,那么就关闭套接字。在最后的IOCP模型中就加入了这种超时机制。其他模型若要加入,可参考它来做。

一,基本的阻塞模型

[cpp]
view plaincopyprint?

#include <WinSock2.h> #include <Windows.h> #include <stdio.h> #pragma comment(lib,"Ws2_32.lib") DWORD WINAPI WorkThread(void* param) { SOCKET* psClient = (SOCKET*)param; char buf[4096]; while(true) { int len = recv(*psClient,buf,4096,0); if(len <= 0) { printf("recv失败!%d\n",WSAGetLastError()); Sleep(5000); break; } buf[len] = '\0'; printf("收到数据:%s\n",buf); } closesocket(*psClient); delete psClient; return 0; } int main() { WSAData wsaData; if(0 != WSAStartup(MAKEWORD(2,2),&wsaData)) { printf("WSAStartup失败!\n",WSAGetLastError()); Sleep(5000); return 0; } USHORT nPort = 3456; SOCKET sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); sockaddr_in sin; sin.sin_family = AF_INET; sin.sin_port = htons(nPort); sin.sin_addr.S_un.S_addr = INADDR_ANY; if(SOCKET_ERROR == ::bind(sListen,(sockaddr*)&sin,sizeof(sin))) { printf("bind失败!%d\n",WSAGetLastError()); Sleep(5000); return -1; } ::listen(sListen,5); while(true) { sockaddr_in addrRemote; int nAddrLen = sizeof(addrRemote); SOCKET *psClient = new SOCKET; *psClient = accept(sListen,(sockaddr*)&addrRemote,&nAddrLen); HANDLE hThread = CreateThread(NULL,0,WorkThread,psClient,0,NULL); CloseHandle(hThread); } closesocket(sListen); WSACleanup(); }

#include <WinSock2.h>
#include <Windows.h>
#include <stdio.h>

#pragma comment(lib,"Ws2_32.lib")

DWORD WINAPI WorkThread(void* param)
{
SOCKET* psClient = (SOCKET*)param;
char buf[4096];
while(true)
{
int len = recv(*psClient,buf,4096,0);
if(len <= 0)
{
printf("recv失败!%d\n",WSAGetLastError());
Sleep(5000);
break;
}
buf[len] = '\0';
printf("收到数据:%s\n",buf);
}
closesocket(*psClient);
delete psClient;
return 0;
}

int main()
{
WSAData wsaData;
if(0 != WSAStartup(MAKEWORD(2,2),&wsaData))
{
printf("WSAStartup失败!\n",WSAGetLastError());
Sleep(5000);
return 0;
}
USHORT nPort = 3456;
SOCKET sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_port = htons(nPort);
sin.sin_addr.S_un.S_addr = INADDR_ANY;

if(SOCKET_ERROR == ::bind(sListen,(sockaddr*)&sin,sizeof(sin)))
{
printf("bind失败!%d\n",WSAGetLastError());
Sleep(5000);
return -1;
}

::listen(sListen,5);

while(true)
{
sockaddr_in addrRemote;
int nAddrLen = sizeof(addrRemote);
SOCKET *psClient = new SOCKET;
*psClient = accept(sListen,(sockaddr*)&addrRemote,&nAddrLen);
HANDLE hThread = CreateThread(NULL,0,WorkThread,psClient,0,NULL);
CloseHandle(hThread);
}
closesocket(sListen);
WSACleanup();
}


二,无任何优化的非阻塞模型

[cpp]
view plaincopyprint?

#include <WinSock2.h>
#include <Windows.h>
#include <stdio.h>
#include <vector>
using namespace std;

#pragma comment(lib,"Ws2_32.lib")

CRITICAL_SECTION g_cs;
HANDLE g_StartEvent;
vector<SOCKET> g_vecClients;
int g_iVecSize = 0;
DWORD WINAPI WorkThread(void* param)
{
char buf[4096];
while(1)
{
if(g_vecClients.empty())
{
ResetEvent(g_StartEvent);
WaitForSingleObject(g_StartEvent,INFINITE);
}

EnterCriticalSection(&g_cs);
for(vector<SOCKET>::iterator it = g_vecClients.begin();it != g_vecClients.end();)
{
int len = recv(*it,buf,4096,0);
if(len == SOCKET_ERROR)
{
if(WSAEWOULDBLOCK != WSAGetLastError())
{
printf("recv Error:%d\n",WSAGetLastError());
closesocket(*it);
it = g_vecClients.erase(it);
}
else
{
printf("%d.",*it);
++it;
}
}
else
{
buf[len] = 0;
printf("收到数据: %s\n",buf);
++it;
}
}
LeaveCriticalSection(&g_cs);
Sleep(100);

}
return 0;
}

int main()
{
InitializeCriticalSectionAndSpinCount(&g_cs,4000);
g_StartEvent = CreateEvent(NULL,FALSE,FALSE,NULL);

WSAData wsaDate;
WSAStartup(MAKEWORD(2,2),&wsaDate);
USHORT nport = 3456;
u_long ul = 1;
SOCKET s = socket(AF_INET,SOCK_STREAM,0);
ioctlsocket(s,FIONBIO,&ul);
sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_port = htons(nport);
sin.sin_addr.S_un.S_addr = ADDR_ANY;

if(SOCKET_ERROR == ::bind(s,(sockaddr*)&sin,sizeof(sin)))
{
return -1;
}

::listen(s,5);

HANDLE hThread = CreateThread(NULL,0,WorkThread,NULL,0,NULL);
CloseHandle(hThread);

while(true)
{
sockaddr_in addrRemote;
int nAddrLen = sizeof(addrRemote);
SOCKET sClient = accept(s,(sockaddr*)&addrRemote,&nAddrLen);
if(sClient != SOCKET_ERROR)
{
EnterCriticalSection(&g_cs);
g_vecClients.push_back(sClient);
LeaveCriticalSection(&g_cs);
if(g_vecClients.size() == 1)
SetEvent(g_StartEvent);
}
else if(WSAEWOULDBLOCK == WSAGetLastError())
{
printf(".");
Sleep(100);
}
else
{
printf("accept failed! %d\n",WSAGetLastError());
}
}
closesocket(s);
WSACleanup();
CloseHandle(g_StartEvent);
DeleteCriticalSection(&g_cs);
}

#include <WinSock2.h>
#include <Windows.h>
#include <stdio.h>
#include <vector>
using namespace std;

#pragma comment(lib,"Ws2_32.lib")

CRITICAL_SECTION g_cs;
HANDLE           g_StartEvent;
vector<SOCKET> g_vecClients;
int g_iVecSize = 0;
DWORD WINAPI WorkThread(void* param)
{
char buf[4096];
while(1)
{
if(g_vecClients.empty())
{
ResetEvent(g_StartEvent);
WaitForSingleObject(g_StartEvent,INFINITE);
}

EnterCriticalSection(&g_cs);
for(vector<SOCKET>::iterator it = g_vecClients.begin();it != g_vecClients.end();)
{
int len = recv(*it,buf,4096,0);
if(len == SOCKET_ERROR)
{
if(WSAEWOULDBLOCK != WSAGetLastError())
{
printf("recv Error:%d\n",WSAGetLastError());
closesocket(*it);
it = g_vecClients.erase(it);
}
else
{
printf("%d.",*it);
++it;
}
}
else
{
buf[len] = 0;
printf("收到数据: %s\n",buf);
++it;
}
}
LeaveCriticalSection(&g_cs);
Sleep(100);

}
return 0;
}

int main()
{
InitializeCriticalSectionAndSpinCount(&g_cs,4000);
g_StartEvent = CreateEvent(NULL,FALSE,FALSE,NULL);

WSAData wsaDate;
WSAStartup(MAKEWORD(2,2),&wsaDate);
USHORT nport = 3456;
u_long ul = 1;
SOCKET s = socket(AF_INET,SOCK_STREAM,0);
ioctlsocket(s,FIONBIO,&ul);
sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_port = htons(nport);
sin.sin_addr.S_un.S_addr = ADDR_ANY;

if(SOCKET_ERROR == ::bind(s,(sockaddr*)&sin,sizeof(sin)))
{
return -1;
}

::listen(s,5);

HANDLE hThread = CreateThread(NULL,0,WorkThread,NULL,0,NULL);
CloseHandle(hThread);

while(true)
{
sockaddr_in addrRemote;
int nAddrLen = sizeof(addrRemote);
SOCKET sClient = accept(s,(sockaddr*)&addrRemote,&nAddrLen);
if(sClient != SOCKET_ERROR)
{
EnterCriticalSection(&g_cs);
g_vecClients.push_back(sClient);
LeaveCriticalSection(&g_cs);
if(g_vecClients.size() == 1)
SetEvent(g_StartEvent);
}
else if(WSAEWOULDBLOCK == WSAGetLastError())
{
printf(".");
Sleep(100);
}
else
{
printf("accept failed! %d\n",WSAGetLastError());
}
}
closesocket(s);
WSACleanup();
CloseHandle(g_StartEvent);
DeleteCriticalSection(&g_cs);
}


三,select模型

[cpp]
view plaincopyprint?

#include <WinSock2.h>
#include <Windows.h>
#include <MSWSock.h>
#include <stdio.h>
#include <map>
using namespace std;

#pragma comment(lib,"Ws2_32.lib")
#pragma comment(lib,"Mswsock.lib")

struct ThreadObj{
OVERLAPPED *pOl;
HANDLE s;
};

int g_iIndex = 0;
map<SOCKET,char*> g_map;

int main()
{
WSAData wsaData;
if(0 != WSAStartup(MAKEWORD(2,2),&wsaData))
{
printf("初始化失败!%d\n",WSAGetLastError());
Sleep(5000);
return -1;
}
USHORT nport = 3456;
SOCKET sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
u_long ul = 1;
ioctlsocket(sListen,FIONBIO,&ul);
sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_port = htons(nport);
sin.sin_addr.S_un.S_addr = ADDR_ANY;

if(SOCKET_ERROR == bind(sListen,(sockaddr*)&sin,sizeof(sin)))
{
printf("bind failed!%d\n",WSAGetLastError());
Sleep(5000);
return -1;
}

listen(sListen,5);

//1)初始化一个套接字集合fdSocket,并将监听套接字放入
fd_set fdSocket;
FD_ZERO(&fdSocket);
FD_SET(sListen,&fdSocket);
TIMEVAL time={1,0};
char buf[4096];
fd_set fdWrite;
FD_ZERO(&fdWrite);
while(true)
{
//2)将fdSocket的一个拷贝fdRead传给select函数
fd_set fdRead = fdSocket;
fd_set fdTmp = fdWrite;
int nRetAll = 0;
if(fdTmp.fd_count > 0)
nRetAll = select(0,&fdRead,&fdTmp,NULL,NULL/*&time*/);//若不设置超时则select为阻塞
else
nRetAll = select(0,&fdRead,NULL,NULL,NULL/*&time*/);
if(nRetAll > 0)
{
//3)通过将原来的fdSocket和被select处理过的fdRead进行比较,决定由哪些socket有数据可以读取
for(int i=0;i<fdSocket.fd_count;i++)
{
if(FD_ISSET(fdSocket.fd_array[i],&fdRead))
{
if(fdSocket.fd_array[i] == sListen)
{
if(fdSocket.fd_count < FD_SETSIZE)
{
sockaddr_in addrRemote;
int nAddrLen = sizeof(addrRemote);
SOCKET sClient = accept(sListen,(sockaddr*)&addrRemote,&nAddrLen);
FD_SET(sClient,&fdSocket);
printf("接收到连接:(%s)\n",inet_ntoa(addrRemote.sin_addr));
}
else
{
printf("连接数量已达上限!\n");
continue;
}
}
else
{
int nRecv = recv(fdSocket.fd_array[i],buf,4096,0);
if(nRecv > 0)
{
buf[nRecv] = 0;

printf("收到数据:%s\n",buf);

int nRet = send(fdSocket.fd_array[i],buf,nRecv,0);
if(nRet <= 0)
{
SOCKET s = fdSocket.fd_array[i];
if(GetLastError() == WSAEWOULDBLOCK)
{
if(g_map.find(s) == g_map.end())
{
char* szTmp = new char[nRecv + 1];
strncpy(szTmp,buf,nRecv);
szTmp[nRecv] = 0;
g_map[s] = szTmp;
}
else
{
char* szOld = g_map[s];
char* szTmp2 = new char[strlen(szOld) + nRecv + 1];
strncpy(szTmp2,szOld,strlen(szOld));
strncpy(szTmp2 + strlen(szOld),buf,nRecv);
szTmp2[strlen(szOld) + nRecv] = 0;
delete [] szOld;
g_map[s] = szTmp2;
}
FD_SET(fdSocket.fd_array[i],&fdWrite);
}
else
{
closesocket(fdSocket.fd_array[i]);
if(g_map.find(s) != g_map.end())
{
if(g_map[s] != NULL)
delete [] g_map[s];
g_map.erase(s);
}
FD_CLR(fdSocket.fd_array[i],&fdSocket);
}
}
printf("发送了%d\n",nRet);
}
else
{
printf("1个Client已断开\n");
closesocket(fdSocket.fd_array[i]);
FD_CLR(fdSocket.fd_array[i],&fdSocket);
}
}
}
if(FD_ISSET(fdSocket.fd_array[i],&fdTmp))
{
SOCKET s = fdSocket.fd_array[i];
if(g_map.find(s) != g_map.end())
{
char* szToSend = g_map[s];
int nToSend = strlen(szToSend);
int nRet = send(fdSocket.fd_array[i],szToSend,nToSend,0);
if(nRet <= 0)
{
if(GetLastError() == WSAEWOULDBLOCK)
{
//do nothing
}
else
{
closesocket(fdSocket.fd_array[i]);
if(g_map.find(s) != g_map.end())
{
if(g_map[s] != NULL)
delete [] g_map[s];
g_map.erase(s);
}
FD_CLR(fdSocket.fd_array[i],&fdSocket);
}
}
else if(nRet < nToSend)
{
printf("发送了%d/%d\n",nRet,nToSend);
nToSend -= nRet;
char* szTmp = new char[nToSend + 1];
strncpy(szTmp,szToSend + nRet,nToSend);
szTmp[nToSend] = 0;
delete [] szToSend;
g_map[s] = szTmp;
}
else
{
if(g_map[s] != NULL)
delete [] g_map[s];
g_map.erase(s);
FD_CLR(fdSocket.fd_array[i],&fdWrite);
}
printf("============================================发送了%d\n",nRet);
}
}
}
}
else if(nRetAll == 0)
{
printf("time out!\n");
}
else
{
printf("select error!%d\n",WSAGetLastError());
Sleep(5000);
break;
}
}
closesocket(sListen);
WSACleanup();
}

#include <WinSock2.h>
#include <Windows.h>
#include <MSWSock.h>
#include <stdio.h>
#include <map>
using namespace std;

#pragma comment(lib,"Ws2_32.lib")
#pragma comment(lib,"Mswsock.lib")

struct ThreadObj{
OVERLAPPED *pOl;
HANDLE s;
};

int g_iIndex = 0;
map<SOCKET,char*> g_map;

int main()
{
WSAData wsaData;
if(0 != WSAStartup(MAKEWORD(2,2),&wsaData))
{
printf("初始化失败!%d\n",WSAGetLastError());
Sleep(5000);
return -1;
}
USHORT nport = 3456;
SOCKET sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
u_long ul = 1;
ioctlsocket(sListen,FIONBIO,&ul);
sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_port = htons(nport);
sin.sin_addr.S_un.S_addr = ADDR_ANY;

if(SOCKET_ERROR == bind(sListen,(sockaddr*)&sin,sizeof(sin)))
{
printf("bind failed!%d\n",WSAGetLastError());
Sleep(5000);
return -1;
}

listen(sListen,5);

//1)初始化一个套接字集合fdSocket,并将监听套接字放入
fd_set fdSocket;
FD_ZERO(&fdSocket);
FD_SET(sListen,&fdSocket);
TIMEVAL time={1,0};
char buf[4096];
fd_set fdWrite;
FD_ZERO(&fdWrite);
while(true)
{
//2)将fdSocket的一个拷贝fdRead传给select函数
fd_set fdRead = fdSocket;
fd_set fdTmp = fdWrite;
int nRetAll = 0;
if(fdTmp.fd_count > 0)
nRetAll = select(0,&fdRead,&fdTmp,NULL,NULL/*&time*/);//若不设置超时则select为阻塞
else
nRetAll = select(0,&fdRead,NULL,NULL,NULL/*&time*/);
if(nRetAll > 0)
{
//3)通过将原来的fdSocket和被select处理过的fdRead进行比较,决定由哪些socket有数据可以读取
for(int i=0;i<fdSocket.fd_count;i++)
{
if(FD_ISSET(fdSocket.fd_array[i],&fdRead))
{
if(fdSocket.fd_array[i] == sListen)
{
if(fdSocket.fd_count < FD_SETSIZE)
{
sockaddr_in addrRemote;
int nAddrLen = sizeof(addrRemote);
SOCKET sClient = accept(sListen,(sockaddr*)&addrRemote,&nAddrLen);
FD_SET(sClient,&fdSocket);
printf("接收到连接:(%s)\n",inet_ntoa(addrRemote.sin_addr));
}
else
{
printf("连接数量已达上限!\n");
continue;
}
}
else
{
int nRecv = recv(fdSocket.fd_array[i],buf,4096,0);
if(nRecv > 0)
{
buf[nRecv] = 0;

printf("收到数据:%s\n",buf);

int nRet = send(fdSocket.fd_array[i],buf,nRecv,0);
if(nRet <= 0)
{
SOCKET s = fdSocket.fd_array[i];
if(GetLastError() == WSAEWOULDBLOCK)
{
if(g_map.find(s) == g_map.end())
{
char* szTmp = new char[nRecv + 1];
strncpy(szTmp,buf,nRecv);
szTmp[nRecv] = 0;
g_map[s] = szTmp;
}
else
{
char* szOld = g_map[s];
char* szTmp2 = new char[strlen(szOld) + nRecv + 1];
strncpy(szTmp2,szOld,strlen(szOld));
strncpy(szTmp2 + strlen(szOld),buf,nRecv);
szTmp2[strlen(szOld) + nRecv] = 0;
delete [] szOld;
g_map[s] = szTmp2;
}
FD_SET(fdSocket.fd_array[i],&fdWrite);
}
else
{
closesocket(fdSocket.fd_array[i]);
if(g_map.find(s) != g_map.end())
{
if(g_map[s] != NULL)
delete [] g_map[s];
g_map.erase(s);
}
FD_CLR(fdSocket.fd_array[i],&fdSocket);
}
}
printf("发送了%d\n",nRet);
}
else
{
printf("1个Client已断开\n");
closesocket(fdSocket.fd_array[i]);
FD_CLR(fdSocket.fd_array[i],&fdSocket);
}
}
}
if(FD_ISSET(fdSocket.fd_array[i],&fdTmp))
{
SOCKET s = fdSocket.fd_array[i];
if(g_map.find(s) != g_map.end())
{
char* szToSend = g_map[s];
int nToSend = strlen(szToSend);
int nRet = send(fdSocket.fd_array[i],szToSend,nToSend,0);
if(nRet <= 0)
{
if(GetLastError() == WSAEWOULDBLOCK)
{
//do nothing
}
else
{
closesocket(fdSocket.fd_array[i]);
if(g_map.find(s) != g_map.end())
{
if(g_map[s] != NULL)
delete [] g_map[s];
g_map.erase(s);
}
FD_CLR(fdSocket.fd_array[i],&fdSocket);
}
}
else if(nRet < nToSend)
{
printf("发送了%d/%d\n",nRet,nToSend);
nToSend -= nRet;
char* szTmp = new char[nToSend + 1];
strncpy(szTmp,szToSend + nRet,nToSend);
szTmp[nToSend] = 0;
delete [] szToSend;
g_map[s] = szTmp;
}
else
{
if(g_map[s] != NULL)
delete [] g_map[s];
g_map.erase(s);
FD_CLR(fdSocket.fd_array[i],&fdWrite);
}
printf("============================================发送了%d\n",nRet);
}
}
}
}
else if(nRetAll == 0)
{
printf("time out!\n");
}
else
{
printf("select error!%d\n",WSAGetLastError());
Sleep(5000);
break;
}
}
closesocket(sListen);
WSACleanup();
}


四,异步选择模型
注意:收到FD_Write消息有2种情况:1,在socket第一次和窗口句柄绑定后。2,socket从不可写状态变成可写状态。下面的事件选择模型也是同理。

[cpp]
view plaincopyprint?

#include <WinSock2.h>
#include <Windows.h>
#include <stdio.h>
#include <map>
using namespace std;

#pragma comment(lib,"Ws2_32.lib")

#define WM_SOCKET (WM_USER + 100)

map<SOCKET,char*> g_map;
LRESULT WINAPI WindowProc(HWND hwnd,UINT uMsg,WPARAM wParam,LPARAM lParam)
{
switch(uMsg)
{
case WM_SOCKET:
{
SOCKET s = wParam;
if(WSAGETSELECTERROR(lParam))
{
printf("消息错误!\n");
closesocket(s);
return 0;
}

switch(WSAGETSELECTEVENT(lParam))
{
case FD_ACCEPT:
{
sockaddr_in addrRemote;
int nAddrLen = sizeof(addrRemote);
SOCKET sClient = accept(s,(sockaddr*)&addrRemote,&nAddrLen);
WSAAsyncSelect(sClient,hwnd,WM_SOCKET,FD_READ | FD_WRITE | FD_CLOSE);
}break;
case FD_WRITE:
{
printf("write====================\n");
if(!g_map.empty())
{
char* buf = g_map[s];
int nLenth = strlen(buf);
while(nLenth > 0)
{
int nRet = send(s,buf,nLenth,0);
if(nRet > 0)
{
buf += nRet;
nLenth -= nRet;
}
else if(10035 == GetLastError())
{
char* newBuf = new char[nLenth + 1];
strncpy(newBuf,buf,nLenth);
newBuf[nLenth] = 0;
delete [] g_map[s];
g_map[s] = newBuf;
break;
}
else
{
delete [] g_map[s];
g_map.erase(s);
closesocket(s);
}
}
if(nLenth == 0)
{
g_map.erase(s);
}
}
}break;
case FD_READ:
{
char buf[4096];
int nRet = recv(s,buf,4096,0);
if(nRet > 0)
{
buf[nRet] = 0;
//printf("收到数据:%s\n",buf);
int x = send(s,buf,nRet,0);
printf("已发送字节数:%d , 线程号:%d\n",x,GetCurrentThreadId());
if(x < 0)
{
int iError = GetLastError();
printf("数据:%s ,错误:%d\n",buf,iError);
if(10035 == iError)
{
if(g_map.end() != g_map.find(s))
{
int newLength = strlen(g_map[s]) + strlen(buf);
char* newBuf = new char[newLength + 1];
strncpy(newBuf,g_map[s],strlen(g_map[s]));
strncpy(newBuf+strlen(g_map[s]),buf,strlen(buf));
newBuf[newLength] = 0;
delete [] g_map[s];
g_map[s] = newBuf;
}
else
{
char* newBuf = new char[strlen(buf) + 1];
strncpy(newBuf,buf,strlen(buf));
newBuf[strlen(buf)] = 0;
g_map[s] = newBuf;
}
}
else
{
if(g_map.end() != g_map.find(s))
{
delete [] g_map[s];
g_map.erase(s);
}
closesocket(s);
}
}
}
else
{
printf("1个Client已经断开1111!\n");
if(g_map.end() != g_map.find(s))
{
delete [] g_map[s];
g_map.erase(s);
}
closesocket(s);
}
}break;
case FD_CLOSE:
{
printf("1个Client已经断开222!\n");
if(g_map.end() != g_map.find(s))
{
delete [] g_map[s];
g_map.erase(s);
}
closesocket(s);
}break;
}
}break;
case WM_DESTROY:
{
printf("窗口已关闭!\n");
PostQuitMessage(0);
}
}
return DefWindowProc(hwnd,uMsg,wParam,lParam);
}

int main()
{
char szClassName[] = "WSAAsyncSelect Test";
static WNDCLASSEX wndClass;
wndClass.cbSize = sizeof(wndClass);
wndClass.style = CS_HREDRAW | CS_VREDRAW;
wndClass.lpfnWndProc = WindowProc;
wndClass.cbClsExtra = 0;
wndClass.cbWndExtra = 0;
wndClass.hInstance = GetModuleHandle(0);
wndClass.hIcon = LoadIcon(NULL,IDI_APPLICATION);
wndClass.hCursor = LoadCursor(NULL,IDC_ARROW);
wndClass.hbrBackground = (HBRUSH)GetStockObject(WHITE_BRUSH);
wndClass.lpszMenuName = NULL;
wndClass.lpszClassName = szClassName;
wndClass.hIconSm = NULL;

ATOM atom = RegisterClassEx(&wndClass);
if(0 == atom)
{
char error[256];
sprintf(error,"RegisterClassEx错误!%d",GetLastError());
MessageBox(NULL,error,"error",MB_OK);
return -1;
}
HWND hwnd = CreateWindowEx(0,(char *)atom,"",WS_OVERLAPPEDWINDOW,CW_USEDEFAULT,CW_USEDEFAULT,
CW_USEDEFAULT,CW_USEDEFAULT,HWND_MESSAGE,NULL,NULL,NULL);
if(hwnd == NULL)
{
char error[256];
sprintf(error,"创建窗口错误!%d",GetLastError());
MessageBox(NULL,error,"error",MB_OK);
return -1;
}

WSAData wsaData;
if(0 != WSAStartup(MAKEWORD(2,2),&wsaData))
{
printf("初始化失败!%d\n",WSAGetLastError());
Sleep(5000);
return -1;
}
USHORT nport = 3456;
SOCKET sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_port = htons(nport);
sin.sin_addr.S_un.S_addr = ADDR_ANY;

if(SOCKET_ERROR == bind(sListen,(sockaddr*)&sin,sizeof(sin)))
{
printf("bind failed!%d\n",WSAGetLastError());
Sleep(5000);
return -1;
}

WSAAsyncSelect(sListen,hwnd,WM_SOCKET,FD_ACCEPT | FD_CLOSE);
listen(sListen,5);

MSG msg;
while(GetMessage(&msg,NULL,0,0))
{
TranslateMessage(&msg);
DispatchMessage(&msg);
}
closesocket(sListen);
WSACleanup();
return msg.wParam;
}

#include <WinSock2.h>
#include <Windows.h>
#include <stdio.h>
#include <map>
using namespace std;

#pragma comment(lib,"Ws2_32.lib")

#define WM_SOCKET (WM_USER + 100)

map<SOCKET,char*> g_map;
LRESULT WINAPI WindowProc(HWND hwnd,UINT uMsg,WPARAM wParam,LPARAM lParam)
{
switch(uMsg)
{
case WM_SOCKET:
{
SOCKET s = wParam;
if(WSAGETSELECTERROR(lParam))
{
printf("消息错误!\n");
closesocket(s);
return 0;
}

switch(WSAGETSELECTEVENT(lParam))
{
case FD_ACCEPT:
{
sockaddr_in addrRemote;
int nAddrLen = sizeof(addrRemote);
SOCKET sClient = accept(s,(sockaddr*)&addrRemote,&nAddrLen);
WSAAsyncSelect(sClient,hwnd,WM_SOCKET,FD_READ | FD_WRITE | FD_CLOSE);
}break;
case FD_WRITE:
{
printf("write====================\n");
if(!g_map.empty())
{
char* buf = g_map[s];
int nLenth = strlen(buf);
while(nLenth > 0)
{
int nRet = send(s,buf,nLenth,0);
if(nRet > 0)
{
buf += nRet;
nLenth -= nRet;
}
else if(10035 == GetLastError())
{
char* newBuf = new char[nLenth + 1];
strncpy(newBuf,buf,nLenth);
newBuf[nLenth] = 0;
delete [] g_map[s];
g_map[s] = newBuf;
break;
}
else
{
delete [] g_map[s];
g_map.erase(s);
closesocket(s);
}
}
if(nLenth == 0)
{
g_map.erase(s);
}
}
}break;
case FD_READ:
{
char buf[4096];
int nRet = recv(s,buf,4096,0);
if(nRet > 0)
{
buf[nRet] = 0;
//printf("收到数据:%s\n",buf);
int x = send(s,buf,nRet,0);
printf("已发送字节数:%d , 线程号:%d\n",x,GetCurrentThreadId());
if(x < 0)
{
int iError = GetLastError();
printf("数据:%s ,错误:%d\n",buf,iError);
if(10035 == iError)
{
if(g_map.end() != g_map.find(s))
{
int newLength = strlen(g_map[s]) + strlen(buf);
char* newBuf = new char[newLength + 1];
strncpy(newBuf,g_map[s],strlen(g_map[s]));
strncpy(newBuf+strlen(g_map[s]),buf,strlen(buf));
newBuf[newLength] = 0;
delete [] g_map[s];
g_map[s] = newBuf;
}
else
{
char* newBuf = new char[strlen(buf) + 1];
strncpy(newBuf,buf,strlen(buf));
newBuf[strlen(buf)] = 0;
g_map[s] = newBuf;
}
}
else
{
if(g_map.end() != g_map.find(s))
{
delete [] g_map[s];
g_map.erase(s);
}
closesocket(s);
}
}
}
else
{
printf("1个Client已经断开1111!\n");
if(g_map.end() != g_map.find(s))
{
delete [] g_map[s];
g_map.erase(s);
}
closesocket(s);
}
}break;
case FD_CLOSE:
{
printf("1个Client已经断开222!\n");
if(g_map.end() != g_map.find(s))
{
delete [] g_map[s];
g_map.erase(s);
}
closesocket(s);
}break;
}
}break;
case WM_DESTROY:
{
printf("窗口已关闭!\n");
PostQuitMessage(0);
}
}
return DefWindowProc(hwnd,uMsg,wParam,lParam);
}

int main()
{
char szClassName[] = "WSAAsyncSelect Test";
static WNDCLASSEX wndClass;
wndClass.cbSize = sizeof(wndClass);
wndClass.style = CS_HREDRAW | CS_VREDRAW;
wndClass.lpfnWndProc = WindowProc;
wndClass.cbClsExtra = 0;
wndClass.cbWndExtra = 0;
wndClass.hInstance = GetModuleHandle(0);
wndClass.hIcon = LoadIcon(NULL,IDI_APPLICATION);
wndClass.hCursor = LoadCursor(NULL,IDC_ARROW);
wndClass.hbrBackground = (HBRUSH)GetStockObject(WHITE_BRUSH);
wndClass.lpszMenuName = NULL;
wndClass.lpszClassName = szClassName;
wndClass.hIconSm = NULL;

ATOM atom = RegisterClassEx(&wndClass);
if(0 == atom)
{
char error[256];
sprintf(error,"RegisterClassEx错误!%d",GetLastError());
MessageBox(NULL,error,"error",MB_OK);
return -1;
}
HWND hwnd = CreateWindowEx(0,(char *)atom,"",WS_OVERLAPPEDWINDOW,CW_USEDEFAULT,CW_USEDEFAULT,
CW_USEDEFAULT,CW_USEDEFAULT,HWND_MESSAGE,NULL,NULL,NULL);
if(hwnd == NULL)
{
char error[256];
sprintf(error,"创建窗口错误!%d",GetLastError());
MessageBox(NULL,error,"error",MB_OK);
return -1;
}

WSAData wsaData;
if(0 != WSAStartup(MAKEWORD(2,2),&wsaData))
{
printf("初始化失败!%d\n",WSAGetLastError());
Sleep(5000);
return -1;
}
USHORT nport = 3456;
SOCKET sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_port = htons(nport);
sin.sin_addr.S_un.S_addr = ADDR_ANY;

if(SOCKET_ERROR == bind(sListen,(sockaddr*)&sin,sizeof(sin)))
{
printf("bind failed!%d\n",WSAGetLastError());
Sleep(5000);
return -1;
}

WSAAsyncSelect(sListen,hwnd,WM_SOCKET,FD_ACCEPT | FD_CLOSE);
listen(sListen,5);

MSG msg;
while(GetMessage(&msg,NULL,0,0))
{
TranslateMessage(&msg);
DispatchMessage(&msg);
}
closesocket(sListen);
WSACleanup();
return msg.wParam;
}


五,事件选择模型
事件选择模型主要难点是对线程池的使用,send操作可以参考异步选择模型。

[cpp]
view plaincopyprint?

#include <WinSock2.h>
#include <Windows.h>
#include <stdio.h>
#include <vector>
using namespace std;

#pragma comment(lib,"Ws2_32.lib")

typedef struct _THREAD_OBJ
{
HANDLE events[WSA_MAXIMUM_WAIT_EVENTS];
SOCKET sockets[WSA_MAXIMUM_WAIT_EVENTS];
int nSocksUsed;
CRITICAL_SECTION cs;
_THREAD_OBJ *pNext;
}THREAD_OBJ,*PTHREAD_OBJ;

PTHREAD_OBJ g_pThreadList = NULL;
CRITICAL_SECTION g_cs;
BOOL g_bServerRunning = FALSE;
HANDLE g_hThreads[1000] = {0};
int g_nThreadsCount = 0;

PTHREAD_OBJ CreateThreadObj()
{
PTHREAD_OBJ pThread = new THREAD_OBJ();
if(pThread != NULL)
{
InitializeCriticalSectionAndSpinCount(&pThread->cs,4000);
pThread->events[0] = WSACreateEvent();
pThread->nSocksUsed = 1;
EnterCriticalSection(&g_cs);
pThread->pNext = g_pThreadList;
g_pThreadList = pThread;
LeaveCriticalSection(&g_cs);
}
return pThread;
}

void FreeThreadObj(PTHREAD_OBJ pThread)
{
if(pThread == NULL)
return;
EnterCriticalSection(&g_cs);
PTHREAD_OBJ p = g_pThreadList;
if(p == pThread)
{
g_pThreadList = p->pNext;
}
else
{
while(p != NULL && p->pNext != pThread)
{
p = p->pNext;
}
if(p != NULL)
{
p->pNext = pThread->pNext;
}
}
LeaveCriticalSection(&g_cs);

DeleteCriticalSection(&pThread->cs);
WSACloseEvent(pThread->events[0]);
delete pThread;
}

LONG g_nTotalConnections;
LONG g_nCurrentConnections;

BOOL InsertSocket(PTHREAD_OBJ pThread,SOCKET s)
{
if(pThread == NULL || s == INVALID_SOCKET)
return FALSE;

BOOL bRet = FALSE;
EnterCriticalSection(&pThread->cs);
if(pThread->nSocksUsed < WSA_MAXIMUM_WAIT_EVENTS)
{
pThread->events[pThread->nSocksUsed] = WSACreateEvent();
pThread->sockets[pThread->nSocksUsed] = s;
WSAEventSelect(s,pThread->events[pThread->nSocksUsed],FD_READ | FD_CLOSE | FD_WRITE);
pThread->nSocksUsed++;
bRet = TRUE;
WSASetEvent(pThread->events[0]);//通知线程,有新的事件加入了,需要重新调用WSAWaitFormultipleEvents
}
LeaveCriticalSection(&pThread->cs);

if(bRet)
{
InterlockedIncrement(&g_nTotalConnections);
InterlockedIncrement(&g_nCurrentConnections);
}
return bRet;
}

void RemoveSocket(PTHREAD_OBJ pThread,SOCKET s)
{
if(pThread == NULL || s == INVALID_SOCKET)
return;
EnterCriticalSection(&pThread->cs);
for(int i=1;i<pThread->nSocksUsed;i++)
{
if(pThread->sockets[i] == s)
{
WSACloseEvent(pThread->events[i]);
closesocket(s);
for(int j=i;j<pThread->nSocksUsed - 1;j++)
{
pThread->events[j] = pThread->events[j+1];
pThread->sockets[j] = pThread->sockets[j+1];
}
pThread->nSocksUsed--;
break;
}
}
LeaveCriticalSection(&pThread->cs);
InterlockedDecrement(&g_nCurrentConnections);
}

BOOL HandleIo(PTHREAD_OBJ pThread,int nIndex)
{
WSANETWORKEVENTS event;
SOCKET s = pThread->sockets[nIndex];
HANDLE sEvent = pThread->events[nIndex];
if(0 != WSAEnumNetworkEvents(s,sEvent,&event))
{
printf("socket error!\n");
RemoveSocket(pThread,s);
return FALSE;
}
do
{
if(event.lNetworkEvents & FD_READ)
{
if(event.iErrorCode[FD_READ_BIT] == 0)
{
char szText[256];
int nRecv = recv(s,szText,strlen(szText),0);
if(nRecv > 0)
{
szText[nRecv] = '\0';
printf("接收到数据:%s\n",szText);
}
else
{
break;
}
}
else
break;
}
else if(event.lNetworkEvents & FD_CLOSE)
{
break;
}
else if(event.lNetworkEvents & FD_WRITE)
{
printf("FD_WRITE==========================\n");
}
return TRUE;
} while (FALSE);
printf("socket error2!\n");
RemoveSocket(pThread,s);
return FALSE;
}

DWORD WINAPI ServerThread(LPVOID lpParam)
{
PTHREAD_OBJ pThread = (PTHREAD_OBJ)lpParam;

while(TRUE)
{
int nIndex = WSAWaitForMultipleEvents(
pThread->nSocksUsed,pThread->events,FALSE,WSA_INFINITE,FALSE);
nIndex = nIndex - WSA_WAIT_EVENT_0;

if(nIndex == WSA_WAIT_FAILED || nIndex == WSA_WAIT_TIMEOUT)
{
printf("WSAWaitForMultipleEvents error!\n");
continue;
}
else if(nIndex == 0)
{
ResetEvent(pThread->events[0]);
}
else
{
HandleIo(pThread,nIndex);
}
if(!g_bServerRunning && pThread->nSocksUsed == 1)
break;
}
FreeThreadObj(pThread);
return 0;
}

BOOL AssignToFreeThread(SOCKET s)
{
if(s == INVALID_SOCKET)
return FALSE;
BOOL bAllSucceed = TRUE;
EnterCriticalSection(&g_cs);
PTHREAD_OBJ pThread = g_pThreadList;
while(pThread != NULL)
{
if(InsertSocket(pThread,s))
{
break;
}
pThread = pThread->pNext;
}
if(pThread == NULL)
{
if(g_nThreadsCount < 1000)
{
pThread = CreateThreadObj();
HANDLE hThread = CreateThread(NULL,0,ServerThread,pThread,0,NULL);
if(!hThread)
{
bAllSucceed = FALSE;
FreeThreadObj(pThread);
}
else
{
g_hThreads[g_nThreadsCount++] = hThread;
InsertSocket(pThread,s);
}
}
else
bAllSucceed = FALSE;
}
LeaveCriticalSection(&g_cs);
return bAllSucceed;
}

DWORD WINAPI ControlThread(LPVOID lpParma)
{
HANDLE wsaEvent = (HANDLE)lpParma;
char cmd[128];
while(scanf("%s",cmd))
{
if(cmd[0] == 's')
{
g_bServerRunning = FALSE;
EnterCriticalSection(&g_cs);
PTHREAD_OBJ pThread = g_pThreadList;
while(pThread != NULL)
{
EnterCriticalSection(&pThread->cs);
for(int i=0;i<pThread->nSocksUsed;i++)
{
closesocket(pThread->sockets[i]);
}
WSASetEvent(pThread->events[0]);
LeaveCriticalSection(&pThread->cs);
pThread = pThread->pNext;
}
LeaveCriticalSection(&g_cs);
WSASetEvent(wsaEvent);
break;
}
}
return 0;
}

int main()
{
WSAData wsaData;
if(0 != WSAStartup(MAKEWORD(2,2),&wsaData))
{
printf("初始化失败!%d\n",WSAGetLastError());
Sleep(5000);
return -1;
}
USHORT nport = 3456;
SOCKET sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_port = htons(nport);
sin.sin_addr.S_un.S_addr = ADDR_ANY;

if(SOCKET_ERROR == bind(sListen,(sockaddr*)&sin,sizeof(sin)))
{
printf("bind failed!%d\n",WSAGetLastError());
Sleep(5000);
return -1;
}

listen(sListen,200);

WSAEVENT wsaEvent = WSACreateEvent();
WSAEventSelect(sListen,wsaEvent,FD_ACCEPT | FD_CLOSE);
InitializeCriticalSectionAndSpinCount(&g_cs,4000);
g_bServerRunning = TRUE;
HANDLE hThread = CreateThread(NULL,0,ControlThread,wsaEvent,0,NULL);
CloseHandle(hThread);
while(TRUE)
{
int nRet = WaitForSingleObject(wsaEvent,5*1000);
if(!g_bServerRunning)
{
closesocket(sListen);
WSACloseEvent(wsaEvent);
WSAWaitForMultipleEvents(g_nThreadsCount,g_hThreads,TRUE,INFINITE,FALSE);
for(int i=0;i<g_nThreadsCount;i++)
{
CloseHandle(g_hThreads[i]);
}
break;
}
if(nRet == WAIT_FAILED)
{
printf("WaitForSingleObject Failed!\n");
break;
}
else if(nRet == WAIT_TIMEOUT)
{
printf("\nTotalConnections: %d\nCurrentConnections: %d\nThreads:%d\n",
g_nTotalConnections,g_nCurrentConnections,g_nThreadsCount);
continue;
}
else
{
ResetEvent(wsaEvent);
while(TRUE)
{
sockaddr_in addrRemote;
int nLen = sizeof(addrRemote);
SOCKET sNew = accept(sListen,(sockaddr*)&addrRemote,&nLen);
if(sNew == SOCKET_ERROR)
break;

if(!AssignToFreeThread(sNew))
{
closesocket(sNew);
printf("AssignToFreeThread Failed!\n");
}
}
}
}
DeleteCriticalSection(&g_cs);
return 0;
}

#include <WinSock2.h>
#include <Windows.h>
#include <stdio.h>
#include <vector>
using namespace std;

#pragma comment(lib,"Ws2_32.lib")

typedef struct _THREAD_OBJ
{
HANDLE events[WSA_MAXIMUM_WAIT_EVENTS];
SOCKET sockets[WSA_MAXIMUM_WAIT_EVENTS];
int nSocksUsed;
CRITICAL_SECTION cs;
_THREAD_OBJ *pNext;
}THREAD_OBJ,*PTHREAD_OBJ;

PTHREAD_OBJ g_pThreadList = NULL;
CRITICAL_SECTION g_cs;
BOOL g_bServerRunning = FALSE;
HANDLE g_hThreads[1000] = {0};
int g_nThreadsCount = 0;

PTHREAD_OBJ CreateThreadObj()
{
PTHREAD_OBJ pThread = new THREAD_OBJ();
if(pThread != NULL)
{
InitializeCriticalSectionAndSpinCount(&pThread->cs,4000);
pThread->events[0] = WSACreateEvent();
pThread->nSocksUsed = 1;
EnterCriticalSection(&g_cs);
pThread->pNext = g_pThreadList;
g_pThreadList = pThread;
LeaveCriticalSection(&g_cs);
}
return pThread;
}

void FreeThreadObj(PTHREAD_OBJ pThread)
{
if(pThread == NULL)
return;
EnterCriticalSection(&g_cs);
PTHREAD_OBJ p = g_pThreadList;
if(p == pThread)
{
g_pThreadList = p->pNext;
}
else
{
while(p != NULL && p->pNext != pThread)
{
p = p->pNext;
}
if(p != NULL)
{
p->pNext = pThread->pNext;
}
}
LeaveCriticalSection(&g_cs);

DeleteCriticalSection(&pThread->cs);
WSACloseEvent(pThread->events[0]);
delete pThread;
}

LONG g_nTotalConnections;
LONG g_nCurrentConnections;

BOOL InsertSocket(PTHREAD_OBJ pThread,SOCKET s)
{
if(pThread == NULL || s == INVALID_SOCKET)
return FALSE;

BOOL bRet = FALSE;
EnterCriticalSection(&pThread->cs);
if(pThread->nSocksUsed < WSA_MAXIMUM_WAIT_EVENTS)
{
pThread->events[pThread->nSocksUsed] = WSACreateEvent();
pThread->sockets[pThread->nSocksUsed] = s;
WSAEventSelect(s,pThread->events[pThread->nSocksUsed],FD_READ | FD_CLOSE | FD_WRITE);
pThread->nSocksUsed++;
bRet = TRUE;
WSASetEvent(pThread->events[0]);//通知线程,有新的事件加入了,需要重新调用WSAWaitFormultipleEvents
}
LeaveCriticalSection(&pThread->cs);

if(bRet)
{
InterlockedIncrement(&g_nTotalConnections);
InterlockedIncrement(&g_nCurrentConnections);
}
return bRet;
}

void RemoveSocket(PTHREAD_OBJ pThread,SOCKET s)
{
if(pThread == NULL || s == INVALID_SOCKET)
return;
EnterCriticalSection(&pThread->cs);
for(int i=1;i<pThread->nSocksUsed;i++)
{
if(pThread->sockets[i] == s)
{
WSACloseEvent(pThread->events[i]);
closesocket(s);
for(int j=i;j<pThread->nSocksUsed - 1;j++)
{
pThread->events[j] = pThread->events[j+1];
pThread->sockets[j] = pThread->sockets[j+1];
}
pThread->nSocksUsed--;
break;
}
}
LeaveCriticalSection(&pThread->cs);
InterlockedDecrement(&g_nCurrentConnections);
}

BOOL HandleIo(PTHREAD_OBJ pThread,int nIndex)
{
WSANETWORKEVENTS event;
SOCKET s = pThread->sockets[nIndex];
HANDLE sEvent = pThread->events[nIndex];
if(0 != WSAEnumNetworkEvents(s,sEvent,&event))
{
printf("socket error!\n");
RemoveSocket(pThread,s);
return FALSE;
}
do
{
if(event.lNetworkEvents & FD_READ)
{
if(event.iErrorCode[FD_READ_BIT] == 0)
{
char szText[256];
int nRecv = recv(s,szText,strlen(szText),0);
if(nRecv > 0)
{
szText[nRecv] = '\0';
printf("接收到数据:%s\n",szText);
}
else
{
break;
}
}
else
break;
}
else if(event.lNetworkEvents & FD_CLOSE)
{
break;
}
else if(event.lNetworkEvents & FD_WRITE)
{
printf("FD_WRITE==========================\n");
}
return TRUE;
} while (FALSE);
printf("socket error2!\n");
RemoveSocket(pThread,s);
return FALSE;
}

DWORD WINAPI ServerThread(LPVOID lpParam)
{
PTHREAD_OBJ pThread = (PTHREAD_OBJ)lpParam;

while(TRUE)
{
int nIndex = WSAWaitForMultipleEvents(
pThread->nSocksUsed,pThread->events,FALSE,WSA_INFINITE,FALSE);
nIndex = nIndex - WSA_WAIT_EVENT_0;

if(nIndex == WSA_WAIT_FAILED || nIndex == WSA_WAIT_TIMEOUT)
{
printf("WSAWaitForMultipleEvents error!\n");
continue;
}
else if(nIndex == 0)
{
ResetEvent(pThread->events[0]);
}
else
{
HandleIo(pThread,nIndex);
}
if(!g_bServerRunning && pThread->nSocksUsed == 1)
break;
}
FreeThreadObj(pThread);
return 0;
}

BOOL AssignToFreeThread(SOCKET s)
{
if(s == INVALID_SOCKET)
return FALSE;
BOOL bAllSucceed = TRUE;
EnterCriticalSection(&g_cs);
PTHREAD_OBJ pThread = g_pThreadList;
while(pThread != NULL)
{
if(InsertSocket(pThread,s))
{
break;
}
pThread = pThread->pNext;
}
if(pThread == NULL)
{
if(g_nThreadsCount < 1000)
{
pThread = CreateThreadObj();
HANDLE hThread = CreateThread(NULL,0,ServerThread,pThread,0,NULL);
if(!hThread)
{
bAllSucceed = FALSE;
FreeThreadObj(pThread);
}
else
{
g_hThreads[g_nThreadsCount++] = hThread;
InsertSocket(pThread,s);
}
}
else
bAllSucceed = FALSE;
}
LeaveCriticalSection(&g_cs);
return bAllSucceed;
}

DWORD WINAPI ControlThread(LPVOID lpParma)
{
HANDLE wsaEvent = (HANDLE)lpParma;
char cmd[128];
while(scanf("%s",cmd))
{
if(cmd[0] == 's')
{
g_bServerRunning = FALSE;
EnterCriticalSection(&g_cs);
PTHREAD_OBJ pThread = g_pThreadList;
while(pThread != NULL)
{
EnterCriticalSection(&pThread->cs);
for(int i=0;i<pThread->nSocksUsed;i++)
{
closesocket(pThread->sockets[i]);
}
WSASetEvent(pThread->events[0]);
LeaveCriticalSection(&pThread->cs);
pThread = pThread->pNext;
}
LeaveCriticalSection(&g_cs);
WSASetEvent(wsaEvent);
break;
}
}
return 0;
}

int main()
{
WSAData wsaData;
if(0 != WSAStartup(MAKEWORD(2,2),&wsaData))
{
printf("初始化失败!%d\n",WSAGetLastError());
Sleep(5000);
return -1;
}
USHORT nport = 3456;
SOCKET sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_port = htons(nport);
sin.sin_addr.S_un.S_addr = ADDR_ANY;

if(SOCKET_ERROR == bind(sListen,(sockaddr*)&sin,sizeof(sin)))
{
printf("bind failed!%d\n",WSAGetLastError());
Sleep(5000);
return -1;
}

listen(sListen,200);

WSAEVENT wsaEvent = WSACreateEvent();
WSAEventSelect(sListen,wsaEvent,FD_ACCEPT | FD_CLOSE);
InitializeCriticalSectionAndSpinCount(&g_cs,4000);
g_bServerRunning = TRUE;
HANDLE hThread = CreateThread(NULL,0,ControlThread,wsaEvent,0,NULL);
CloseHandle(hThread);
while(TRUE)
{
int nRet = WaitForSingleObject(wsaEvent,5*1000);
if(!g_bServerRunning)
{
closesocket(sListen);
WSACloseEvent(wsaEvent);
WSAWaitForMultipleEvents(g_nThreadsCount,g_hThreads,TRUE,INFINITE,FALSE);
for(int i=0;i<g_nThreadsCount;i++)
{
CloseHandle(g_hThreads[i]);
}
break;
}
if(nRet == WAIT_FAILED)
{
printf("WaitForSingleObject Failed!\n");
break;
}
else if(nRet == WAIT_TIMEOUT)
{
printf("\nTotalConnections: %d\nCurrentConnections: %d\nThreads:%d\n",
g_nTotalConnections,g_nCurrentConnections,g_nThreadsCount);
continue;
}
else
{
ResetEvent(wsaEvent);
while(TRUE)
{
sockaddr_in addrRemote;
int nLen = sizeof(addrRemote);
SOCKET sNew = accept(sListen,(sockaddr*)&addrRemote,&nLen);
if(sNew == SOCKET_ERROR)
break;

if(!AssignToFreeThread(sNew))
{
closesocket(sNew);
printf("AssignToFreeThread Failed!\n");
}
}
}
}
DeleteCriticalSection(&g_cs);
return 0;
}


六,重叠I/O模型。

若需要建线程池,可参考事件选择模型。若纠结于send,可参考下面的IOCP。

[cpp]
view plaincopyprint?

#include <WinSock2.h>
#include <Windows.h>
#include <MSWSock.h>
#include <stdio.h>

#pragma comment(lib,"Ws2_32.lib")

#define BUFFER_SIZE 4096

typedef struct _SOCKET_OBJ
{
SOCKET s;
int nOutstandingOps;
LPFN_ACCEPTEX lpfnAcceptEx;
}SOCKET_OBJ,*PSOCKET_OBJ;

PSOCKET_OBJ CreateSocketObj(SOCKET s)
{
PSOCKET_OBJ pSocket = new SOCKET_OBJ();
if(pSocket != NULL)
pSocket->s = s;
return pSocket;
}

void FreeSocketObj(PSOCKET_OBJ pSocket)
{
if(pSocket == NULL)
return;
if(pSocket->s != INVALID_SOCKET)
closesocket(pSocket->s);
delete pSocket;
}

typedef struct _BUFFER_OBJ
{
OVERLAPPED ol;
char* buff;
int nLen;
PSOCKET_OBJ pSocket;
int nOperation;
#define OP_ACCEPT 1
#define OP_READ 2
#define OP_WRITE 3
SOCKET sAccept;
_BUFFER_OBJ* pNext;
}BUFFER_OBJ,*PBUFFER_OBJ;

HANDLE g_events[WSA_MAXIMUM_WAIT_EVENTS];
int g_nBufferCount;
PBUFFER_OBJ g_pBufferHeader,g_pBufferTail;
BOOL g_bServerRunning;
CRITICAL_SECTION g_cs;

PBUFFER_OBJ CreateBufferObj(PSOCKET_OBJ pSocket,ULONG nLen)
{
if(g_nBufferCount > WSA_MAXIMUM_WAIT_EVENTS - 1)
return NULL;
PBUFFER_OBJ pBuffer = new BUFFER_OBJ();
if(pBuffer != NULL)
{
pBuffer->buff = new char[nLen];
pBuffer->nLen = nLen;
pBuffer->ol.hEvent = WSACreateEvent();
pBuffer->pSocket = pSocket;
pBuffer->sAccept = INVALID_SOCKET;
pBuffer->pNext = NULL;
EnterCriticalSection(&g_cs);
if(g_pBufferHeader == NULL)
{
g_pBufferHeader = g_pBufferTail = pBuffer;
}
else
{
g_pBufferTail->pNext = pBuffer;
g_pBufferTail = pBuffer;
}
LeaveCriticalSection(&g_cs);
g_events[++g_nBufferCount] = pBuffer->ol.hEvent;
}
return pBuffer;
}

void FreeBufferObj(PBUFFER_OBJ pBuffer)
{
EnterCriticalSection(&g_cs);
PBUFFER_OBJ pTest = g_pBufferHeader;
BOOL bFind = FALSE;
if(pTest == pBuffer)
{
if(g_pBufferHeader == g_pBufferTail)
g_pBufferHeader = g_pBufferTail = NULL;
else
g_pBufferHeader = g_pBufferHeader->pNext;
bFind = TRUE;
}
else
{
while(pTest != NULL && pTest->pNext != pBuffer)
pTest = pTest->pNext;
if(pTest != NULL)
{
pTest->pNext = pBuffer->pNext;
if(pTest->pNext == NULL)
g_pBufferTail = pTest;
bFind = TRUE;
}
}

if(bFind)
{
g_nBufferCount--;
WSACloseEvent(pBuffer->ol.hEvent);
delete [] pBuffer->buff;
delete pBuffer;
}
LeaveCriticalSection(&g_cs);
}

PBUFFER_OBJ FindBufferObj(HANDLE hEvent)
{
if(hEvent == NULL || hEvent == INVALID_HANDLE_VALUE)
return NULL;
EnterCriticalSection(&g_cs);
PBUFFER_OBJ pTest = g_pBufferHeader;
while(pTest != NULL && pTest->ol.hEvent != hEvent)
pTest = pTest->pNext;
LeaveCriticalSection(&g_cs);
return pTest;
}

void RebuildArray()
{
EnterCriticalSection(&g_cs);
PBUFFER_OBJ pBuffer = g_pBufferHeader;
int i=1;
while(pBuffer != NULL)
{
g_events[i++] = pBuffer->ol.hEvent;
pBuffer = pBuffer->pNext;
}
LeaveCriticalSection(&g_cs);
}

BOOL PostAccept(PBUFFER_OBJ pBuffer)
{
PSOCKET_OBJ pSocket = pBuffer->pSocket;
if(pSocket->lpfnAcceptEx != NULL)
{
pBuffer->nOperation = OP_ACCEPT;
pSocket->nOutstandingOps++;

DWORD dwBytes;
pBuffer->sAccept = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
BOOL b = pSocket->lpfnAcceptEx(pSocket->s,
pBuffer->sAccept,pBuffer->buff,BUFFER_SIZE - ((sizeof(sockaddr_in) + 16)*2),
sizeof(sockaddr_in) + 16, sizeof(sockaddr_in) + 16,&dwBytes,&pBuffer->ol);
if(!b)
{
if(WSAGetLastError() != WSA_IO_PENDING)
return FALSE;
}
return TRUE;
}
return FALSE;
}

BOOL PostRecv(PBUFFER_OBJ pBuffer)
{
pBuffer->nOperation = OP_READ;
pBuffer->pSocket->nOutstandingOps++;

DWORD dwBytes;
DWORD dwFlags = 0;
WSABUF buf;
buf.buf = pBuffer->buff;
buf.len = pBuffer->nLen;
if(WSARecv(pBuffer->pSocket->s,&buf,1,&dwBytes,&dwFlags,&pBuffer->ol,NULL))
{
if(WSAGetLastError() != WSA_IO_PENDING)
return FALSE;
}
return TRUE;
}

BOOL PostSend(PBUFFER_OBJ pBuffer)
{
pBuffer->nOperation = OP_WRITE;
pBuffer->pSocket->nOutstandingOps++;
DWORD dwBytes;
DWORD dwFlags = 0;
WSABUF buf;
buf.buf = pBuffer->buff;
buf.len = pBuffer->nLen;
if(WSASend(pBuffer->pSocket->s,&buf,1,&dwBytes,dwFlags,&pBuffer->ol,NULL))
{
if(WSAGetLastError() != WSA_IO_PENDING)
return FALSE;
}
return TRUE;
}

BOOL HandleIo(PBUFFER_OBJ pBuffer)
{
if(pBuffer == NULL)
return FALSE;

PSOCKET_OBJ pSocket = pBuffer->pSocket;
pSocket->nOutstandingOps--;

DWORD dwTrans;
DWORD dwFlags;
BOOL bRet = WSAGetOverlappedResult(pSocket->s,&pBuffer->ol,&dwTrans,FALSE,&dwFlags);
if(!bRet)
{
if(pSocket->s != INVALID_SOCKET)
{
closesocket(pSocket->s);
pSocket->s = INVALID_SOCKET;
}
if(pBuffer->nOperation == OP_ACCEPT && pBuffer->sAccept != INVALID_SOCKET)
{
closesocket(pBuffer->sAccept);
pBuffer->sAccept = INVALID_SOCKET;
}
if(pSocket->nOutstandingOps == 0)
{
FreeSocketObj(pSocket);
}
FreeBufferObj(pBuffer);
return FALSE;
}

switch(pBuffer->nOperation)
{
case OP_ACCEPT:
{
if(dwTrans > 0)
{
pBuffer->buff[dwTrans] = 0;
printf("Accept收到数据:%s\n",pBuffer->buff);

PSOCKET_OBJ pClient = CreateSocketObj(pBuffer->sAccept);
PBUFFER_OBJ pRecv = CreateBufferObj(pClient,BUFFER_SIZE);
if(pRecv == NULL)
{
printf("Too much connections!\n");
FreeSocketObj(pClient);
return FALSE;
}
RebuildArray();
if(!PostRecv(pRecv))
{
FreeSocketObj(pClient);
FreeBufferObj(pBuffer);
return FALSE;
}
}
else
{
if(pSocket->s != INVALID_SOCKET)
{
closesocket(pSocket->s);
pSocket->s = INVALID_SOCKET;
}
if(pBuffer->sAccept != INVALID_SOCKET)
{
closesocket(pBuffer->sAccept);
pBuffer->sAccept = INVALID_SOCKET;
}
if(pSocket->nOutstandingOps == 0)
{
FreeSocketObj(pSocket);
}
FreeBufferObj(pBuffer);
}
// PBUFFER_OBJ pSend = CreateBufferObj(pClient,BUFFER_SIZE);
//if(pSend == NULL)
//{
// printf("Too much connections!\n");
// FreeSocketObj(pClient);
// return FALSE;
//}
//RebuildArray();
//pSend->nLen = dwTrans;
//memcpy(pSend->buff,pBuffer->buff,dwTrans);

//if(!PostSend(pSend))
//{
// FreeSocketObj(pSocket);
// FreeBufferObj(pBuffer);
// return FALSE;
//}

PostAccept(pBuffer);
}break;
case OP_READ:
{
if(dwTrans > 0)
{
pBuffer->buff[dwTrans] = 0;
printf("Recv收到数据:%s\n",pBuffer->buff);
PostRecv(pBuffer);
}
else
{
if(pSocket->s != INVALID_SOCKET)
{
closesocket(pSocket->s);
pSocket->s = INVALID_SOCKET;
}
if(pSocket->nOutstandingOps == 0)
{
FreeSocketObj(pSocket);
}
FreeBufferObj(pBuffer);
}
}break;
case OP_WRITE:
{
if(dwTrans > 0)
{
pBuffer->buff[dwTrans] = 0;
printf("发送数据: %s 成功!\n",pBuffer->buff);
FreeBufferObj(pBuffer);
}
else
{
if(pSocket->s != INVALID_SOCKET)
{
closesocket(pSocket->s);
pSocket->s = INVALID_SOCKET;
}
if(pSocket->nOutstandingOps == 0)
{
FreeSocketObj(pSocket);
}
FreeBufferObj(pBuffer);
}
}break;
}
}

DWORD WINAPI ControlThread(LPVOID lpParma)
{
char cmd[128];
while(scanf("%s",cmd))
{
if(cmd[0] == 's')
{
g_bServerRunning = FALSE;
EnterCriticalSection(&g_cs);
PBUFFER_OBJ pBuffer = g_pBufferHeader;
while(pBuffer != NULL)
{
if(pBuffer->pSocket != NULL && pBuffer->pSocket->s != INVALID_SOCKET)
closesocket(pBuffer->pSocket->s);
pBuffer = pBuffer->pNext;
}
LeaveCriticalSection(&g_cs);
break;
}
}
return 0;
}

int main()
{
InitializeCriticalSectionAndSpinCount(&g_cs,4000);
WSAData wsaData;
if(0 != WSAStartup(MAKEWORD(2,2),&wsaData))
{
printf("初始化失败!%d\n",WSAGetLastError());
Sleep(5000);
return -1;
}
USHORT nport = 3456;
SOCKET sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_port = htons(nport);
sin.sin_addr.S_un.S_addr = ADDR_ANY;

if(SOCKET_ERROR == bind(sListen,(sockaddr*)&sin,sizeof(sin)))
{
printf("bind failed!%d\n",WSAGetLastError());
Sleep(5000);
return -1;
}

listen(sListen,200);

g_bServerRunning = TRUE;
PSOCKET_OBJ pListen = CreateSocketObj(sListen);
GUID GuidAcceptEx = WSAID_ACCEPTEX;
DWORD dwBytes;
WSAIoctl(pListen->s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx,
sizeof(GuidAcceptEx),
&pListen->lpfnAcceptEx,
sizeof(pListen->lpfnAcceptEx),
&dwBytes,
NULL,
NULL);
g_events[0] = WSACreateEvent();

for(int i=0;i<5;++i)
{
PostAccept(CreateBufferObj(pListen,BUFFER_SIZE));
}

HANDLE hThread = CreateThread(NULL,0,ControlThread,NULL,0,NULL);
while(TRUE)
{
int nIndex = WSAWaitForMultipleEvents(g_nBufferCount+1,g_events,FALSE,WSA_INFINITE,FALSE);
if(nIndex == WSA_WAIT_FAILED)
{
printf("WSAWaitForMultipleEvents Failed!\n");
break;
}
nIndex = nIndex - WSA_WAIT_EVENT_0;
for(int i=nIndex;i<= g_nBufferCount;i++)
{
int nRet = WSAWaitForMultipleEvents(1,&g_events[i],FALSE,0,FALSE);
if(nRet == WSA_WAIT_TIMEOUT)
continue;

if(i == 0)
{
RebuildArray();
continue;
}

PBUFFER_OBJ pBuffer = FindBufferObj(g_events[i]);
if(pBuffer != NULL)
{
if(!HandleIo(pBuffer))
RebuildArray();
}
}
if(!g_bServerRunning && g_nBufferCount == 0)
break;
}
WSACloseEvent(g_events[0]);
WaitForSingleObject(hThread,INFINITE);
CloseHandle(hThread);
closesocket(sListen);
WSACleanup();
DeleteCriticalSection(&g_cs);
return 0;
}

#include <WinSock2.h>
#include <Windows.h>
#include <MSWSock.h>
#include <stdio.h>

#pragma comment(lib,"Ws2_32.lib")

#define BUFFER_SIZE 4096

typedef struct _SOCKET_OBJ
{
SOCKET s;
int nOutstandingOps;
LPFN_ACCEPTEX lpfnAcceptEx;
}SOCKET_OBJ,*PSOCKET_OBJ;

PSOCKET_OBJ CreateSocketObj(SOCKET s)
{
PSOCKET_OBJ pSocket = new SOCKET_OBJ();
if(pSocket != NULL)
pSocket->s = s;
return pSocket;
}

void FreeSocketObj(PSOCKET_OBJ pSocket)
{
if(pSocket == NULL)
return;
if(pSocket->s != INVALID_SOCKET)
closesocket(pSocket->s);
delete pSocket;
}

typedef struct _BUFFER_OBJ
{
OVERLAPPED ol;
char* buff;
int nLen;
PSOCKET_OBJ pSocket;
int nOperation;
#define OP_ACCEPT 1
#define OP_READ 2
#define OP_WRITE 3
SOCKET sAccept;
_BUFFER_OBJ* pNext;
}BUFFER_OBJ,*PBUFFER_OBJ;

HANDLE g_events[WSA_MAXIMUM_WAIT_EVENTS];
int g_nBufferCount;
PBUFFER_OBJ g_pBufferHeader,g_pBufferTail;
BOOL g_bServerRunning;
CRITICAL_SECTION g_cs;

PBUFFER_OBJ CreateBufferObj(PSOCKET_OBJ pSocket,ULONG nLen)
{
if(g_nBufferCount > WSA_MAXIMUM_WAIT_EVENTS - 1)
return NULL;
PBUFFER_OBJ pBuffer = new BUFFER_OBJ();
if(pBuffer != NULL)
{
pBuffer->buff = new char[nLen];
pBuffer->nLen = nLen;
pBuffer->ol.hEvent = WSACreateEvent();
pBuffer->pSocket = pSocket;
pBuffer->sAccept = INVALID_SOCKET;
pBuffer->pNext = NULL;
EnterCriticalSection(&g_cs);
if(g_pBufferHeader == NULL)
{
g_pBufferHeader = g_pBufferTail = pBuffer;
}
else
{
g_pBufferTail->pNext = pBuffer;
g_pBufferTail = pBuffer;
}
LeaveCriticalSection(&g_cs);
g_events[++g_nBufferCount] = pBuffer->ol.hEvent;
}
return pBuffer;
}

void FreeBufferObj(PBUFFER_OBJ pBuffer)
{
EnterCriticalSection(&g_cs);
PBUFFER_OBJ pTest = g_pBufferHeader;
BOOL bFind = FALSE;
if(pTest == pBuffer)
{
if(g_pBufferHeader == g_pBufferTail)
g_pBufferHeader = g_pBufferTail = NULL;
else
g_pBufferHeader = g_pBufferHeader->pNext;
bFind = TRUE;
}
else
{
while(pTest != NULL && pTest->pNext != pBuffer)
pTest = pTest->pNext;
if(pTest != NULL)
{
pTest->pNext = pBuffer->pNext;
if(pTest->pNext == NULL)
g_pBufferTail = pTest;
bFind = TRUE;
}
}

if(bFind)
{
g_nBufferCount--;
WSACloseEvent(pBuffer->ol.hEvent);
delete [] pBuffer->buff;
delete pBuffer;
}
LeaveCriticalSection(&g_cs);
}

PBUFFER_OBJ FindBufferObj(HANDLE hEvent)
{
if(hEvent == NULL || hEvent == INVALID_HANDLE_VALUE)
return NULL;
EnterCriticalSection(&g_cs);
PBUFFER_OBJ pTest = g_pBufferHeader;
while(pTest != NULL && pTest->ol.hEvent != hEvent)
pTest = pTest->pNext;
LeaveCriticalSection(&g_cs);
return pTest;
}

void RebuildArray()
{
EnterCriticalSection(&g_cs);
PBUFFER_OBJ pBuffer = g_pBufferHeader;
int i=1;
while(pBuffer != NULL)
{
g_events[i++] = pBuffer->ol.hEvent;
pBuffer = pBuffer->pNext;
}
LeaveCriticalSection(&g_cs);
}

BOOL PostAccept(PBUFFER_OBJ pBuffer)
{
PSOCKET_OBJ pSocket = pBuffer->pSocket;
if(pSocket->lpfnAcceptEx != NULL)
{
pBuffer->nOperation = OP_ACCEPT;
pSocket->nOutstandingOps++;

DWORD dwBytes;
pBuffer->sAccept = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
BOOL b = pSocket->lpfnAcceptEx(pSocket->s,
pBuffer->sAccept,pBuffer->buff,BUFFER_SIZE - ((sizeof(sockaddr_in) + 16)*2),
sizeof(sockaddr_in) + 16, sizeof(sockaddr_in) + 16,&dwBytes,&pBuffer->ol);
if(!b)
{
if(WSAGetLastError() != WSA_IO_PENDING)
return FALSE;
}
return TRUE;
}
return FALSE;
}

BOOL PostRecv(PBUFFER_OBJ pBuffer)
{
pBuffer->nOperation = OP_READ;
pBuffer->pSocket->nOutstandingOps++;

DWORD dwBytes;
DWORD dwFlags = 0;
WSABUF buf;
buf.buf = pBuffer->buff;
buf.len = pBuffer->nLen;
if(WSARecv(pBuffer->pSocket->s,&buf,1,&dwBytes,&dwFlags,&pBuffer->ol,NULL))
{
if(WSAGetLastError() != WSA_IO_PENDING)
return FALSE;
}
return TRUE;
}

BOOL PostSend(PBUFFER_OBJ pBuffer)
{
pBuffer->nOperation = OP_WRITE;
pBuffer->pSocket->nOutstandingOps++;
DWORD dwBytes;
DWORD dwFlags = 0;
WSABUF buf;
buf.buf = pBuffer->buff;
buf.len = pBuffer->nLen;
if(WSASend(pBuffer->pSocket->s,&buf,1,&dwBytes,dwFlags,&pBuffer->ol,NULL))
{
if(WSAGetLastError() != WSA_IO_PENDING)
return FALSE;
}
return TRUE;
}

BOOL HandleIo(PBUFFER_OBJ pBuffer)
{
if(pBuffer == NULL)
return FALSE;

PSOCKET_OBJ pSocket = pBuffer->pSocket;
pSocket->nOutstandingOps--;

DWORD dwTrans;
DWORD dwFlags;
BOOL bRet = WSAGetOverlappedResult(pSocket->s,&pBuffer->ol,&dwTrans,FALSE,&dwFlags);
if(!bRet)
{
if(pSocket->s != INVALID_SOCKET)
{
closesocket(pSocket->s);
pSocket->s = INVALID_SOCKET;
}
if(pBuffer->nOperation == OP_ACCEPT && pBuffer->sAccept != INVALID_SOCKET)
{
closesocket(pBuffer->sAccept);
pBuffer->sAccept = INVALID_SOCKET;
}
if(pSocket->nOutstandingOps == 0)
{
FreeSocketObj(pSocket);
}
FreeBufferObj(pBuffer);
return FALSE;
}

switch(pBuffer->nOperation)
{
case OP_ACCEPT:
{
if(dwTrans > 0)
{
pBuffer->buff[dwTrans] = 0;
printf("Accept收到数据:%s\n",pBuffer->buff);

PSOCKET_OBJ pClient = CreateSocketObj(pBuffer->sAccept);
PBUFFER_OBJ pRecv = CreateBufferObj(pClient,BUFFER_SIZE);
if(pRecv == NULL)
{
printf("Too much connections!\n");
FreeSocketObj(pClient);
return FALSE;
}
RebuildArray();
if(!PostRecv(pRecv))
{
FreeSocketObj(pClient);
FreeBufferObj(pBuffer);
return FALSE;
}
}
else
{
if(pSocket->s != INVALID_SOCKET)
{
closesocket(pSocket->s);
pSocket->s = INVALID_SOCKET;
}
if(pBuffer->sAccept != INVALID_SOCKET)
{
closesocket(pBuffer->sAccept);
pBuffer->sAccept = INVALID_SOCKET;
}
if(pSocket->nOutstandingOps == 0)
{
FreeSocketObj(pSocket);
}
FreeBufferObj(pBuffer);
}
//			PBUFFER_OBJ pSend = CreateBufferObj(pClient,BUFFER_SIZE);
//if(pSend == NULL)
//{
//	printf("Too much connections!\n");
//	FreeSocketObj(pClient);
//	return FALSE;
//}
//RebuildArray();
//pSend->nLen = dwTrans;
//memcpy(pSend->buff,pBuffer->buff,dwTrans);

//if(!PostSend(pSend))
//{
//	FreeSocketObj(pSocket);
//	FreeBufferObj(pBuffer);
//	return FALSE;
//}

PostAccept(pBuffer);
}break;
case OP_READ:
{
if(dwTrans > 0)
{
pBuffer->buff[dwTrans] = 0;
printf("Recv收到数据:%s\n",pBuffer->buff);
PostRecv(pBuffer);
}
else
{
if(pSocket->s != INVALID_SOCKET)
{
closesocket(pSocket->s);
pSocket->s = INVALID_SOCKET;
}
if(pSocket->nOutstandingOps == 0)
{
FreeSocketObj(pSocket);
}
FreeBufferObj(pBuffer);
}
}break;
case OP_WRITE:
{
if(dwTrans > 0)
{
pBuffer->buff[dwTrans] = 0;
printf("发送数据: %s 成功!\n",pBuffer->buff);
FreeBufferObj(pBuffer);
}
else
{
if(pSocket->s != INVALID_SOCKET)
{
closesocket(pSocket->s);
pSocket->s = INVALID_SOCKET;
}
if(pSocket->nOutstandingOps == 0)
{
FreeSocketObj(pSocket);
}
FreeBufferObj(pBuffer);
}
}break;
}
}

DWORD WINAPI ControlThread(LPVOID lpParma)
{
char cmd[128];
while(scanf("%s",cmd))
{
if(cmd[0] == 's')
{
g_bServerRunning = FALSE;
EnterCriticalSection(&g_cs);
PBUFFER_OBJ pBuffer = g_pBufferHeader;
while(pBuffer != NULL)
{
if(pBuffer->pSocket != NULL && pBuffer->pSocket->s != INVALID_SOCKET)
closesocket(pBuffer->pSocket->s);
pBuffer = pBuffer->pNext;
}
LeaveCriticalSection(&g_cs);
break;
}
}
return 0;
}

int main()
{
InitializeCriticalSectionAndSpinCount(&g_cs,4000);
WSAData wsaData;
if(0 != WSAStartup(MAKEWORD(2,2),&wsaData))
{
printf("初始化失败!%d\n",WSAGetLastError());
Sleep(5000);
return -1;
}
USHORT nport = 3456;
SOCKET sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_port = htons(nport);
sin.sin_addr.S_un.S_addr = ADDR_ANY;

if(SOCKET_ERROR == bind(sListen,(sockaddr*)&sin,sizeof(sin)))
{
printf("bind failed!%d\n",WSAGetLastError());
Sleep(5000);
return -1;
}

listen(sListen,200);

g_bServerRunning = TRUE;
PSOCKET_OBJ pListen = CreateSocketObj(sListen);
GUID GuidAcceptEx = WSAID_ACCEPTEX;
DWORD dwBytes;
WSAIoctl(pListen->s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx,
sizeof(GuidAcceptEx),
&pListen->lpfnAcceptEx,
sizeof(pListen->lpfnAcceptEx),
&dwBytes,
NULL,
NULL);
g_events[0] = WSACreateEvent();

for(int i=0;i<5;++i)
{
PostAccept(CreateBufferObj(pListen,BUFFER_SIZE));
}

HANDLE hThread = CreateThread(NULL,0,ControlThread,NULL,0,NULL);
while(TRUE)
{
int nIndex = WSAWaitForMultipleEvents(g_nBufferCount+1,g_events,FALSE,WSA_INFINITE,FALSE);
if(nIndex == WSA_WAIT_FAILED)
{
printf("WSAWaitForMultipleEvents Failed!\n");
break;
}
nIndex = nIndex - WSA_WAIT_EVENT_0;
for(int i=nIndex;i<= g_nBufferCount;i++)
{
int nRet = WSAWaitForMultipleEvents(1,&g_events[i],FALSE,0,FALSE);
if(nRet == WSA_WAIT_TIMEOUT)
continue;

if(i == 0)
{
RebuildArray();
continue;
}

PBUFFER_OBJ pBuffer = FindBufferObj(g_events[i]);
if(pBuffer != NULL)
{
if(!HandleIo(pBuffer))
RebuildArray();
}
}
if(!g_bServerRunning && g_nBufferCount == 0)
break;
}
WSACloseEvent(g_events[0]);
WaitForSingleObject(hThread,INFINITE);
CloseHandle(hThread);
closesocket(sListen);
WSACleanup();
DeleteCriticalSection(&g_cs);
return 0;
}


七,IOCP。

大框架为书中例子,对强化了发送操作,部分异常处理,且加入了连接超时处理。

注意:当一个投递完成,且对应socket上已经没有未决的投递,必须要再投递一个请求或者关闭连接,否则socket对应的数据结构无法被释放,对应socket连接断开时也无法被

检测到。所以如果业务逻辑结束,要关闭连接。或者你需要等客户端来断开连接,那么你可以在业务逻辑结束后,再投递一个接收请求(客户端断开时,接收请求返回且接收的字节数为0,则此类中的异常处理逻辑便会将资源清理掉)。

头文件

[cpp]
view plaincopyprint?

////////////////////////////////////////
// IOCP.h文件

#ifndef __IOCP_H__
#define __IOCP_H__

#include <winsock2.h>
#include <windows.h>
#include <Mswsock.h>

#define BUFFER_SIZE 1024*4 // I/O请求的缓冲区大小
#define MAX_THREAD 1 // I/O服务线程的数量

// 这是per-I/O数据。它包含了在套节字上处理I/O操作的必要信息
struct CIOCPBuffer
{
CIOCPBuffer()
{
memset(&ol,0,sizeof(WSAOVERLAPPED));
sClient = INVALID_SOCKET;
memset(buff,0,BUFFER_SIZE);
nLen = 0;
nSequenceNumber = 0;
bIsReleased = FALSE;
nOperation = 0;
pNext = NULL;
}
WSAOVERLAPPED ol;

SOCKET sClient; // AcceptEx接收的客户方套节字

char buff[BUFFER_SIZE]; // I/O操作使用的缓冲区
int nLen; // buff缓冲区(使用的)大小

ULONG nSequenceNumber; // 此I/O的序列号
BOOL bIsReleased;

int nOperation; // 操作类型
#define OP_ACCEPT 1
#define OP_WRITE 2
#define OP_READ 3

CIOCPBuffer *pNext;
};
struct CIOCPNextToSend;
struct CIOCPTimerData;
// 这是per-Handle数据。它包含了一个套节字的信息
struct CIOCPContext
{
CIOCPContext()
{
s = INVALID_SOCKET;
memset(&addrLocal,0,sizeof(SOCKADDR_IN));
memset(&addrRemote,0,sizeof(SOCKADDR_IN));
bClosing = FALSE;
nOutstandingRecv = 0;
nOutstandingSend = 0;
nReadSequence = 0;
nCurrentReadSequence = 0;
nCurrentStep = 0;
pOutOfOrderReads = NULL;
pNextToSend = NULL;
bIsReleased = FALSE;
pNext = NULL;
pPreData = NULL;
strcpy(szClientName,"");
hTimer = NULL;
hCompletion = NULL;
}
CIOCPBuffer m_pBuffer;
SOCKET s; // 套节字句柄

SOCKADDR_IN addrLocal; // 连接的本地地址
SOCKADDR_IN addrRemote; // 连接的远程地址

BOOL bClosing; // 套节字是否关闭

int nOutstandingRecv; // 此套节字上抛出的重叠操作的数量
int nOutstandingSend;

ULONG nReadSequence; // 安排给接收的下一个序列号
ULONG nCurrentReadSequence; // 当前要读的序列号

CIOCPBuffer *pOutOfOrderReads; // 记录没有按顺序完成的读I/O
CIOCPNextToSend *pNextToSend; //xss,按顺序发送的下一个要发送的。

LPVOID pPreData; //xss,用于2个过程之间的数据交流。
ULONG nCurrentStep;//xss,用于记录当前处于的过程步骤数。
BOOL bIsReleased;

CRITICAL_SECTION Lock; // 保护这个结构

CIOCPContext *pNext;

char szClientName[256];//xss
HANDLE hTimer;//xss
HANDLE hCompletion;//xss

};

struct CIOCPNextToSend//xss
{
CIOCPBuffer * pBuffer;
CIOCPNextToSend * pNext;
};

struct CIOCPTimerData
{
CIOCPContext* pContext;
HANDLE hCompletion;
};

class CIOCPServer // 处理线程
{
public:
CIOCPServer();
~CIOCPServer();

// 开始服务
BOOL Start(int nPort = 3456, int nMaxConnections = 2000,
int nMaxFreeBuffers = 200, int nMaxFreeContexts = 100, int nInitialReads = 4);
// 停止服务
void Shutdown();

// 关闭一个连接和关闭所有连接
void CloseAConnection(CIOCPContext *pContext);
void CloseAllConnections();

// 取得当前的连接数量
ULONG GetCurrentConnection() { return m_nCurrentConnection; }

// 向指定客户发送文本
BOOL SendText(CIOCPContext *pContext, char *pszText, int nLen);

protected:

// 申请和释放缓冲区对象
CIOCPBuffer *AllocateBuffer(int nLen);
void ReleaseBuffer(CIOCPBuffer *pBuffer);

// 申请和释放套节字上下文
CIOCPContext *AllocateContext(SOCKET s);
void ReleaseContext(CIOCPContext *pContext);

// 释放空闲缓冲区对象列表和空闲上下文对象列表
void FreeBuffers();
void FreeContexts();

// 向连接列表中添加一个连接
BOOL AddAConnection(CIOCPContext *pContext);

// 插入和移除未决的接受请求
BOOL InsertPendingAccept(CIOCPBuffer *pBuffer);
BOOL RemovePendingAccept(CIOCPBuffer *pBuffer);

//xss,把要发送的数据加入队列,按顺序发送
BOOL PostSendToList(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
//xss,发送下一个需要发送的
BOOL PostNextWriteBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
// 取得下一个要读取的
CIOCPBuffer *GetNextReadBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer);

void ErrorHandle(CIOCPContext *pContext, CIOCPBuffer *pBuffer);//xss,错误集中处理
// 投递接受I/O、发送I/O、接收I/O
BOOL PostAccept(CIOCPBuffer *pBuffer);
BOOL PostSend(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
BOOL PostRecv(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
BOOL PostRecv2(CIOCPContext *pContext, CIOCPBuffer *pBuffer);

void HandleIO(DWORD dwKey, CIOCPBuffer *pBuffer, DWORD dwTrans, int nError);

// 事件通知函数
// 建立了一个新的连接
virtual void OnConnectionEstablished(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
// 一个连接关闭
virtual void OnConnectionClosing(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
// 在一个连接上发生了错误
virtual void OnConnectionError(CIOCPContext *pContext, CIOCPBuffer *pBuffer, int nError);
// 一个连接上的读操作完成
virtual void OnReadCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
// 一个连接上的写操作完成
virtual void OnWriteCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer);

protected:

// 记录空闲结构信息
CIOCPBuffer *m_pFreeBufferList;
CIOCPContext *m_pFreeContextList;
int m_nFreeBufferCount;
int m_nFreeContextCount;
CRITICAL_SECTION m_FreeBufferListLock;
CRITICAL_SECTION m_FreeContextListLock;

CRITICAL_SECTION m_HeapLock;
CRITICAL_SECTION m_RepostLock;

// 记录抛出的Accept请求
CIOCPBuffer *m_pPendingAccepts; // 抛出请求列表。
long m_nPendingAcceptCount;
CRITICAL_SECTION m_PendingAcceptsLock;

// 记录连接列表
CIOCPContext *m_pConnectionList;
int m_nCurrentConnection;
CRITICAL_SECTION m_ConnectionListLock;

// 用于投递Accept请求
HANDLE m_hAcceptEvent;
HANDLE m_hRepostEvent;
LONG m_nRepostCount;

int m_nPort; // 服务器监听的端口

int m_nInitialAccepts;
int m_nInitialReads;
int m_nMaxAccepts;
int m_nMaxSends;
int m_nMaxFreeBuffers;
int m_nMaxFreeContexts;
int m_nMaxConnections;

HANDLE m_hListenThread; // 监听线程
HANDLE m_hCompletion; // 完成端口句柄
SOCKET m_sListen; // 监听套节字句柄
LPFN_ACCEPTEX m_lpfnAcceptEx; // AcceptEx函数地址
LPFN_GETACCEPTEXSOCKADDRS m_lpfnGetAcceptExSockaddrs; // GetAcceptExSockaddrs函数地址

BOOL m_bShutDown; // 用于通知监听线程退出
BOOL m_bServerStarted; // 记录服务是否启动

HANDLE m_hTimerQueue;//xss

private: // 线程函数
static DWORD WINAPI _ListenThreadProc(LPVOID lpParam);
static DWORD WINAPI _WorkerThreadProc(LPVOID lpParam);
};

#endif // __IOCP_H__

////////////////////////////////////////
// IOCP.h文件

#ifndef __IOCP_H__
#define __IOCP_H__

#include <winsock2.h>
#include <windows.h>
#include <Mswsock.h>

#define BUFFER_SIZE 1024*4		// I/O请求的缓冲区大小
#define MAX_THREAD	1			// I/O服务线程的数量

// 这是per-I/O数据。它包含了在套节字上处理I/O操作的必要信息
struct CIOCPBuffer
{
CIOCPBuffer()
{
memset(&ol,0,sizeof(WSAOVERLAPPED));
sClient = INVALID_SOCKET;
memset(buff,0,BUFFER_SIZE);
nLen = 0;
nSequenceNumber = 0;
bIsReleased = FALSE;
nOperation = 0;
pNext = NULL;
}
WSAOVERLAPPED ol;

SOCKET sClient;			// AcceptEx接收的客户方套节字

char buff[BUFFER_SIZE];				// I/O操作使用的缓冲区
int nLen;				// buff缓冲区(使用的)大小

ULONG nSequenceNumber;	// 此I/O的序列号
BOOL  bIsReleased;

int nOperation;			// 操作类型
#define OP_ACCEPT	1
#define OP_WRITE	2
#define OP_READ		3

CIOCPBuffer *pNext;
};
struct CIOCPNextToSend;
struct CIOCPTimerData;
// 这是per-Handle数据。它包含了一个套节字的信息
struct CIOCPContext
{
CIOCPContext()
{
s = INVALID_SOCKET;
memset(&addrLocal,0,sizeof(SOCKADDR_IN));
memset(&addrRemote,0,sizeof(SOCKADDR_IN));
bClosing = FALSE;
nOutstandingRecv = 0;
nOutstandingSend = 0;
nReadSequence = 0;
nCurrentReadSequence = 0;
nCurrentStep = 0;
pOutOfOrderReads = NULL;
pNextToSend = NULL;
bIsReleased = FALSE;
pNext = NULL;
pPreData = NULL;
strcpy(szClientName,"");
hTimer = NULL;
hCompletion = NULL;
}
CIOCPBuffer m_pBuffer;
SOCKET s;						// 套节字句柄

SOCKADDR_IN addrLocal;			// 连接的本地地址
SOCKADDR_IN addrRemote;			// 连接的远程地址

BOOL bClosing;					// 套节字是否关闭

int nOutstandingRecv;			// 此套节字上抛出的重叠操作的数量
int nOutstandingSend;

ULONG nReadSequence;			// 安排给接收的下一个序列号
ULONG nCurrentReadSequence;		// 当前要读的序列号

CIOCPBuffer *pOutOfOrderReads;	// 记录没有按顺序完成的读I/O
CIOCPNextToSend *pNextToSend;       //xss,按顺序发送的下一个要发送的。

LPVOID pPreData; //xss,用于2个过程之间的数据交流。
ULONG  nCurrentStep;//xss,用于记录当前处于的过程步骤数。
BOOL   bIsReleased;

CRITICAL_SECTION Lock;			// 保护这个结构

CIOCPContext *pNext;

char szClientName[256];//xss
HANDLE hTimer;//xss
HANDLE hCompletion;//xss

};

struct CIOCPNextToSend//xss
{
CIOCPBuffer * pBuffer;
CIOCPNextToSend * pNext;
};

struct CIOCPTimerData
{
CIOCPContext* pContext;
HANDLE hCompletion;
};

class CIOCPServer   // 处理线程
{
public:
CIOCPServer();
~CIOCPServer();

// 开始服务
BOOL Start(int nPort = 3456, int nMaxConnections = 2000,
int nMaxFreeBuffers = 200, int nMaxFreeContexts = 100, int nInitialReads = 4);
// 停止服务
void Shutdown();

// 关闭一个连接和关闭所有连接
void CloseAConnection(CIOCPContext *pContext);
void CloseAllConnections();

// 取得当前的连接数量
ULONG GetCurrentConnection() { return m_nCurrentConnection; }

// 向指定客户发送文本
BOOL SendText(CIOCPContext *pContext, char *pszText, int nLen);

protected:

// 申请和释放缓冲区对象
CIOCPBuffer *AllocateBuffer(int nLen);
void ReleaseBuffer(CIOCPBuffer *pBuffer);

// 申请和释放套节字上下文
CIOCPContext *AllocateContext(SOCKET s);
void ReleaseContext(CIOCPContext *pContext);

// 释放空闲缓冲区对象列表和空闲上下文对象列表
void FreeBuffers();
void FreeContexts();

// 向连接列表中添加一个连接
BOOL AddAConnection(CIOCPContext *pContext);

// 插入和移除未决的接受请求
BOOL InsertPendingAccept(CIOCPBuffer *pBuffer);
BOOL RemovePendingAccept(CIOCPBuffer *pBuffer);

//xss,把要发送的数据加入队列,按顺序发送
BOOL PostSendToList(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
//xss,发送下一个需要发送的
BOOL PostNextWriteBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
// 取得下一个要读取的
CIOCPBuffer *GetNextReadBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer);

void ErrorHandle(CIOCPContext *pContext, CIOCPBuffer *pBuffer);//xss,错误集中处理
// 投递接受I/O、发送I/O、接收I/O
BOOL PostAccept(CIOCPBuffer *pBuffer);
BOOL PostSend(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
BOOL PostRecv(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
BOOL PostRecv2(CIOCPContext *pContext, CIOCPBuffer *pBuffer);

void HandleIO(DWORD dwKey, CIOCPBuffer *pBuffer, DWORD dwTrans, int nError);

// 事件通知函数
// 建立了一个新的连接
virtual void OnConnectionEstablished(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
// 一个连接关闭
virtual void OnConnectionClosing(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
// 在一个连接上发生了错误
virtual void OnConnectionError(CIOCPContext *pContext, CIOCPBuffer *pBuffer, int nError);
// 一个连接上的读操作完成
virtual void OnReadCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
// 一个连接上的写操作完成
virtual void OnWriteCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer);

protected:

// 记录空闲结构信息
CIOCPBuffer *m_pFreeBufferList;
CIOCPContext *m_pFreeContextList;
int m_nFreeBufferCount;
int m_nFreeContextCount;
CRITICAL_SECTION m_FreeBufferListLock;
CRITICAL_SECTION m_FreeContextListLock;

CRITICAL_SECTION m_HeapLock;
CRITICAL_SECTION m_RepostLock;

// 记录抛出的Accept请求
CIOCPBuffer *m_pPendingAccepts;   // 抛出请求列表。
long m_nPendingAcceptCount;
CRITICAL_SECTION m_PendingAcceptsLock;

// 记录连接列表
CIOCPContext *m_pConnectionList;
int m_nCurrentConnection;
CRITICAL_SECTION m_ConnectionListLock;

// 用于投递Accept请求
HANDLE m_hAcceptEvent;
HANDLE m_hRepostEvent;
LONG m_nRepostCount;

int m_nPort;				// 服务器监听的端口

int m_nInitialAccepts;
int m_nInitialReads;
int m_nMaxAccepts;
int m_nMaxSends;
int m_nMaxFreeBuffers;
int m_nMaxFreeContexts;
int m_nMaxConnections;

HANDLE m_hListenThread;			// 监听线程
HANDLE m_hCompletion;			// 完成端口句柄
SOCKET m_sListen;				// 监听套节字句柄
LPFN_ACCEPTEX m_lpfnAcceptEx;	// AcceptEx函数地址
LPFN_GETACCEPTEXSOCKADDRS m_lpfnGetAcceptExSockaddrs; // GetAcceptExSockaddrs函数地址

BOOL m_bShutDown;		// 用于通知监听线程退出
BOOL m_bServerStarted;	// 记录服务是否启动

HANDLE m_hTimerQueue;//xss

private:	// 线程函数
static DWORD WINAPI _ListenThreadProc(LPVOID lpParam);
static DWORD WINAPI _WorkerThreadProc(LPVOID lpParam);
};

#endif // __IOCP_H__


cpp文件

[cpp]
view plaincopyprint?

//////////////////////////////////////////////////
// IOCP.cpp文件
#define _WIN32_WINNT 0x0500 //xss

#include "iocp.h"
#pragma comment(lib, "WS2_32.lib")

#include <stdio.h>
#include "httpFun.h"

static int iBufferCount = 0;
static int iContextCount = 0;
CIOCPServer::CIOCPServer()
{
// 列表
m_pFreeBufferList = NULL;
m_pFreeContextList = NULL;
m_pPendingAccepts = NULL;
m_pConnectionList = NULL;

m_nFreeBufferCount = 0;
m_nFreeContextCount = 0;
m_nPendingAcceptCount = 0;
m_nCurrentConnection = 0;

::InitializeCriticalSection(&m_FreeBufferListLock);
::InitializeCriticalSection(&m_FreeContextListLock);
::InitializeCriticalSection(&m_PendingAcceptsLock);
::InitializeCriticalSection(&m_ConnectionListLock);

::InitializeCriticalSection(&m_HeapLock);
::InitializeCriticalSection(&m_RepostLock);

// Accept请求
m_hAcceptEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);
m_hRepostEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);
m_nRepostCount = 0;

m_nPort = 8888;

m_nInitialAccepts = 10;
m_nInitialReads = 4;
m_nMaxAccepts = 100;
m_nMaxSends = 20;
m_nMaxFreeBuffers = 200;
m_nMaxFreeContexts = 100;
m_nMaxConnections = 2000;

m_hListenThread = NULL;
m_hCompletion = NULL;
m_sListen = INVALID_SOCKET;
m_lpfnAcceptEx = NULL;
m_lpfnGetAcceptExSockaddrs = NULL;

m_bShutDown = FALSE;
m_bServerStarted = FALSE;

m_hTimerQueue = ::CreateTimerQueue();

// 初始化WS2_32.dll
WSADATA wsaData;
WORD sockVersion = MAKEWORD(2, 2);
::WSAStartup(sockVersion, &wsaData);
}

CIOCPServer::~CIOCPServer()
{
Shutdown();

if(m_sListen != INVALID_SOCKET)
::closesocket(m_sListen);
if(m_hListenThread != NULL)
::CloseHandle(m_hListenThread);

::CloseHandle(m_hRepostEvent);
::CloseHandle(m_hAcceptEvent);

::DeleteCriticalSection(&m_FreeBufferListLock);
::DeleteCriticalSection(&m_FreeContextListLock);
::DeleteCriticalSection(&m_PendingAcceptsLock);
::DeleteCriticalSection(&m_ConnectionListLock);

::DeleteCriticalSection(&m_HeapLock);
::DeleteCriticalSection(&m_RepostLock);

::DeleteTimerQueue(m_hTimerQueue);//xss
::WSACleanup();
}

///////////////////////////////////////
static VOID CALLBACK TimerRoutine(PVOID lpParam, BOOLEAN TimerOrWaitFired)
{
CIOCPContext* pContext = (CIOCPContext*)lpParam;
if(pContext != NULL && pContext->bClosing == FALSE)
{
EnterCriticalSection(&pContext->Lock);
if(pContext->hCompletion != NULL)
{
PostQueuedCompletionStatus(pContext->hCompletion,-2,(ULONG_PTR)pContext,NULL);
}
LeaveCriticalSection(&pContext->Lock);
}
}

///////////////////////////////////
// 自定义帮助函数

CIOCPBuffer *CIOCPServer::AllocateBuffer(int nLen)
{
CIOCPBuffer *pBuffer = NULL;
if(nLen > BUFFER_SIZE)
return NULL;

// 为缓冲区对象申请内存
::EnterCriticalSection(&m_FreeBufferListLock);
if(m_pFreeBufferList == NULL) // 内存池为空,申请新的内存
{
// pBuffer = (CIOCPBuffer *)::HeapAlloc(GetProcessHeap(),
// HEAP_ZERO_MEMORY, sizeof(CIOCPBuffer) + BUFFER_SIZE);
pBuffer = new CIOCPBuffer();
}
else // 从内存池中取一块来使用
{
pBuffer = m_pFreeBufferList;
m_pFreeBufferList = m_pFreeBufferList->pNext;
pBuffer->pNext = NULL;
m_nFreeBufferCount --;
}
::LeaveCriticalSection(&m_FreeBufferListLock);

EnterCriticalSection(&m_HeapLock);
iBufferCount++;
LeaveCriticalSection(&m_HeapLock);

// 初始化新的缓冲区对象
if(pBuffer != NULL)
{
//pBuffer->buff = (char*)(pBuffer + sizeof(CIOCPBuffer)/*1*/);//xss,个人以为应该+sizeof(CIOCPBuffer);
pBuffer->nLen = nLen;
pBuffer->bIsReleased = FALSE;
}
return pBuffer;
}

void CIOCPServer::ReleaseBuffer(CIOCPBuffer *pBuffer)
{
if(pBuffer == NULL || pBuffer->bIsReleased)
return;

::EnterCriticalSection(&m_FreeBufferListLock);

if(m_nFreeBufferCount <= m_nMaxFreeBuffers) // 将要释放的内存添加到空闲列表中
{
memset(pBuffer, 0, sizeof(CIOCPBuffer) /*+ BUFFER_SIZE*/);
pBuffer->pNext = m_pFreeBufferList;
m_pFreeBufferList = pBuffer;

m_nFreeBufferCount ++ ;

pBuffer->bIsReleased = TRUE;
}
else // 已经达到最大值,真正的释放内存
{
//::HeapFree(::GetProcessHeap(), 0, pBuffer);
delete pBuffer;
}

::LeaveCriticalSection(&m_FreeBufferListLock);

EnterCriticalSection(&m_HeapLock);
iBufferCount--;
LeaveCriticalSection(&m_HeapLock);
}

CIOCPContext *CIOCPServer::AllocateContext(SOCKET s)
{
CIOCPContext *pContext;

// 申请一个CIOCPContext对象
::EnterCriticalSection(&m_FreeContextListLock);

if(m_pFreeContextList == NULL)
{
//pContext = (CIOCPContext *)::HeapAlloc(::GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(CIOCPContext));
pContext = new CIOCPContext();

::InitializeCriticalSection(&pContext->Lock);
}
else
{
// 在空闲列表中申请
pContext = m_pFreeContextList;
m_pFreeContextList = m_pFreeContextList->pNext;
pContext->pNext = NULL;
m_nFreeBufferCount --;
}
::LeaveCriticalSection(&m_FreeContextListLock);

EnterCriticalSection(&m_HeapLock);
iContextCount++;
LeaveCriticalSection(&m_HeapLock);

// 初始化对象成员
if(pContext != NULL)
{
pContext->s = s;
pContext->bIsReleased = FALSE;
}
return pContext;
}

void CIOCPServer::ReleaseContext(CIOCPContext *pContext)
{
if(pContext == NULL || pContext->bIsReleased)
return;

printf("\n%s释放了Context\n\n",pContext->szClientName);
if(pContext->s != INVALID_SOCKET)
::closesocket(pContext->s);

// 首先释放(如果有的话)此套节字上的没有按顺序完成的读I/O的缓冲区
CIOCPBuffer *pNext;
while(pContext->pOutOfOrderReads != NULL)
{
pNext = pContext->pOutOfOrderReads->pNext;
ReleaseBuffer(pContext->pOutOfOrderReads);
pContext->pOutOfOrderReads = pNext;
}

//xss,再释放(如果有的话)此套接字上未完成的写I/O缓冲区
CIOCPNextToSend* pSend = NULL;
while(pContext->pNextToSend != NULL)
{
pSend = pContext->pNextToSend->pNext;
if(pContext->pNextToSend->pBuffer != NULL && pContext->pNextToSend->pBuffer->bIsReleased == FALSE)
{
ReleaseBuffer(pContext->pNextToSend->pBuffer);
}
delete pContext->pNextToSend;
pContext->pNextToSend = pSend;
}

if(pContext->hTimer != NULL)
{
DeleteTimerQueueTimer(m_hTimerQueue,pContext->hTimer,NULL);
pContext->hTimer = NULL;
}

::EnterCriticalSection(&m_FreeContextListLock);

if(m_nFreeContextCount <= m_nMaxFreeContexts) // 添加到空闲列表
{
// 先将关键代码段变量保存到一个临时变量中
CRITICAL_SECTION cstmp = pContext->Lock;
// 将要释放的上下文对象初始化为0
memset(pContext, 0, sizeof(CIOCPContext));

// 再放会关键代码段变量,将要释放的上下文对象添加到空闲列表的表头
pContext->Lock = cstmp;
pContext->pNext = m_pFreeContextList;
m_pFreeContextList = pContext;

// 更新计数
m_nFreeContextCount ++;

pContext->bIsReleased = TRUE;
}
else
{
::DeleteCriticalSection(&pContext->Lock);
//::HeapFree(::GetProcessHeap(), 0, pContext);
delete pContext;
}
::LeaveCriticalSection(&m_FreeContextListLock);

EnterCriticalSection(&m_HeapLock);
iContextCount--;
LeaveCriticalSection(&m_HeapLock);
}

void CIOCPServer::FreeBuffers()
{
// 遍历m_pFreeBufferList空闲列表,释放缓冲区池内存
::EnterCriticalSection(&m_FreeBufferListLock);

CIOCPBuffer *pFreeBuffer = m_pFreeBufferList;
CIOCPBuffer *pNextBuffer;
while(pFreeBuffer != NULL)
{
pNextBuffer = pFreeBuffer->pNext;

delete pFreeBuffer;
// if(!::HeapFree(::GetProcessHeap(), 0, pFreeBuffer))
// {
// #ifdef _DEBUG
// ::OutputDebugString(" FreeBuffers释放内存出错!");
// #endif // _DEBUG
// break;
// }
pFreeBuffer = pNextBuffer;
}
m_pFreeBufferList = NULL;
m_nFreeBufferCount = 0;

::LeaveCriticalSection(&m_FreeBufferListLock);
}

void CIOCPServer::FreeContexts()
{
// 遍历m_pFreeContextList空闲列表,释放缓冲区池内存
::EnterCriticalSection(&m_FreeContextListLock);

CIOCPContext *pFreeContext = m_pFreeContextList;
CIOCPContext *pNextContext;
while(pFreeContext != NULL)
{
pNextContext = pFreeContext->pNext;

::DeleteCriticalSection(&pFreeContext->Lock);
delete pFreeContext;
// if(!::HeapFree(::GetProcessHeap(), 0, pFreeContext))
// {
// #ifdef _DEBUG
// ::OutputDebugString(" FreeBuffers释放内存出错!");
// #endif // _DEBUG
// break;
// }
pFreeContext = pNextContext;
}
m_pFreeContextList = NULL;
m_nFreeContextCount = 0;

::LeaveCriticalSection(&m_FreeContextListLock);
}

BOOL CIOCPServer::AddAConnection(CIOCPContext *pContext)
{
// 向客户连接列表添加一个CIOCPContext对象

::EnterCriticalSection(&m_ConnectionListLock);
if(m_nCurrentConnection <= m_nMaxConnections)
{
// 添加到表头
pContext->pNext = m_pConnectionList;
m_pConnectionList = pContext;
// 更新计数
m_nCurrentConnection ++;

::LeaveCriticalSection(&m_ConnectionListLock);
return TRUE;
}
::LeaveCriticalSection(&m_ConnectionListLock);

return FALSE;
}

void CIOCPServer::CloseAConnection(CIOCPContext *pContext)
{
if(pContext == NULL || pContext->bClosing == TRUE)
return;

// 首先从列表中移除要关闭的连接
::EnterCriticalSection(&m_ConnectionListLock);

CIOCPContext* pTest = m_pConnectionList;
if(pTest == pContext)
{
m_pConnectionList = pContext->pNext;
m_nCurrentConnection --;
}
else
{
while(pTest != NULL && pTest->pNext != pContext)
pTest = pTest->pNext;
if(pTest != NULL)
{
pTest->pNext = pContext->pNext;
m_nCurrentConnection --;
}
}

::LeaveCriticalSection(&m_ConnectionListLock);

// 然后关闭客户套节字
::EnterCriticalSection(&pContext->Lock);

if(pContext->s != INVALID_SOCKET)
{
::closesocket(pContext->s);
pContext->s = INVALID_SOCKET;
}
pContext->bClosing = TRUE;

::LeaveCriticalSection(&pContext->Lock);
}

void CIOCPServer::CloseAllConnections()
{
// 遍历整个连接列表,关闭所有的客户套节字

::EnterCriticalSection(&m_ConnectionListLock);

CIOCPContext *pContext = m_pConnectionList;
while(pContext != NULL)
{
::EnterCriticalSection(&pContext->Lock);

if(pContext->s != INVALID_SOCKET)
{
::closesocket(pContext->s);
pContext->s = INVALID_SOCKET;
}

pContext->bClosing = TRUE;

::LeaveCriticalSection(&pContext->Lock);

pContext = pContext->pNext;
}

m_pConnectionList = NULL;
m_nCurrentConnection = 0;

::LeaveCriticalSection(&m_ConnectionListLock);
}

BOOL CIOCPServer::InsertPendingAccept(CIOCPBuffer *pBuffer)
{
// 将一个I/O缓冲区对象插入到m_pPendingAccepts表中

::EnterCriticalSection(&m_PendingAcceptsLock);

if(m_pPendingAccepts == NULL)
m_pPendingAccepts = pBuffer;
else
{
pBuffer->pNext = m_pPendingAccepts;
m_pPendingAccepts = pBuffer;
}
m_nPendingAcceptCount ++;

::LeaveCriticalSection(&m_PendingAcceptsLock);

return TRUE;
}

BOOL CIOCPServer::RemovePendingAccept(CIOCPBuffer *pBuffer)
{
BOOL bResult = FALSE;

// 遍历m_pPendingAccepts表,从中移除pBuffer所指向的缓冲区对象
::EnterCriticalSection(&m_PendingAcceptsLock);

CIOCPBuffer *pTest = m_pPendingAccepts;
if(pTest == pBuffer) // 如果是表头元素
{
m_pPendingAccepts = pBuffer->pNext;
bResult = TRUE;
}
else // 不是表头元素的话,就要遍历这个表来查找了
{
while(pTest != NULL && pTest->pNext != pBuffer)
pTest = pTest->pNext;
if(pTest != NULL)
{
pTest->pNext = pBuffer->pNext;
bResult = TRUE;
}
}
// 更新计数
if(bResult)
m_nPendingAcceptCount --;

::LeaveCriticalSection(&m_PendingAcceptsLock);

return bResult;
}

void CIOCPServer::ErrorHandle(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
CloseAConnection(pContext);
}

BOOL CIOCPServer::PostSendToList(CIOCPContext *pContext, CIOCPBuffer *pBuffer)//xss
{
::EnterCriticalSection(&pContext->Lock);
CIOCPNextToSend *ptr = pContext->pNextToSend;

CIOCPNextToSend * pSend = new CIOCPNextToSend();
pSend->pBuffer = pBuffer;
pSend->pNext = NULL;
if(ptr == NULL)
{
printf("数据:%10.10s ...,被直接发送。\n",pBuffer->buff);
//::EnterCriticalSection(&pContext->Lock);
pContext->pNextToSend = pSend;
//::LeaveCriticalSection(&pContext->Lock);
if(!PostSend(pContext,pBuffer))//如果没有需要等待的send就直接发送
{
::LeaveCriticalSection(&pContext->Lock);
return FALSE;
}
}
else
{
printf("数据:%10.10s ...,被放入链表结尾。\n",pBuffer->buff);
while(ptr->pNext != NULL)
{
ptr = ptr->pNext;
}
ptr->pNext = pSend;//新的发送请求放在链表结尾
}
::LeaveCriticalSection(&pContext->Lock);
return TRUE;
}

BOOL CIOCPServer::PostNextWriteBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer)//xss
{
::EnterCriticalSection(&pContext->Lock);
CIOCPNextToSend* pSend = pContext->pNextToSend;
CIOCPNextToSend* pNextSend = NULL;
if(pSend != NULL && pSend->pNext != NULL)//发送成功的pBuffer是队列的第一个,发送下一个,pNextToSend指向下一个,pBuffer由外面释放。
{
pNextSend = pSend->pNext;
if(pNextSend->pBuffer != NULL)
{
printf("数据:%10.10s ...从链表中弹出被发送。\n",pNextSend->pBuffer->buff);
if(!PostSend(pContext,pNextSend->pBuffer))
{
delete pSend;
pContext->pNextToSend = pNextSend;
::LeaveCriticalSection(&pContext->Lock);
return FALSE;
}
}
}
if(pSend != NULL)
{
pNextSend = pSend->pNext;
delete pSend;
pContext->pNextToSend = pNextSend;
}
::LeaveCriticalSection(&pContext->Lock);
return TRUE;
}

CIOCPBuffer *CIOCPServer::GetNextReadBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
if(pBuffer != NULL)
{
// 如果与要读的下一个序列号相等,则读这块缓冲区
if(pBuffer->nSequenceNumber == pContext->nCurrentReadSequence)
{
return pBuffer;
}

// 如果不相等,则说明没有按顺序接收数据,将这块缓冲区保存到连接的pOutOfOrderReads列表中

// 列表中的缓冲区是按照其序列号从小到大的顺序排列的

pBuffer->pNext = NULL;

CIOCPBuffer *ptr = pContext->pOutOfOrderReads;
CIOCPBuffer *pPre = NULL;
while(ptr != NULL)
{
if(pBuffer->nSequenceNumber < ptr->nSequenceNumber)
break;

pPre = ptr;
ptr = ptr->pNext;
}

if(pPre == NULL) // 应该插入到表头
{
pBuffer->pNext = pContext->pOutOfOrderReads;
pContext->pOutOfOrderReads = pBuffer;
}
else // 应该插入到表的中间
{
pBuffer->pNext = pPre->pNext;
pPre->pNext = pBuffer/*->pNext*/;//xss,个人觉得应该是pPre->pNext = pBuffer;
}
}

// 检查表头元素的序列号,如果与要读的序列号一致,就将它从表中移除,返回给用户
CIOCPBuffer *ptr = pContext->pOutOfOrderReads;
if(ptr != NULL && (ptr->nSequenceNumber == pContext->nCurrentReadSequence))
{
pContext->pOutOfOrderReads = ptr->pNext;
return ptr;
}
return NULL;
}

BOOL CIOCPServer::PostAccept(CIOCPBuffer *pBuffer) // 在监听套节字上投递Accept请求
{
// 设置I/O类型
pBuffer->nOperation = OP_ACCEPT;

// 投递此重叠I/O
DWORD dwBytes;
pBuffer->sClient = ::WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
BOOL b = m_lpfnAcceptEx(m_sListen,
pBuffer->sClient,
pBuffer->buff,
pBuffer->nLen - ((sizeof(sockaddr_in) + 16) * 2),//xss,第一次都是收一个cmd_header
sizeof(sockaddr_in) + 16,
sizeof(sockaddr_in) + 16,
&dwBytes,
&pBuffer->ol);
if(!b && ::WSAGetLastError() != WSA_IO_PENDING)
{
return FALSE;
}
if(pBuffer->nOperation == 0)
{
int x = 0;
}
return TRUE;
};

BOOL CIOCPServer::PostRecv(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
// 设置I/O类型
pBuffer->nOperation = OP_READ;

::EnterCriticalSection(&pContext->Lock);

// 设置序列号
pBuffer->nSequenceNumber = pContext->nReadSequence;

// 投递此重叠I/O
DWORD dwBytes;
DWORD dwFlags = 0;
WSABUF buf;
buf.buf = pBuffer->buff;
buf.len = pBuffer->nLen;
if(::WSARecv(pContext->s, &buf, 1, &dwBytes, &dwFlags, &pBuffer->ol, NULL) != NO_ERROR)
{
if(::WSAGetLastError() != WSA_IO_PENDING)
{
printf("WSARecv出错:%d\n",WSAGetLastError());
::LeaveCriticalSection(&pContext->Lock);
return FALSE;
}
}

// 增加套节字上的重叠I/O计数和读序列号计数

pContext->nOutstandingRecv ++;
pContext->nReadSequence ++;

::LeaveCriticalSection(&pContext->Lock);

return TRUE;
}

BOOL CIOCPServer::PostSend(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
// 跟踪投递的发送的数量,防止用户仅发送数据而不接收,导致服务器抛出大量发送操作
if(pContext->nOutstandingSend > m_nMaxSends)
return FALSE;

// 设置I/O类型,增加套节字上的重叠I/O计数
pBuffer->nOperation = OP_WRITE;

// 投递此重叠I/O
DWORD dwBytes;
DWORD dwFlags = 0;
WSABUF buf;
buf.buf = pBuffer->buff;
buf.len = pBuffer->nLen;
if(::WSASend(pContext->s,
&buf, 1, &dwBytes, dwFlags, &pBuffer->ol, NULL) != NO_ERROR)
{
int x;
if((x=::WSAGetLastError()) != WSA_IO_PENDING)
{
printf("发送失败!错误码:%d",x);
return FALSE;
}
}
// 增加套节字上的重叠I/O计数

::EnterCriticalSection(&pContext->Lock);
pContext->nOutstandingSend ++;
::LeaveCriticalSection(&pContext->Lock);

if(pBuffer->nOperation == 0)
{
int x = 0;
}
return TRUE;
}

BOOL CIOCPServer::Start(int nPort, int nMaxConnections,
int nMaxFreeBuffers, int nMaxFreeContexts, int nInitialReads)
{
// 检查服务是否已经启动
if(m_bServerStarted)
return FALSE;

// 保存用户参数
m_nPort = nPort;
m_nMaxConnections = nMaxConnections;
m_nMaxFreeBuffers = nMaxFreeBuffers;
m_nMaxFreeContexts = nMaxFreeContexts;
m_nInitialReads = nInitialReads;

// 初始化状态变量
m_bShutDown = FALSE;
m_bServerStarted = TRUE;

// 创建监听套节字,绑定到本地端口,进入监听模式
m_sListen = ::WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
SOCKADDR_IN si;
si.sin_family = AF_INET;
si.sin_port = ::ntohs(m_nPort);
si.sin_addr.S_un.S_addr = INADDR_ANY;
if(::bind(m_sListen, (sockaddr*)&si, sizeof(si)) == SOCKET_ERROR)
{
m_bServerStarted = FALSE;
return FALSE;
}
::listen(m_sListen, 200);

// 创建完成端口对象
m_hCompletion = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);

// 加载扩展函数AcceptEx
GUID GuidAcceptEx = WSAID_ACCEPTEX;
DWORD dwBytes;
::WSAIoctl(m_sListen,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx,
sizeof(GuidAcceptEx),
&m_lpfnAcceptEx,
sizeof(m_lpfnAcceptEx),
&dwBytes,
NULL,
NULL);

// 加载扩展函数GetAcceptExSockaddrs
GUID GuidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
::WSAIoctl(m_sListen,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidGetAcceptExSockaddrs,
sizeof(GuidGetAcceptExSockaddrs),
&m_lpfnGetAcceptExSockaddrs,
sizeof(m_lpfnGetAcceptExSockaddrs),
&dwBytes,
NULL,
NULL
);

// 将监听套节字关联到完成端口,注意,这里为它传递的CompletionKey为0
::CreateIoCompletionPort((HANDLE)m_sListen, m_hCompletion, (DWORD)0, 0);

// 注册FD_ACCEPT事件。
// 如果投递的AcceptEx I/O不够,线程会接收到FD_ACCEPT网络事件,说明应该投递更多的AcceptEx I/O
WSAEventSelect(m_sListen, m_hAcceptEvent, FD_ACCEPT);

// 创建监听线程
m_hListenThread = ::CreateThread(NULL, 0, _ListenThreadProc, this, 0, NULL);

return TRUE;
}

void CIOCPServer::Shutdown()
{
if(!m_bServerStarted)
return;

// 通知监听线程,马上停止服务
m_bShutDown = TRUE;
::SetEvent(m_hAcceptEvent);
// 等待监听线程退出
::WaitForSingleObject(m_hListenThread, INFINITE);
::CloseHandle(m_hListenThread);
m_hListenThread = NULL;

m_bServerStarted = FALSE;
}

DWORD WINAPI CIOCPServer::_ListenThreadProc(LPVOID lpParam)
{
CIOCPServer *pThis = (CIOCPServer*)lpParam;

// 先在监听套节字上投递几个Accept I/O
CIOCPBuffer *pBuffer;
for(int i=0; i<pThis->m_nInitialAccepts; i++)
{
pBuffer = pThis->AllocateBuffer(BUFFER_SIZE);//xss,BUFFER_SIZE
if(pBuffer == NULL)
return -1;
pThis->InsertPendingAccept(pBuffer);
pThis->PostAccept(pBuffer);
}

// 构建事件对象数组,以便在上面调用WSAWaitForMultipleEvents函数
HANDLE hWaitEvents[2 + MAX_THREAD];
int nEventCount = 0;
hWaitEvents[nEventCount ++] = pThis->m_hAcceptEvent;
hWaitEvents[nEventCount ++] = pThis->m_hRepostEvent;

// 创建指定数量的工作线程在完成端口上处理I/O
for(int i=0; i<MAX_THREAD; i++)
{
hWaitEvents[nEventCount ++] = ::CreateThread(NULL, 0, _WorkerThreadProc, pThis, 0, NULL);
}

// 下面进入无限循环,处理事件对象数组中的事件
while(TRUE)
{
int nIndex = ::WSAWaitForMultipleEvents(nEventCount, hWaitEvents, FALSE, 60*1000, FALSE);

// 首先检查是否要停止服务
if(pThis->m_bShutDown || nIndex == WSA_WAIT_FAILED)
{
// 关闭所有连接
pThis->CloseAllConnections();
::Sleep(0); // 给I/O工作线程一个执行的机会
// 关闭监听套节字
::closesocket(pThis->m_sListen);
pThis->m_sListen = INVALID_SOCKET;
::Sleep(0); // 给I/O工作线程一个执行的机会

// 通知所有I/O处理线程退出
for(int i=2; i<MAX_THREAD + 2; i++)
{
::PostQueuedCompletionStatus(pThis->m_hCompletion, -1, 0, NULL);
}

// 等待I/O处理线程退出
::WaitForMultipleObjects(MAX_THREAD, &hWaitEvents[2], TRUE, 5*1000);

for(int i=2; i<MAX_THREAD + 2; i++)
{
::CloseHandle(hWaitEvents[i]);
}

::CloseHandle(pThis->m_hCompletion);

pThis->FreeBuffers();
pThis->FreeContexts();
::ExitThread(0);
}

// 1)定时检查所有未返回的AcceptEx I/O的连接建立了多长时间
if(nIndex == WSA_WAIT_TIMEOUT)
{
pBuffer = pThis->m_pPendingAccepts;
while(pBuffer != NULL)
{
int nSeconds;
int nLen = sizeof(nSeconds);
// 取得连接建立的时间
::getsockopt(pBuffer->sClient,
SOL_SOCKET, SO_CONNECT_TIME, (char *)&nSeconds, &nLen);
// 如果超过2分钟客户还不发送初始数据,就让这个客户go away
if(nSeconds != -1 && nSeconds > /*2*60*/50)
{
closesocket(pBuffer->sClient);
pBuffer->sClient = INVALID_SOCKET;
}

pBuffer = pBuffer->pNext;
}
}
else
{
nIndex = nIndex - WAIT_OBJECT_0;
WSANETWORKEVENTS ne;
int nLimit=0;
if(nIndex == 0) // 2)m_hAcceptEvent事件对象受信,说明投递的Accept请求不够,需要增加
{
::WSAEnumNetworkEvents(pThis->m_sListen, hWaitEvents[nIndex], &ne);
if(ne.lNetworkEvents & FD_ACCEPT)
{
nLimit = 50; // 增加的个数,这里设为50个
}
}
else if(nIndex == 1) // 3)m_hRepostEvent事件对象受信,说明处理I/O的线程接受到新的客户
{
nLimit = InterlockedExchange(&pThis->m_nRepostCount, 0);
}
else if(nIndex > 1) // I/O服务线程退出,说明有错误发生,关闭服务器
{
pThis->m_bShutDown = TRUE;
continue;
}

// 投递nLimit个AcceptEx I/O请求
int i = 0;
while(i++ < nLimit && pThis->m_nPendingAcceptCount < pThis->m_nMaxAccepts)
{
pBuffer = pThis->AllocateBuffer(BUFFER_SIZE);
if(pBuffer != NULL)
{
pThis->InsertPendingAccept(pBuffer);
pThis->PostAccept(pBuffer);
}
}
}
}
return 0;
}

DWORD WINAPI CIOCPServer::_WorkerThreadProc(LPVOID lpParam)
{
#ifdef _DEBUG
::OutputDebugString(" WorkerThread 启动... \n");
#endif // _DEBUG

CIOCPServer *pThis = (CIOCPServer*)lpParam;

CIOCPBuffer *pBuffer = NULL;
DWORD dwKey;
DWORD dwTrans;
LPOVERLAPPED lpol;

while(TRUE)
{
// 在关联到此完成端口的所有套节字上等待I/O完成
BOOL bOK = ::GetQueuedCompletionStatus(pThis->m_hCompletion,
&dwTrans, (LPDWORD)&dwKey, (LPOVERLAPPED*)&lpol, WSA_INFINITE);

if(dwTrans == -1) // 用户通知退出
{
#ifdef _DEBUG
::OutputDebugString(" WorkerThread 退出 \n");
#endif // _DEBUG
::ExitThread(0);
}
if(dwTrans != -2)
pBuffer = CONTAINING_RECORD(lpol, CIOCPBuffer, ol);
int nError = NO_ERROR;
if(!bOK) // 在此套节字上有错误发生
{
printf("完成端口套接字上有错误:%d\n",GetLastError());
SOCKET s;
if(pBuffer->nOperation == OP_ACCEPT)
{
s = pThis->m_sListen;
}
else
{
if(dwKey == 0)
break;
s = ((CIOCPContext*)dwKey)->s;
}
DWORD dwFlags = 0;
if(!::WSAGetOverlappedResult(s, &pBuffer->ol, &dwTrans, FALSE, &dwFlags))
{
nError = ::WSAGetLastError();
}
}
pThis->HandleIO(dwKey, pBuffer, dwTrans, nError);
printf("Buffer:%d Context:%d\n",iBufferCount,iContextCount);
}

#ifdef _DEBUG
::OutputDebugString(" WorkerThread 退出 \n");
#endif // _DEBUG
return 0;
}

int g_x = 0;
void CIOCPServer::HandleIO(DWORD dwKey, CIOCPBuffer *pBuffer, DWORD dwTrans, int nError)
{
CIOCPContext *pContext = (CIOCPContext *)dwKey;

#ifdef _DEBUG
::OutputDebugString(" HandleIO... \n");
#endif // _DEBUG

// 1)首先减少套节字上的未决I/O计数
if(dwTrans == -2)
{
CloseAConnection(pContext);
return;
}
if(pContext != NULL)
{
::EnterCriticalSection(&pContext->Lock);

if(pBuffer->nOperation == OP_READ)
pContext->nOutstandingRecv --;
else if(pBuffer->nOperation == OP_WRITE)
pContext->nOutstandingSend --;

::LeaveCriticalSection(&pContext->Lock);

// 2)检查套节字是否已经被我们关闭
if(pContext->bClosing)
{
#ifdef _DEBUG
::OutputDebugString(" 检查到套节字已经被我们关闭 \n");
#endif // _DEBUG
if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
pContext = NULL;
}
// 释放已关闭套节字的未决I/O
ReleaseBuffer(pBuffer);
pBuffer = NULL;
return;
}
}
else
{
RemovePendingAccept(pBuffer);
}

// 3)检查套节字上发生的错误,如果有的话,通知用户,然后关闭套节字
if(nError != NO_ERROR)
{
if(pBuffer->nOperation != OP_ACCEPT)
{
OnConnectionError(pContext, pBuffer, nError);
CloseAConnection(pContext);
if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
pContext = NULL;
}
#ifdef _DEBUG
::OutputDebugString(" 检查到客户套节字上发生错误 \n");
#endif // _DEBUG
}
else // 在监听套节字上发生错误,也就是监听套节字处理的客户出错了
{
// 客户端出错,释放I/O缓冲区
if(pBuffer->sClient != INVALID_SOCKET)
{
::closesocket(pBuffer->sClient);
pBuffer->sClient = INVALID_SOCKET;
}
#ifdef _DEBUG
::OutputDebugString(" 检查到监听套节字上发生错误 \n");
#endif // _DEBUG
}

ReleaseBuffer(pBuffer);
pBuffer = NULL;
return;
}

// 开始处理
if(pBuffer->nOperation == OP_ACCEPT)
{
if(dwTrans == 0)
{
#ifdef _DEBUG
::OutputDebugString(" 监听套节字上客户端关闭 \n");
#endif // _DEBUG

if(pBuffer->sClient != INVALID_SOCKET)
{
::closesocket(pBuffer->sClient);
pBuffer->sClient = INVALID_SOCKET;
}
}
else
{
// 为新接受的连接申请客户上下文对象
CIOCPContext *pClient = AllocateContext(pBuffer->sClient);
if(pClient != NULL)
{
if(AddAConnection(pClient))
{
// 取得客户地址
int nLocalLen, nRmoteLen;
LPSOCKADDR pLocalAddr, pRemoteAddr;
m_lpfnGetAcceptExSockaddrs(
pBuffer->buff,
pBuffer->nLen - (sizeof(sockaddr_in) + 16) * 2/*sizeof(cmd_header)*/,
sizeof(sockaddr_in) + 16,
sizeof(sockaddr_in) + 16,
(SOCKADDR **)&pLocalAddr,
&nLocalLen,
(SOCKADDR **)&pRemoteAddr,
&nRmoteLen);
memcpy(&pClient->addrLocal, pLocalAddr, nLocalLen);
memcpy(&pClient->addrRemote, pRemoteAddr, nRmoteLen);

// 关联新连接到完成端口对象
::CreateIoCompletionPort((HANDLE)pClient->s, m_hCompletion, (DWORD)pClient, 0);

// 通知用户
pBuffer->nLen = dwTrans;
OnConnectionEstablished(pClient, pBuffer);

if(pClient->bClosing && pClient->nOutstandingRecv == 0 && pClient->nOutstandingSend == 0)
{
ReleaseContext(pClient);
pContext = NULL;
}
else if(pClient->hTimer == NULL)//接收一个客户端的同时创建一个检测I/O超时的Timer
{
pClient->hCompletion = m_hCompletion;
CreateTimerQueueTimer(&pClient->hTimer,m_hTimerQueue,(WAITORTIMERCALLBACK)TimerRoutine,(PVOID)pClient,60*1000,0,0);
}

// 向新连接投递Read请求或者Write请求,直接关闭这些空间在套节字关闭或出错时释放
// CIOCPBuffer *p = AllocateBuffer(BUFFER_SIZE);
// if(p != NULL)
// {
// if(!PostRecv(pClient, p))
// {
// CloseAConnection(pClient);
// }
// }

}
else // 连接数量已满,关闭连接
{
CloseAConnection(pClient);
ReleaseContext(pClient);
pContext = NULL;
}
}
else
{
// 资源不足,关闭与客户的连接即可
::closesocket(pBuffer->sClient);
pBuffer->sClient = INVALID_SOCKET;
}
}

// Accept请求完成,释放I/O缓冲区
ReleaseBuffer(pBuffer);
pBuffer = NULL;

// 通知监听线程继续再投递一个Accept请求
::InterlockedIncrement(&m_nRepostCount);

::SetEvent(m_hRepostEvent);
}
else if(pBuffer->nOperation == OP_READ)
{
if(dwTrans == 0) // 对方关闭套节字
{
// 先通知用户
pBuffer->nLen = 0;
OnConnectionClosing(pContext, pBuffer);
// 再关闭连接
CloseAConnection(pContext);
// 释放客户上下文和缓冲区对象
if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
pContext = NULL;
}
ReleaseBuffer(pBuffer);
pBuffer = NULL;
}
else
{
pBuffer->nLen = dwTrans;
// 按照I/O投递的顺序读取接收到的数据
CIOCPBuffer *p = GetNextReadBuffer(pContext, pBuffer);
while(p != NULL)
{
// 通知用户
OnReadCompleted(pContext, p);
// 增加要读的序列号的值
::InterlockedIncrement((LONG*)&pContext->nCurrentReadSequence);
// 释放这个已完成的I/O
ReleaseBuffer(p);
p = GetNextReadBuffer(pContext, NULL);
}

if(pContext->bClosing && pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
pContext = NULL;
}
else if(pContext->hTimer != NULL)
{
ChangeTimerQueueTimer(m_hTimerQueue,pContext->hTimer,60*1000,0);//重置监视时间,当一个投递完成后,60s内无任何交互则断开。
}

// 继续投递一个新的接收请求
// pBuffer = AllocateBuffer(BUFFER_SIZE);
//if(pBuffer == NULL || !PostRecv(pContext, pBuffer))
//{
// CloseAConnection(pContext);
//}
}
}
else if(pBuffer->nOperation == OP_WRITE)
{

if(dwTrans == 0) // 对方关闭套节字
{
// 先通知用户
pBuffer->nLen = 0;
OnConnectionClosing(pContext, pBuffer);

// 再关闭连接
CloseAConnection(pContext);

// 释放客户上下文和缓冲区对象
if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
pContext = NULL;
}
ReleaseBuffer(pBuffer);
pBuffer = NULL;
}
else
{
if(pContext->bClosing && pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
pContext = NULL;
ReleaseBuffer(pBuffer);
pBuffer = NULL;
return;
}
else if(pContext->hTimer != NULL)
{
ChangeTimerQueueTimer(m_hTimerQueue,pContext->hTimer,60*1000,0);
}

// 写操作完成,通知用户
if(dwTrans < pBuffer->nLen)//如果此send没有发送完全,则发送剩下的部分(此部分如果还是没发完全,这里同样进行)
{
printf("send未发送完全,发送:%d,总长度:%d\n",dwTrans,pBuffer->nLen);
CIOCPBuffer* p = AllocateBuffer(pBuffer->nLen - dwTrans);
if(p != NULL)
memcpy(p->buff,pBuffer->buff + dwTrans,pBuffer->nLen - dwTrans);
if(p == NULL || !PostSend(pContext,p))
{
CloseAConnection(pContext);
return;
}
}
else
{
if(!PostNextWriteBuffer(pContext,pBuffer))
{
CloseAConnection(pContext);
return;
}
}
pBuffer->nLen = dwTrans;
OnWriteCompleted(pContext, pBuffer);
if(pContext->bClosing && pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
pContext = NULL;
}
// 释放SendText函数申请的缓冲区
ReleaseBuffer(pBuffer);
pBuffer = NULL;
}
}
}

BOOL CIOCPServer::SendText(CIOCPContext *pContext, char *pszText, int nLen)
{
CIOCPBuffer *pBuffer = AllocateBuffer(nLen);
if(pBuffer != NULL)
{
memcpy(pBuffer->buff, pszText, nLen);
return PostSend(pContext, pBuffer);
}
return FALSE;
}

//投递接收请求示例
//CIOCPBuffer *p = AllocateBuffer(BUFFER_SIZE);
//if(p != NULL)
//{
// if(!PostRecv(pContext, p))
// {
// CloseAConnection(pContext);
// }
//}
//投递发送请求示例
//CIOCPBuffer *p = AllocateBuffer(BUFFER_SIZE);
//if(p != NULL)
//{
// if(!PostSendToList(pContext, p))
// {
// CloseAConnection(pContext);
// }
//}
void CIOCPServer::OnConnectionEstablished(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
//连接建立,且第一次数据接收完成。
//接下来可以根据业务逻辑,PostRecv收或者PostSendToList发或者CloseAConnection(pContext)关闭连接
}

void CIOCPServer::OnConnectionClosing(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
}

void CIOCPServer::OnReadCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
//一次数据接收完成。
//接下来可以根据业务逻辑,PostRecv收或者PostSendToList发或者CloseAConnection(pContext)关闭连接
}

void CIOCPServer::OnWriteCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
//一次数据发送完成。
//接下来可以根据业务逻辑,PostRecv收或者PostSendToList发或者CloseAConnection(pContext)关闭连接
}

void CIOCPServer::OnConnectionError(CIOCPContext *pContext, CIOCPBuffer *pBuffer, int nError)
{
}

//////////////////////////////////////////////////
// IOCP.cpp文件
#define _WIN32_WINNT 0x0500 //xss

#include "iocp.h"
#pragma comment(lib, "WS2_32.lib")

#include <stdio.h>
#include "httpFun.h"

static int iBufferCount = 0;
static int iContextCount = 0;
CIOCPServer::CIOCPServer()
{
// 列表
m_pFreeBufferList = NULL;
m_pFreeContextList = NULL;
m_pPendingAccepts = NULL;
m_pConnectionList = NULL;

m_nFreeBufferCount = 0;
m_nFreeContextCount = 0;
m_nPendingAcceptCount = 0;
m_nCurrentConnection = 0;

::InitializeCriticalSection(&m_FreeBufferListLock);
::InitializeCriticalSection(&m_FreeContextListLock);
::InitializeCriticalSection(&m_PendingAcceptsLock);
::InitializeCriticalSection(&m_ConnectionListLock);

::InitializeCriticalSection(&m_HeapLock);
::InitializeCriticalSection(&m_RepostLock);

// Accept请求
m_hAcceptEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);
m_hRepostEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);
m_nRepostCount = 0;

m_nPort = 8888;

m_nInitialAccepts = 10;
m_nInitialReads = 4;
m_nMaxAccepts = 100;
m_nMaxSends = 20;
m_nMaxFreeBuffers = 200;
m_nMaxFreeContexts = 100;
m_nMaxConnections = 2000;

m_hListenThread = NULL;
m_hCompletion = NULL;
m_sListen = INVALID_SOCKET;
m_lpfnAcceptEx = NULL;
m_lpfnGetAcceptExSockaddrs = NULL;

m_bShutDown = FALSE;
m_bServerStarted = FALSE;

m_hTimerQueue = ::CreateTimerQueue();

// 初始化WS2_32.dll
WSADATA wsaData;
WORD sockVersion = MAKEWORD(2, 2);
::WSAStartup(sockVersion, &wsaData);
}

CIOCPServer::~CIOCPServer()
{
Shutdown();

if(m_sListen != INVALID_SOCKET)
::closesocket(m_sListen);
if(m_hListenThread != NULL)
::CloseHandle(m_hListenThread);

::CloseHandle(m_hRepostEvent);
::CloseHandle(m_hAcceptEvent);

::DeleteCriticalSection(&m_FreeBufferListLock);
::DeleteCriticalSection(&m_FreeContextListLock);
::DeleteCriticalSection(&m_PendingAcceptsLock);
::DeleteCriticalSection(&m_ConnectionListLock);

::DeleteCriticalSection(&m_HeapLock);
::DeleteCriticalSection(&m_RepostLock);

::DeleteTimerQueue(m_hTimerQueue);//xss
::WSACleanup();
}

///////////////////////////////////////
static VOID CALLBACK TimerRoutine(PVOID lpParam, BOOLEAN TimerOrWaitFired)
{
CIOCPContext* pContext = (CIOCPContext*)lpParam;
if(pContext != NULL && pContext->bClosing == FALSE)
{
EnterCriticalSection(&pContext->Lock);
if(pContext->hCompletion != NULL)
{
PostQueuedCompletionStatus(pContext->hCompletion,-2,(ULONG_PTR)pContext,NULL);
}
LeaveCriticalSection(&pContext->Lock);
}
}

///////////////////////////////////
// 自定义帮助函数

CIOCPBuffer *CIOCPServer::AllocateBuffer(int nLen)
{
CIOCPBuffer *pBuffer = NULL;
if(nLen > BUFFER_SIZE)
return NULL;

// 为缓冲区对象申请内存
::EnterCriticalSection(&m_FreeBufferListLock);
if(m_pFreeBufferList == NULL)  // 内存池为空,申请新的内存
{
// 		pBuffer = (CIOCPBuffer *)::HeapAlloc(GetProcessHeap(),
// 						HEAP_ZERO_MEMORY, sizeof(CIOCPBuffer) + BUFFER_SIZE);
pBuffer = new CIOCPBuffer();
}
else	// 从内存池中取一块来使用
{
pBuffer = m_pFreeBufferList;
m_pFreeBufferList = m_pFreeBufferList->pNext;
pBuffer->pNext = NULL;
m_nFreeBufferCount --;
}
::LeaveCriticalSection(&m_FreeBufferListLock);

EnterCriticalSection(&m_HeapLock);
iBufferCount++;
LeaveCriticalSection(&m_HeapLock);

// 初始化新的缓冲区对象
if(pBuffer != NULL)
{
//pBuffer->buff = (char*)(pBuffer + sizeof(CIOCPBuffer)/*1*/);//xss,个人以为应该+sizeof(CIOCPBuffer);
pBuffer->nLen = nLen;
pBuffer->bIsReleased = FALSE;
}
return pBuffer;
}

void CIOCPServer::ReleaseBuffer(CIOCPBuffer *pBuffer)
{
if(pBuffer == NULL || pBuffer->bIsReleased)
return;

::EnterCriticalSection(&m_FreeBufferListLock);

if(m_nFreeBufferCount <= m_nMaxFreeBuffers)	// 将要释放的内存添加到空闲列表中
{
memset(pBuffer, 0, sizeof(CIOCPBuffer) /*+ BUFFER_SIZE*/);
pBuffer->pNext = m_pFreeBufferList;
m_pFreeBufferList = pBuffer;

m_nFreeBufferCount ++ ;

pBuffer->bIsReleased = TRUE;
}
else			// 已经达到最大值,真正的释放内存
{
//::HeapFree(::GetProcessHeap(), 0, pBuffer);
delete pBuffer;
}

::LeaveCriticalSection(&m_FreeBufferListLock);

EnterCriticalSection(&m_HeapLock);
iBufferCount--;
LeaveCriticalSection(&m_HeapLock);
}

CIOCPContext *CIOCPServer::AllocateContext(SOCKET s)
{
CIOCPContext *pContext;

// 申请一个CIOCPContext对象
::EnterCriticalSection(&m_FreeContextListLock);

if(m_pFreeContextList == NULL)
{
//pContext = (CIOCPContext *)::HeapAlloc(::GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(CIOCPContext));
pContext = new CIOCPContext();

::InitializeCriticalSection(&pContext->Lock);
}
else
{
// 在空闲列表中申请
pContext = m_pFreeContextList;
m_pFreeContextList = m_pFreeContextList->pNext;
pContext->pNext = NULL;
m_nFreeBufferCount --;
}
::LeaveCriticalSection(&m_FreeContextListLock);

EnterCriticalSection(&m_HeapLock);
iContextCount++;
LeaveCriticalSection(&m_HeapLock);

// 初始化对象成员
if(pContext != NULL)
{
pContext->s = s;
pContext->bIsReleased = FALSE;
}
return pContext;
}

void CIOCPServer::ReleaseContext(CIOCPContext *pContext)
{
if(pContext == NULL || pContext->bIsReleased)
return;

printf("\n%s释放了Context\n\n",pContext->szClientName);
if(pContext->s != INVALID_SOCKET)
::closesocket(pContext->s);

// 首先释放(如果有的话)此套节字上的没有按顺序完成的读I/O的缓冲区
CIOCPBuffer *pNext;
while(pContext->pOutOfOrderReads != NULL)
{
pNext = pContext->pOutOfOrderReads->pNext;
ReleaseBuffer(pContext->pOutOfOrderReads);
pContext->pOutOfOrderReads = pNext;
}

//xss,再释放(如果有的话)此套接字上未完成的写I/O缓冲区
CIOCPNextToSend* pSend = NULL;
while(pContext->pNextToSend != NULL)
{
pSend = pContext->pNextToSend->pNext;
if(pContext->pNextToSend->pBuffer != NULL && pContext->pNextToSend->pBuffer->bIsReleased == FALSE)
{
ReleaseBuffer(pContext->pNextToSend->pBuffer);
}
delete pContext->pNextToSend;
pContext->pNextToSend = pSend;
}

if(pContext->hTimer != NULL)
{
DeleteTimerQueueTimer(m_hTimerQueue,pContext->hTimer,NULL);
pContext->hTimer = NULL;
}

::EnterCriticalSection(&m_FreeContextListLock);

if(m_nFreeContextCount <= m_nMaxFreeContexts) // 添加到空闲列表
{
// 先将关键代码段变量保存到一个临时变量中
CRITICAL_SECTION cstmp = pContext->Lock;
// 将要释放的上下文对象初始化为0
memset(pContext, 0, sizeof(CIOCPContext));

// 再放会关键代码段变量,将要释放的上下文对象添加到空闲列表的表头
pContext->Lock = cstmp;
pContext->pNext = m_pFreeContextList;
m_pFreeContextList = pContext;

// 更新计数
m_nFreeContextCount ++;

pContext->bIsReleased = TRUE;
}
else
{
::DeleteCriticalSection(&pContext->Lock);
//::HeapFree(::GetProcessHeap(), 0, pContext);
delete pContext;
}
::LeaveCriticalSection(&m_FreeContextListLock);

EnterCriticalSection(&m_HeapLock);
iContextCount--;
LeaveCriticalSection(&m_HeapLock);
}

void CIOCPServer::FreeBuffers()
{
// 遍历m_pFreeBufferList空闲列表,释放缓冲区池内存
::EnterCriticalSection(&m_FreeBufferListLock);

CIOCPBuffer *pFreeBuffer = m_pFreeBufferList;
CIOCPBuffer *pNextBuffer;
while(pFreeBuffer != NULL)
{
pNextBuffer = pFreeBuffer->pNext;

delete pFreeBuffer;
// 		if(!::HeapFree(::GetProcessHeap(), 0, pFreeBuffer))
// 		{
// #ifdef _DEBUG
// 			::OutputDebugString("  FreeBuffers释放内存出错!");
// #endif // _DEBUG
// 			break;
// 		}
pFreeBuffer = pNextBuffer;
}
m_pFreeBufferList = NULL;
m_nFreeBufferCount = 0;

::LeaveCriticalSection(&m_FreeBufferListLock);
}

void CIOCPServer::FreeContexts()
{
// 遍历m_pFreeContextList空闲列表,释放缓冲区池内存
::EnterCriticalSection(&m_FreeContextListLock);

CIOCPContext *pFreeContext = m_pFreeContextList;
CIOCPContext *pNextContext;
while(pFreeContext != NULL)
{
pNextContext = pFreeContext->pNext;

::DeleteCriticalSection(&pFreeContext->Lock);
delete pFreeContext;
// 		if(!::HeapFree(::GetProcessHeap(), 0, pFreeContext))
// 		{
// #ifdef _DEBUG
// 			::OutputDebugString("  FreeBuffers释放内存出错!");
// #endif // _DEBUG
// 			break;
// 		}
pFreeContext = pNextContext;
}
m_pFreeContextList = NULL;
m_nFreeContextCount = 0;

::LeaveCriticalSection(&m_FreeContextListLock);
}

BOOL CIOCPServer::AddAConnection(CIOCPContext *pContext)
{
// 向客户连接列表添加一个CIOCPContext对象

::EnterCriticalSection(&m_ConnectionListLock);
if(m_nCurrentConnection <= m_nMaxConnections)
{
// 添加到表头
pContext->pNext = m_pConnectionList;
m_pConnectionList = pContext;
// 更新计数
m_nCurrentConnection ++;

::LeaveCriticalSection(&m_ConnectionListLock);
return TRUE;
}
::LeaveCriticalSection(&m_ConnectionListLock);

return FALSE;
}

void CIOCPServer::CloseAConnection(CIOCPContext *pContext)
{
if(pContext == NULL || pContext->bClosing == TRUE)
return;

// 首先从列表中移除要关闭的连接
::EnterCriticalSection(&m_ConnectionListLock);

CIOCPContext* pTest = m_pConnectionList;
if(pTest == pContext)
{
m_pConnectionList =  pContext->pNext;
m_nCurrentConnection --;
}
else
{
while(pTest != NULL && pTest->pNext !=  pContext)
pTest = pTest->pNext;
if(pTest != NULL)
{
pTest->pNext =  pContext->pNext;
m_nCurrentConnection --;
}
}

::LeaveCriticalSection(&m_ConnectionListLock);

// 然后关闭客户套节字
::EnterCriticalSection(&pContext->Lock);

if(pContext->s != INVALID_SOCKET)
{
::closesocket(pContext->s);
pContext->s = INVALID_SOCKET;
}
pContext->bClosing = TRUE;

::LeaveCriticalSection(&pContext->Lock);
}

void CIOCPServer::CloseAllConnections()
{
// 遍历整个连接列表,关闭所有的客户套节字

::EnterCriticalSection(&m_ConnectionListLock);

CIOCPContext *pContext = m_pConnectionList;
while(pContext != NULL)
{
::EnterCriticalSection(&pContext->Lock);

if(pContext->s != INVALID_SOCKET)
{
::closesocket(pContext->s);
pContext->s = INVALID_SOCKET;
}

pContext->bClosing = TRUE;

::LeaveCriticalSection(&pContext->Lock);

pContext = pContext->pNext;
}

m_pConnectionList = NULL;
m_nCurrentConnection = 0;

::LeaveCriticalSection(&m_ConnectionListLock);
}

BOOL CIOCPServer::InsertPendingAccept(CIOCPBuffer *pBuffer)
{
// 将一个I/O缓冲区对象插入到m_pPendingAccepts表中

::EnterCriticalSection(&m_PendingAcceptsLock);

if(m_pPendingAccepts == NULL)
m_pPendingAccepts = pBuffer;
else
{
pBuffer->pNext = m_pPendingAccepts;
m_pPendingAccepts = pBuffer;
}
m_nPendingAcceptCount ++;

::LeaveCriticalSection(&m_PendingAcceptsLock);

return TRUE;
}

BOOL CIOCPServer::RemovePendingAccept(CIOCPBuffer *pBuffer)
{
BOOL bResult = FALSE;

// 遍历m_pPendingAccepts表,从中移除pBuffer所指向的缓冲区对象
::EnterCriticalSection(&m_PendingAcceptsLock);

CIOCPBuffer *pTest = m_pPendingAccepts;
if(pTest == pBuffer)	// 如果是表头元素
{
m_pPendingAccepts = pBuffer->pNext;
bResult = TRUE;
}
else					// 不是表头元素的话,就要遍历这个表来查找了
{
while(pTest != NULL && pTest->pNext != pBuffer)
pTest = pTest->pNext;
if(pTest != NULL)
{
pTest->pNext = pBuffer->pNext;
bResult = TRUE;
}
}
// 更新计数
if(bResult)
m_nPendingAcceptCount --;

::LeaveCriticalSection(&m_PendingAcceptsLock);

return  bResult;
}

void CIOCPServer::ErrorHandle(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
CloseAConnection(pContext);
}

BOOL CIOCPServer::PostSendToList(CIOCPContext *pContext, CIOCPBuffer *pBuffer)//xss
{
::EnterCriticalSection(&pContext->Lock);
CIOCPNextToSend *ptr = pContext->pNextToSend;

CIOCPNextToSend * pSend = new CIOCPNextToSend();
pSend->pBuffer = pBuffer;
pSend->pNext = NULL;
if(ptr == NULL)
{
printf("数据:%10.10s ...,被直接发送。\n",pBuffer->buff);
//::EnterCriticalSection(&pContext->Lock);
pContext->pNextToSend = pSend;
//::LeaveCriticalSection(&pContext->Lock);
if(!PostSend(pContext,pBuffer))//如果没有需要等待的send就直接发送
{
::LeaveCriticalSection(&pContext->Lock);
return FALSE;
}
}
else
{
printf("数据:%10.10s ...,被放入链表结尾。\n",pBuffer->buff);
while(ptr->pNext != NULL)
{
ptr = ptr->pNext;
}
ptr->pNext = pSend;//新的发送请求放在链表结尾
}
::LeaveCriticalSection(&pContext->Lock);
return TRUE;
}

BOOL CIOCPServer::PostNextWriteBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer)//xss
{
::EnterCriticalSection(&pContext->Lock);
CIOCPNextToSend* pSend = pContext->pNextToSend;
CIOCPNextToSend* pNextSend = NULL;
if(pSend != NULL && pSend->pNext != NULL)//发送成功的pBuffer是队列的第一个,发送下一个,pNextToSend指向下一个,pBuffer由外面释放。
{
pNextSend = pSend->pNext;
if(pNextSend->pBuffer != NULL)
{
printf("数据:%10.10s ...从链表中弹出被发送。\n",pNextSend->pBuffer->buff);
if(!PostSend(pContext,pNextSend->pBuffer))
{
delete pSend;
pContext->pNextToSend = pNextSend;
::LeaveCriticalSection(&pContext->Lock);
return FALSE;
}
}
}
if(pSend != NULL)
{
pNextSend = pSend->pNext;
delete pSend;
pContext->pNextToSend = pNextSend;
}
::LeaveCriticalSection(&pContext->Lock);
return TRUE;
}

CIOCPBuffer *CIOCPServer::GetNextReadBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
if(pBuffer != NULL)
{
// 如果与要读的下一个序列号相等,则读这块缓冲区
if(pBuffer->nSequenceNumber == pContext->nCurrentReadSequence)
{
return pBuffer;
}

// 如果不相等,则说明没有按顺序接收数据,将这块缓冲区保存到连接的pOutOfOrderReads列表中

// 列表中的缓冲区是按照其序列号从小到大的顺序排列的

pBuffer->pNext = NULL;

CIOCPBuffer *ptr = pContext->pOutOfOrderReads;
CIOCPBuffer *pPre = NULL;
while(ptr != NULL)
{
if(pBuffer->nSequenceNumber < ptr->nSequenceNumber)
break;

pPre = ptr;
ptr = ptr->pNext;
}

if(pPre == NULL) // 应该插入到表头
{
pBuffer->pNext = pContext->pOutOfOrderReads;
pContext->pOutOfOrderReads = pBuffer;
}
else			// 应该插入到表的中间
{
pBuffer->pNext = pPre->pNext;
pPre->pNext = pBuffer/*->pNext*/;//xss,个人觉得应该是pPre->pNext = pBuffer;
}
}

// 检查表头元素的序列号,如果与要读的序列号一致,就将它从表中移除,返回给用户
CIOCPBuffer *ptr = pContext->pOutOfOrderReads;
if(ptr != NULL && (ptr->nSequenceNumber == pContext->nCurrentReadSequence))
{
pContext->pOutOfOrderReads = ptr->pNext;
return ptr;
}
return NULL;
}

BOOL CIOCPServer::PostAccept(CIOCPBuffer *pBuffer)	// 在监听套节字上投递Accept请求
{
// 设置I/O类型
pBuffer->nOperation = OP_ACCEPT;

// 投递此重叠I/O
DWORD dwBytes;
pBuffer->sClient = ::WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
BOOL b = m_lpfnAcceptEx(m_sListen,
pBuffer->sClient,
pBuffer->buff,
pBuffer->nLen - ((sizeof(sockaddr_in) + 16) * 2),//xss,第一次都是收一个cmd_header
sizeof(sockaddr_in) + 16,
sizeof(sockaddr_in) + 16,
&dwBytes,
&pBuffer->ol);
if(!b && ::WSAGetLastError() != WSA_IO_PENDING)
{
return FALSE;
}
if(pBuffer->nOperation == 0)
{
int x = 0;
}
return TRUE;
};

BOOL CIOCPServer::PostRecv(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
// 设置I/O类型
pBuffer->nOperation = OP_READ;

::EnterCriticalSection(&pContext->Lock);

// 设置序列号
pBuffer->nSequenceNumber = pContext->nReadSequence;

// 投递此重叠I/O
DWORD dwBytes;
DWORD dwFlags = 0;
WSABUF buf;
buf.buf = pBuffer->buff;
buf.len = pBuffer->nLen;
if(::WSARecv(pContext->s, &buf, 1, &dwBytes, &dwFlags, &pBuffer->ol, NULL) != NO_ERROR)
{
if(::WSAGetLastError() != WSA_IO_PENDING)
{
printf("WSARecv出错:%d\n",WSAGetLastError());
::LeaveCriticalSection(&pContext->Lock);
return FALSE;
}
}

// 增加套节字上的重叠I/O计数和读序列号计数

pContext->nOutstandingRecv ++;
pContext->nReadSequence ++;

::LeaveCriticalSection(&pContext->Lock);

return TRUE;
}

BOOL CIOCPServer::PostSend(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
// 跟踪投递的发送的数量,防止用户仅发送数据而不接收,导致服务器抛出大量发送操作
if(pContext->nOutstandingSend > m_nMaxSends)
return FALSE;

// 设置I/O类型,增加套节字上的重叠I/O计数
pBuffer->nOperation = OP_WRITE;

// 投递此重叠I/O
DWORD dwBytes;
DWORD dwFlags = 0;
WSABUF buf;
buf.buf = pBuffer->buff;
buf.len = pBuffer->nLen;
if(::WSASend(pContext->s,
&buf, 1, &dwBytes, dwFlags, &pBuffer->ol, NULL) != NO_ERROR)
{
int x;
if((x=::WSAGetLastError()) != WSA_IO_PENDING)
{
printf("发送失败!错误码:%d",x);
return FALSE;
}
}
// 增加套节字上的重叠I/O计数

::EnterCriticalSection(&pContext->Lock);
pContext->nOutstandingSend ++;
::LeaveCriticalSection(&pContext->Lock);

if(pBuffer->nOperation == 0)
{
int x = 0;
}
return TRUE;
}

BOOL CIOCPServer::Start(int nPort, int nMaxConnections,
int nMaxFreeBuffers, int nMaxFreeContexts, int nInitialReads)
{
// 检查服务是否已经启动
if(m_bServerStarted)
return FALSE;

// 保存用户参数
m_nPort = nPort;
m_nMaxConnections = nMaxConnections;
m_nMaxFreeBuffers = nMaxFreeBuffers;
m_nMaxFreeContexts = nMaxFreeContexts;
m_nInitialReads = nInitialReads;

// 初始化状态变量
m_bShutDown = FALSE;
m_bServerStarted = TRUE;

// 创建监听套节字,绑定到本地端口,进入监听模式
m_sListen = ::WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
SOCKADDR_IN si;
si.sin_family = AF_INET;
si.sin_port = ::ntohs(m_nPort);
si.sin_addr.S_un.S_addr = INADDR_ANY;
if(::bind(m_sListen, (sockaddr*)&si, sizeof(si)) == SOCKET_ERROR)
{
m_bServerStarted = FALSE;
return FALSE;
}
::listen(m_sListen, 200);

// 创建完成端口对象
m_hCompletion = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);

// 加载扩展函数AcceptEx
GUID GuidAcceptEx = WSAID_ACCEPTEX;
DWORD dwBytes;
::WSAIoctl(m_sListen,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx,
sizeof(GuidAcceptEx),
&m_lpfnAcceptEx,
sizeof(m_lpfnAcceptEx),
&dwBytes,
NULL,
NULL);

// 加载扩展函数GetAcceptExSockaddrs
GUID GuidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
::WSAIoctl(m_sListen,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidGetAcceptExSockaddrs,
sizeof(GuidGetAcceptExSockaddrs),
&m_lpfnGetAcceptExSockaddrs,
sizeof(m_lpfnGetAcceptExSockaddrs),
&dwBytes,
NULL,
NULL
);

// 将监听套节字关联到完成端口,注意,这里为它传递的CompletionKey为0
::CreateIoCompletionPort((HANDLE)m_sListen, m_hCompletion, (DWORD)0, 0);

// 注册FD_ACCEPT事件。
// 如果投递的AcceptEx I/O不够,线程会接收到FD_ACCEPT网络事件,说明应该投递更多的AcceptEx I/O
WSAEventSelect(m_sListen, m_hAcceptEvent, FD_ACCEPT);

// 创建监听线程
m_hListenThread = ::CreateThread(NULL, 0, _ListenThreadProc, this, 0, NULL);

return TRUE;
}

void CIOCPServer::Shutdown()
{
if(!m_bServerStarted)
return;

// 通知监听线程,马上停止服务
m_bShutDown = TRUE;
::SetEvent(m_hAcceptEvent);
// 等待监听线程退出
::WaitForSingleObject(m_hListenThread, INFINITE);
::CloseHandle(m_hListenThread);
m_hListenThread = NULL;

m_bServerStarted = FALSE;
}

DWORD WINAPI CIOCPServer::_ListenThreadProc(LPVOID lpParam)
{
CIOCPServer *pThis = (CIOCPServer*)lpParam;

// 先在监听套节字上投递几个Accept I/O
CIOCPBuffer *pBuffer;
for(int i=0; i<pThis->m_nInitialAccepts; i++)
{
pBuffer = pThis->AllocateBuffer(BUFFER_SIZE);//xss,BUFFER_SIZE
if(pBuffer == NULL)
return -1;
pThis->InsertPendingAccept(pBuffer);
pThis->PostAccept(pBuffer);
}

// 构建事件对象数组,以便在上面调用WSAWaitForMultipleEvents函数
HANDLE hWaitEvents[2 + MAX_THREAD];
int nEventCount = 0;
hWaitEvents[nEventCount ++] = pThis->m_hAcceptEvent;
hWaitEvents[nEventCount ++] = pThis->m_hRepostEvent;

// 创建指定数量的工作线程在完成端口上处理I/O
for(int i=0; i<MAX_THREAD; i++)
{
hWaitEvents[nEventCount ++] = ::CreateThread(NULL, 0, _WorkerThreadProc, pThis, 0, NULL);
}

// 下面进入无限循环,处理事件对象数组中的事件
while(TRUE)
{
int nIndex = ::WSAWaitForMultipleEvents(nEventCount, hWaitEvents, FALSE, 60*1000, FALSE);

// 首先检查是否要停止服务
if(pThis->m_bShutDown || nIndex == WSA_WAIT_FAILED)
{
// 关闭所有连接
pThis->CloseAllConnections();
::Sleep(0);		// 给I/O工作线程一个执行的机会
// 关闭监听套节字
::closesocket(pThis->m_sListen);
pThis->m_sListen = INVALID_SOCKET;
::Sleep(0);		// 给I/O工作线程一个执行的机会

// 通知所有I/O处理线程退出
for(int i=2; i<MAX_THREAD + 2; i++)
{
::PostQueuedCompletionStatus(pThis->m_hCompletion, -1, 0, NULL);
}

// 等待I/O处理线程退出
::WaitForMultipleObjects(MAX_THREAD, &hWaitEvents[2], TRUE, 5*1000);

for(int i=2; i<MAX_THREAD + 2; i++)
{
::CloseHandle(hWaitEvents[i]);
}

::CloseHandle(pThis->m_hCompletion);

pThis->FreeBuffers();
pThis->FreeContexts();
::ExitThread(0);
}

// 1)定时检查所有未返回的AcceptEx I/O的连接建立了多长时间
if(nIndex == WSA_WAIT_TIMEOUT)
{
pBuffer = pThis->m_pPendingAccepts;
while(pBuffer != NULL)
{
int nSeconds;
int nLen = sizeof(nSeconds);
// 取得连接建立的时间
::getsockopt(pBuffer->sClient,
SOL_SOCKET, SO_CONNECT_TIME, (char *)&nSeconds, &nLen);
// 如果超过2分钟客户还不发送初始数据,就让这个客户go away
if(nSeconds != -1 && nSeconds > /*2*60*/50)
{
closesocket(pBuffer->sClient);
pBuffer->sClient = INVALID_SOCKET;
}

pBuffer = pBuffer->pNext;
}
}
else
{
nIndex = nIndex - WAIT_OBJECT_0;
WSANETWORKEVENTS ne;
int nLimit=0;
if(nIndex == 0)			// 2)m_hAcceptEvent事件对象受信,说明投递的Accept请求不够,需要增加
{
::WSAEnumNetworkEvents(pThis->m_sListen, hWaitEvents[nIndex], &ne);
if(ne.lNetworkEvents & FD_ACCEPT)
{
nLimit = 50;  // 增加的个数,这里设为50个
}
}
else if(nIndex == 1)	// 3)m_hRepostEvent事件对象受信,说明处理I/O的线程接受到新的客户
{
nLimit = InterlockedExchange(&pThis->m_nRepostCount, 0);
}
else if(nIndex > 1)		// I/O服务线程退出,说明有错误发生,关闭服务器
{
pThis->m_bShutDown = TRUE;
continue;
}

// 投递nLimit个AcceptEx I/O请求
int i = 0;
while(i++ < nLimit && pThis->m_nPendingAcceptCount < pThis->m_nMaxAccepts)
{
pBuffer = pThis->AllocateBuffer(BUFFER_SIZE);
if(pBuffer != NULL)
{
pThis->InsertPendingAccept(pBuffer);
pThis->PostAccept(pBuffer);
}
}
}
}
return 0;
}

DWORD WINAPI CIOCPServer::_WorkerThreadProc(LPVOID lpParam)
{
#ifdef _DEBUG
::OutputDebugString("	WorkerThread 启动... \n");
#endif // _DEBUG

CIOCPServer *pThis = (CIOCPServer*)lpParam;

CIOCPBuffer *pBuffer = NULL;
DWORD dwKey;
DWORD dwTrans;
LPOVERLAPPED lpol;

while(TRUE)
{
// 在关联到此完成端口的所有套节字上等待I/O完成
BOOL bOK = ::GetQueuedCompletionStatus(pThis->m_hCompletion,
&dwTrans, (LPDWORD)&dwKey, (LPOVERLAPPED*)&lpol, WSA_INFINITE);

if(dwTrans == -1) // 用户通知退出
{
#ifdef _DEBUG
::OutputDebugString("	WorkerThread 退出 \n");
#endif // _DEBUG
::ExitThread(0);
}
if(dwTrans != -2)
pBuffer = CONTAINING_RECORD(lpol, CIOCPBuffer, ol);
int nError = NO_ERROR;
if(!bOK)						// 在此套节字上有错误发生
{
printf("完成端口套接字上有错误:%d\n",GetLastError());
SOCKET s;
if(pBuffer->nOperation == OP_ACCEPT)
{
s = pThis->m_sListen;
}
else
{
if(dwKey == 0)
break;
s = ((CIOCPContext*)dwKey)->s;
}
DWORD dwFlags = 0;
if(!::WSAGetOverlappedResult(s, &pBuffer->ol, &dwTrans, FALSE, &dwFlags))
{
nError = ::WSAGetLastError();
}
}
pThis->HandleIO(dwKey, pBuffer, dwTrans, nError);
printf("Buffer:%d     Context:%d\n",iBufferCount,iContextCount);
}

#ifdef _DEBUG
::OutputDebugString("	WorkerThread 退出 \n");
#endif // _DEBUG
return 0;
}

int g_x = 0;
void CIOCPServer::HandleIO(DWORD dwKey, CIOCPBuffer *pBuffer, DWORD dwTrans, int nError)
{
CIOCPContext *pContext = (CIOCPContext *)dwKey;

#ifdef _DEBUG
::OutputDebugString("	HandleIO... \n");
#endif // _DEBUG

// 1)首先减少套节字上的未决I/O计数
if(dwTrans == -2)
{
CloseAConnection(pContext);
return;
}
if(pContext != NULL)
{
::EnterCriticalSection(&pContext->Lock);

if(pBuffer->nOperation == OP_READ)
pContext->nOutstandingRecv --;
else if(pBuffer->nOperation == OP_WRITE)
pContext->nOutstandingSend --;

::LeaveCriticalSection(&pContext->Lock);

// 2)检查套节字是否已经被我们关闭
if(pContext->bClosing)
{
#ifdef _DEBUG
::OutputDebugString("	检查到套节字已经被我们关闭 \n");
#endif // _DEBUG
if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
pContext = NULL;
}
// 释放已关闭套节字的未决I/O
ReleaseBuffer(pBuffer);
pBuffer = NULL;
return;
}
}
else
{
RemovePendingAccept(pBuffer);
}

// 3)检查套节字上发生的错误,如果有的话,通知用户,然后关闭套节字
if(nError != NO_ERROR)
{
if(pBuffer->nOperation != OP_ACCEPT)
{
OnConnectionError(pContext, pBuffer, nError);
CloseAConnection(pContext);
if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
pContext = NULL;
}
#ifdef _DEBUG
::OutputDebugString("	检查到客户套节字上发生错误 \n");
#endif // _DEBUG
}
else // 在监听套节字上发生错误,也就是监听套节字处理的客户出错了
{
// 客户端出错,释放I/O缓冲区
if(pBuffer->sClient != INVALID_SOCKET)
{
::closesocket(pBuffer->sClient);
pBuffer->sClient = INVALID_SOCKET;
}
#ifdef _DEBUG
::OutputDebugString("	检查到监听套节字上发生错误 \n");
#endif // _DEBUG
}

ReleaseBuffer(pBuffer);
pBuffer = NULL;
return;
}

// 开始处理
if(pBuffer->nOperation == OP_ACCEPT)
{
if(dwTrans == 0)
{
#ifdef _DEBUG
::OutputDebugString("	监听套节字上客户端关闭 \n");
#endif // _DEBUG

if(pBuffer->sClient != INVALID_SOCKET)
{
::closesocket(pBuffer->sClient);
pBuffer->sClient = INVALID_SOCKET;
}
}
else
{
// 为新接受的连接申请客户上下文对象
CIOCPContext *pClient = AllocateContext(pBuffer->sClient);
if(pClient != NULL)
{
if(AddAConnection(pClient))
{
// 取得客户地址
int nLocalLen, nRmoteLen;
LPSOCKADDR pLocalAddr, pRemoteAddr;
m_lpfnGetAcceptExSockaddrs(
pBuffer->buff,
pBuffer->nLen - (sizeof(sockaddr_in) + 16) * 2/*sizeof(cmd_header)*/,
sizeof(sockaddr_in) + 16,
sizeof(sockaddr_in) + 16,
(SOCKADDR **)&pLocalAddr,
&nLocalLen,
(SOCKADDR **)&pRemoteAddr,
&nRmoteLen);
memcpy(&pClient->addrLocal, pLocalAddr, nLocalLen);
memcpy(&pClient->addrRemote, pRemoteAddr, nRmoteLen);

// 关联新连接到完成端口对象
::CreateIoCompletionPort((HANDLE)pClient->s, m_hCompletion, (DWORD)pClient, 0);

// 通知用户
pBuffer->nLen = dwTrans;
OnConnectionEstablished(pClient, pBuffer);

if(pClient->bClosing && pClient->nOutstandingRecv == 0 && pClient->nOutstandingSend == 0)
{
ReleaseContext(pClient);
pContext = NULL;
}
else if(pClient->hTimer == NULL)//接收一个客户端的同时创建一个检测I/O超时的Timer
{
pClient->hCompletion = m_hCompletion;
CreateTimerQueueTimer(&pClient->hTimer,m_hTimerQueue,(WAITORTIMERCALLBACK)TimerRoutine,(PVOID)pClient,60*1000,0,0);
}

// 向新连接投递Read请求或者Write请求,直接关闭这些空间在套节字关闭或出错时释放
// 						CIOCPBuffer *p = AllocateBuffer(BUFFER_SIZE);
// 						if(p != NULL)
// 						{
// 							if(!PostRecv(pClient, p))
// 							{
// 								CloseAConnection(pClient);
// 							}
// 						}

}
else	// 连接数量已满,关闭连接
{
CloseAConnection(pClient);
ReleaseContext(pClient);
pContext = NULL;
}
}
else
{
// 资源不足,关闭与客户的连接即可
::closesocket(pBuffer->sClient);
pBuffer->sClient = INVALID_SOCKET;
}
}

// Accept请求完成,释放I/O缓冲区
ReleaseBuffer(pBuffer);
pBuffer = NULL;

// 通知监听线程继续再投递一个Accept请求
::InterlockedIncrement(&m_nRepostCount);

::SetEvent(m_hRepostEvent);
}
else if(pBuffer->nOperation == OP_READ)
{
if(dwTrans == 0)	// 对方关闭套节字
{
// 先通知用户
pBuffer->nLen = 0;
OnConnectionClosing(pContext, pBuffer);
// 再关闭连接
CloseAConnection(pContext);
// 释放客户上下文和缓冲区对象
if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
pContext = NULL;
}
ReleaseBuffer(pBuffer);
pBuffer = NULL;
}
else
{
pBuffer->nLen = dwTrans;
// 按照I/O投递的顺序读取接收到的数据
CIOCPBuffer *p = GetNextReadBuffer(pContext, pBuffer);
while(p != NULL)
{
// 通知用户
OnReadCompleted(pContext, p);
// 增加要读的序列号的值
::InterlockedIncrement((LONG*)&pContext->nCurrentReadSequence);
// 释放这个已完成的I/O
ReleaseBuffer(p);
p = GetNextReadBuffer(pContext, NULL);
}

if(pContext->bClosing && pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
pContext = NULL;
}
else if(pContext->hTimer != NULL)
{
ChangeTimerQueueTimer(m_hTimerQueue,pContext->hTimer,60*1000,0);//重置监视时间,当一个投递完成后,60s内无任何交互则断开。
}

// 继续投递一个新的接收请求
//   pBuffer = AllocateBuffer(BUFFER_SIZE);
//if(pBuffer == NULL || !PostRecv(pContext, pBuffer))
//{
//	CloseAConnection(pContext);
//}
}
}
else if(pBuffer->nOperation == OP_WRITE)
{

if(dwTrans == 0)	// 对方关闭套节字
{
// 先通知用户
pBuffer->nLen = 0;
OnConnectionClosing(pContext, pBuffer);

// 再关闭连接
CloseAConnection(pContext);

// 释放客户上下文和缓冲区对象
if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
pContext = NULL;
}
ReleaseBuffer(pBuffer);
pBuffer = NULL;
}
else
{
if(pContext->bClosing && pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
pContext = NULL;
ReleaseBuffer(pBuffer);
pBuffer = NULL;
return;
}
else if(pContext->hTimer != NULL)
{
ChangeTimerQueueTimer(m_hTimerQueue,pContext->hTimer,60*1000,0);
}

// 写操作完成,通知用户
if(dwTrans < pBuffer->nLen)//如果此send没有发送完全,则发送剩下的部分(此部分如果还是没发完全,这里同样进行)
{
printf("send未发送完全,发送:%d,总长度:%d\n",dwTrans,pBuffer->nLen);
CIOCPBuffer* p = AllocateBuffer(pBuffer->nLen - dwTrans);
if(p != NULL)
memcpy(p->buff,pBuffer->buff + dwTrans,pBuffer->nLen - dwTrans);
if(p == NULL || !PostSend(pContext,p))
{
CloseAConnection(pContext);
return;
}
}
else
{
if(!PostNextWriteBuffer(pContext,pBuffer))
{
CloseAConnection(pContext);
return;
}
}
pBuffer->nLen = dwTrans;
OnWriteCompleted(pContext, pBuffer);
if(pContext->bClosing && pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
pContext = NULL;
}
// 释放SendText函数申请的缓冲区
ReleaseBuffer(pBuffer);
pBuffer = NULL;
}
}
}

BOOL CIOCPServer::SendText(CIOCPContext *pContext, char *pszText, int nLen)
{
CIOCPBuffer *pBuffer = AllocateBuffer(nLen);
if(pBuffer != NULL)
{
memcpy(pBuffer->buff, pszText, nLen);
return PostSend(pContext, pBuffer);
}
return FALSE;
}

//投递接收请求示例
//CIOCPBuffer *p = AllocateBuffer(BUFFER_SIZE);
//if(p != NULL)
//{
//	if(!PostRecv(pContext, p))
//	{
//		CloseAConnection(pContext);
//	}
//}
//投递发送请求示例
//CIOCPBuffer *p = AllocateBuffer(BUFFER_SIZE);
//if(p != NULL)
//{
//	if(!PostSendToList(pContext, p))
//	{
//		CloseAConnection(pContext);
//	}
//}
void CIOCPServer::OnConnectionEstablished(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
//连接建立,且第一次数据接收完成。
//接下来可以根据业务逻辑,PostRecv收或者PostSendToList发或者CloseAConnection(pContext)关闭连接
}

void CIOCPServer::OnConnectionClosing(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
}

void CIOCPServer::OnReadCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
//一次数据接收完成。
//接下来可以根据业务逻辑,PostRecv收或者PostSendToList发或者CloseAConnection(pContext)关闭连接
}

void CIOCPServer::OnWriteCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
//一次数据发送完成。
//接下来可以根据业务逻辑,PostRecv收或者PostSendToList发或者CloseAConnection(pContext)关闭连接
}

void CIOCPServer::OnConnectionError(CIOCPContext *pContext, CIOCPBuffer *pBuffer, int nError)
{
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: