您的位置:首页 > 理论基础 > 计算机网络

Linux网络编程---I/O多路复用之epoll

2015-10-05 17:45 811 查看
实现一个基本的流式套接字客户端/服务器通信程序,客户端和服务器按如下步骤交互:

(1)客户端向服务器发出日期时间请求字符串,如:%D %Y %A %T等。

(2)服务器从网络接收到日期时间请求字符串后,根据字符串格式生成对应的日期时间值返回给客户端。

二、事件I/O

在前面用select函数有效的解决了多个I/O端口的复用问题,但是select函数存在两个缺陷:一是进程所能同时打开的文件描述符个数受FD_SETSIZE大小的限制,二是每个select函数返回可用的文件描述符集合后,应用都必须对所有已注册的文件描述符进行遍历对比,以确定哪个描述符上发生了事件,从而对其进行读写操作,于是随着文件描述符的增加,系统的性能线性下降,从Linux内核2.6开始,提供了一种新的I/O模型,称为事件I/O(epoll)。epoll有效解决了select存在的问题,称为Linux平台上逐渐流行的编程模式,epoll通过三个系统调用完成了高效的I/O模型的实现:

(1)epoll_create 初始化epoll上下文环境

(2)epoll_ctl 向epoll上下文添加或者除去需要系统监视的文件描述符

(3)epoll_wait 等待文件描述符上发生事件

(一)、创建epoll上下文环境epoll_create

#include<sys/epoll.h>
int epoll_create(int size)


返回值:文件描述符表示成功,-1表示错误,errno记录错误号

epoll_create()函数调用成功,则初始化一个epoll实例,并返回一个和此实例相关联的文件描述符,该文件描述符实际并不指向任何真实的文件,仅作为句柄用于后续使用此epoll实例,size参数表示应用预计需要内核监视的文件描述符个数,注意该参数并不表示最大监视文件描述符个数,而只是一个告诉内核的提示数目,内核可以根据此size参数分配合适的内部数据结构,因此该数据越准确,就越能获得更好的性能。当发生错误时,epoll_create返回-1,并且设置错误代码errno为以下几个值:

(1)EINVAL 参数size不是一个整数

(2)ENFILE 系统已经分配了最大限度的文件描述符个数了

(3)ENOMEM 无足够内存完成此操作

示例:

int epfd;
epfd=epoll_create(100)
if(epfd<0)
{
perror("epoll_creat");
}
注意:在使用完毕此文件描述符后,应该通过close()关闭

(二)、epoll设置epoll_ctl

#include<sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);


epoll的事件注册函数,它不同与select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。第一个参数是epoll_create()的返回值,第二个参数表示动作,用三个宏来表示:

EPOLL_CTL_ADD:注册新的fd到epfd中;

EPOLL_CTL_MOD:修改已经注册的fd的监听事件;

EPOLL_CTL_DEL:从epfd中删除一个fd;

第三个参数是需要监听的fd,第四个参数是告诉内核需要监听什么事,struct epoll_event结构如下:

typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;

struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};


events可以是以下几个宏的集合:

EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);

EPOLLOUT:表示对应的文件描述符可以写;

EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);

EPOLLERR:表示对应的文件描述符发生错误;

EPOLLHUP:表示对应的文件描述符被挂断;

EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。

EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

示例:

//注册一个文件描述符并要求epoll根据传入的事件类型参数进行监视
struct epoll_event event;
int ret;
event.data.fd=fd;//将来epoll会返回此fd给应用
event.events=EPOLLIN|EPOLLOUT;//监视可读和可写事件
ret=epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&event);
if(ret)
{
perror("epoll_ctl");
}
//如果需要修改文件描述符所关联的事件,可以按如下方式进行:

struct epoll_event event;
int ret;
event.data.fd=fd;//将来epoll会返回此fd给应用
event.events=EPOLLIN;//监视可读事件
ret=epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&event);
if(ret)
{
perror("epoll_ctl");
}
//如果需要从epoll上下文中去除一个文件描述符,则可以按如下方式进行:
struct epoll_event event;
int ret;
ret=epoll_ctl(epfd,EPOLL_CTL_DEL,fd,&event);
if(ret)
{
perror("epoll_ctl");
}


(三)、等待事件发生epoll_wait

#include<sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
返回值:准备好的文件描述符个数表示成功,-1表示错误,errno记录错误号

调用epoll_waith后,进程将等待事件的发生,直到timeout参数设定的超时值到时为止,当成功返回后,则返回值为发生了所监视事件的文件描述符个数,并且参数events指向被返回的事件,本次epoll_wait最多可以返回maxevents个事件,因此可以通过遍历的方式逐个处理发生了事件的哪些文件描述符,另外,如果epoll_wait返回了maxevents个事件,并不表示当前只有maxevents个事件发生,而只是说本次调用能处理的事件个数为maxevents,剩下已产生但是未处理的事件存放到epoll上下文的事件队列中,因此将在下次调用epoll_wait并返回时进行处理,epoll_wait调用失败返回-1,并设置错误码如下值:

(1)EBADF epfd为一个非法文件描述符

(2)EFAULT 进程没有events参数所指向内存的写权限

(3)EINTR epoll_wait系统调用被信号中断

(4)EINVAL epfd不是合法epoll文件描述符,或者maxevents小于或者等于0

注意:若参数timeout为0,则epoll_wait立即返回,即使没有任何事件发生,并且返回值为0,若参数为-1,则不返回直到发生事件为止。

示例:

#define MAX_EVENTS 64  //设置一次epoll_wait最多能返回的事件
struct epoll_event *events;
int nr_events,epfd;
events=(struct epoll_event*)malloc(sizeof(struct epoll_event)*MAX_EVENTS);
if(!events)
{
perror("malloc");
return 1;
}
nr_events=epoll_wait(epfd,events,MAX_EVENTS,-1);
if(nr_events<0)
{
perror("epoll_wait");
free(events);
return 1;
}
//下面开始对epoll返回的事件进行遍历,处理
for(int i=0;i<nr_events;i++)
{
printf("event=%ld on fd=%d\n",events[i].events,events[i].data.fd);
//根据events[i].events  处理events[i].data.fd文件描述符
}
free(events);


epoll支持两种触发模式,即若在调用epoll_ctl时,在events域中设置了EPOLLET标志,则epoll将工作在边沿触发方式下,而不是默认的水平触发模式。

为了更好的理解这两种触发模式,考虑下面场景:

(1)客户端发来100字节的数据

(2)服务器调用epoll_wait后,发现套接字可读,于是读取了50字节,则在服务器套接字接收缓存中还有50字节未读,

(3)当服务器再次执行epoll_wait后,如果是边沿触发,则尽管接收缓存中还有50字节数据未读,则再次调用epoll_wait后,epoll_wait将不再返回该套接字可读,反之,若服务器工作于水平触发模式,则因为套接字缓存中有数据可读,所以当再次调用epoll_wait后,将仍可以返回此套接字为可读状态,因此边沿触发只发生在状态改变的那一时刻,而对于水平触发,只要套接字可读写,epoll_wait就立即返回,因此在使用边沿触发模式时,一定要小心,通常都会将套接字设置为非阻塞模式,然后检查错误代码是否为EWOULDBLOCK.

再改写上面程序之前,我们先简单熟悉epoll函数的用法,下面程序实现如下简单功能:

(1)服务器接收客服端发送的数据,并发回至客服端,客服端显示出接收的数据
(2)客服端退出后,服务器给予提示
(3)服务器可手动退出,若服务器退出,客服端自动退出

//服务器
//用法: ./server ip port
#include  <unistd.h>
#include  <sys/types.h>
#include  <sys/socket.h>
#include  <netinet/in.h>
#include  <arpa/inet.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <sys/resource.h>
#include <stdlib.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>
#define  MaxFd 10240 //最大连接数
#define  backlog 1000 //监听队列大小
#define MAX_EVENTS 64  //设置一次epoll_wait最多能返回的事件
#define MaxSize 1024 //缓冲区大小
#define Maxfile 100 //进程最多打开文件数

int do_task(int connfd)  //客服端请求处理程序
{
int nread;
char buf[MaxSize];
nread=read(connfd, buf, sizeof(buf));
if (nread==0)
{
printf("client close the connection\n");
close(connfd);
return -1;
}
if (nread<0)
{
perror("read error");
close(connfd);
return -1;
}
buf[nread]=0;
printf("send '%s' ok\n",buf);
write(connfd, buf, nread);
return 0;
}
int main(int argc,char **argv)
{
char quit[10];
bool opt=true;
int handle,curfd,acceptCount;
struct sockaddr_in server,client;
int listenfd,connfd,port;
socklen_t socklen=sizeof(struct sockaddr_in);
struct epoll_event ev;
struct epoll_event events[MaxFd+10];  //事件集
struct rlimit rt;
//设置每个进程允许打开的最大文件数
rt.rlim_max=rt.rlim_cur=Maxfile;
if (setrlimit(RLIMIT_NOFILE, &rt)==-1)
{
perror("setrlimit error");
return -1;
}
listenfd=socket(AF_INET, SOCK_STREAM, 0);
if (listenfd==-1)
{
perror("socket error\n");
return -1;
}
bzero(&server, sizeof(server));
server.sin_family=AF_INET;
inet_aton(argv[1], &server.sin_addr);
port=atoi(argv[2]);
server.sin_port=htons(port);
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

bind(listenfd, (struct sockaddr*)&server,socklen);
if (listen(listenfd, backlog)==-1)
{
perror("listen error");
return -1;
}
//创建 epoll 句柄,把监听 socket 加入到 epoll 集合里
handle=epoll_create(MaxFd);
ev.events=EPOLLIN | EPOLLET; //沿触发模式或水平触发模式。
ev.data.fd = listenfd; //监听的套接字

if (epoll_ctl(handle, EPOLL_CTL_ADD, listenfd, &ev) < 0)
{
fprintf(stderr, "epoll set listenfd error: fd=%d\n", listenfd);
return -1;
}
//添加输入 文件描述符
ev.events=EPOLLIN | EPOLLET;
ev.data.fd = STDOUT_FILENO;
epoll_ctl(handle, EPOLL_CTL_ADD, STDOUT_FILENO, &ev);
acceptCount=0;
printf("epollserver startup,port %d, max connection is %d, backlog is %d\n", port,MaxFd,backlog);
while (1)
{
//等待有事件发生
curfd=epoll_wait(handle,events,MAX_EVENTS,-1); //阻塞式调用
if (curfd==-1)
{
perror("epoll_wait");
continue;
}
for (int i=0; i<curfd; i++) //对返回的可处理的文件描述符数进行枚举判断
{
if (events[i].data.fd==listenfd) //监听套接字可读
{
connfd=accept(listenfd, (struct sockaddr *)&client,&socklen);
if (connfd<0)
{
perror("accept error");
continue;
}
++acceptCount;
if (acceptCount>MaxFd)
{
fprintf(stderr, "too many connection, more than %d\n", MaxFd);
close(connfd);
continue;
}
printf("accept from %s:%d, tot %d accepted\n",inet_ntoa(client.sin_addr),ntohs(client.sin_port),acceptCount);

if (fcntl(connfd, F_SETFL,fcntl(connfd, F_GETFD,0)| O_NONBLOCK)==-1)
{
perror("set nonblocking error");
}
//添加连接套接字到事件集中
ev.events=EPOLLIN | EPOLLET;;
ev.data.fd = connfd;
if (epoll_ctl(handle, EPOLL_CTL_ADD, connfd, &ev) < 0)
{
fprintf(stderr, "epoll set connfd error: fd=%d\n", listenfd);
return -1;
}
continue;
}
else if(events[i].data.fd==STDOUT_FILENO) //可输入
{
scanf("%s",quit);
if (strcasecmp(quit, "quit")==0)
{
printf("server close...\n");
close(listenfd);
return 0;
}
continue;
}
if (do_task(connfd)<0) //执行客服端请求
{
epoll_ctl(handle, EPOLL_CTL_DEL, events[i].data.fd,&ev); //删除该连接套接字
acceptCount--;
}
}
}
return 0;
}


客户端沿用
Linux网络编程---I/O多路复用 之 select 中的程序



下面是改写的时间服务程序,稍复杂:

服务器:
/*
TCP服务器
用法:./server port

*/
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <netdb.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <time.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <assert.h>
#include <vector>
#include <fcntl.h>
#include <sys/epoll.h>
#include <algorithm>
using namespace std;
#define BUFSIZE 1024
#define MAXCONN 200
static void bail(const char *on_what)
{
fputs(strerror(errno),stderr);
fputs(": ",stderr);
fputs(on_what, stderr);
fputc('\n',stderr);
exit(1);
}
struct sockfd_opt  //处理每个socket描述符的结构体
{
int fd;    //描述符
int (*do_task)(struct sockfd_opt *p_so); //回调函数
};
vector<struct sockfd_opt*>HashHead[MAXCONN];  //链表元素
int epfd;
struct epoll_event *events;

//设置为非阻塞模式
void setnonblocking(int sock)
{
int opts;
opts=fcntl(sock, F_GETFL);
if (opts<0)
bail("fcntl");
opts=opts|O_NONBLOCK;
if (fcntl(sock, F_SETFL,opts)<0)
bail("fcntl");
}

//生成hash值
int intHash(int key)
{
key+=~(key<<15);
key^=(key>>10);
key+=(key<<3);
key^=(key>>6);
key+=~(key<<11);
key^=(key>>16);
return key;
}

//向客户端发回日期时间
int send_reply(struct sockfd_opt *p_so)
{
char reqBuf[BUFSIZE]; //接收缓存
char dtfmt[BUFSIZE];//日期-时间结果字符串
time_t td; //当前时间和日期
struct tm tm;
long z;
unsigned int hash;
if ((z=read(p_so->fd, reqBuf, sizeof(reqBuf)))<=0)
{
//此fd代表的客户端关闭了连接,因此该fd将自动从epfd中删除,于是我们仅需将其从散列表中删除

hash=intHash(p_so->fd)& MAXCONN;
vector<struct sockfd_opt*>::iterator  it;
HashHead[hash].erase(find(HashHead[hash].begin(), HashHead[hash].end(), p_so)); //删除

//关闭当前套接字描述符
close(p_so->fd);
free(p_so);
//若读操作返回-1且不是RST分段
if (z<0 && (errno|=ECONNRESET))
bail("read()");
}
else
{
reqBuf[z]=0;
time(&td);
tm=*localtime(&td);
strftime(dtfmt, sizeof(dtfmt), reqBuf, &tm);

//向客户端发回结果
z=write(p_so->fd, dtfmt, strlen(dtfmt));
if (z<0)
bail("write()");
}

return 0;
}

//接收TCP连接
int creat_conn(struct sockfd_opt *p_so)
{
unsigned int hash;
struct sockaddr_in client;  //客户端ip地址
int conn_fd;
socklen_t sin_size;
sin_size=sizeof(client);
struct epoll_event ev;
if ((conn_fd=accept(p_so->fd, (struct sockaddr*)&client, &sin_size))==-1)
{
fprintf(stderr, "Accept error:%s\a\n",strerror(errno));
exit(1);
}
setnonblocking(conn_fd);
fprintf(stdout, "server got connection from %s:%d\n",inet_ntoa(client.sin_addr),ntohs(client.sin_port));

int ret;
if ((p_so=(struct sockfd_opt*)malloc(sizeof(sockfd_opt)))==NULL)
{
perror("malloc");
return -1;
}
p_so->fd=conn_fd;
p_so->do_task=send_reply;
hash=intHash(conn_fd)&MAXCONN;

HashHead[hash].push_back(p_so);
//  printf("fd2:%d hash2:%d size2:%d\n",conn_fd,hash,HashHead[hash].size());

//向epoll上下文注册此conn_fd
ev.data.fd=conn_fd;
ev.events=EPOLLIN;
//ev.events=EPOLLIN|EPOLLET

//添加此fd
ret=epoll_ctl(epfd,EPOLL_CTL_ADD,conn_fd,&ev);
if (ret)
bail("epoll_ctl");
return 0;
}

//初始化监听套接字选项
int init(int fd)
{
sockfd_opt *p_so;
struct epoll_event ev;
unsigned int hash;
int ret;
if ((p_so=(struct sockfd_opt*)malloc(sizeof(sockfd_opt)))==NULL)
{
perror("malloc");
return -1;
}
//设置监听套接字选项的回调函数
p_so->do_task=creat_conn;
p_so->fd=fd;
//将监听套接字选项加入到链表尾

hash=intHash(fd)&MAXCONN;
HashHead[hash].push_back(p_so);
//   printf("fd1:%d hash1:%d size1:%d\n",fd,hash,HashHead[hash].size());

//向epoll上下文注册此fd
ev.data.fd=fd;
ev.events=EPOLLIN;
//ev.events=EPOLLIN|EPOLLET

//添加此fd
ret=epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&ev);
if (ret)
bail("epoll_ctl");
return 0;

}

int main(int argc,char *argv[])
{
int listen_fd; //用于监听的套接字描述符
struct sockaddr_in server;
int port;
socklen_t optlen;
epfd=epoll_create(MAXCONN); //epoll集合
int nev;//epoll_wait返回的文件描述符个数
vector<struct sockfd_opt*>::iterator it;//迭代器
struct sockfd_opt *p_so;
unsigned int hash;
port=atoi(argv[1]);
if((listen_fd=socket(PF_INET, SOCK_STREAM, 0))==-1)
bail("socket()");

setnonblocking(listen_fd);

//设置套接字选项
int opt;
optlen=sizeof(opt);
int ret=setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, optlen);
if (ret)
bail("setsockopt()");
//服务器监听地址准备
memset(&server, 0, sizeof(server));
server.sin_family=PF_INET;
server.sin_addr.s_addr=htonl(INADDR_ANY);
server.sin_port=htons(port);
//绑定服务器到监听套接字
if((bind(listen_fd, (struct sockaddr*)&server, sizeof(server)))==-1)
bail("bind()");
//开始监听
if(listen(listen_fd, 5)==-1)
bail("listen()");

if (init(listen_fd))
bail("init()");

events=(struct epoll_event*)malloc(sizeof(struct epoll_event)*MAXCONN);
printf("server is waiting for acceptance of new client\n");
for (; ; )
{
//等待注册的事件发生
nev=epoll_wait(epfd,events,MAXCONN,-1);
if (nev<0)
{
free(events);
bail("epoll_wait");
}
for (int i=0; i<nev; i++)
{
hash=intHash(events[i].data.fd)&MAXCONN;
it=HashHead[hash].begin();
while (it!=HashHead[hash].end())
{
if ((*it)->fd==events[i].data.fd)
{
(*it)->do_task(*it);
break;   //跳出来,迭代器可能会失效(当删除一个套接字描述符后)
}
++it;
}
}
}
return 0;
}


客户端:
//TCP客户端
/*

用法:./client hostname port

说明:本程序使用TCP连接和TCP服务器通信,当连接建立后,向服务器发送如下格式字符串

格式字符串示例:
(1) %D
(2) %A %D %H:%M:%S
(3) %A
(4) %H:%M:%S
(5)...

*/

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <netdb.h>
#include <sys/types.h>
#include <time.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#define BUFSIZE 1024
#define backlog 128 //等待队列大小
static void bail(const char *on_what)
{
fputs(strerror(errno),stderr);
fputs(": ",stderr);
fputs(on_what, stderr);
fputc('\n',stderr);
exit(1);
}
int main(int argc,char *argv[])
{
int sockfd;//客户端套接字
char buf[BUFSIZE];
struct sockaddr_in server_addr;
int portnumber;
long nbytes;
long z;
char reqBuf[BUFSIZE];
if (argc!=3)
{
printf("输入格式错误\n");
exit(1);
}
if ((portnumber=atoi(argv[2]))<0)
{
fprintf(stderr,"Usage:%s hostname portnumber\a\n",argv[0]);
exit(1);
}
//创建客户端套接字

if ((sockfd=socket(PF_INET,SOCK_STREAM,0))==-1)
{
fprintf(stderr,"Socket error:%s\a\n",strerror(errno));
exit(1);
}
//创建服务器地址
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family=AF_INET;
server_addr.sin_port=htons(portnumber);
if (!inet_aton(argv[1], &server_addr.sin_addr))
{
bail("bad address");
}
//连接服务器
if (connect(sockfd, (struct sockaddr*)(&server_addr),sizeof(server_addr))==-1)
{
fprintf(stderr,"connect error:%s\a\n",strerror(errno));
exit(1);
}
printf("connected to server %s\n",inet_ntoa(server_addr.sin_addr));
//客户端主循环输入 “quit”退出
for (; ; )
{
//提示输入日期请求格式字符串

fputs("\nEnter fotmat string(^D or 'quit' to exit):",stdout);
if (!fgets(reqBuf,sizeof(reqBuf),stdin))
{
printf("\n");
break;
}
//为日期时间请求字符串添加NULL字符作为结尾,另外同时去掉末尾的换行符

z=strlen(reqBuf);
if (z>0 && reqBuf[--z]=='\n')
reqBuf[z]=0;

if (z==0)//客户端仅键入Enter
continue;

//输入‘quit’退出
if(!strcasecmp(reqBuf,"QUIT"))//忽略大小写比较
{
printf("press any key to end client.\n");
getchar();
break;
}
//发送日期时间请求字符串到服务器,注意请求信息中去掉了NULL字符

z=write(sockfd, reqBuf, sizeof(reqBuf));
printf("client has sent '%s' to the sever\n",reqBuf);
if (z<0)
bail("write()");

//从客户端套接字中读取服务器发回的应答
if ((nbytes=read(sockfd,buf,sizeof(buf)))==-1)
{
fprintf(stderr,"read error:%s\n",strerror(errno));
exit(1);
}
//若服务器由于某种原因关闭了连接,则客户端需要处理此事件
if(nbytes==0)
{
printf("server hs closed the socket.\n");
printf("press any key to exit...\n");
getchar();
break;
}
buf[nbytes]='\0';

//输出日期时间结果
printf("result from %s port %u:\n\t'%s'\n",inet_ntoa(server_addr.sin_addr),(unsigned)ntohs(server_addr.sin_port),buf);

}
close(sockfd);
return 0;
}

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: