您的位置:首页 > 其它

一个基于线程池的WSAEventSelect模型服务器设计

2012-11-27 16:45 513 查看
//WSAEventSelect.h
#include "CInitSocket.h"
typedef struct _SOCKET_OBJ
{
SOCKET sock; //套接字句柄
HANDLE hEvent; //与次套接字相关的事件对象句柄
sockaddr_in addrRemote; //客户端地址信息
_SOCKET_OBJ *pNext; //指向下一个SOCKET_OBJ对象,构成链表
} SOCKET_OBJ, *PSOCKET_OBJ;

typedef struct _THREAD_OBJ
{
HANDLE hEvents[WSA_MAXIMUM_WAIT_EVENTS];//记录当前线程要等待的事件对象的句柄
int nSocketCount; //当前线程处理的套接字的数量
PSOCKET_OBJ pSockHeader; //指向当前线程处理的套接字对象列表表天头
PSOCKET_OBJ pSockTail; //指向表尾
CRITICAL_SECTION cs; //关键代码段,用于同步本结构的访问
_THREAD_OBJ *pNext;  //指向下一个对象,练成链表
} THREAD_OBJ, *PTHREAD_OBJ;

PSOCKET_OBJ GetSocketObj(SOCKET sock); //申请一个套接字对象,并初始化
void FreeSocketObj(PSOCKET_OBJ pSocket); //释放一个套接字对象
PTHREAD_OBJ GetThreadObj(); //申请一个线程对象,初始化后加入线程对象列表
void FreeThreadObj(PTHREAD_OBJ pThread); //释放一个线程对象,并将它从线程对象列表中移除
void RebuildArrary(PTHREAD_OBJ pThread); //套接字链表改变的时候,重新组建线程对象的hEvents数组
bool InsertSocketObj(PTHREAD_OBJ pThread, PSOCKET_OBJ pSocket);
void AssignToFreeThread(PSOCKET_OBJ pSocket);
void RemoveSocketObj(PTHREAD_OBJ pThread, PSOCKET_OBJ pSocket);
DWORD WINAPI ServerThread(LPVOID lpParam);
PSOCKET_OBJ FindSocketObj(PTHREAD_OBJ pThread, int nIndex);
bool HandleIO(PTHREAD_OBJ pThread, PSOCKET_OBJ pSocket);

//WSAEventSelect.cpp
#include "WSAEventSelect.h"
#include <iostream>
using namespace std;
extern PTHREAD_OBJ g_pThreadList; //指向线程对象列表表头
extern CRITICAL_SECTION g_cs; //同步对此全局变量的访问
extern LONG g_nTatolConnections; //总共连接数量
extern LONG g_nCurrentConnections; //当前连接数量
PSOCKET_OBJ GetSocketObj(SOCKET sock) //申请一个套接字对象,并初始化
{
PSOCKET_OBJ pSocket = (PSOCKET_OBJ)GlobalAlloc(GPTR, sizeof(SOCKET_OBJ));
if (pSocket != NULL)
{
pSocket->sock = sock;
pSocket->hEvent = WSACreateEvent();
}
return pSocket;
}
yle = "color: #0000ff;" > void FreeSocketObj(PSOCKET_OBJ pSocket) //释放一个套接字对象
{
CloseHandle(pSocket->hEvent);
if (pSocket->sock != INVALID_SOCKET)
{
closesocket(pSocket->sock);
}
GlobalFree(pSocket);
}
PTHREAD_OBJ GetThreadObj() //申请一个线程对象,初始化后加入线程对象列表
{
PTHREAD_OBJ pThread = (PTHREAD_OBJ)GlobalAlloc(GPTR, sizeof(THREAD_OBJ));
if (pThread != NULL)
{
InitializeCriticalSection(&pThread->cs);
//创建一个事件对象,用于只是该线程的句柄数组是否需要重建
pThread->hEvents[0] = WSACreateEvent();
EnterCriticalSection(&g_cs);
pThread->pNext = g_pThreadList;
g_pThreadList = pThread;
LeaveCriticalSection(&g_cs);
}
return pThread;
}
void FreeThreadObj(PTHREAD_OBJ pThread) //释放一个线程对象,并将它从线程对象列表中移除
{
EnterCriticalSection(&g_cs);
PTHREAD_OBJ p = g_pThreadList;
if (p == pThread)
{
g_pThreadList = p->pNext;
}
else
{
while (p != NULL && p->pNext != pThread)
{
p = p->pNext;
}
if (p != NULL)
{
p->pNext = pThread->pNext;
}
}
LeaveCriticalSection(&g_cs);
//释放资源
CloseHandle(pThread->hEvents[0]);
DeleteCriticalSection(&pThread->cs);
GlobalFree(pThread);
}
void RebuildArrary(PTHREAD_OBJ pThread) //套接字链表改变的时候,重新组建线程对象的hEvents数组
{
EnterCriticalSection(&pThread->cs);
PSOCKET_OBJ pSocket = pThread->pSockHeader;
int n = 1;
while(pSocket != NULL)
{
pThread->hEvents[n++] = pSocket->hEvent;
pSocket = pSocket->pNext;
}
LeaveCriticalSection(&pThread->cs);
}
PSOCKET_OBJ FindSocketObj(PTHREAD_OBJ pThread, int nIndex) //从nIndex开始搜索
{
PSOCKET_OBJ pSocket = pThread->pSockHeader;
while (--nIndex)
{

if (pSocket == NULL)
{
return NULL;
}
pSocket = pSocket->pNext;
}
return pSocket;
}
bool HandleIO(PTHREAD_OBJ pThread, PSOCKET_OBJ pSocket)
{
WSANETWORKEVENTS NetEvents;
WSAEnumNetworkEvents(pSocket->sock, pSocket->hEvent, &NetEvents);
if (NetEvents.lNetworkEvents & FD_READ)
{
if (NetEvents.iErrorCode[FD_READ_BIT==0])
{
char szText[256];
int nRecv = recv(pSocket->sock, szText, 256, 0);
if (nRecv > 0)
{
szText[nRecv] = '';
cout << "接收到数据:" << szText << endl;
}
return true;
}
}
else if (NetEvents.lNetworkEvents & FD_WRITE)
{
if (NetEvents.iErrorCode[FD_WRITE_BIT] == 0)
{
//do something
return true;
}
}
else if (NetEvents.lNetworkEvents & FD_CLOSE)
{
}
RemoveSocketObj(pThread, pSocket);
FreeSocketObj(pSocket);
return false;
}
DWORD WINAPI ServerThread(LPVOID lpParam)
{
PTHREAD_OBJ pThread = (PTHREAD_OBJ)lpParam;
while (true)
{
int nIndex = WSAWaitForMultipleEvents(pThread->nSocketCount + 1, pThread->hEvents,
FALSE, INFINITE, FALSE);
nIndex = nIndex - WSA_WAIT_EVENT_0;
//查看受信事件对象
for(int i = nIndex; i < pThread->nSocketCount + 1; i++)
{
nIndex = WSAWaitForMultipleEvents(1, &pThread->hEvents[i], TRUE, INFINITE, FALSE);
if (nIndex == WSA_WAIT_FAILED || nIndex == WSA_WAIT_TIMEOUT) //hEvent[0]受信,重建数组
{
continue;
}
else
{
if (i == 0) //hEvent[0]受信,重建数组
{
RebuildArrary(pThread);
if (pThread->nSocketCount
== 0)
{
FreeThreadObj(pThread);
return 0;
}
}
else
{
//查找对应的套接字对象指针,调用HandleIO处理网络事件
PSOCKET_OBJ pSocket = (PSOCKET_OBJ)FindSocketObj(pThread, i);
if (pSocket != NULL)
{
if (!HandleIO(pThread, pSocket))
{
RebuildArrary(pThread);
}
}
else
{
cout << "无法找到该套接字对象!n";
}
}
}
}
}
return 0;
}
//向一个线程的套接字列表中插入一个套接字
bool InsertSocketObj(PTHREAD_OBJ pThread, PSOCKET_OBJ pSocket)
{
bool bRet = false;
EnterCriticalSection(&pThread->cs);
if (pThread->nSocketCount < WSA_MAXIMUM_WAIT_EVENTS - 1)
{
if (pThread->pSockHeader == NULL)
{
pThread->pSockHeader = pThread->pSockTail = pSocket;
}
else
{
pThread->pSockTail->pNext = pSocket;
pThread->pSockTail = pSocket;
}
pThread->nSocketCount++;
bRet = true;
}
LeaveCriticalSection(&pThread->cs);
if (bRet) //插入成功,说明处理了客户的连接请求
{
InterlockedIncrement(&g_nTatolConnections);
InterlockedIncrement(&g_nCurrentConnections);
}
return bRet;
}
void AssignToFreeThread(PSOCKET_OBJ pSocket)
{
pSocket->pNext = NULL;
EnterCriticalSection(&g_cs);
PTHREAD_OBJ pThread = g_pThreadList;
//试图插入到现存线程
while (pThread != NULL)
{
if (InsertSocketObj(pThread, pSocket)) break;
pThread = pThread->pNext;
}
if (pThread == NULL)
{
pThread = GetThreadObj();
InsertSocketObj(pThread, pSocket);
CreateThread(NULL, 0, ServerThread, pThread, 0, NULL);
}
LeaveCriticalSection(&g_cs);
//使第一个事件受信,指使线程重建句柄数组
WSASetEvent(pThread->hEvents[0]);
}
//从给定的线程套接字对象列表中移除一个套接字
void RemoveSocketObj(PTHREAD_OBJ pThread, PSOCKET_OBJ pSocket)
{
EnterCriticalSection(&pThread->cs);
PSOCKET_OBJ pTest = pThread->pSockHeader;
if (pTest == pSocket)
{
if (pThread->pSockHeader == pThread->pSockTail)
{
pThread->pSockTail = pThread->pSockHeader = NULL;
}
else
{
pThread->pSockHeader = pTest->pNext;
}
}
pThread->nSocketCount--;
LeaveCriticalSection(&pThread->cs);
WSASetEvent(pThread->hEvents[0]);
InterlockedDecrement(&g_nCurrentConnections);
InterlockedDecrement(&g_nTatolConnections);
}
//main.cpp
#include "WSAEventSelect.h"
PTHREAD_OBJ g_pThreadList; //指向线程对象列表表头
CRITICAL_SECTION g_cs; //同步对此全局变量的访问
LONG g_nTatolConnections; //总共连接数量
LONG g_nCurrentConnections; //当前连接数量
#include <iostream>
using namespace std;
CInitSock initsock;
int main() //主线程
{
USHORT nPort = 6000;
//创建监听套接字
SOCKET ListenSocket = socket(AF_INET, SOCK_STREAM, 0);
sockaddr_in addrin;
addrin.sin_addr.S_un.S_addr = INADDR_ANY;
addrin.sin_family = AF_INET;
addrin.sin_port = htons(nPort);
if (SOCKET_ERROR == bind(ListenSocket, (LPSOCKADDR)&addrin, sizeof(addrin)))
{
cout << "套接字绑定失败!n";
return -1;
}
listen(ListenSocket, 200);
//创建事件对象,并关联到监听套接字
WSAEVENT hEvent = WSACreateEvent();
WSAEventSelect(ListenSocket, hEvent, FD_ACCEPT | FD_CLOSE);
InitializeCriticalSection(&g_cs);
//处理客户连接请求,打印状态信息
while (true)
{
int nRet = WaitForSingleObject(hEvent, 5000);
if (nRet == WAIT_FAILED)
{
cout << "等待出错!";
break;
}
else if (nRet == WSA_WAIT_TIMEOUT)
{
cout << "TatolConnections:" << g_nTatolConnections << endl;
cout << "CurrentConnections" << g_nCurrentConnections << endl;
continue;
}
else //有新的连接未决
{
ResetEvent(hEvent);
while (true)
{
sockaddr_in NewAddr;
int nLen = sizeof(NewAddr);
SOCKET NewSocket = accept(ListenSocket, (sockaddr *)&NewAddr, &nLen);
if (NewSocket == SOCKET_ERROR)
{
break;
}
PSOCKET_OBJ pSocket = GetSocketObj(NewSocket);
pSocket->addrRemote = NewAddr;
WSAEventSelect(pSocket->sock, pSocket->hEvent, FD_READ | FD_CLOSE | FD_WRITE);
AssignToFreeThread(pSocket);
}
}
}
DeleteCriticalSection(&g_cs);
return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: