您的位置:首页 > 其它

win32 多线程知识点梳理六 IOCP

2015-10-02 23:02 633 查看
IOCP的全称就是I/O Completion ports。

虽然名称看上去相似,但是它和APCs 中所用的I/O completion routines 没有任何关联。

IOCP 可以解决目前为止我们看到的所有问题:

与WaitForMultipleObjects()不同,这里不限制handles个数。

I/O completion ports 允许一个线程将一个请求暂时保存下来,而由另一个线程为它做实际服务。

I/O completion ports 默默支持 scalable 架构。

服务器的线程模型

有3 个基本的方法可以决定一个服务器上需要多少个线程:

单独一个线程。 在一个文件服务器或一个简单的web服务器上,单一线程可以使用overlapped I/O来搬移数据,但是如果这个线程还必须进行任何其他操作,整个服务器就会陷入泥沼中。

每个client 给予一个线程,如果你为每个client产生一个线程那么理论上每个人都可以有不错的反应时间,因为CPU的能量被平均分配了,然而事实上系统资源是有限的,到了某种情况下,系统效率就会急剧下降,比如如果有2000个client,这种做法就不切实际了。

每个CPU给予一个线程,这种做法让每一个CPU尽可能忙碌,不至于出现哪一个CPU有过度饱和的情况,如果你使用event对象或APCs,此法将难以实现,因为它们都是否紧密地和某个线程绑在一起,此时我们就需要依赖一种特殊的同步对象,也就是I/O completion ports。

I/O completion port 是一种非常特殊的核心对象,用来综合一堆线程,让它们为overlapped请求服务。

其所提供的功能可以跨越多个CPUs。

Completion Ports 做了什么

IOCP允许你将启动overlapped 请求的线程提供服务的线程拆伙。

为了使用IOCP,你的程序应该产生一堆线程,统统在IOCP上等待着,这些线程都将成为能够处理completed overlapped I/O request的线程之一 ——只要线程在I/O completion port上等待,它就自然而然成为了那种线程。

每次有新的文件因为overlapped I/O开启你就可以让它的文件handle和I/O completion port产生关联,一旦这样的关系建立起来,任何文件操作如果成功完成,便会导致I/O completion packet 被送到 completion port去。这是发生操作系统之内的操作,对应用程序而言是透明的。

为了回应I/O completion packet,completion port释放了一个等待中的线程,如果目前没有线程正在等待,completion port就不会产生新线程。

释放出来的线程被给予足够的信息后就可以执行,处理该操作请求,但它还是属于原来的那一堆(被指定给此completion port)的线程,所不同的是,之前它是等待中的线程(waiting),现在它是一个作用中的线程(active),当这个线程将overlapped I/O请求处理完毕后,它应该再次在这个I/O completion port上等待。

现在我们再来描述一个completion port ,它是一个机制,用来管理一堆线程如何为completed overlapped I/O request服务。然而,completion port远比一个简单的分派器要丰富的多,它可以保持一个CPU或多个CPUs尽可能忙碌,但也避免它们被太多线程淹没,I/O completion port企图保持并行处理的线程个数在某个数字左右,一般而言你希望所有的CPUs都忙碌,所以一般而言并行处理的线程个数就是CPUs的个数。

I/O completion port运作过程中令人迷惑的部分就是,当一个线程被阻塞时它将发出通过并提交另一个线程。假设在单一CPU系统中有两个线程都正在一个I/O completion port上等待,线程1被唤醒,并且从网络上活得一包数据,为了服务这个数据包,线程1必须从磁盘上读取一个文件,所以它调用CreateFile()和ReadFile(),但是并不在overlapped模式中。Completion port 于是通告说线程1被磁盘I/O滞留了,并且提交线程2以使目前作用的线程个数到达需求数量。

当线程1 从磁盘操作中返回时,或许现有有两个线程同时在执行——甚至即使作用中的线程个数应该是1(因为CPU的个数只是1),这一行为令人惊讶,但却是正确的。Completion Port不会再提交另一个线程,除非作用中的线程个数再次降低到1以下。

操作概观

使用一个completion port 的快速摘要如下:

1. 产生一个I/O completion port

2. 让它和一个文件handle 产生关联

3. 产生一堆线程

4. 让每一个线程都在completion port 上等待

5. 开始对着那个文件handle 发出一些overlapped I/O 请求

当新文件被开启时,它们可以在任何时候与I/O completion port 产生关联。在completion port上等待的线程不应该做为completion port 服务以外的事情,因为这些线程一直都将是completion port 所持续追踪的那一堆线程的一部分。

步骤1:产生一个I/O Completion Port

I/O completion port 是一核心对象,你必须使用CreateIoCompletionPort()才能产生它:

HANDLE WINAPI CreateIoCompletionPort(
_In_     HANDLE    FileHandle, //文件或设备的handle,可以使用INVALID_HANDLE_VALUE,表示产生一个与任何文件handle都无关的port
_In_opt_ HANDLE    ExistingCompletionPort,//如果被指定,那么上一栏的FileHandle就会被加到这个port上,指定NULL可以产生一个新的port
_In_     ULONG_PTR CompletionKey,//用户自定义的的一个数值,将被交给提供服务的线程,这个值和FileHandle有关
_In_     DWORD     NumberOfConcurrentThreads//与此completion port 有关联的线程个数
);


返回值:

如果函数成功将返回一个I/O completion port的handle,如果函数失败那么则传回FALSE

任何文件只要附着到一个I/O completion port身上,都必须先以FILE_FLAG_OVERLAPPED开启,如果已经附着上去,就不能够再以ReadFileEx()或WriteFileEx()操作它。你可以任意关闭这样一个文件,没有安全上的顾虑。

还有需要注意的是,最后一个参数:

If this parameter is zero, the system allows as many concurrently running threads as there are processors in the system.

如果你设定为0,那么在CPU系统上就会有尽可能多的线程运行起来。

步骤2:与一个文件handle产生关联

CreateCompletionPort()通常被调用两次,一次先指定FileHandle为INVALID_HANDLE_VALUE,并设定ExistingCompletionPort 为NULL,用以产生一个port。

然后再为每一个欲附着上去的文件handle调用一次CreateIoCompletionPort(), 这些调用将ExistingCompletionPort设定为第一次调用所传回的handle。

例如:

HANDLE hPort;
HANDLE hFiles[MAX_FILES];
int index;
hPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0);
for(index =0; index<OPEN_FILES; index++){
CreateIoCompletionPort(hFiles[index],hPort,0,0);
}


步骤3 : 产生一堆线程

一旦completion port 产生出来,你就可以设立在该port上等待的那些线程了。 I/O completion port并不自己产生那些线程,它只是使用由你产生的线程。因此你必须自己以CreateThread() 或_beginthreadex()或AfxBeginThread()产生出线程。

当你一产生这些线程时,它们都应该在completion port上等待,当线程开始为各个请求服务时,池子里的线程的组织如下:

目前正在执行的线程
+ 被阻塞的线程
+ 在completion port上等待的线程
----------------------------------------
池子里所有线程的个数


因为如此,所以你应该产生比CPU个数还多的线程,如果你只有一个CPU,而你也只产生了一个线程,那么当该线程阻塞时,你的CPU也变成闲置的了。由于池子里没有其他线程,completion port 也就没有办法为任何数据包服务,甚至即使CPU的能量游刃有余。

合理的线程个数应该是CPU个数的两倍加上2,你当然也可以产生更多的线程,但记住线程不是免费的。

步骤4 :在一个I/O Completion Port 上等待

work线程初始化自己后,它应该调用GetQueuedCompletionStatus(),这个操作像是WaitForSingleObject() 和 GetOverlappedResult()的组合。函数的规格如下:

BOOL WINAPI GetQueuedCompletionStatus(
_In_  HANDLE       CompletionPort,//将在其上等待的completion port
_Out_ LPDWORD      lpNumberOfBytes,//一个指针,指向被传输的数据字节数
_Out_ PULONG_PTR   lpCompletionKey,//一个指针,由CreateIoCompletionPort()所定义的key
_Out_ LPOVERLAPPED *lpOverlapped,//其实应该命名为lplpOverlapped,因为它指向的是overlapped结构的指针的地址
_In_  DWORD        dwMilliseconds//等待的最长时间
);


返回值

如果函数成功将一个completion packet 从队列中取出,并完成一个成功的操作,函数将传回TRUE,并填写由lpNumberOfBytesTransferred 、lpCompletionKey、lpOverlapped所指向的变量内容。

如果操作失败,但completion packet已经从队列中取出,则函数传回FALSE,lpOverlapped 指向失败之操作。调用GetLastError()可获知为什么I/O操作会失败。

如果函数失败,则传回FALSE,lpOverlapped 设为NULL。

与其他的核心对象不同,在completion port上等待的线程是以先进后出(FILO)的次序提供服务。没有什么理由需要担忧次序的公平性,因为所有的线程都做完全相同的事情。

使用FILO,一个进行中的线程调用GetQueuedCompletionStatus()就可以取得下一个请求request,并保持执行状态,没有阻塞,这是非常有效率的。如果线程等待太长的时间,就有可能被置换出去(page out)。最近执行过的线程则通常还在内存中,不需要先置换进来才能执行。

步骤5: 发出overlapped I/O请求

这些请求可以启动一个能被I/O completion port所驾驭的I/O操作:

ConnectNamePipe()

DeviceIoControl()

LockFileEx()

ReadFile()

TransactNamePipe()

WaitCommEvent()

WriteFile()

为了使用completion port ,主线程可以对着一个与此completion port 有关联的文件,进行读、写、或其他任何操作。该线程不需要调用WaitForMultipleObjects(),因为池子里各个线程都调用过GetQueuedCompletionStatus()。一旦I/O操作完成,一个等待中的线程将会自动被释放,以服务该操作。

避免Completion Packets

常常会有这种情况:你读一个文件,但是操作完成时,你并不希望I/O completion port 被通告。网络服务器就是一个例子,在那里,线程从一个文件读进一个针对named pipe 或socket的请求,然后将回应写到同一个文件中。问题是文件已经以overlapped I/O状态打开了,所以写入操作将被异步。而当写入操作完成时,completion port将会收到一个completion packet,如果写入操作不是很重要,那么服务器花费许多时间处理不怎么重要的completion packets就显得得不偿失。

解决之道,将每个操作均会引发一个completion port 通告的开关关闭。

我们可以用受激发的event对象的机制取而代之,你设定一个overlapped结构,内含一个manual reset event对象,放在hEvent一栏,然后把handle最低位设为1. ————有点类似于hack行为,但是文件中是这么交代的。

OVERLAPPED overlap;
HANDLE hFile;
char buffer[128];
DWORD dwBytesWritten;

memset(&overlap,0,sizeof(OVERLAPPED));
overlap.hEvent = CreateEvent(NULL,TURE, FALSE, NULL);
overlap.hEvent = (HANDLE)((DWORD)overlap.hEvent | 0x1);
WriteFile(hFile,buffer,128,&dwBytesWritten,&overlap);


对Sockets 使用Overlapped I/O

来看一个模拟标准的TCP port 7 行为的例子。

ECHOSRV 是一个服务器, 倾听TCP port 5554(一个任意数值) 并且将它从socket中读到的每一样东西写回相同的socket,ECHOCLI是一个客户端,负责取得你的输入,送往服务器,然后印出它从服务器收到的回应。

先上代码吧,我稍微修改了一下,用vs2010可以编译通过。

服务器端代码:

/*
* EchoSrv.c
*
* Sample code for Multithreading Applications in Win32
* This is from Chapter 6, Listing 6-4
*
* Demonstrates how to use I/O completion ports
* with TCP on the Internet.  This sample
* server can only be run on Windows NT,
* version 3.51 or later.  The client (EchoCli)
* can be run on Windows 95.
*/

#define WIN32_LEAN_AND_MEAN
#include <stdio.h>
#include <stdlib.h>
#include <windows.h>
#include <tchar.h>
#include <string.h>
#include <winsock.h>
#include <io.h>
//#include "MtVerify.h"

// Pick a port number that seems to be away from all others
#define SERV_TCP_PORT 5554
#define MAXLINE 512
#pragma comment(lib, "Wsock32.lib")
//
// Structure definition
//

// The context key keeps track of how the I/O
// is progressing for each individual file handle.
struct ContextKey
{
SOCKET  sock;
// Input
char        InBuffer[4];
OVERLAPPED  ovIn;
// Output
int         nOutBufIndex;
char        OutBuffer[MAXLINE];
OVERLAPPED  ovOut;
DWORD       dwWritten;
};

//
// Global variables
//

HANDLE ghCompletionPort;

//
// Function prototypes
//

//void CreateWorkerThreads(HANDLE);
void CreateWorkerThreads();
DWORD WINAPI ThreadFunc(LPVOID pvoid);
void IssueRead(struct ContextKey *pCntx);
void CheckOsVersion();
void FatalError(char *s);

///////////////////////////////////////////////////////

int main(int argc, char *argv[])
{
SOCKET  listener;
SOCKET  newsocket;
WSADATA WsaData;
struct sockaddr_in serverAddress;
struct sockaddr_in clientAddress;
int     clientAddressLength;
int     err;

CheckOsVersion();

err = WSAStartup (0x0101, &WsaData);
if (err == SOCKET_ERROR)
{
FatalError("WSAStartup Failed");
return EXIT_FAILURE;
}

/*
* Open a TCP socket connection to the server
* By default, a socket is always opened
* for overlapped I/O.  Do NOT attach this
* socket (listener) to the I/O completion
* port!
*/
listener = socket(AF_INET, SOCK_STREAM, 0);
if (listener < 0)
{
FatalError("socket() failed");
return EXIT_FAILURE;
}

/*
* Bind our local address
*/
memset(&serverAddress, 0, sizeof(serverAddress));
serverAddress.sin_family      = AF_INET;
serverAddress.sin_addr.s_addr = htonl(INADDR_ANY);
serverAddress.sin_port        = htons(SERV_TCP_PORT);

err = bind(listener,
(struct sockaddr *)&serverAddress,
sizeof(serverAddress)
);
if (err < 0)
FatalError("bind() failed");

ghCompletionPort = CreateIoCompletionPort(
INVALID_HANDLE_VALUE,
NULL,   // No prior port
0,      // No key
0       // Use default  # of threads
);
if (ghCompletionPort == NULL)
FatalError("CreateIoCompletionPort() failed");

CreateWorkerThreads();
//CreateWorkerThreads(ghCompletionPort);

listen(listener, 5);

fprintf(stderr, "Echo Server with I/O Completion Ports\n");
fprintf(stderr, "Running on TCP port %d\n", SERV_TCP_PORT);
fprintf(stderr, "\nPress Ctrl+C to stop the server\n");

//
// Loop forever accepting requests new connections
// and starting reading from them.
//
for (;;)
{
struct ContextKey *pKey;

clientAddressLength = sizeof(clientAddress);
newsocket = accept(listener,
(struct sockaddr *)&clientAddress,
&clientAddressLength);
if (newsocket < 0)
{
FatalError("accept() Failed");
return EXIT_FAILURE;
}

// Create a context key and initialize it.
// calloc will zero the buffer
pKey = (ContextKey* )calloc(1, sizeof(struct ContextKey));
pKey->sock = newsocket;
pKey->ovOut.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
// Set the event for writing so that packets
// will not be sent to the completion port when
// a write finishes.
pKey->ovOut.hEvent = (HANDLE)((DWORD)pKey->ovOut.hEvent | 0x1);

// Associate the socket with the completion port
CreateIoCompletionPort(
(HANDLE)newsocket,
ghCompletionPort,
(DWORD)pKey,   // key
0              // Use default # of threads
);

// Kick off the first read
IssueRead(pKey);
}
return 0;
}

void CreateWorkerThreads()
{
SYSTEM_INFO  sysinfo;
DWORD        dwThreadId;
DWORD        dwThreads;
DWORD        i;

GetSystemInfo(&sysinfo);
dwThreads = sysinfo.dwNumberOfProcessors * 2 + 2;
for (i=0; i<dwThreads; i++)
{
HANDLE hThread;
hThread = CreateThread(
NULL, 0, ThreadFunc, NULL, 0, &dwThreadId
);
CloseHandle(hThread);
}
}

//
// Each worker thread starts here.
DWORD WINAPI ThreadFunc(LPVOID pVoid)
{
BOOL    bResult;
DWORD   dwNumRead;
struct ContextKey *pCntx;
LPOVERLAPPED lpOverlapped;

UNREFERENCED_PARAMETER(pVoid);

// Loop forever on getting packets from
// the I/O completion port.
for (;;)
{
bResult = GetQueuedCompletionStatus(
ghCompletionPort,
&dwNumRead,
(LPDWORD)&pCntx,
&lpOverlapped,
INFINITE
);

if (bResult == FALSE
&& lpOverlapped == NULL)
{
FatalError(
"ThreadFunc - Illegal call to GetQueuedCompletionStatus");
}

else if (bResult == FALSE
&& lpOverlapped != NULL)
{
// This happens occasionally instead of
// end-of-file. Not sure why.
closesocket(pCntx->sock);
free(pCntx);
fprintf(stderr,
"ThreadFunc - I/O operation failed\n");
}

else if (dwNumRead == 0)
{
closesocket(pCntx->sock);
free(pCntx);
fprintf(stderr, "ThreadFunc - End of file.\n");
}

// Got a valid data block!
// Save the data to our buffer and write it
// all back out (echo it) if we have see a \n
else
{
// Figure out where in the buffer to save the character
char *pch = &pCntx->OutBuffer[pCntx->nOutBufIndex++];
*pch++ = pCntx->InBuffer[0];
*pch = '\0';    // For debugging, WriteFile doesn't care
if (pCntx->InBuffer[0] == '\n')
{
WriteFile(
(HANDLE)(pCntx->sock),
pCntx->OutBuffer,
pCntx->nOutBufIndex,
&pCntx->dwWritten,
&pCntx->ovOut
);
pCntx->nOutBufIndex = 0;
fprintf(stderr, "Echo on socket %x.\n", pCntx->sock);
}

// Start a new read
IssueRead(pCntx);
}
}

return 0;
}

/*
* Call ReadFile to start an overlapped request
* on a socket.  Make sure we handle errors
* that are recoverable.
*/
void IssueRead(struct ContextKey *pCntx)
{
int     i = 0;
BOOL    bResult;
int     err;
int     numRead;

while (++i)
{
// Request a single character
bResult = ReadFile(
(HANDLE)pCntx->sock,
pCntx->InBuffer,
1,
(LPDWORD)&numRead,
&pCntx->ovIn
);

// It succeeded immediately, but do not process it
// here, wait for the completion packet.
if (bResult)
return;

err = GetLastError();

// This is what we want to happen, it's not an error
if (err == ERROR_IO_PENDING)
return;

// Handle recoverable error
if ( err == ERROR_INVALID_USER_BUFFER ||
err == ERROR_NOT_ENOUGH_QUOTA ||
err == ERROR_NOT_ENOUGH_MEMORY )
{
if (i == 5) // I just picked a number
{
Sleep(50);  // Wait around and try later
continue;
}

FatalError("IssueRead - System ran out of non-paged space");
}

break;
}

fprintf(stderr, "IssueRead - ReadFile failed.\n");
}

//
// Make sure we are running under the right versions
// of Windows NT (3.51, 4.0, or later)
//
void CheckOsVersion()
{
OSVERSIONINFO   ver;
BOOL            bResult;

ver.dwOSVersionInfoSize = sizeof(OSVERSIONINFO);

bResult = GetVersionEx((LPOSVERSIONINFO) &ver);

if ( (!bResult) ||
(ver.dwPlatformId != VER_PLATFORM_WIN32_NT) )
{
FatalError("ECHOSRV requires Windows NT 3.51 or later.");
}

}

//
// Error handler
//
void FatalError(char *s)
{
fprintf(stdout, "%s\n", s);
exit(EXIT_FAILURE);
}


客户端代码

/*
* EchoCli.c
*
* Sample code for "Multithreading Applications in Win32"
* This is from Chapter 6.
*
* This is a command line client to drive the ECHO server.
* Run the server in one Commmand Prompt Window,
* then run this program in one or more other windows.
* EchoCli will wait for you to type in some text when
* it starts up. Each line of text will be sent to the
* echo server on TCP port 5554.
*/

#include <windows.h>
#include <tchar.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <winsock.h>

#pragma comment(lib, "Wsock32.lib")

/* Function Prototypes */
void FatalError(char *s);
int writen(SOCKET sock, char *ptr, int nBytes);
int readline(SOCKET sock, char *ptr, int maxlen);
void DoClientLoop(FILE *fp, SOCKET sock);

/* Constants */
#define MAXLINE 512
#define SERVER_TCP_PORT     5554
#define SERVER_ADDRESS      "127.0.0.1"

/*
* Error handler
*/
void FatalError(char *s)
{
fprintf(stdout, "%s\n", s);
exit(EXIT_FAILURE);
}

/*
* Write bytes to the port with proper block size handling.
*/
int writen(SOCKET sock, char *ptr, int nBytes)
{
int nleft;
int nwritten;

nleft = nBytes;
while (nleft > 0)
{
nwritten = send(sock,
ptr,
nBytes,
0
);

if (nwritten == SOCKET_ERROR)
{
fprintf(stdout, "Send Failed\n");
exit(1);
}

nleft -= nwritten;
ptr += nwritten;
}

return nBytes - nleft;
}

/*
* Read a line of text of the port. This version
* is very inefficient, but it's simple.
*/
int readline(SOCKET sock, char *ptr, int maxlen)
{
int n;
int rc;
char c;

for (n=1; n<maxlen; n++)
{
if ( ( rc= recv(sock, &c, 1, 0)) == 1)
{
*ptr++ = c;
if (c=='\n')
break;
}
else if (rc == 0)
{
if (n == 1)
return 0;
else
break;
}
else
return -1;  /* Error */
}

*ptr = '\0';
return n;
}

int main(int argc, char *argv[])
{
WSADATA WsaData;
SOCKET sock;
struct sockaddr_in  serv_addr;
int err;

puts("EchoCli - client for echo server sample program\n");
puts("Type a line of text followed by Return.");
puts("Exit this program by typing Ctrl+Z followed by Return.");

err = WSAStartup(0x0101, &WsaData);
if (err == SOCKET_ERROR)
FatalError("WSAStartup Failed");

/*
* Bind our local address
*/
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family    = AF_INET;
// Use the local host
serv_addr.sin_addr.s_addr   = inet_addr(SERVER_ADDRESS);
serv_addr.sin_port          = htons(SERVER_TCP_PORT);

/*
* Open a TCP socket (an Internet stream socket)
*/

sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0)
FatalError("socket() failed -- do you have TCP/IP networking installed?");

if (connect(sock, (struct sockaddr *) &serv_addr,
sizeof(serv_addr)) < 0)
FatalError("connect() failed -- is the server running?");

DoClientLoop(stdin, sock);

closesocket(sock);

return EXIT_SUCCESS;
}

/*
* As long as there is input from "fp", copy it to
* the server, read what the server gives back,
* and print it.
*/
void DoClientLoop(FILE *fp, SOCKET sock)
{
int n;
char sendline[MAXLINE];
char recvline[MAXLINE+1];

while (fgets(sendline, MAXLINE, fp) != NULL)
{
n = strlen(sendline);
if (writen(sock, sendline, n) != n)
FatalError("DoClientLoop: writen() error");

n = readline(sock, recvline, MAXLINE);
if (n < 0)
FatalError("DoClientLoop: readline() error");
recvline
= '\0';
fputs(recvline, stdout);
}

if (ferror(fp))
FatalError("DoClientLoop: error reading file");
}


运行结果

运行结果如下图所示:

首先运行服务器端,它开启socket,产生I/O completion port,启动worker线程,并进入循环之中等待连接。

它要做的事情就是读入客户端的输入,随后把它再原样写回,这就是echo的名字的由来。



随后启动一个客户端:

并输入字符串“hello, I am A”



可以看到服务器端此时为:



我们再启动一个客户端:

并输入字符串“nihao ”



可以看到服务器端此时变为:



过程解读

#define AF_INET         2               /* internetwork: UDP, TCP, etc. */


创建套接字

listener = socket(AF_INET, SOCK_STREAM, 0);
if (listener < 0)
{
FatalError("socket() failed");
return EXIT_FAILURE;
}


分配套接字地址

memset(&serverAddress, 0, sizeof(serverAddress));
serverAddress.sin_family      = AF_INET;
serverAddress.sin_addr.s_addr = htonl(INADDR_ANY);
serverAddress.sin_port        = htons(SERV_TCP_PORT);

err = bind(listener,
(struct sockaddr *)&serverAddress,
sizeof(serverAddress)
);
if (err < 0)
FatalError("bind() failed");


产生一个I/O completion port

ghCompletionPort = CreateIoCompletionPort(
INVALID_HANDLE_VALUE,
NULL,   // No prior port
0,      // No key
0       // Use default  # of threads
);
if (ghCompletionPort == NULL)
FatalError("CreateIoCompletionPort() failed");


产生一堆线程,在这个线程中调用了ThreadFunc(),那是 worker线程花掉所有时间的地方,在其中调用了GetQueueCompletionStatus(),当每一个completion packet被处理之后,我们使用GetQueueCompletionStatus()传回来的key,当作指向ContextKey的结构的指针

DWORD WINAPI ThreadFunc(LPVOID pVoid)
{
BOOL    bResult;
DWORD   dwNumRead;
struct ContextKey *pCntx;
LPOVERLAPPED lpOverlapped;

UNREFERENCED_PARAMETER(pVoid);

// Loop forever on getting packets from
// the I/O completion port.
for (;;)
{
bResult = GetQueuedCompletionStatus(
ghCompletionPort,
&dwNumRead,
(LPDWORD)&pCntx,
&lpOverlapped,
INFINITE
);

if (bResult == FALSE//函数失败
&& lpOverlapped == NULL)
{
FatalError(
"ThreadFunc - Illegal call to GetQueuedCompletionStatus");
}

else if (bResult == FALSE
&& lpOverlapped != NULL)//操作失败
{
// This happens occasionally instead of
// end-of-file. Not sure why.
closesocket(pCntx->sock);
free(pCntx);
fprintf(stderr,
"ThreadFunc - I/O operation failed\n");
}

else if (dwNumRead == 0)// 被传输的字节数为0
{
closesocket(pCntx->sock);
free(pCntx);
fprintf(stderr, "ThreadFunc - End of file.\n");
}

// Got a valid data block!
// Save the data to our buffer and write it
// all back out (echo it) if we have see a \n
else
{
// Figure out where in the buffer to save the character
char *pch = &pCntx->OutBuffer[pCntx->nOutBufIndex++];//指向OutBuffer对应位置
*pch++ = pCntx->InBuffer[0];// 可以看到字符是一个一个被读入的
*pch = '\0';    // For debugging, WriteFile doesn't care
if (pCntx->InBuffer[0] == '\n')//当完整一行后,就写回到socket中
{
WriteFile(
(HANDLE)(pCntx->sock),
pCntx->OutBuffer,
pCntx->nOutBufIndex,
&pCntx->dwWritten,
&pCntx->ovOut
);
pCntx->nOutBufIndex = 0;
fprintf(stderr, "Echo on socket %x.\n", pCntx->sock);
}

// Start a new read
IssueRead(pCntx);
}
}

return 0;
}


进入等待连接请求状态

listen(listener, 5);


进入循环之中等待连接,accept返回创建套接字的文件描述符,如果失败将返回-1.

for (;;)
{
struct ContextKey *pKey;

clientAddressLength = sizeof(clientAddress);
newsocket = accept(listener,
(struct sockaddr *)&clientAddress,
&clientAddressLength);
if (newsocket < 0)
{
FatalError("accept() Failed");
return EXIT_FAILURE;
}

// Create a context key and initialize it.
// calloc will zero the buffer
pKey = (ContextKey* )calloc(1, sizeof(struct ContextKey));
pKey->sock = newsocket;
pKey->ovOut.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
// Set the event for writing so that packets
// will not be sent to the completion port when
// a write finishes.
pKey->ovOut.hEvent = (HANDLE)((DWORD)pKey->ovOut.hEvent | 0x1);//这样设定后,当写入操作完成时,不需要任何packet被送往completion port。

// Associate the socket with the completion port
CreateIoCompletionPort(
(HANDLE)newsocket,
ghCompletionPort,
(DWORD)pKey,   // key
0              // Use default # of threads
);

// Kick off the first read
IssueRead(pKey);
}


资料来源:

Win32多线程程序设计
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: