您的位置:首页 > 理论基础 > 计算机网络

网络编程-复用I/O-select、epoll的用法

2020-07-24 19:14 471 查看

阻塞式I/O

非阻塞式I/O

I/O复用(select/epoll)

信号驱动I/O

异步I/O

select

select 函数:
原函数:int select (int maxfdp, fd_set *readfds, fd_set *writefds, fd_set *errorfds, struct timeval *timeout)
参数:maxfdp ,整数值,是指集合中所有文件描述符的范围,即所有文件描述符的最大值加1,不能错!
   readfds ,指向fd_set结构的指针,这个集合中应该包括文件描述符,我们是要监视这些文件描述符的读变化的,即我们关心是否可以从这些文件中读取数据了,如果这个集合中有一个文件可读,select就会返回一个大于0的值,表示有文件可读,如果没有可读的文件,则根据timeout参数再判断是否超时,若超出timeout的时间,select返回0,若发生错误返回负值。可以传入NULL值,表示不关心任何文件的读变化。
   writefds ,指向fd_set结构的指针,这个集合中应该包括文件描述符,我们是要监视这些文件描述符的写变化的,即我们关心是否可以向这些文件中写入数据了,如果这个集合中有一个文件可写,select就会返回一个大于0的值,表示有文件可写,如果没有可写的文件,则根据timeout参数再判断是否超时,若超出timeout的时间,select返回0,若发生错误返回负值。可以传入NULL值,表示不关心任何文件的写变化。
   errorfds , 同上面两个参数的意图,用来监视文件是否发生错误异常。
   timeout,select的超时时间,这个参数至关重要。它可以使select处于三种状态,第一,若将NULL以形参传入,即不传入时间结构,就是将select置于永久阻塞状态,一定等到监视文件描述符集合中某个文件描述符发生变化为止;第二,若将时间值设为0秒0毫秒,就变成一个纯粹的非阻塞函数,不管文件描述符是否有变化,都立刻返回继续执行,文件无变化返回0,有变化返回一个正值;第三,timeout的值大于0,这就是等待的超时时间,即select在timeout时间内阻塞,超时时间之内有事件到来返回正值,超时返回0。
返回值:负值:select错误,正值:某些文件可读写或出错,0:等待超时,没有可读写或错误的文件。

FD_ZERO(&set);      /* 将set清零 */
FD_SET(fd, &set);   /* 将fd加入set */
FD_CLR(fd, &set);   /* 将fd从set中清除 */
FD_ISSET(fd, &set); /* 如果fd在set中则真 */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>

#include <unistd.h>
#include <fcntl.h>

#define SERV_PORT 8888
#define MAX_LISTEN_QUE 5

#define MAX_BUFFER_SIZE 1024

#define RT_ERR (-1)
#define RT_OK  0
//创建套接字的函数
int Ipv4_tcp_create_socket(void)
{
int listenfd,sockfd,opt=1;
struct sockaddr_in server,client;
socklen_t len;
int temp;
int ret;
//创建套接字
listenfd = socket(AF_INET,SOCK_STREAM,0);//ipv4,全双工
if(listenfd < 0)
{
perror("Create socket fail\n");
return RT_ERR;
}
//设置地址重用
if((ret = setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt))) < 0)
{
perror("Error , set socket reuse addr failued\n");
return RT_ERR;
}
//初始化服务器端

bzero(&server,sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(SERV_PORT);
server.sin_addr.s_addr = htonl(INADDR_ANY);//连接所有的客户端

len = sizeof(struct sockaddr);
if(bind(listenfd,(struct sockaddr *)&server,len) < 0){
perror("bind error\n");
return RT_ERR;
}

listen(listenfd,MAX_LISTEN_QUE);

return listenfd;

}

int main(int argc,char *argv[])
{
int listenfd,sockfd;
struct sockaddr_in server,client;
socklen_t len;
int bytes = 0;
fd_set g_rdfs,cur_rdfs;
int maxfd;
int i;
char buf[MAX_BUFFER_SIZE];

len = sizeof(struct sockaddr_in);
//调用创建套接字函数
listenfd = Ipv4_tcp_create_socket();
//将c_rdfs清零
FD_ZERO(&g_rdfs);
//将listendfd添加进c_rdfs中
FD_SET(listenfd,&g_rdfs);
maxfd = listenfd;

while(1)
{
cur_rdfs = g_rdfs;
//监控套接字
if(select(maxfd + 1,&cur_rdfs,NULL,NULL,NULL) < 0){
perror("select error\n");
return RT_ERR;
}
for(i=0;i <= maxfd;i++){
//判断i是否在cur_rdfs中
if(FD_ISSET(i,&cur_rdfs)){
//判断是不是我们监听的套接字
if(listenfd == i){
//接收套接字(返回的是通信套接字)
if((sockfd = accept(listenfd,(struct sockaddr*)&client,(socklen_t*)&len)) < 0){
perror("accept error\n");
return RT_ERR;
}
printf("sockfd:%d\n",sockfd);
//清除我们添加进去的套接字,以防下次循环再次检测到
FD_CLR(i,&cur_rdfs);
//得到最大的套接字的个数
maxfd = maxfd > sockfd ? maxfd : sockfd;
//将通信套接字加入关注的套接字集
FD_SET(sockfd,&g_rdfs);
//如果不是监听套接字
}else{
printf("read socket :%d\n",i);
//如果不是监听套接字我就直接读取数据
bytes = recv(i,buf,MAX_BUFFER_SIZE,0);
if(bytes < 0){
perror("recv error\n");
return RT_ERR;
}
if(bytes == 0){
//客户端退出,从我关注的套接字集中把它清掉
FD_CLR(i,&g_rdfs);
//关闭套接字
close(i);
continue;
}
//打印读取到的内容
printf("buf:%s\n",buf);
//把客户端发送的数据,发送给客户端
send(i,buf,strlen(buf),0);
}
}
}
}
}

优化一下

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>

#include <unistd.h>
#include <fcntl.h>

#define SERV_PORT 8888
#define MAX_LISTEN_QUE 5

#define MAX_BUFFER_SIZE 1024

#define RT_ERR (-1)
#define RT_OK  0
//创建套接字的函数
int Ipv4_tcp_create_socket(void)
{
int listenfd,sockfd,opt=1;
struct sockaddr_in server,client;
socklen_t len;
int temp;
int ret;
//创建套接字
listenfd = socket(AF_INET,SOCK_STREAM,0);//ipv4,全双工
if(listenfd < 0)
{
perror("Create socket fail\n");
return RT_ERR;
}
//设置地址重用
if((ret = setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt))) < 0)
{
perror("Error , set socket reuse addr failued\n");
return RT_ERR;
}
//初始化服务器端

bzero(&server,sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(SERV_PORT);
server.sin_addr.s_addr = htonl(INADDR_ANY);//连接所有的客户端

len = sizeof(struct sockaddr);
if(bind(listenfd,(struct sockaddr *)&server,len) < 0){
perror("bind error\n");
return RT_ERR;
}

listen(listenfd,MAX_LISTEN_QUE);

return listenfd;

}

int main(int argc,char *argv[])
{
int listenfd,sockfd;
struct sockaddr_in server,client;
socklen_t len;
int bytes = 0;
fd_set g_rdfs,cur_rdfs;
int maxfd;
int i;
char buf[MAX_BUFFER_SIZE];
//获得套接字的极限值
int client_fd[FD_SETSIZE];

printf("FD_SETSIZE:%d\n",FD_SETSIZE);
len = sizeof(struct sockaddr_in);
//调用创建套接字函数
listenfd = Ipv4_tcp_create_socket();
//将c_rdfs清零
FD_ZERO(&g_rdfs);
//将listendfd添加进c_rdfs中
FD_SET(listenfd,&g_rdfs);
maxfd = listenfd;
//赋初值
for(i = 0;i < FD_SETSIZE; i++)
{
client_fd[i] = -1;
}
while(1)
{
cur_rdfs = g_rdfs;
//监控套接字
if(select(maxfd + 1,&cur_rdfs,NULL,NULL,NULL) < 0){
perror("select error\n");
return RT_ERR;
}
//判断监听套接字是否在检测集中
if(FD_ISSET(listenfd,&cur_rdfs)){
//接收套接字(返回的是通信套接字)
if((sockfd = accept(listenfd,(struct sockaddr*)&client,(socklen_t*)&len)) < 0){
perror("accept error\n");
return RT_ERR;
}
printf("sockfd:%d\n",sockfd);
//清除添加进去的套接字,以防下次循环再次检测到
FD_CLR(listenfd,&cur_rdfs);
//得到最大的套接字的个数
maxfd = maxfd > sockfd ? maxfd : sockfd;
//将通信套接字加入关注的套接字集
FD_SET(sockfd,&g_rdfs);
//下面的for语句整体的作用是找到通信套接字并将它的值赋值给存储套接字的数组
//前三个套接字都不是通信套接字,第四个才是
for(i = 0; i < maxfd; i++){
if(-1 == client_fd[i]){
client_fd[i] = sockfd;
break;
}
}
}
for(i=0;i <= maxfd;i++){
if(-1 == client_fd[i]){
continue;
}
//判断cur_rdfs是不是在关注的读写套接字中
if(FD_ISSET(client_fd[i],&cur_rdfs)){
printf("read socket :%d\n",client_fd[i]);
//如果不是监听套接字就直接读取数据
bytes = recv(client_fd[i],buf,MAX_BUFFER_SIZE,0);
if(bytes < 0){
perror("recv error\n");
return RT_ERR;
}
if(bytes == 0){
//客户端退出,从关注的套接字集中把它清掉
FD_CLR(client_fd[i],&g_rdfs);
//关闭套接字
close(client_fd[i]);
client_fd[i] = -1;
continue;
}
//打印读取到的内容
printf("buf:%s\n",buf);
//把客户端发送的数据,发送给客户端
send(client_fd[i],buf,strlen(buf),0);

}
}
}
}

运行结果

FD_SETSIZE:1024
sockfd:4
read socket :4
buf:1
read socket :4
buf:2

epoll

epoll_create函数: 创建一个epoll句柄,size用来告诉内核这个监听的数目一共有多大。这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
原函数:int epoll_create(int size);
参数:size,用来告诉内核要监听的数目一共有多少个。
返回值:成功返回一个非负整数的文件描述符,作为创建好的epoll句柄。失败返回-1,错误信息可以通过errno获得。

epoll_ctl函数: epoll的事件注册函数,它不同与select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。
原函数:int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
参数:epfd,epoll_create()函数返回的epoll句柄。
   op,操作选项。
   fd,要进行操作的目标文件描述符。
   event,struct epoll_event结构指针,将fd和要进行的操作关联起来。
返回值:成功返回0,作为创建好的epoll句柄。失败返回-1,错误信息可以通过errno获得。

参数op的可选值有以下3个:
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd;

struct epoll_event结构如下:

typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;

struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};

events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

epoll_wait函数: 等待事件的产生。
原函数:int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
参数:epfd,epoll_create()函数返回的epoll句柄。
   events,struct epoll_event结构指针,用来存放从内核得到事件的集合。
   maxevents,告诉内核这个events有多大
   timeout,等待时的超时时间,以毫秒为单位。
返回值:成功返回需要处理的事件数目。失败返回0,表示等待超时。

注:epoll有两种工作方式:

  LT(level triggered,水平触发)是缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表。

  ET (edge-triggered,边缘触发)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once)。
  

fcntl函数:
原函数:fcntl(int fd, int cmd, … /* arg */ )
参数:fd,创建的套接字的目标文件描述符
   cmd,要执行的控制操作
返回值: 与命令有关。如果出错,所有命令都返回-1,如果成功则返回某个其他值。下列三个命令有特定返回值:F_DUPFD,F_GETFD,F_GETFL以及F_GETOWN。第一个返回新的文件描述符,第二个返回相应标志,最后一个返回一个正的进程ID或负的进程组ID。

常用的用法:
  (1)把一个套接字设置为非阻塞型:cmd为F_SETFL,flags“包含”O_NONBLOCK。(fcntl(listenfd,F_SETFL,O_NONBLOCK))
  (2)把一个套接字设置成一旦其状态发生变化,内核就产生一个SIGIO:cmd为F_SETFL,flags“包含”O_ASYNC。
  (3)关于套接字的当前属主。
  

fcntl函数有5种功能:

  1. 复制一个现有的描述符(cmd=F_DUPFD).
  2. 获得/设置文件描述符标记(cmd=F_GETFD或F_SETFD).
  3. 获得/设置文件状态标记(cmd=F_GETFL或F_SETFL).
  4. 获得/设置异步I/O所有权(cmd=F_GETOWN或F_SETOWN).
  5. 获得/设置记录锁(cmd=F_GETLK,F_SETLK或F_SETLKW).

  
cmd的选项:
  F_DUPFD 返回一个如下描述的(文件)描述符:
    (1)最小的大于或等于arg的一个可用的描述符
    (2)与原始操作符一样的某对象的引用
    (3)如果对象是文件(file)的话,返回一个新的描述符,这个描述符与arg共享相同的偏移量(offset)
    (4)相同的访问模式(读,写或读/写)
    (5)相同的文件状态标志(如:两个文件描述符共享相同的状态标志)
    (6)与新的文件描述符结合在一起的close-on-exec标志被设置成交叉式访问execve(2)的系统调用

  F_GETFD 取得与文件描述符fd联合close-on-exec标志,类似FD_CLOEXEC.如果返回值和FD_CLOEXEC进行与运算结果是0的话,文件保持交叉式访问exec(),否则如果通过exec运行的话,文件将被关闭(arg被忽略)

  F_SETFD 设置close-on-exec旗标。该旗标以参数arg的FD_CLOEXEC位决定。
  F_GETFL 取得fd的文件状态标志,如同下面的描述一样(arg被忽略)

  F_SETFL 设置给arg描述符状态标志,可以更改的几个标志是:O_APPEND, O_NONBLOCK,O_SYNC和O_ASYNC。

  F_GETOWN 取得当前正在接收SIGIO或者SIGURG信号的进程id或进程组id,进程组id返回成负值(arg被忽略)

  F_SETOWN 设置将接收SIGIO和SIGURG信号的进程id或进程组id,进程组id通过提供负值的arg来说明,否则,arg将被认为是进程id
  

命令字(cmd)F_GETFL和F_SETFL的标志如下面的描述:
  O_NONBLOCK 非阻塞I/O;如果read(2)调用没有可读取的数据,或者如果write(2)操作将阻塞,read或write调用返回-1和EAGAIN错误

  O_APPEND 强制每次写(write)操作都添加在文件大的末尾,相当于open(2)的O_APPEND标志

  O_DIRECT 最小化或去掉reading和writing的缓存影响.系统将企图避免缓存你的读或写的数据;如果不能够避免缓存,那么它将最小化已经被缓存了的数 据造成的影响.如果这个标志用的不够好,将大大的降低性能

  O_ASYNC 当I/O可用的时候,允许SIGIO信号发送到进程组,例如:当有数据可以读的时候
  

注: 在修改文件描述符标志或文件状态标志时必须谨慎,先要取得现在的标志值,然后按照希望修改它,最后设置新标志值。不能只是执行F_SETFD或F_SETFL命令,这样会关闭以前设置的标志位。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#define SERV_PORT 8888
#define MAX_LISTEN_QUE 5
#define MAX_BUFFER_SIZE 100
#define RT_ERR (-1)
#define RT_OK  0
#define MAX_EVENTS 500
//创建套接字子函数
int IPv4_tcp_create_socked(void)
{
int listenfd,sockfd,opt = 1;
struct sockaddr_in server,client;
socklen_t len;
int timep;
int ret;
//创建套接字
listenfd = socket(AF_INET,SOCK_STREAM,0);//ipv4,全双工通信
if(listenfd < 0){
perror("cretae socket error\n");
return -1;
}
//设置地址重用
if((ret = setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt))) < 0){
perror("set sockopt failure\n");
return -1;
}

//初始化服务器结构体
bzero(&server,sizeof(server));
server.sin_family = AF_INET;//ipv4
server.sin_port = htons(SERV_PORT);//端口号(主机序转换到网络序)
server.sin_addr.s_addr = htonl(INADDR_ANY);//允许所有的客户端连接

len = sizeof(struct sockaddr);
//绑定端口号,IP到套接字
if(bind(listenfd,(struct sockaddr *)&server,len) < 0){
perror("bind error\n");
return -1;
}
//设置最大连接数
listen(listenfd,MAX_LISTEN_QUE);

return listenfd;
}

//数据处理子函数
int Process_data(int sockfd)
{
int bytes;
char buf[MAX_BUFFER_SIZE];
char *s = buf;
char flag = 1;
int len;

while(flag)
{
//读取数据
bytes = recv(sockfd,s,100,0);
if(bytes < 0){
//判断出错的类型是不是已经读完
if(errno == EAGAIN){
printf("no data\n");
break;
}
perror("recv error\n");
return -1;
}
//客户端断开连接
if(bytes == 0){
return -2;
}

if(bytes == 100){
flag = 1;
}
else{
flag = 0;
}
//调整存储数据指针
s += bytes;
//获得读取的字节数
len += bytes;
printf("bytes:%d\n",bytes);
}

printf("buf:%s\n",buf);
send(sockfd,buf,len,0);
return 0;
}

int main(int argc,char *argv[])
{
int listenfd,sockfd;
int epollfd,fds;
struct epoll_event ev,events[MAX_EVENTS];
int i,rv;
struct sockaddr_in client;
int len;

len = sizeof(struct sockaddr_in);
//创建epoll句柄
epollfd = epoll_create(MAX_EVENTS);
if(epollfd < 0){
perror("epoll_create error\n");
return -1;
}
//调用创建套接字函数
listenfd = IPv4_tcp_create_socked();
//把套接字设置为非阻塞方式
fcntl(listenfd,F_SETFL,O_NONBLOCK);
//设置要监听的套接字的可读监听模式
ev.data.fd = listenfd;
ev.events = EPOLLIN;
//epoll的注册函数(添加要监听的套接字)
rv = epoll_ctl(epollfd,EPOLL_CTL_ADD,listenfd,&ev);
if(rv < 0){
perror("epoll_ctl error\n");
return -1;
}
while(1)
{
//等待事件的产生
fds = epoll_wait(epollfd,events,MAX_EVENTS,-1);
if(fds < 0){
perror("epoll_wait error\n");
return -1;
}

for(i = 0;i < fds;i++)
{
//判断监听的套接字是不是我们创建的监听套接字
if(events[i].data.fd == listenfd)
{
sockfd = accept(listenfd,(struct sockaddr *)&client,&len);
if(sockfd <0){
perror("accept error\n");
continue;
}
ev.data.fd = sockfd;
ev.events = EPOLLIN | EPOLLET;//设置为读监听并且是边沿触发
//epoll的注册函数(添加要监听的套接字)
epoll_ctl(epollfd,EPOLL_CTL_ADD,sockfd,&ev);
continue;
}//如果不是我们创建的套接字,就是有数据到来
else{
//调用数据处理函数(传入的入口参数是我们创建的通信套接字)
rv = Process_data(events[i].data.fd);
if(rv == -2){
//清除套接字
epoll_ctl(epollfd,EPOLL_CTL_DEL,events[i].data.fd,&ev);
close(events[i].data.fd);
continue;
}

}
}
}

}

运行结果

bytes:1
buf:1
bytes:1
buf:2
bytes:1
buf:3
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: