您的位置:首页 > 理论基础 > 计算机网络

linux网络编程-----几种服务器模型及io多路复用函数

2017-10-11 12:33 1001 查看
libevent实现了对io多路复用函数的封装,复习一下linux下的io复用函数,select,poll,epoll

在c/s模型中,要处理多个客户端的请求以达到并发处理的效果,有以下几种方法

主线程accept,多线程处理,为每一个客户端开一个线程

主进程accept,多进程处理,为每一个客户端开一个进程

线程池/进程池,将程序执行过程中线程/进程的创建销毁开销放在程序一开始执行时进行,进一步可以动态改变池中线程/进程个数

io多路复用函数,单线程模式

多线程模式的服务器模型大体如下

//server.cpp
#include <iostream>

#include <cerrno>
#include <cstdlib>
#include <cstdio>
#include <cstring>
#include <unistd.h>

#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>

#include <pthread.h>

void* process_client(void* arg);

int main()
{
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if(sockfd < 0)
{
std::cerr << "socket error" << std::endl;
return -1;
}

struct sockaddr_in servaddr;
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(8080);
servaddr.sin_addr.s_addr = INADDR_ANY;

if(bind(sockfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0)
{
close(sockfd);
std::cerr << "bind error" << std::endl;
return -1;
}

if(listen(sockfd, 10) < 0)
{
close(sockfd);
std::cerr << "listen error" << std::endl;
return -1;
}

struct sockaddr_in addr;
bzero(&addr, sizeof(addr));
socklen_t len = sizeof(addr);
while(true)
{
/* 主线程接受客户端请求,每一个子线程处理一个服务器与客户端的交互 */
int fd = accept(sockfd, (struct sockaddr*)&addr, &len);

pthread_t tid;
pthread_create(&tid, NULL, process_client, (void*)&fd);
pthread_detach(tid);
}

close(sockfd);

return 0;
}

void *process_client(void *arg)
{
int fd = *static_cast<int *>(arg);

char reply[] = "server has receive your message";
char msg[4096];
/* 每个线程一个循环 */
while(true)
{
bzero(msg, sizeof(msg));
int ret = recv(fd, msg, sizeof(msg), 0);
if(ret < 0)
{
continue;
}
else if(ret == 0)
{  std::cout << "close connection with client " << fd << std::endl;
close(fd);
pthread_exit(NULL);
break;
}
else
{
msg[ret] = '\0';
std::cout << "receive from client " << fd << " : " << msg << std::endl;

ret = send(fd, reply, strlen(reply), MSG_NOSIGNAL);
if(ret < 0)
{
continue;
}
else if(ret == 0)
{
std::cout << "close connection with client " << fd << std::endl;
close(fd);
pthread_exit(NULL);
break;
}
}
}

}


//client.cpp
#include <iostream>
#include <string>

#include <cerrno>
#include <cstdio>
#include <cstring>
#include <cstdlib>
#include <unistd.h>

#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>

int main()
{
int sockfd = socket(AF_INET, SOCK_STREAM, 0);

struct sockaddr_in servaddr;
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(8080);
inet_aton("127.0.0.1", &servaddr.sin_addr);

connect(sockfd, (struct sockaddr*)&servaddr, sizeof(servaddr));

std::string msg;
char res[4096];

while(true)
{
getline(std::cin, msg);
if(msg == "exit")
{
close(sockfd);
break;
}

if(send(sockfd, msg.c_str(), msg.size(), MSG_NOSIGNAL) <= 0)
{
std::cout << "close connection from server" << std::endl;
close(sockfd);
break;
}
int len = recv(sockfd, res, sizeof(res), 0);
if(len <= 0)
{
std::cout << "close connection from server" << std::endl;
close(sockfd);
break;
}
res[len] = '\0';

std::cout << "receive from server" << res << std::endl;
}

return 0;
}


多进程模式的服务器模型如下

//server.cpp
#include <iostream>
#include <string>

#include <cstring>
#include <cstdlib>

#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>

void process_client(int fd);

int main()
{
int sockfd = socket(AF_INET, SOCK_STREAM, 0);

struct sockaddr_in servaddr;
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(8080);
servaddr.sin_addr.s_addr = INADDR_ANY;

bind(sockfd, (struct sockaddr*)&servaddr, sizeof(servaddr));
listen(sockfd, 10);

struct sockaddr_in addr;
socklen_t len = sizeof(addr);

signal(SIGCHLD, SIG_IGN);
while(true)
{
int fd = accept(sockfd, (struct sockaddr*)&addr, &len);
std::cout << "receive a new client " << fd << std::endl;
if(fork() == 0)
{
close(sockfd);
process_client(fd);
exit(EXIT_SUCCESS);
}
else
{
close(fd);
}
}

close(sockfd);
return 0;
}

void process_client(int fd)
{
char reply[] = "server has receive your message";
char msg[4096];
while(true)
{
int len = recv(fd, msg, sizeof(msg), 0);
if(len <= 0)
{
std::cerr << "close connection with client" << fd << std::endl;
close(fd);
break;
}

msg[len] = '\0';
std::cout << "receive from client" << fd << " : " << msg << std::endl;

if(send(fd, reply, strlen(reply), MSG_NOSIGNAL) <= 0)
{
std::cout << "close connection with client" << fd << std::endl;
close(fd);
break;
}
}
}


//client.cpp
//同多线程client.cpp


创建进程时关闭套接字的原因:

fork创建进程,是将主进程的内存空间copy一份作为子进程的内存空间,这就导致了监听套接字也被copy了,从而监听套接字的引用计数变为2,这样如果子进程不close掉,而仅仅主进程close,那么监听套接字就不会被关闭。当然进程结束后会自动关闭相应的套接字,但还是手动关闭的好,以免在子进程中使用exec,就没办法关闭了

主进程同理,客户端套接字引用计数为2,也需要关闭一个

线程池模式下的服务器模型

需要注意的几点

在程序开始时就创建一定数量的线程池,初始化互斥锁和条件变量

维护一个任务队列,采用互斥锁保护队列中的任务只被一个线程执行

采用条件变量保证在没有任务时的cpu使用情况,即不需要不断轮循查看是否有任务没有被执行

扩展可以动态改变池中线程个数,保证既没有过多线程空闲,也没有过多任务没有被执行

释放线程空间,释放互斥锁和条件变量

代码(没有实现动态改变线程个数)

进程池模式下的服务器模型

io多路复用函数

上述这些方法在某种程度上都有缺陷,当客户端请求过多时效率都会降低,内存消耗都比较明显。对于处理高并发的客户端请求,可以采用io多路复用的的方法,linux下提供了select,poll,epoll三个函数,使用这些函数时可以把监控的套接字设置成非阻塞(可以使用fcntl函数实现)。

select

select是早期的io多路复用函数,函数原型如下

#include <sys/select.h>
int select(int maxfdp1, fd_set *readfds, fd_set *writefds, fd_set *exceptfds,
struct timeval *tvptr);


传给select的参数告诉内核

用户所关心的描述符

对于每个描述符,用户所关心的事件,可读,可写,出现异常

愿意等待的时长,可以永远等待,等待一个固定时长或者根本不等待

从slect返回时内核通知进程

- 已准备好的描述符总数量

- 每一个描述符是否可读/可写/出现异常(需要用户手动判断每一个,不能直接定位到准备好的描述符)

select最后一个参数tvptr指定愿意等待的时间长度,有三种情况

NULL:永远等待直到有某(些)个描述符准备好才返回

tvptr->tv_sec==0 && tvptr->tv_usec==0:不等待,直接返回

tvptr->tv_sec!=0 || tvptr->tv_usec!=0:等待指定的时间,当某个描述符准备好,或者时间到,select就会返回。注意如果某个描述符准备好,而时间没有到,也会立即返回

readfds,writefds,exceptfds是指向描述符集的指针,都是fd_set*类型,fd_set有不同的实现方式,可以是每一位作为一个描述符,也可以是一个很大的数组。

在使用select之前,用户需要手动初始化需要的描述符集,不关心的可以传入NULL。使用方法如下

/* ... */
/* 创建监听套接字listenfd */

/* 如果只关心可读事件,就不需要初始化其他两个fd_set */
fd_set readfds_in, readfds_out;

/* 记录最大的描述符,用于select的参数 */
int maxfd = listenfd;

FD_ZERO(&readfds_in);
FD_SET(listenfd, &readfds_in);

while(true)
{
/* 清空fd_set */
FD_ZERO(&readfds_out);

/* 将关心的描述符添加到关心的事件集中 */
memcpy(readfds_out, readfds_in, sizeof(readfds_in));

/* 时间为NULL表示永久阻塞,知道有描述符准备好才返回 */
int n = select(maxfd + 1, readfds_out, NULL, NULL, NULL);

if(n < 1)
{
perror("select error");
exit(1);
}

/* 返回后通过FD_ISSET判断某个文件描述符是否准备好 */
if(FD_ISSET(listenfd, &readfds))
{
struct sockaddr_in servaddr;
bero(&servaddr, sizeof(servaddr);
socklen len = sizeof(servaddr);
int clientfd = accept(listenfd, (struct sockaddr*)&servaddr, &len);

if(maxfd < clientfd)
maxfd = clientfd;

FD_SET(clientfd, &readfds_in);
}

for(int fd = 0; fd < maxfd + 1; ++fd)
{
if(fd == listenfd)
continue;
if(FD_ISSET(fd, &readfds_out))
{
char msg[4096];
bzero(msg, sizeof(msg));
int ret = recv(fd, msg, sizeof(msg), 0);
if(ret == 0)
{
FD_CLR(readfds_in);
close(fd);
}
else if(ret < 0)
{
continue;
}
else
{
printf("receive from client %s\n", msg);
char reply[] = "i have receive your message";
send(fd, reply, strlen(reply), MSG_NOSIGNAL);
}
}
}
}


select使用FD_SET添加描述符,使用FD_ISSET判断是否某个描述符准备好。可以把fd_set想象成一个位数组,每一位代表一个描述符,使用FD_ZERO时将每一位置为0,使用FD_SET时将对应位置为1,在select中返回后,将准备好的描述符对应的那一为的1保留,没有准备好的描述符对应的那一位置为0。FD_ISSET就判断对应位是否为1即可知道描述符是否准备好。

但是select的缺陷还是很多的

每次循环都需要调用FD_ZERO清空描述符集,这就导致需要用户手动再添加关心的描述符,很麻烦,libevent是通过上述方法解决这一问题的,即添加到readfds_in,select时改变readfds_out,这样每次循环只需要将readfds_in复制给readfds_out即可

不能直接定位到准备好的描述符上,用户需要手动遍历所有的描述符判断哪些准备好

每次调用select,都需要把fd集合从用户态拷贝到内核态

每次调用select都需要在内核遍历所有的fd

select支持的描述符数量太小,默认为1024

poll

poll函数类似于select,相比于select使用起来更简单,但是仍然没有解决轮循判断描述符是否准备好的问题,换句话说,当描述符数量很大时,效率仍然很低

poll函数的原型如下

#include <poll.h>
int poll(struct pollfd fdarray[], nfds_t nfds, int timeout);


与select不同,poll不是为每一个事件构造一个描述符集,而是构造一个pollfd结构的数组,每个数组元素指定一个描述符以及用户关心的事件

struct pollfd
{
int fd;  /* 描述符 */
short events;  /* 用户关心的事件,调用poll时由用户设置 */
short revents;  /* 是什么事件将描述符激活,在poll返回时由内核设置*/
};


events可以由以下几种通过或运算结合在一起

POLLIN:可读

POLLOUT:可写

POLLERR:出错

POLLHUP:挂断

POLLNVAL:描述符没有绑定到一个文件/TCP连接



poll的等待时长是int类型

timeout == -1:永远等待或者某个描述符准备好后返回

timeout == 0:不等待,直接返回

timeout > 0:等待指定微秒或者某个描述符准备好后返回

使用方法同select,构造一个很大的pollfd数组,调用时传入一个当前最大描述符大一的值,返回后判断每个描述符是否准备好,这一点和select一样,poll不能直接定位到准备好的哪些描述符,需要判断每一个的revents

epoll

与select和poll不同,epoll克服了二者的缺陷,在阻塞等待过程中不是循环遍历每个描述符判断是否准备好以便返回,而是通过内核中的相应fd回调函数的调用直接得知某个文件描述符已经被激活(准备好),这就解决多当描述符数量过多时效率下降的问题。另外,epoll在返回后将所有已经准备好的描述符都存放在了一个数组中,用户不再需要手动遍历每一个描述符以判断是否是准备好的,大大提高了效率。通常在处理高并发的客户端请求时都会使用epoll代替上面两个。

epoll使用struct epoll_event结构体来存储描述符和事件

struct epoll_event
{
__uint32_t events; /* 监听的事件集,使用或运算结合在一起 */
epoll_data_t data;
};

typedef union epoll_data
{
void *ptr;
int fd; /* 监听的描述符 */
__uint32_t u32;
__uint64_t u64;
}epoll_data_t;


events可以是以下几种的或运算结果

EPOLLIN:描述符可读

EPOLLOUT:描述符可写

EPOLLERR:描述符出错

EPOLLHUP:描述符被挂断

EPOLLET:设置为边缘触发,相对于水平触发而言



epoll提供了几个接口

/* 创建一个epoll监听描述符,参数为最大需要监听描述符的个数 */
int epoll_create(int size);

/*
*epoll的事件注册函数
*epfd: 通过epoll_create创建的监听描述符
*op: 想要执行的操作,添加描述符的监听EPOLL_CTL_ADD, 修改描述符的监听EPOLL_CTL_MOD,删除描述符的监听EPOLL_CTL_DEL
*event: 对fd想要监听的事件,或者更新的事件
*/
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

/*
*epoll阻塞等待函数
*events是一个数组,用于存放所有准备好的描述符epoll_event
*maxevents是最大可以接受多少个激活的epoll_event,不能超过epoll_create传入的参数
*timeout超时时长
*/
int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);


使用方法如下

//server.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>

#include <sys/epoll.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>

int main()
{
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if(sockfd < 0)
return 0;

struct sockaddr_in servaddr;
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(8080);
servaddr.sin_addr.s_addr = INADDR_ANY;

if(bind(sockfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0)
{
close(sockfd);
return 0;
}

if(listen(sockfd, 10) < 0)
{
close(sockfd);
return 0;
}

int epollfd = epoll_create(1024);
struct epoll_event events[1024];
int fd_numbers = 1;

struct epoll_event event;
event.data.fd = sockfd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &event);
while(1)
{
int n  = epoll_wait(epollfd, events, fd_numbers, -1);
if(n < 1)
break;
/* 直接遍历events数组, 不需要遍历所有fd判断哪个fd被激活 */
for(int i = 0; i < n; ++i)
{
int fd = events[i].data.fd;
short fd_event = events[i].events;
/* 出现异常 */
if((fd_event & EPOLLHUP) || (fd_event & EPOLLERR) || !(fd_event & EPOLLIN))
{
perror("epoll error");
close(fd);
epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL);
--fd_numbers;
}
/* 有客户端请求连接服务器 */
else if(fd == sockfd)
{
struct sockaddr_in clientaddr;
bzero(&clientaddr, sizeof(clientaddr));
socklen_t len = sizeof(clientaddr);
int client_fd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);

struct epoll_event client_event;
client_event.data.fd = client_fd;
client_event.events = EPOLLIN | EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, client_fd, &client_event);
++fd_numbers;
}
else
{
char msg[4096];
bzero(msg, sizeof(msg));
int recv_ret = recv(fd, msg, sizeof(msg), 0);
/* 如果客户端close,那么客户端fd会变为可读,读取时返回0表示连接关闭*/
if(recv_ret == 0)
{
printf("close connection with %d\n", fd);
close(fd);
epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL);
--fd_numbers;
}
else if(recv_ret > 0)
{
printf("recv from %d: %s\n", fd, msg);
char reply[] = "server has received client message";
if(send(fd, reply, strlen(reply), MSG_NOSIGNAL) == 0)
{
printf("close connection with %d\n", fd);
close(fd);
epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL);
--fd_numbers;
}
}
}
}
}

close(epollfd);
close(sockfd);

return 0;
}


//client.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>

#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <netinet/in.h>

int main()
{
int sockfd = socket(AF_INET, SOCK_STREAM, 0);

struct sockaddr_in sockaddr;
bzero(&sockaddr, sizeof(sockaddr));
sockaddr.sin_family = AF_INET;
sockaddr.sin_port = htons(8080);
sockaddr.sin_addr.s_addr = inet_addr("127.0.0.1");
//inet_aton(&sockaddr.sin_addr, "127.0.0.1");

if(connect(sockfd, (struct sockaddr*)&sockaddr, sizeof(sockaddr)) < 0)
{
close(sockfd);
return 0;
}

char msg[4096];
while(1)
{
bzero(msg, sizeof(msg));
scanf("%s", msg);

if(strcmp(msg, "exit") == 0)
{
break;
}

send(sockfd, msg, strlen(msg), MSG_NOSIGNAL);
int len = recv(sockfd, msg, sizeof(msg), 0);
msg[len] = '\0';

printf("receive from server %s\n", msg);
}

close(sockfd);

return 0;
}


epoll相比于select与poll,克服了二者的缺点

epoll在每次epoll_ctl添加fd时就将这个fd拷贝到内核,保证了整个epoll过程中同一个fd只拷贝一次,而select每次都需要重新将所有fd拷贝到内核

epoll在epoll_ctl时为每一个fd设置一个回调函数,当这个fd就绪,就会调用相应回调函数,函数中将对应的fd添加到一个就绪链表,epoll_wait实际上就在就绪链表中查看所有就绪的fd。而select和poll则需要遍历整个fd以判断是否有哪个fd就绪。epoll采用回调机制提升性能,效果明显

epoll可以支持的描述符数量很大
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息