win32 多线程知识点梳理六 IOCP
2015-10-02 23:02
633 查看
IOCP的全称就是I/O Completion ports。
虽然名称看上去相似,但是它和APCs 中所用的I/O completion routines 没有任何关联。
IOCP 可以解决目前为止我们看到的所有问题:
与WaitForMultipleObjects()不同,这里不限制handles个数。
I/O completion ports 允许一个线程将一个请求暂时保存下来,而由另一个线程为它做实际服务。
I/O completion ports 默默支持 scalable 架构。
单独一个线程。 在一个文件服务器或一个简单的web服务器上,单一线程可以使用overlapped I/O来搬移数据,但是如果这个线程还必须进行任何其他操作,整个服务器就会陷入泥沼中。
每个client 给予一个线程,如果你为每个client产生一个线程那么理论上每个人都可以有不错的反应时间,因为CPU的能量被平均分配了,然而事实上系统资源是有限的,到了某种情况下,系统效率就会急剧下降,比如如果有2000个client,这种做法就不切实际了。
每个CPU给予一个线程,这种做法让每一个CPU尽可能忙碌,不至于出现哪一个CPU有过度饱和的情况,如果你使用event对象或APCs,此法将难以实现,因为它们都是否紧密地和某个线程绑在一起,此时我们就需要依赖一种特殊的同步对象,也就是I/O completion ports。
I/O completion port 是一种非常特殊的核心对象,用来综合一堆线程,让它们为overlapped请求服务。
其所提供的功能可以跨越多个CPUs。
为了使用IOCP,你的程序应该产生一堆线程,统统在IOCP上等待着,这些线程都将成为能够处理completed overlapped I/O request的线程之一 ——只要线程在I/O completion port上等待,它就自然而然成为了那种线程。
每次有新的文件因为overlapped I/O开启你就可以让它的文件handle和I/O completion port产生关联,一旦这样的关系建立起来,任何文件操作如果成功完成,便会导致I/O completion packet 被送到 completion port去。这是发生操作系统之内的操作,对应用程序而言是透明的。
为了回应I/O completion packet,completion port释放了一个等待中的线程,如果目前没有线程正在等待,completion port就不会产生新线程。
释放出来的线程被给予足够的信息后就可以执行,处理该操作请求,但它还是属于原来的那一堆(被指定给此completion port)的线程,所不同的是,之前它是等待中的线程(waiting),现在它是一个作用中的线程(active),当这个线程将overlapped I/O请求处理完毕后,它应该再次在这个I/O completion port上等待。
现在我们再来描述一个completion port ,它是一个机制,用来管理一堆线程如何为completed overlapped I/O request服务。然而,completion port远比一个简单的分派器要丰富的多,它可以保持一个CPU或多个CPUs尽可能忙碌,但也避免它们被太多线程淹没,I/O completion port企图保持并行处理的线程个数在某个数字左右,一般而言你希望所有的CPUs都忙碌,所以一般而言并行处理的线程个数就是CPUs的个数。
I/O completion port运作过程中令人迷惑的部分就是,当一个线程被阻塞时它将发出通过并提交另一个线程。假设在单一CPU系统中有两个线程都正在一个I/O completion port上等待,线程1被唤醒,并且从网络上活得一包数据,为了服务这个数据包,线程1必须从磁盘上读取一个文件,所以它调用CreateFile()和ReadFile(),但是并不在overlapped模式中。Completion port 于是通告说线程1被磁盘I/O滞留了,并且提交线程2以使目前作用的线程个数到达需求数量。
当线程1 从磁盘操作中返回时,或许现有有两个线程同时在执行——甚至即使作用中的线程个数应该是1(因为CPU的个数只是1),这一行为令人惊讶,但却是正确的。Completion Port不会再提交另一个线程,除非作用中的线程个数再次降低到1以下。
1. 产生一个I/O completion port
2. 让它和一个文件handle 产生关联
3. 产生一堆线程
4. 让每一个线程都在completion port 上等待
5. 开始对着那个文件handle 发出一些overlapped I/O 请求
当新文件被开启时,它们可以在任何时候与I/O completion port 产生关联。在completion port上等待的线程不应该做为completion port 服务以外的事情,因为这些线程一直都将是completion port 所持续追踪的那一堆线程的一部分。
返回值:
如果函数成功将返回一个I/O completion port的handle,如果函数失败那么则传回FALSE
任何文件只要附着到一个I/O completion port身上,都必须先以FILE_FLAG_OVERLAPPED开启,如果已经附着上去,就不能够再以ReadFileEx()或WriteFileEx()操作它。你可以任意关闭这样一个文件,没有安全上的顾虑。
还有需要注意的是,最后一个参数:
If this parameter is zero, the system allows as many concurrently running threads as there are processors in the system.
如果你设定为0,那么在CPU系统上就会有尽可能多的线程运行起来。
然后再为每一个欲附着上去的文件handle调用一次CreateIoCompletionPort(), 这些调用将ExistingCompletionPort设定为第一次调用所传回的handle。
例如:
当你一产生这些线程时,它们都应该在completion port上等待,当线程开始为各个请求服务时,池子里的线程的组织如下:
因为如此,所以你应该产生比CPU个数还多的线程,如果你只有一个CPU,而你也只产生了一个线程,那么当该线程阻塞时,你的CPU也变成闲置的了。由于池子里没有其他线程,completion port 也就没有办法为任何数据包服务,甚至即使CPU的能量游刃有余。
合理的线程个数应该是CPU个数的两倍加上2,你当然也可以产生更多的线程,但记住线程不是免费的。
返回值
如果函数成功将一个completion packet 从队列中取出,并完成一个成功的操作,函数将传回TRUE,并填写由lpNumberOfBytesTransferred 、lpCompletionKey、lpOverlapped所指向的变量内容。
如果操作失败,但completion packet已经从队列中取出,则函数传回FALSE,lpOverlapped 指向失败之操作。调用GetLastError()可获知为什么I/O操作会失败。
如果函数失败,则传回FALSE,lpOverlapped 设为NULL。
与其他的核心对象不同,在completion port上等待的线程是以先进后出(FILO)的次序提供服务。没有什么理由需要担忧次序的公平性,因为所有的线程都做完全相同的事情。
使用FILO,一个进行中的线程调用GetQueuedCompletionStatus()就可以取得下一个请求request,并保持执行状态,没有阻塞,这是非常有效率的。如果线程等待太长的时间,就有可能被置换出去(page out)。最近执行过的线程则通常还在内存中,不需要先置换进来才能执行。
ConnectNamePipe()
DeviceIoControl()
LockFileEx()
ReadFile()
TransactNamePipe()
WaitCommEvent()
WriteFile()
为了使用completion port ,主线程可以对着一个与此completion port 有关联的文件,进行读、写、或其他任何操作。该线程不需要调用WaitForMultipleObjects(),因为池子里各个线程都调用过GetQueuedCompletionStatus()。一旦I/O操作完成,一个等待中的线程将会自动被释放,以服务该操作。
解决之道,将每个操作均会引发一个completion port 通告的开关关闭。
我们可以用受激发的event对象的机制取而代之,你设定一个overlapped结构,内含一个manual reset event对象,放在hEvent一栏,然后把handle最低位设为1. ————有点类似于hack行为,但是文件中是这么交代的。
ECHOSRV 是一个服务器, 倾听TCP port 5554(一个任意数值) 并且将它从socket中读到的每一样东西写回相同的socket,ECHOCLI是一个客户端,负责取得你的输入,送往服务器,然后印出它从服务器收到的回应。
先上代码吧,我稍微修改了一下,用vs2010可以编译通过。
首先运行服务器端,它开启socket,产生I/O completion port,启动worker线程,并进入循环之中等待连接。
它要做的事情就是读入客户端的输入,随后把它再原样写回,这就是echo的名字的由来。
随后启动一个客户端:
并输入字符串“hello, I am A”
可以看到服务器端此时为:
我们再启动一个客户端:
并输入字符串“nihao ”
可以看到服务器端此时变为:
创建套接字
分配套接字地址
产生一个I/O completion port
产生一堆线程,在这个线程中调用了ThreadFunc(),那是 worker线程花掉所有时间的地方,在其中调用了GetQueueCompletionStatus(),当每一个completion packet被处理之后,我们使用GetQueueCompletionStatus()传回来的key,当作指向ContextKey的结构的指针
进入等待连接请求状态
进入循环之中等待连接,accept返回创建套接字的文件描述符,如果失败将返回-1.
资料来源:
Win32多线程程序设计
虽然名称看上去相似,但是它和APCs 中所用的I/O completion routines 没有任何关联。
IOCP 可以解决目前为止我们看到的所有问题:
与WaitForMultipleObjects()不同,这里不限制handles个数。
I/O completion ports 允许一个线程将一个请求暂时保存下来,而由另一个线程为它做实际服务。
I/O completion ports 默默支持 scalable 架构。
服务器的线程模型
有3 个基本的方法可以决定一个服务器上需要多少个线程:单独一个线程。 在一个文件服务器或一个简单的web服务器上,单一线程可以使用overlapped I/O来搬移数据,但是如果这个线程还必须进行任何其他操作,整个服务器就会陷入泥沼中。
每个client 给予一个线程,如果你为每个client产生一个线程那么理论上每个人都可以有不错的反应时间,因为CPU的能量被平均分配了,然而事实上系统资源是有限的,到了某种情况下,系统效率就会急剧下降,比如如果有2000个client,这种做法就不切实际了。
每个CPU给予一个线程,这种做法让每一个CPU尽可能忙碌,不至于出现哪一个CPU有过度饱和的情况,如果你使用event对象或APCs,此法将难以实现,因为它们都是否紧密地和某个线程绑在一起,此时我们就需要依赖一种特殊的同步对象,也就是I/O completion ports。
I/O completion port 是一种非常特殊的核心对象,用来综合一堆线程,让它们为overlapped请求服务。
其所提供的功能可以跨越多个CPUs。
Completion Ports 做了什么
IOCP允许你将启动overlapped 请求的线程和提供服务的线程拆伙。为了使用IOCP,你的程序应该产生一堆线程,统统在IOCP上等待着,这些线程都将成为能够处理completed overlapped I/O request的线程之一 ——只要线程在I/O completion port上等待,它就自然而然成为了那种线程。
每次有新的文件因为overlapped I/O开启你就可以让它的文件handle和I/O completion port产生关联,一旦这样的关系建立起来,任何文件操作如果成功完成,便会导致I/O completion packet 被送到 completion port去。这是发生操作系统之内的操作,对应用程序而言是透明的。
为了回应I/O completion packet,completion port释放了一个等待中的线程,如果目前没有线程正在等待,completion port就不会产生新线程。
释放出来的线程被给予足够的信息后就可以执行,处理该操作请求,但它还是属于原来的那一堆(被指定给此completion port)的线程,所不同的是,之前它是等待中的线程(waiting),现在它是一个作用中的线程(active),当这个线程将overlapped I/O请求处理完毕后,它应该再次在这个I/O completion port上等待。
现在我们再来描述一个completion port ,它是一个机制,用来管理一堆线程如何为completed overlapped I/O request服务。然而,completion port远比一个简单的分派器要丰富的多,它可以保持一个CPU或多个CPUs尽可能忙碌,但也避免它们被太多线程淹没,I/O completion port企图保持并行处理的线程个数在某个数字左右,一般而言你希望所有的CPUs都忙碌,所以一般而言并行处理的线程个数就是CPUs的个数。
I/O completion port运作过程中令人迷惑的部分就是,当一个线程被阻塞时它将发出通过并提交另一个线程。假设在单一CPU系统中有两个线程都正在一个I/O completion port上等待,线程1被唤醒,并且从网络上活得一包数据,为了服务这个数据包,线程1必须从磁盘上读取一个文件,所以它调用CreateFile()和ReadFile(),但是并不在overlapped模式中。Completion port 于是通告说线程1被磁盘I/O滞留了,并且提交线程2以使目前作用的线程个数到达需求数量。
当线程1 从磁盘操作中返回时,或许现有有两个线程同时在执行——甚至即使作用中的线程个数应该是1(因为CPU的个数只是1),这一行为令人惊讶,但却是正确的。Completion Port不会再提交另一个线程,除非作用中的线程个数再次降低到1以下。
操作概观
使用一个completion port 的快速摘要如下:1. 产生一个I/O completion port
2. 让它和一个文件handle 产生关联
3. 产生一堆线程
4. 让每一个线程都在completion port 上等待
5. 开始对着那个文件handle 发出一些overlapped I/O 请求
当新文件被开启时,它们可以在任何时候与I/O completion port 产生关联。在completion port上等待的线程不应该做为completion port 服务以外的事情,因为这些线程一直都将是completion port 所持续追踪的那一堆线程的一部分。
步骤1:产生一个I/O Completion Port
I/O completion port 是一核心对象,你必须使用CreateIoCompletionPort()才能产生它:HANDLE WINAPI CreateIoCompletionPort( _In_ HANDLE FileHandle, //文件或设备的handle,可以使用INVALID_HANDLE_VALUE,表示产生一个与任何文件handle都无关的port _In_opt_ HANDLE ExistingCompletionPort,//如果被指定,那么上一栏的FileHandle就会被加到这个port上,指定NULL可以产生一个新的port _In_ ULONG_PTR CompletionKey,//用户自定义的的一个数值,将被交给提供服务的线程,这个值和FileHandle有关 _In_ DWORD NumberOfConcurrentThreads//与此completion port 有关联的线程个数 );
返回值:
如果函数成功将返回一个I/O completion port的handle,如果函数失败那么则传回FALSE
任何文件只要附着到一个I/O completion port身上,都必须先以FILE_FLAG_OVERLAPPED开启,如果已经附着上去,就不能够再以ReadFileEx()或WriteFileEx()操作它。你可以任意关闭这样一个文件,没有安全上的顾虑。
还有需要注意的是,最后一个参数:
If this parameter is zero, the system allows as many concurrently running threads as there are processors in the system.
如果你设定为0,那么在CPU系统上就会有尽可能多的线程运行起来。
步骤2:与一个文件handle产生关联
CreateCompletionPort()通常被调用两次,一次先指定FileHandle为INVALID_HANDLE_VALUE,并设定ExistingCompletionPort 为NULL,用以产生一个port。然后再为每一个欲附着上去的文件handle调用一次CreateIoCompletionPort(), 这些调用将ExistingCompletionPort设定为第一次调用所传回的handle。
例如:
HANDLE hPort; HANDLE hFiles[MAX_FILES]; int index; hPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0); for(index =0; index<OPEN_FILES; index++){ CreateIoCompletionPort(hFiles[index],hPort,0,0); }
步骤3 : 产生一堆线程
一旦completion port 产生出来,你就可以设立在该port上等待的那些线程了。 I/O completion port并不自己产生那些线程,它只是使用由你产生的线程。因此你必须自己以CreateThread() 或_beginthreadex()或AfxBeginThread()产生出线程。当你一产生这些线程时,它们都应该在completion port上等待,当线程开始为各个请求服务时,池子里的线程的组织如下:
目前正在执行的线程 + 被阻塞的线程 + 在completion port上等待的线程 ---------------------------------------- 池子里所有线程的个数
因为如此,所以你应该产生比CPU个数还多的线程,如果你只有一个CPU,而你也只产生了一个线程,那么当该线程阻塞时,你的CPU也变成闲置的了。由于池子里没有其他线程,completion port 也就没有办法为任何数据包服务,甚至即使CPU的能量游刃有余。
合理的线程个数应该是CPU个数的两倍加上2,你当然也可以产生更多的线程,但记住线程不是免费的。
步骤4 :在一个I/O Completion Port 上等待
work线程初始化自己后,它应该调用GetQueuedCompletionStatus(),这个操作像是WaitForSingleObject() 和 GetOverlappedResult()的组合。函数的规格如下:BOOL WINAPI GetQueuedCompletionStatus( _In_ HANDLE CompletionPort,//将在其上等待的completion port _Out_ LPDWORD lpNumberOfBytes,//一个指针,指向被传输的数据字节数 _Out_ PULONG_PTR lpCompletionKey,//一个指针,由CreateIoCompletionPort()所定义的key _Out_ LPOVERLAPPED *lpOverlapped,//其实应该命名为lplpOverlapped,因为它指向的是overlapped结构的指针的地址 _In_ DWORD dwMilliseconds//等待的最长时间 );
返回值
如果函数成功将一个completion packet 从队列中取出,并完成一个成功的操作,函数将传回TRUE,并填写由lpNumberOfBytesTransferred 、lpCompletionKey、lpOverlapped所指向的变量内容。
如果操作失败,但completion packet已经从队列中取出,则函数传回FALSE,lpOverlapped 指向失败之操作。调用GetLastError()可获知为什么I/O操作会失败。
如果函数失败,则传回FALSE,lpOverlapped 设为NULL。
与其他的核心对象不同,在completion port上等待的线程是以先进后出(FILO)的次序提供服务。没有什么理由需要担忧次序的公平性,因为所有的线程都做完全相同的事情。
使用FILO,一个进行中的线程调用GetQueuedCompletionStatus()就可以取得下一个请求request,并保持执行状态,没有阻塞,这是非常有效率的。如果线程等待太长的时间,就有可能被置换出去(page out)。最近执行过的线程则通常还在内存中,不需要先置换进来才能执行。
步骤5: 发出overlapped I/O请求
这些请求可以启动一个能被I/O completion port所驾驭的I/O操作:ConnectNamePipe()
DeviceIoControl()
LockFileEx()
ReadFile()
TransactNamePipe()
WaitCommEvent()
WriteFile()
为了使用completion port ,主线程可以对着一个与此completion port 有关联的文件,进行读、写、或其他任何操作。该线程不需要调用WaitForMultipleObjects(),因为池子里各个线程都调用过GetQueuedCompletionStatus()。一旦I/O操作完成,一个等待中的线程将会自动被释放,以服务该操作。
避免Completion Packets
常常会有这种情况:你读一个文件,但是操作完成时,你并不希望I/O completion port 被通告。网络服务器就是一个例子,在那里,线程从一个文件读进一个针对named pipe 或socket的请求,然后将回应写到同一个文件中。问题是文件已经以overlapped I/O状态打开了,所以写入操作将被异步。而当写入操作完成时,completion port将会收到一个completion packet,如果写入操作不是很重要,那么服务器花费许多时间处理不怎么重要的completion packets就显得得不偿失。解决之道,将每个操作均会引发一个completion port 通告的开关关闭。
我们可以用受激发的event对象的机制取而代之,你设定一个overlapped结构,内含一个manual reset event对象,放在hEvent一栏,然后把handle最低位设为1. ————有点类似于hack行为,但是文件中是这么交代的。
OVERLAPPED overlap; HANDLE hFile; char buffer[128]; DWORD dwBytesWritten; memset(&overlap,0,sizeof(OVERLAPPED)); overlap.hEvent = CreateEvent(NULL,TURE, FALSE, NULL); overlap.hEvent = (HANDLE)((DWORD)overlap.hEvent | 0x1); WriteFile(hFile,buffer,128,&dwBytesWritten,&overlap);
对Sockets 使用Overlapped I/O
来看一个模拟标准的TCP port 7 行为的例子。ECHOSRV 是一个服务器, 倾听TCP port 5554(一个任意数值) 并且将它从socket中读到的每一样东西写回相同的socket,ECHOCLI是一个客户端,负责取得你的输入,送往服务器,然后印出它从服务器收到的回应。
先上代码吧,我稍微修改了一下,用vs2010可以编译通过。
服务器端代码:
/* * EchoSrv.c * * Sample code for Multithreading Applications in Win32 * This is from Chapter 6, Listing 6-4 * * Demonstrates how to use I/O completion ports * with TCP on the Internet. This sample * server can only be run on Windows NT, * version 3.51 or later. The client (EchoCli) * can be run on Windows 95. */ #define WIN32_LEAN_AND_MEAN #include <stdio.h> #include <stdlib.h> #include <windows.h> #include <tchar.h> #include <string.h> #include <winsock.h> #include <io.h> //#include "MtVerify.h" // Pick a port number that seems to be away from all others #define SERV_TCP_PORT 5554 #define MAXLINE 512 #pragma comment(lib, "Wsock32.lib") // // Structure definition // // The context key keeps track of how the I/O // is progressing for each individual file handle. struct ContextKey { SOCKET sock; // Input char InBuffer[4]; OVERLAPPED ovIn; // Output int nOutBufIndex; char OutBuffer[MAXLINE]; OVERLAPPED ovOut; DWORD dwWritten; }; // // Global variables // HANDLE ghCompletionPort; // // Function prototypes // //void CreateWorkerThreads(HANDLE); void CreateWorkerThreads(); DWORD WINAPI ThreadFunc(LPVOID pvoid); void IssueRead(struct ContextKey *pCntx); void CheckOsVersion(); void FatalError(char *s); /////////////////////////////////////////////////////// int main(int argc, char *argv[]) { SOCKET listener; SOCKET newsocket; WSADATA WsaData; struct sockaddr_in serverAddress; struct sockaddr_in clientAddress; int clientAddressLength; int err; CheckOsVersion(); err = WSAStartup (0x0101, &WsaData); if (err == SOCKET_ERROR) { FatalError("WSAStartup Failed"); return EXIT_FAILURE; } /* * Open a TCP socket connection to the server * By default, a socket is always opened * for overlapped I/O. Do NOT attach this * socket (listener) to the I/O completion * port! */ listener = socket(AF_INET, SOCK_STREAM, 0); if (listener < 0) { FatalError("socket() failed"); return EXIT_FAILURE; } /* * Bind our local address */ memset(&serverAddress, 0, sizeof(serverAddress)); serverAddress.sin_family = AF_INET; serverAddress.sin_addr.s_addr = htonl(INADDR_ANY); serverAddress.sin_port = htons(SERV_TCP_PORT); err = bind(listener, (struct sockaddr *)&serverAddress, sizeof(serverAddress) ); if (err < 0) FatalError("bind() failed"); ghCompletionPort = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, // No prior port 0, // No key 0 // Use default # of threads ); if (ghCompletionPort == NULL) FatalError("CreateIoCompletionPort() failed"); CreateWorkerThreads(); //CreateWorkerThreads(ghCompletionPort); listen(listener, 5); fprintf(stderr, "Echo Server with I/O Completion Ports\n"); fprintf(stderr, "Running on TCP port %d\n", SERV_TCP_PORT); fprintf(stderr, "\nPress Ctrl+C to stop the server\n"); // // Loop forever accepting requests new connections // and starting reading from them. // for (;;) { struct ContextKey *pKey; clientAddressLength = sizeof(clientAddress); newsocket = accept(listener, (struct sockaddr *)&clientAddress, &clientAddressLength); if (newsocket < 0) { FatalError("accept() Failed"); return EXIT_FAILURE; } // Create a context key and initialize it. // calloc will zero the buffer pKey = (ContextKey* )calloc(1, sizeof(struct ContextKey)); pKey->sock = newsocket; pKey->ovOut.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); // Set the event for writing so that packets // will not be sent to the completion port when // a write finishes. pKey->ovOut.hEvent = (HANDLE)((DWORD)pKey->ovOut.hEvent | 0x1); // Associate the socket with the completion port CreateIoCompletionPort( (HANDLE)newsocket, ghCompletionPort, (DWORD)pKey, // key 0 // Use default # of threads ); // Kick off the first read IssueRead(pKey); } return 0; } void CreateWorkerThreads() { SYSTEM_INFO sysinfo; DWORD dwThreadId; DWORD dwThreads; DWORD i; GetSystemInfo(&sysinfo); dwThreads = sysinfo.dwNumberOfProcessors * 2 + 2; for (i=0; i<dwThreads; i++) { HANDLE hThread; hThread = CreateThread( NULL, 0, ThreadFunc, NULL, 0, &dwThreadId ); CloseHandle(hThread); } } // // Each worker thread starts here. DWORD WINAPI ThreadFunc(LPVOID pVoid) { BOOL bResult; DWORD dwNumRead; struct ContextKey *pCntx; LPOVERLAPPED lpOverlapped; UNREFERENCED_PARAMETER(pVoid); // Loop forever on getting packets from // the I/O completion port. for (;;) { bResult = GetQueuedCompletionStatus( ghCompletionPort, &dwNumRead, (LPDWORD)&pCntx, &lpOverlapped, INFINITE ); if (bResult == FALSE && lpOverlapped == NULL) { FatalError( "ThreadFunc - Illegal call to GetQueuedCompletionStatus"); } else if (bResult == FALSE && lpOverlapped != NULL) { // This happens occasionally instead of // end-of-file. Not sure why. closesocket(pCntx->sock); free(pCntx); fprintf(stderr, "ThreadFunc - I/O operation failed\n"); } else if (dwNumRead == 0) { closesocket(pCntx->sock); free(pCntx); fprintf(stderr, "ThreadFunc - End of file.\n"); } // Got a valid data block! // Save the data to our buffer and write it // all back out (echo it) if we have see a \n else { // Figure out where in the buffer to save the character char *pch = &pCntx->OutBuffer[pCntx->nOutBufIndex++]; *pch++ = pCntx->InBuffer[0]; *pch = '\0'; // For debugging, WriteFile doesn't care if (pCntx->InBuffer[0] == '\n') { WriteFile( (HANDLE)(pCntx->sock), pCntx->OutBuffer, pCntx->nOutBufIndex, &pCntx->dwWritten, &pCntx->ovOut ); pCntx->nOutBufIndex = 0; fprintf(stderr, "Echo on socket %x.\n", pCntx->sock); } // Start a new read IssueRead(pCntx); } } return 0; } /* * Call ReadFile to start an overlapped request * on a socket. Make sure we handle errors * that are recoverable. */ void IssueRead(struct ContextKey *pCntx) { int i = 0; BOOL bResult; int err; int numRead; while (++i) { // Request a single character bResult = ReadFile( (HANDLE)pCntx->sock, pCntx->InBuffer, 1, (LPDWORD)&numRead, &pCntx->ovIn ); // It succeeded immediately, but do not process it // here, wait for the completion packet. if (bResult) return; err = GetLastError(); // This is what we want to happen, it's not an error if (err == ERROR_IO_PENDING) return; // Handle recoverable error if ( err == ERROR_INVALID_USER_BUFFER || err == ERROR_NOT_ENOUGH_QUOTA || err == ERROR_NOT_ENOUGH_MEMORY ) { if (i == 5) // I just picked a number { Sleep(50); // Wait around and try later continue; } FatalError("IssueRead - System ran out of non-paged space"); } break; } fprintf(stderr, "IssueRead - ReadFile failed.\n"); } // // Make sure we are running under the right versions // of Windows NT (3.51, 4.0, or later) // void CheckOsVersion() { OSVERSIONINFO ver; BOOL bResult; ver.dwOSVersionInfoSize = sizeof(OSVERSIONINFO); bResult = GetVersionEx((LPOSVERSIONINFO) &ver); if ( (!bResult) || (ver.dwPlatformId != VER_PLATFORM_WIN32_NT) ) { FatalError("ECHOSRV requires Windows NT 3.51 or later."); } } // // Error handler // void FatalError(char *s) { fprintf(stdout, "%s\n", s); exit(EXIT_FAILURE); }
客户端代码
/* * EchoCli.c * * Sample code for "Multithreading Applications in Win32" * This is from Chapter 6. * * This is a command line client to drive the ECHO server. * Run the server in one Commmand Prompt Window, * then run this program in one or more other windows. * EchoCli will wait for you to type in some text when * it starts up. Each line of text will be sent to the * echo server on TCP port 5554. */ #include <windows.h> #include <tchar.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <winsock.h> #pragma comment(lib, "Wsock32.lib") /* Function Prototypes */ void FatalError(char *s); int writen(SOCKET sock, char *ptr, int nBytes); int readline(SOCKET sock, char *ptr, int maxlen); void DoClientLoop(FILE *fp, SOCKET sock); /* Constants */ #define MAXLINE 512 #define SERVER_TCP_PORT 5554 #define SERVER_ADDRESS "127.0.0.1" /* * Error handler */ void FatalError(char *s) { fprintf(stdout, "%s\n", s); exit(EXIT_FAILURE); } /* * Write bytes to the port with proper block size handling. */ int writen(SOCKET sock, char *ptr, int nBytes) { int nleft; int nwritten; nleft = nBytes; while (nleft > 0) { nwritten = send(sock, ptr, nBytes, 0 ); if (nwritten == SOCKET_ERROR) { fprintf(stdout, "Send Failed\n"); exit(1); } nleft -= nwritten; ptr += nwritten; } return nBytes - nleft; } /* * Read a line of text of the port. This version * is very inefficient, but it's simple. */ int readline(SOCKET sock, char *ptr, int maxlen) { int n; int rc; char c; for (n=1; n<maxlen; n++) { if ( ( rc= recv(sock, &c, 1, 0)) == 1) { *ptr++ = c; if (c=='\n') break; } else if (rc == 0) { if (n == 1) return 0; else break; } else return -1; /* Error */ } *ptr = '\0'; return n; } int main(int argc, char *argv[]) { WSADATA WsaData; SOCKET sock; struct sockaddr_in serv_addr; int err; puts("EchoCli - client for echo server sample program\n"); puts("Type a line of text followed by Return."); puts("Exit this program by typing Ctrl+Z followed by Return."); err = WSAStartup(0x0101, &WsaData); if (err == SOCKET_ERROR) FatalError("WSAStartup Failed"); /* * Bind our local address */ memset(&serv_addr, 0, sizeof(serv_addr)); serv_addr.sin_family = AF_INET; // Use the local host serv_addr.sin_addr.s_addr = inet_addr(SERVER_ADDRESS); serv_addr.sin_port = htons(SERVER_TCP_PORT); /* * Open a TCP socket (an Internet stream socket) */ sock = socket(AF_INET, SOCK_STREAM, 0); if (sock < 0) FatalError("socket() failed -- do you have TCP/IP networking installed?"); if (connect(sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) FatalError("connect() failed -- is the server running?"); DoClientLoop(stdin, sock); closesocket(sock); return EXIT_SUCCESS; } /* * As long as there is input from "fp", copy it to * the server, read what the server gives back, * and print it. */ void DoClientLoop(FILE *fp, SOCKET sock) { int n; char sendline[MAXLINE]; char recvline[MAXLINE+1]; while (fgets(sendline, MAXLINE, fp) != NULL) { n = strlen(sendline); if (writen(sock, sendline, n) != n) FatalError("DoClientLoop: writen() error"); n = readline(sock, recvline, MAXLINE); if (n < 0) FatalError("DoClientLoop: readline() error"); recvline = '\0'; fputs(recvline, stdout); } if (ferror(fp)) FatalError("DoClientLoop: error reading file"); }
运行结果
运行结果如下图所示:首先运行服务器端,它开启socket,产生I/O completion port,启动worker线程,并进入循环之中等待连接。
它要做的事情就是读入客户端的输入,随后把它再原样写回,这就是echo的名字的由来。
随后启动一个客户端:
并输入字符串“hello, I am A”
可以看到服务器端此时为:
我们再启动一个客户端:
并输入字符串“nihao ”
可以看到服务器端此时变为:
过程解读
#define AF_INET 2 /* internetwork: UDP, TCP, etc. */
创建套接字
listener = socket(AF_INET, SOCK_STREAM, 0); if (listener < 0) { FatalError("socket() failed"); return EXIT_FAILURE; }
分配套接字地址
memset(&serverAddress, 0, sizeof(serverAddress)); serverAddress.sin_family = AF_INET; serverAddress.sin_addr.s_addr = htonl(INADDR_ANY); serverAddress.sin_port = htons(SERV_TCP_PORT); err = bind(listener, (struct sockaddr *)&serverAddress, sizeof(serverAddress) ); if (err < 0) FatalError("bind() failed");
产生一个I/O completion port
ghCompletionPort = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, // No prior port 0, // No key 0 // Use default # of threads ); if (ghCompletionPort == NULL) FatalError("CreateIoCompletionPort() failed");
产生一堆线程,在这个线程中调用了ThreadFunc(),那是 worker线程花掉所有时间的地方,在其中调用了GetQueueCompletionStatus(),当每一个completion packet被处理之后,我们使用GetQueueCompletionStatus()传回来的key,当作指向ContextKey的结构的指针
DWORD WINAPI ThreadFunc(LPVOID pVoid) { BOOL bResult; DWORD dwNumRead; struct ContextKey *pCntx; LPOVERLAPPED lpOverlapped; UNREFERENCED_PARAMETER(pVoid); // Loop forever on getting packets from // the I/O completion port. for (;;) { bResult = GetQueuedCompletionStatus( ghCompletionPort, &dwNumRead, (LPDWORD)&pCntx, &lpOverlapped, INFINITE ); if (bResult == FALSE//函数失败 && lpOverlapped == NULL) { FatalError( "ThreadFunc - Illegal call to GetQueuedCompletionStatus"); } else if (bResult == FALSE && lpOverlapped != NULL)//操作失败 { // This happens occasionally instead of // end-of-file. Not sure why. closesocket(pCntx->sock); free(pCntx); fprintf(stderr, "ThreadFunc - I/O operation failed\n"); } else if (dwNumRead == 0)// 被传输的字节数为0 { closesocket(pCntx->sock); free(pCntx); fprintf(stderr, "ThreadFunc - End of file.\n"); } // Got a valid data block! // Save the data to our buffer and write it // all back out (echo it) if we have see a \n else { // Figure out where in the buffer to save the character char *pch = &pCntx->OutBuffer[pCntx->nOutBufIndex++];//指向OutBuffer对应位置 *pch++ = pCntx->InBuffer[0];// 可以看到字符是一个一个被读入的 *pch = '\0'; // For debugging, WriteFile doesn't care if (pCntx->InBuffer[0] == '\n')//当完整一行后,就写回到socket中 { WriteFile( (HANDLE)(pCntx->sock), pCntx->OutBuffer, pCntx->nOutBufIndex, &pCntx->dwWritten, &pCntx->ovOut ); pCntx->nOutBufIndex = 0; fprintf(stderr, "Echo on socket %x.\n", pCntx->sock); } // Start a new read IssueRead(pCntx); } } return 0; }
进入等待连接请求状态
listen(listener, 5);
进入循环之中等待连接,accept返回创建套接字的文件描述符,如果失败将返回-1.
for (;;) { struct ContextKey *pKey; clientAddressLength = sizeof(clientAddress); newsocket = accept(listener, (struct sockaddr *)&clientAddress, &clientAddressLength); if (newsocket < 0) { FatalError("accept() Failed"); return EXIT_FAILURE; } // Create a context key and initialize it. // calloc will zero the buffer pKey = (ContextKey* )calloc(1, sizeof(struct ContextKey)); pKey->sock = newsocket; pKey->ovOut.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); // Set the event for writing so that packets // will not be sent to the completion port when // a write finishes. pKey->ovOut.hEvent = (HANDLE)((DWORD)pKey->ovOut.hEvent | 0x1);//这样设定后,当写入操作完成时,不需要任何packet被送往completion port。 // Associate the socket with the completion port CreateIoCompletionPort( (HANDLE)newsocket, ghCompletionPort, (DWORD)pKey, // key 0 // Use default # of threads ); // Kick off the first read IssueRead(pKey); }
资料来源:
Win32多线程程序设计
相关文章推荐
- 131 Palindrome Partitioning [Leetcode]
- <单调队列><二分>4.偷懒的西西
- ZOJ 3820 Building Fire Stations
- Linux----给一个普通用户root权限
- 运算放大器规范总结
- 常见端口、端口查询及TCP状态
- win7 64位通过dosbox使用debug的方法
- 树的中序遍历(非递归)
- ocp-78
- ocp-77
- Unity mesh 合并
- Climbing Stairs
- 梦想不是挂在嘴边的,是拿行动来证明的
- ocp-76
- ocp-75
- SSH三大框架搭建(非常之基础)
- ocp-74
- IOS开发笔记-01按钮操作-08.git的简单使用
- JS获取浏览器的高度和宽度
- spring boot实战(第五篇)配置源码解析