您的位置:首页 > 理论基础 > 计算机网络

学习笔记(09):C++网络编程进阶-IO模型之完成端口completion port

2020-04-01 18:49 971 查看

立即学习:https://edu.csdn.net/course/play/6082/113764?utm_source=blogtoedu

完成端口IOCP模型

一个完成端口实际上就是一个通知队列,操作系统吧已经完成的重叠IO请求通知发到队列中,完成端口会充分利用Windows最复杂的内核对象来进行IO的调度,属于异步IO,适用于CS通信模式中性能最好的网络通信模型。

完成端口IOCP模型的原理

完成端口创建几个线程,等到用户请求的时候,就把这些请求都加入到一个公共消息队列中去,然后这几个线程就排队从消息队列中取出消息并加以处理,这种方式就很优雅的实现了异步通信和负载均衡的问题,因为它提供了一种机制来使用几个线程“公平的”处理来自多个客户端的输入输出,并且线程如果没事干的时候会被系统挂起,不会占用CPU周期,这个关键的作为交换的消息队列,它就是完成端口。

使用完成端口的基本流程

1、调用CreateIoCompletionPort函数创建一个完成端口,第四个参数设为0,让完成端口上每次处理一次只允许执行的线程

2、根据系统中CPU核心的数量建立对应的Worker线程,

3、一个用于监听的Socket,在指定的端口上监听连接请求

4、将接受的套接字绑定到完成端口

5、使用重叠IO,在套接字上投递一个或者多个WSARecv或者WSASend请求

 

在新开辟的线程中

1、在Worker线程使用GetQueuedCompletionStatus函数,它让Worker线程进入不占用CPU的睡眠状态,直到完成端口上出现了需要大量处理的网络操作或者超出了等待的时间限制为止

2、使用重叠IO,在套接字上投递一个或者多个WSARecv或者WSASend请求

3、重复1~2步骤。

 

CreateIoCompletionPort函数

CreateIoCompletionPort(

HANDLE FileHandle,

HANDLE ExistingCompletionPort,

ULONG_PTR CompletionKey,

DWORD NumberOfConcurrentThreads

);

参数:

第一个参数:有效的文件句柄或者INVALID_HANDLE_VALUE

第二个参数:已经存在的完成端口,为NULL则新建一个IOCP

第三个参数:传送给处理函数的参数

第四个参数:有多个线程在访问这个消息队列,当参数WxistingCompletionPort不为0的时候,系统忽略该参数,当该参数为0表示允许同时相等数据处理器个数的线程访问该消息队列。

返回值:

成功返回一个IOCP的句柄,失败返回NULL ;

GetQueuedCompletionStatus函数

GetQueuedCompletionStatus(

HANDLE CompletionPort,

LPDWORD lpNumberOfBytesTransferred,

PULONG_PTR lpCompletionKey,

LPOVERLAPPED* lpOverlapped,

DWORD dwMilliseconds

);

参数:

第一个参数:完成端口

第二个参数:接收一个IO操作中完成的字节数

第三个参数:自定义结构体参数

第四个参数:完成IO操作时指定的重叠结构的地址

第五个参数:等待完成包出现在完成端口的毫秒数,INFINITE(一直等待)

返回值:

成功返回TRUE , 失败FALSE,调用WSAGetLastError得到扩展的错误

 

服务端代码

#include <iostream>
#include<WinSock2.h>
#include<cstdlib>
#pragma comment(lib,"ws2_32.lib")
using namespace std;
DWORD WINAPI WorkerThread(LPVOID CompletionPortId);
typedef struct _MY_WSAOVERLAPPED
{
    WSAOVERLAPPED overlap;
    WSABUF Buffer;
    DWORD NumberOfBytesRecvd;
    DWORD Flags;
    SOCKET socket;
    _MY_WSAOVERLAPPED()
    {
        Buffer.buf = new char[64]{ '\0' };
        Buffer.len = 64;
        Flags = 0;
        overlap.hEvent = NULL;
    }
    ~_MY_WSAOVERLAPPED()
    {
        delete[]Buffer.buf;
        Buffer.buf = NULL;
        Buffer.len = 0;
    }
}MY_WSAOVERLAPPED, * PMY_WSAOVERLAPPED;
int main()
{
    WSADATA wd;
    if (WSAStartup(MAKEWORD(2, 2), &wd) != 0)
    {
        cout << "wsastartup error " << WSAGetLastError() << endl;
        exit(EXIT_FAILURE);
    }
    //1 调用CreateIoCompletionPort函数创建一个完成端口,第四个参数设为0,让完成端口上每次处理一次只允许执行的线程
    HANDLE completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
    if (completionPort == NULL)
    {
        cout << "CreateIoCompletionPort " << WSAGetLastError() << endl;
        exit(EXIT_FAILURE);
    }
    //1 根据系统中CPU核心的数量建立对应的Worker线程,
    SYSTEM_INFO sysInfo;
    GetSystemInfo(&sysInfo);
    for (int i = 0; i < (int)sysInfo.dwNumberOfProcessors; i++)
    {
        HANDLE h = CreateThread(NULL, 0, WorkerThread, completionPort, 0, NULL);
        CloseHandle(h);
    }
    cout << "创建了" << sysInfo.dwNumberOfProcessors << "个工作线程" << endl;
    SOCKET s = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
    if (s == INVALID_SOCKET)
    {
        cout << "socket error " << WSAGetLastError() << endl;
        exit(EXIT_FAILURE);
    }
    sockaddr_in addr;
    addr.sin_addr.S_un.S_addr = inet_addr("127.0.0.1");
    addr.sin_family = AF_INET;
    addr.sin_port = htons(8000);

    int bindRet = bind(s, (sockaddr*)&addr, sizeof sockaddr);
    if (bindRet == SOCKET_ERROR)
    {
        cout << "bind error " << WSAGetLastError() << endl;
        exit(EXIT_FAILURE);
    }
    int listenRet = listen(s, 5);
    if (listenRet == SOCKET_ERROR)
    {
        cout << "listen error " << WSAGetLastError() << endl;
        exit(EXIT_FAILURE);
    }
    while (true)
    {
        // 3 一个用于监听的Socket,在指定的端口上监听连接请求
        SOCKET sClient = WSAAccept(s, NULL, NULL, NULL, NULL);
        if (sClient == INVALID_SOCKET)
        {
            cout << "accept error " << WSAGetLastError() << endl;
            continue;
        }
        char temp[64];
        sprintf_s(temp, "欢迎%d进入客户端", sClient);

        int sendRet = send(sClient, temp, strlen(temp), 0);
        if (sendRet == SOCKET_ERROR)
        {
            closesocket(sClient);
        }
        cout << sClient << "进入客户端" << endl;
        //4 将接受的套接字绑定到完成端口
        CreateIoCompletionPort((HANDLE)sClient, completionPort, (ULONG_PTR)sClient, 0);
        PMY_WSAOVERLAPPED pOver = new MY_WSAOVERLAPPED;
        //5 使用重叠IO,在套接字上投递一个或者多个WSARecv或者WSASend请求
        int ret = WSARecv(sClient, &pOver->Buffer, 1, &pOver->NumberOfBytesRecvd, &pOver->Flags, &pOver->overlap, NULL);
        if (ret == SOCKET_ERROR)
        {
            int err = WSAGetLastError();
            if (err != WSA_IO_PENDING)
            {
                cout << "wsarecv error " << WSAGetLastError() << endl;
                closesocket(sClient);
                delete pOver;
            }
        }
    }
    CloseHandle(completionPort);
    closesocket(s);
    if (WSACleanup() == SOCKET_ERROR)
    {
        cout << "wsacleanu 出错" << endl;
    }
}
DWORD WINAPI WorkerThread(LPVOID CompletionPortId)
{
    HANDLE completionPort = (HANDLE)CompletionPortId;
    DWORD dwByteTransferred;
    SOCKET sClient;
    PMY_WSAOVERLAPPED pOver = NULL;
    while (true)
    {
        //1 在Worker线程使用GetQueuedCompletionStatus函数,它让Worker线程进入不占用CPU的睡眠状态,直到完成端口上出现了需要大量处理的网络操作或者超出了等待的时间限制为止
        bool b = GetQueuedCompletionStatus(completionPort, &dwByteTransferred, (PULONG_PTR)&sClient, (LPOVERLAPPED*)&pOver, INFINITE);
        if (sClient == NULL)
            continue;
        if (b && dwByteTransferred > 0)
        {
            cout << sClient << " 说:" << pOver->Buffer.buf << endl;
            ZeroMemory(pOver->Buffer.buf, 64);
            // 2 使用重叠IO,在套接字上投递一个或者多个WSARecv或者WSASend请求
            int ret = WSARecv(sClient, &pOver->Buffer, 1, &pOver->NumberOfBytesRecvd, &pOver->Flags, &pOver->overlap, NULL);
            if (ret == SOCKET_ERROR)
            {
                int err = WSAGetLastError();
                if (err != WSA_IO_PENDING)
                {
                    closesocket(sClient);
                    delete pOver;
                    cout << "wsarecv error" << err << endl;
                }
            }
        }
        else
        {
            cout << sClient << "离开了" << endl;
            closesocket(sClient);
            delete pOver;
        }
    }
}

  • 点赞
  • 收藏
  • 分享
  • 文章举报
weixin_38793855 发布了9 篇原创文章 · 获赞 0 · 访问量 364 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: