您的位置:首页 > 其它

Windows Socket I/O模型之 WSAEventSelect模式

2011-02-02 14:16 387 查看
#
include
<
winsock2.
h>

#
include
<
ws2tcpip.
h>

#
include
"public.h"

#
include
"resolve.h"

typedef
SINGLE_LIST_HEADER BuffHeader;

typedef
SINGLE_LIST BuffObj;

typedef
SINGLE_LIST_HEADER TheadObjHeader;

typedef
SINGLE_LIST ThreadObj;

typedef
DOUBLE_LIST_HEADER SockObjHeader;

typedef
DOUBLE_LIST SockObj;

typedef
struct
_SOCKET_OBJ

{

SOCKET
s;
// Socket handle

HANDLE event;
// Event handle

int
listening;
// Socket is a listening socket (TCP)

int
closing;
// Indicates whether the connection is closing

SOCKADDR_STORAGE addr;
// Used for client's remote address

int
addrlen;
// Length of the address

BuffHeader buff;

DOUBLE_LIST entry;

}
SOCKET_OBJ;

typedef
struct
_THREAD_OBJ

{

SockObjHeader sockHeader;

HANDLE Event;
// Used to signal new clients assigned

// to this thread

HANDLE Thread;

HANDLE Handles[
MAXIMUM_WAIT_OBJECTS]
;
// Array of socket's event handles

CRITICAL_SECTION ThreadCritSec;
// Protect access to SOCKET_OBJ lists

ThreadObj entry;
// Next thread object in list

}
THREAD_OBJ;

TheadObjHeader theadObjHeader;

SOCKET_OBJ*
GetSocketObj(
SOCKET
s,
int
listening)
{

SOCKET_OBJ *
sockobj =
NULL
;

sockobj =
(
SOCKET_OBJ*
)
HeapAlloc(
GetProcessHeap(
)
,
HEAP_ZERO_MEMORY,
sizeof
(
SOCKET_OBJ)
)
;

if
(
sockobj =
=
NULL
)
{

fprintf
(
stderr
,
"HeapAlloc failed./n"
)
;

ExitProcess(
-
1)
;

}

sockobj-
>
s =
s;

sockobj-
>
listening =
listening;

sockobj-
>
addrlen =
sizeof
(
sockobj-
>
addr)
;

sockobj-
>
event =
WSACreateEvent(
)
;

if
(
sockobj-
>
event =
=
NULL
)

{

fprintf
(
stderr
,
"GetSocketObj: WSACreateEvent failed: %d/n"
,
WSAGetLastError(
)
)
;

ExitProcess(
-
1)
;

}

InitializeCriticalSection(
&
sockobj-
>
buff.
SendRecvQueueCritSec)
;

return
sockobj;

}

THREAD_OBJ *
GetThreadObj(
)
{

THREAD_OBJ *
thread =
NULL
;

thread =
(
THREAD_OBJ*
)
HeapAlloc(
GetProcessHeap(
)
,
HEAP_ZERO_MEMORY,
sizeof
(
THREAD_OBJ)
)
;

if
(
thread =
=
NULL
)
{

fprintf
(
stderr
,
"HeapAllco failed./n"
)
;

ExitProcess(
-
1)
;

}

thread-
>
Event =
WSACreateEvent(
)
;

if
(
thread-
>
Event =
=
NULL
)
{

fprintf
(
stderr
,
"WSACreateEvent failed./n"
)
;

ExitProcess(
-
1)
;

}

thread-
>
Handles[
0]
=
thread-
>
Event;

InitializeCriticalSection(
&
thread-
>
ThreadCritSec)
;

InitializeDoubleHead(
&
thread-
>
sockHeader)
;

return
thread;

}

int
InsertSocketObj(
THREAD_OBJ *
thread,
SOCKET_OBJ *
sockobj)
{

int
ret;

EnterCriticalSection(
&
thread-
>
ThreadCritSec)
;

if
(
thread-
>
sockHeader.
count
<
MAXIMUM_WAIT_OBJECTS -
1)
{

EnqueueDoubleListHead(
&
(
thread-
>
sockHeader)
,
&
(
sockobj-
>
entry)
)
;

thread-
>
Handles[
thread-
>
sockHeader.
count
]
=
sockobj-
>
event;

ret =
NO_ERROR;

}
else
{

ret =
SOCKET_ERROR;

}

LeaveCriticalSection(
&
thread-
>
ThreadCritSec)
;

return
ret;

}

SOCKET_OBJ *
FindSocketObj(
THREAD_OBJ *
thread,
int
index)
{

SOCKET_OBJ *
sockobj =
NULL
;

int
i;

EnterCriticalSection(
&
thread-
>
ThreadCritSec)
;

SockObj *
sptr =
(
SockObj *
)
GotoNextDoubleList(
&
thread-
>
sockHeader,
&
(
thread-
>
sockHeader.
head)
)
;

for
(
i =
0;
i <
index;
+
+
i)
{

if
(
sptr =
=
NULL
)

{

fprintf
(
stderr
,
"FindSocketobj failed./n"
)
;

ExitProcess(
-
1)
;

}

sptr =
(
SockObj *
)
GotoNextDoubleList(
&
thread-
>
sockHeader,
sptr)
;

}

sockobj =
(
SOCKET_OBJ *
)
container_of(
SOCKET_OBJ,
entry,
sptr)
;

LeaveCriticalSection(
&
thread-
>
ThreadCritSec)
;

return
sockobj;

}

void
RemoveSocketObj(
THREAD_OBJ *
thread,
SOCKET_OBJ *
sock)
{

EnterCriticalSection(
&
thread-
>
ThreadCritSec)
;

RemoveDoubleList(
&
thread-
>
sockHeader,
&
sock-
>
entry)
;

WSASetEvent(
thread-
>
Event)
;

LeaveCriticalSection(
&
thread-
>
ThreadCritSec)
;

}

void
FreeSocketObj(
SOCKET_OBJ *
obj)
{

BuffObj *
ptr =
NULL
;

BUFFER_OBJ *
blk =
NULL
;

while
(
true
)
{

ptr =
DequeueSingleList(
&
obj-
>
buff)
;

if
(
ptr =
=
NULL
)

break
;

blk =
(
BUFFER_OBJ *
)
container_of(
BUFFER_OBJ,
next,
ptr)
;

FreeBufferObj(
blk)
;

}

WSACloseEvent(
obj-
>
event)
;

if
(
obj-
>
s !
=
INVALID_SOCKET)
{

closesocket(
obj-
>
s)
;

}

HeapFree(
GetProcessHeap(
)
,
0,
obj)
;

}

void
RenumberThreadArray(
THREAD_OBJ *
thread)
{

EnterCriticalSection(
&
thread-
>
ThreadCritSec)
;

SOCKET_OBJ *
obj =
NULL
;

int
i =
0;

SockObj *
sptr =
NULL
;

sptr =
(
SockObj *
)
GotoNextDoubleList(
&
thread-
>
sockHeader,
&
(
thread-
>
sockHeader.
head)
)
;

while
(
sptr)
{

obj =
(
SOCKET_OBJ *
)
container_of(
SOCKET_OBJ,
entry,
sptr)
;

thread-
>
Handles[
+
+
i]
=
obj-
>
event;

sptr =
(
SockObj *
)
GotoNextDoubleList(
&
thread-
>
sockHeader,
sptr)
;

}

LeaveCriticalSection(
&
thread-
>
ThreadCritSec)
;

}

int
ReceivePendingData(
SOCKET_OBJ *
sockobj)
{

BUFFER_OBJ *
buffobj=
NULL
;

int
rc,

ret;

// Get a buffer to receive the data

buffobj =
GetBufferObj(
gBufferSize)
;

ret =
0;

if
(
gProtocol =
=
IPPROTO_TCP
)

{

rc =
recv
(
sockobj-
>
s,
buffobj-
>
buf,
buffobj-
>
buflen,
0)
;

}
else
{

fprintf
(
stderr
,
"Tcp failed./n"
)
;

ExitProcess(
-
1)
;

}

if
(
rc =
=
SOCKET_ERROR)
{

fprintf
(
stderr
,
"recv failed./n"
)
;

ExitProcess(
-
1)
;

}
else
if
(
rc =
=
0)
{

FreeBufferObj(
buffobj)
;

sockobj-
>
closing =
TRUE
;

if
(
sockobj-
>
buff.
head =
=
NULL
)

{

// If no sends are pending, close the socket for good

closesocket(
sockobj-
>
s)
;

sockobj-
>
s =
INVALID_SOCKET;

ret =
-
1;

}

else

{

ret =
0;

}

}
else
{

buffobj-
>
buflen =
rc;

EnqueueSingleList(
&
sockobj-
>
buff,
&
buffobj-
>
next)
;

ret =
1;

}

return
ret;

}

int
SendPendingData(
SOCKET_OBJ *
sock)
{

BUFFER_OBJ *
bufobj =
NULL
;

BuffObj *
entry =
NULL
;

int
nleft =
0,

idx =
0,

ret =
0,

rc =
0;

while
(
entry =
DequeueSingleList(
&
sock-
>
buff)
)
{

bufobj =
(
BUFFER_OBJ *
)
container_of(
BUFFER_OBJ,
next,
entry)
;

if
(
gProtocol =
=
IPPROTO_TCP
)
{

nleft =
bufobj-
>
buflen;

idx =
0;

while
(
nleft >
0)
{

rc =
send
(
sock-
>
s,
&
(
bufobj-
>
buf[
idx]
)
,
nleft,
0)
;

if
(
rc =
=
SOCKET_ERROR)
{

ExitProcess(
-
1)
;

}
else
{

idx +
=
rc;

nleft -
=
rc;

}

}

printf
(
"send %d./n"
,
bufobj-
>
buflen)
;

FreeBufferObj(
bufobj)
;

}
else
{

ExitProcess(
-
1)
;

}

}

if
(
(
sock-
>
buff.
head =
=
NULL
)
&
&
(
sock-
>
closing =
=
TRUE
)
)
{

closesocket(
sock-
>
s)
;

sock-
>
s =
INVALID_SOCKET;

ret =
-
1;

printf
(
"Closing Connection./n"
)
;

}

return
ret;

}

int
HandleIo(
THREAD_OBJ *
thread,
SOCKET_OBJ *
sock)
{

WSANETWORKEVENTS nevents;

int
rc;

// Enumerate the events

rc =
WSAEnumNetworkEvents(
sock-
>
s,
sock-
>
event,
&
nevents)
;

if
(
rc =
=
SOCKET_ERROR)

{

fprintf
(
stderr
,
"HandleIo: WSAEnumNetworkEvents failed: %d/n"
,
WSAGetLastError(
)
)
;

return
SOCKET_ERROR;

}

if
(
nevents.
lNetworkEvents &
FD_READ)
{

if
(
nevents.
iErrorCode[
FD_READ_BIT]
=
=
0)
{

rc =
ReceivePendingData(
sock)
;

if
(
rc =
=
-
1)

{

RemoveSocketObj(
thread,
sock)
;

FreeSocketObj(
sock)
;

return
SOCKET_ERROR;

}

rc =
SendPendingData(
sock)
;

if
(
rc =
=
-
1)

{

RemoveSocketObj(
thread,
sock)
;

FreeSocketObj(
sock)
;

return
SOCKET_ERROR;

}

}
else
{

fprintf
(
stderr
,
"HandleIo: FD_READ error %d/n"
,
nevents.
iErrorCode[
FD_READ_BIT]
)
;

RemoveSocketObj(
thread,
sock)
;

FreeSocketObj(
sock)
;

return
SOCKET_ERROR;

}

}

if
(
nevents.
lNetworkEvents &
FD_WRITE)
{

if
(
nevents.
iErrorCode[
FD_WRITE_BIT]
=
=
0)

{

rc =
SendPendingData(
sock)
;

if
(
rc =
=
-
1)

{

RemoveSocketObj(
thread,
sock)
;

FreeSocketObj(
sock)
;

return
SOCKET_ERROR;

}

}

else

{

fprintf
(
stderr
,
"HandleIo: FD_WRITE error %d/n"
,
nevents.
iErrorCode[
FD_WRITE_BIT]
)
;

return
SOCKET_ERROR;

}

}

if
(
nevents.
lNetworkEvents &
FD_CLOSE)
{

if
(
nevents.
iErrorCode[
FD_CLOSE_BIT]
=
=
0)

{

// Socket has been indicated as closing so make sure all the data

// has been read

printf
(
"close./n"
)
;

while
(
1)

{

rc =
ReceivePendingData(
sock)
;

if
(
rc =
=
-
1)

{

RemoveSocketObj(
thread,
sock)
;

FreeSocketObj(
sock)
;

return
SOCKET_ERROR;

}

else
if
(
rc !
=
0)

{

continue
;

}

else

{

break
;

}

}

// See if there is any data pending, if so try to send it

rc =
SendPendingData(
sock)
;

if
(
rc =
=
-
1)

{

RemoveSocketObj(
thread,
sock)
;

FreeSocketObj(
sock)
;

return
SOCKET_ERROR;

}

}

else

{

fprintf
(
stderr
,
"HandleIo: FD_CLOSE error %d/n"
,
nevents.
iErrorCode[
FD_CLOSE_BIT]
)
;

RemoveSocketObj(
thread,
sock)
;

FreeSocketObj(
sock)
;

return
SOCKET_ERROR;

}

}

return
NO_ERROR;

}

DWORD WINAPI ChildThread(
LPVOID lpParam)
{

THREAD_OBJ *
thread=
NULL
;

SOCKET_OBJ *
sptr=
NULL
,

*
sockobj=
NULL
;

int
index,

rc,

i;

thread =
(
THREAD_OBJ *
)
lpParam;

while
(
true
)
{

rc =
WaitForMultipleObjects(
thread-
>
sockHeader.
count
+
1,
thread-
>
Handles,
FALSE
,
INFINITE)
;

if
(
rc =
=
WAIT_FAILED |
|
rc =
=
WAIT_TIMEOUT)

{

fprintf
(
stderr
,
"ChildThread: WaitForMultipleObjects failed: %d/n"
,
GetLastError(
)
)
;

break
;

}
else
{

for
(
i =
0;
i <
thread-
>
sockHeader.
count
+
1;
i+
+
)
{

rc =
WaitForSingleObject(
thread-
>
Handles[
i]
,
0)
;

if
(
rc =
=
WAIT_FAILED)

{

fprintf
(
stderr
,
"ChildThread: WaitForSingleObject failed: %d/n"
,
GetLastError(
)
)
;

ExitThread(
-
1)
;

}

else
if
(
rc =
=
WAIT_TIMEOUT)

{

// This event isn't signaled, continue to the next one

continue
;

}

index =
i;

if
(
index =
=
0)

{

// If index 0 is signaled then rebuild the array of event

// handles to wait on

WSAResetEvent(
thread-
>
Handles[
index]
)
;

RenumberThreadArray(
thread)
;

i =
1;

}
else
{

sockobj =
FindSocketObj(
thread,
index-
1)
;

if
(
sockobj !
=
NULL
)

{

if
(
HandleIo(
thread,
sockobj)
=
=
SOCKET_ERROR)

{

RenumberThreadArray(
thread)
;

}

}

else

{

printf
(
"Unable to find socket object!/n"
)
;

}

}

}

}

}

}

void
AssignToFreeThread(
SOCKET_OBJ *
sock)
{

ThreadObj *
threadobj =
NULL
;

THREAD_OBJ *
thread =
NULL
;

threadobj =
(
ThreadObj *
)
GotoNextSingleList(
&
theadObjHeader,
theadObjHeader.
head)
;

while
(
threadobj)
{

thread =
(
THREAD_OBJ *
)
container_of(
THREAD_OBJ,
entry,
threadobj)
;

if
(
InsertSocketObj(
thread,
sock)
!
=
SOCKET_ERROR)
{

break
;

}

threadobj =
(
ThreadObj *
)
GotoNextSingleList(
&
theadObjHeader,
threadobj)
;

}

if
(
threadobj =
=
NULL
)
{

thread =
GetThreadObj(
)
;

thread-
>
Thread =
CreateThread(
NULL
,
0,
ChildThread,
(
LPVOID)
thread,
0,
NULL
)
;

if
(
thread-
>
Thread =
=
NULL
)

{

fprintf
(
stderr
,
"AssignToFreeThread: CreateThread failed: %d/n"
,
GetLastError(
)
)
;

ExitProcess(
-
1)
;

}

InsertSocketObj(
thread,
sock)
;

EnqueueSingleList(
&
theadObjHeader,
&
thread-
>
entry)
;

}

WSASetEvent(
thread-
>
Event)
;

}

int
_tmain(
int
argc,
_TCHAR*
argv[
]
)

{

WSADATA wsd;

struct
addrinfo
*
res=
NULL
,

*
ptr=
NULL
;

THREAD_OBJ *
thread=
NULL
;

SOCKET_OBJ *
sockobj=
NULL
,

*
newsock=
NULL
;

int
index,

rc;

if
(
WSAStartup(
MAKEWORD(
2,
2)
,
&
wsd)
!
=
0)

{

fprintf
(
stderr
,
"unable to load Winsock!/n"
)
;

return
-
1;

}

res =
ResolveAddress(
gSrvAddr,
gPort,
gAddressFamily,
gSocketType,
gProtocol)
;

if
(
res =
=
NULL
)

{

fprintf
(
stderr
,
"ResolveAddress failed to return any addresses!/n"
)
;

return
-
1;

}

thread =
GetThreadObj(
)
;

InitializeCriticalSection(
&
theadObjHeader.
SendRecvQueueCritSec)
;

theadObjHeader.
head =
theadObjHeader.
tail =
NULL
;

ptr =
res;

while
(
ptr)
{

sockobj =
GetSocketObj(
INVALID_SOCKET,
(
gProtocol =
=
IPPROTO_TCP
)
?
TRUE
:
FALSE
)
;

sockobj-
>
s =
socket
(
ptr-
>
ai_family,
ptr-
>
ai_socktype,
ptr-
>
ai_protocol)
;

if
(
sockobj-
>
s =
=
INVALID_SOCKET)
{

fprintf
(
stderr
,
"create socket failed./n"
)
;

ExitProcess(
-
1)
;

}

InsertSocketObj(
thread,
sockobj)
;

rc =
bind
(
sockobj-
>
s,
ptr-
>
ai_addr,
ptr-
>
ai_addrlen)
;

if
(
rc =
=
SOCKET_ERROR)

{

fprintf
(
stderr
,
"bind failed: %d/n"
,
WSAGetLastError(
)
)
;

return
-
1;

}

if
(
gProtocol =
=
IPPROTO_TCP
)
{

rc =
listen
(
sockobj-
>
s,
200)
;

if
(
rc =
=
SOCKET_ERROR)
{

fprintf
(
stderr
,
"listen failed./n"
)
;

ExitProcess(
-
1)
;

}

rc =
WSAEventSelect(
sockobj-
>
s,
sockobj-
>
event,
FD_ACCEPT |
FD_CLOSE)
;

if
(
rc =
=
SOCKET_ERROR)
{

fprintf
(
stderr
,
"WSAEventSelect failed: %d/n"
,
WSAGetLastError(
)
)
;

ExitProcess(
-
1)
;

}

}

ptr =
ptr-
>
ai_next;

}

freeaddrinfo
(
res)
;

while
(
true
)
{

rc =
WaitForMultipleObjects(
thread-
>
sockHeader.
count
+
1,
thread-
>
Handles,
FALSE
,
5000)
;

if
(
rc =
=
WAIT_FAILED)
{

fprintf
(
stderr
,
"WaitForMultipleObjects failed:%d/n"
,
WSAGetLastError(
)
)
;

break
;

}
else
if
(
rc =
=
WAIT_TIMEOUT)
{

continue
;

}
else
{

index =
rc -
WAIT_OBJECT_0;

sockobj =
FindSocketObj(
thread,
index -
1)
;

if
(
gProtocol =
=
IPPROTO_TCP
)
{

SOCKADDR_STORAGE sa;

WSANETWORKEVENTS ne;

SOCKET
sc;

int
salen;

rc =
WSAEnumNetworkEvents(
sockobj-
>
s,
thread-
>
Handles[
index]
,
&
ne)
;

if
(
rc =
=
SOCKET_ERROR)
{

fprintf
(
stderr
,
"WSAEnumNetworkEvents failed./n"
)
;

break
;

}

while
(
true
)
{

sc =
INVALID_SOCKET;

salen =
sizeof
(
sa)
;

sc =
accept
(
sockobj-
>
s,
(
SOCKADDR
*
)
&
sa,
&
salen)
;

if
(
(
sc =
=
INVALID_SOCKET)
&
&
(
WSAGetLastError(
)
!
=
WSAEWOULDBLOCK)
)
{

fprintf
(
stderr
,
"accept failed./n"
)
;

break
;

}
else
if
(
sc =
=
INVALID_SOCKET)
{

continue
;

}
else
{

newsock =
GetSocketObj(
INVALID_SOCKET,
FALSE
)
;

memcpy
(
&
newsock-
>
addr,
&
sa,
salen)
;

newsock-
>
addrlen =
salen;

newsock-
>
s =
sc;

rc =
WSAEventSelect(
newsock-
>
s,
newsock-
>
event,
FD_READ |
FD_WRITE |
FD_CLOSE)
;

if
(
rc =
=
SOCKET_ERROR)

{

fprintf
(
stderr
,
"WSAEventSelect failed: %d/n"
,
WSAGetLastError(
)
)
;

break
;

}

AssignToFreeThread(
newsock)
;

}

}

}

}

}

WSACleanup(
)
;

return
0;

}

版权声明:
原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 、作者信息和本声明。否则将追究法律责任。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: