您的位置:首页 > 运维架构 > Linux

epoll

2016-07-06 20:03 531 查看

1、epoll简介

epoll
是Linux内核中的一种可扩展IO事件处理机制,最早在 Linux 2.5.44内核中引入,可被用于代替POSIX select 和 poll 系统调用,并且在具有大量应用程序请求时能够获得较好的性能( 此时被监视的文件描述符数目非常大,与旧的 select和 poll
系统调用完成操作所需 O(n) 不同, epoll能在O(1)时间内完成操作,所以性能相当高),epoll 与 FreeBSD的kqueue类似,都向用户空间提供了自己的文件描述符来进行操作。

 

相比于select,epoll最大的好处在于它不会随着监听fd数目的增长而降低效率。因为在内核中的select实现中,它是采用轮询来处理的,轮询的fd数目越多,自然耗时越多。并且,linux/posix_types.h头文件有这样的声明:

#define__FD_SETSIZE   1024
表示select最多同时监听1024个fd,当然,可以通过修改头文件再重编译内核来扩大这个数目,但这似乎并不治本。
 

epoll主要有三个系统调用,C库封装成以下三个:

1.  int epoll_create(int size);
2. 
int epoll_ctl(int epfd, int op, int fd, struct epoll_event
*event);
3.  int epoll_wait(int epfd, struct epoll_event
*events,int maxevents, int timeout);

 

int epoll_create(int size);


该函数生成一个epoll专用的文件描述符。它其实是在内核申请一空间,用来存放你想关注的socket fd上是否发生以及发生了什么事件。

该创建一个epoll的句柄,size用来告诉内核需要监听的数目一共有多大。当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close() 关闭,否则可能导致fd被耗尽。

 

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);


该函数用于控制某个epoll文件描述符上的事件,可以注册事件,修改事件,删除事件。

epoll的事件注册函数,第一个参数是 epoll_create() 的返回值,即由 epoll_create 生成的epoll专用的文件描述符。第二个参数表示动作,使用如下三个宏来表示:

EPOLL_CTL_ADD    //注册新的fd到epfd中;
EPOLL_CTL_MOD    //修改已经注册的fd的监听事件;
EPOLL_CTL_DEL    //从epfd中删除一个fd;


第三个参数是需要监听的fd,即关联的文件描述符。第四个参数是告诉内核需要监听什么事,struct epoll_event 结构如下:


typedef union epoll_data
{
void        *ptr;
int          fd;
__uint32_t   u32;
__uint64_t   u64;
} epoll_data_t;

struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};


 

如:  

struct epoll_event ev;
ev.data.fd=listenfd;  //设置与要处理的事件相关的文件描述符 
ev.events=EPOLLIN|EPOLLET;  //设置要处理的事件类型 
epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);  //注册epoll事件 

 

events 可以是以下几个宏的集合:

EPOLLIN     //表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT    //表示对应的文件描述符可以写;
EPOLLPRI    //表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR    //表示对应的文件描述符发生错误;
EPOLLHUP    //表示对应的文件描述符被挂断;
EPOLLET     //将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT//只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。


 

如果调用成功返回0,不成功返回-1 

当对方关闭连接(FIN), EPOLLERR,都可以认为是一种EPOLLIN事件,在read的时候分别有0,-1两个返回值。

 

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);


该函数用于轮询I/O事件的发生; 

参数: 

epfd:由epoll_create 生成的epoll专用的文件描述符; 

epoll_event:用于回传代处理事件的数组; 

maxevents:每次能处理的事件数; 

timeout:等待I/O事件发生的超时值(毫秒,0会立即返回,-1将不确定,相当于阻塞,也有说法说是永久阻塞);一般用-1即可返回发生事件数。 

参数events用来从内核得到事件的集合,maxevents 告之内核这个events有多大,这个 maxevents 的值不能大于创建 epoll_create() 时的size,参数 timeout 是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。

 

epoll_wait运行的原理是 

等侍注册在epfd上的socket fd的事件的发生,如果发生则将发生的sokct fd和事件类型放入到events数组中。 

并且将注册在epfd上的socket fd的事件类型给清空,所以如果下一个循环你还要关注这个socket fd的话,则需要用epoll_ctl(epfd,EPOLL_CTL_MOD,listenfd,&ev)来重新设置socket fd的事件类型。这时不用EPOLL_CTL_ADD,因为socket fd并未清空,只是事件类型清空。这一步非常重要。 

 

单个epoll并不能解决所有问题,特别是你的每个操作都比较费时的时候,因为epoll是串行处理的。 

所以你还是有必要建立线程池来发挥更大的效能。

 

////////////////////////////////////////////////////////////////////////////// 

man中给出了epoll的用法,example程序如下: 

for(;;) {
nfds = epoll_wait(kdpfd, events, maxevents, -1);

for(n = 0; n < nfds; ++n) {
if(events
.data.fd == listener) {
client = accept(listener, (struct sockaddr *) &local,
&addrlen);
if(client < 0){
perror("accept");
continue;
}
setnonblocking(client);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = client;
if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, client, &ev) < 0) {
fprintf(stderr, "epoll set insertion error: fd=%d\n",
client);
return -1;
}
}
else
do_use_fd(events
.data.fd);
}
}

此时使用的是ET模式,即,边沿触发,类似于电平触发,epoll中的边沿触发的意思是只对新到的数据进行通知,而内核缓冲区中如果是旧数据则不进行通知,所以在do_use_fd函数中应该使用如下循环,才能将内核缓冲区中的数据读完。       

while (1) {
len = recv(*******);
if (len == -1) {
if(errno == EAGAIN)
break;
perror("recv");
break;
}
do something with the recved data........
}

在上面例子中没有说明对于listen socket fd该如何处理,有的时候会使用两个线程,一个用来监听accept另一个用来监听epoll_wait,如果是这样使用的话,则listen socket fd使用默认的阻塞方式就行了,而如果epoll_wait和accept处于一个线程中,即,全部由epoll_wait进行监听,则,需将listen socket fd也设置成非阻塞的,这样,对accept也应该使用while包起来(类似于上面的recv),因为,epoll_wait返回时只是说有连接到来了,并没有说有几个连接,而且在ET模式下epoll_wait不会再因为上一次的连接还没读完而返回,这种情况确实存在,我因为这个问题而耗费了一天多的时间,这里需要说明的是,每调用一次accept将从内核中的已连接队列中的队头读取一个连接,因为在并发访问的环境下,有可能有多个连接“同时”到达,而epoll_wait只返回了一次。

 

EPOLL事件有两种模型 Level Triggered (LT)Edge Triggered (ET):

LT(level triggered,水平触发模式)是缺省的工作方式,并且同时支持 block 和 non-block socket。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。

ET(edge-triggered,边缘触发模式)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,等到下次有新的数据进来的时候才会再次出发就绪事件。

 

 

epoll 例子

我们将实现一个简单的TCP 服务器,该迷你服务器将会在标准输出上打印处客户端发送的数据,首先我们创建并绑定一个 TCP 套接字:

 

static int
create_and_bind (char *port)
{
struct addrinfo hints;
struct addrinfo *result, *rp;
int s, sfd;

memset (&hints, 0, sizeof (struct addrinfo));
hints.ai_family = AF_UNSPEC;     /* Return IPv4 and IPv6 choices */
hints.ai_socktype = SOCK_STREAM; /* We want a TCP socket */
hints.ai_flags = AI_PASSIVE;     /* All interfaces */

s = getaddrinfo (NULL, port, &hints, &result);
if (s != 0)
{
fprintf (stderr, "getaddrinfo: %s\n", gai_strerror (s));
return -1;
}

for (rp = result; rp != NULL; rp = rp->ai_next)
{
sfd = socket (rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (sfd == -1)
continue;
s = bind (sfd, rp->ai_addr, rp->ai_addrlen);
if (s == 0)
{
/* We managed to bind successfully! */
break;
}

close (sfd);
}
if (rp == NULL)
{
fprintf (stderr, "Could not bind\n");
return -1;
}
freeaddrinfo (result);
return sfd;
}


create_and_bind() 包含了如何创建 IPv4 和 IPv6 套接字的代码块,它接受一字符串作为端口参数,并在 result 中返回一个 addrinfo 结构,

 

struct addrinfo
{
int              ai_flags;
int              ai_family;
int              ai_socktype;
int              ai_protocol;
size_t           ai_addrlen;
struct sockaddr *ai_addr;
char            *ai_canonname;
struct addrinfo *ai_next;
};


 

如果函数成功则返回套接字,如果失败,则返回 -1,

 

下面,我们将一个套接字设置为非阻塞形式,函数如下:

 

static int
make_socket_non_blocking (int sfd)
{
int flags, s;

flags = fcntl (sfd, F_GETFL, 0);
if (flags == -1)
{
perror ("fcntl");
return -1;
}

flags |= O_NONBLOCK;
s = fcntl (sfd, F_SETFL, flags);
if (s == -1)
{
perror ("fcntl");
return -1;
}

return 0;
}


 

接下来,便是主函数代码,主要用于事件循环:

 

#define MAXEVENTS 64

int
main (int argc, char *argv[])
{
int sfd, s;
int efd;
struct epoll_event event;
struct epoll_event *events;

if (argc != 2)
{
fprintf (stderr, "Usage: %s [port]\n", argv[0]);
exit (EXIT_FAILURE);
}

sfd = create_and_bind (argv[1]);
if (sfd == -1)
abort ();

s = make_socket_non_blocking (sfd);
if (s == -1)
abort ();

s = listen (sfd, SOMAXCONN);
if (s == -1)
{
perror ("listen");
abort ();
}

efd = epoll_create1 (0);
if (efd == -1)
{
perror ("epoll_create");
abort ();
}

event.data.fd = sfd;
event.events = EPOLLIN | EPOLLET;
s = epoll_ctl (efd, EPOLL_CTL_ADD, sfd, &event);
if (s == -1)
{
perror ("epoll_ctl");
abort ();
}

/* Buffer where events are returned */
events = calloc (MAXEVENTS, sizeof event);

/* The event loop */
while (1)
{
int n, i;

n = epoll_wait (efd, events, MAXEVENTS, -1);
for (i = 0; i < n; i++)
{
if ((events[i].events & EPOLLERR) ||
(events[i].events & EPOLLHUP) ||
(!(events[i].events & EPOLLIN)))
{
/* An error has occured on this fd, or the socket is not
ready for reading (why were we notified then?) */
fprintf (stderr, "epoll error\n");
close (events[i].data.fd);
continue;
}

else if (sfd == events[i].data.fd)
{
/* We have a notification on the listening socket, which
means one or more incoming connections. */
while (1)
{
struct sockaddr in_addr;
socklen_t in_len;
int infd;
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];

in_len = sizeof in_addr;
infd = accept (sfd, &in_addr, &in_len);
if (infd == -1)
{
if ((errno == EAGAIN) ||
(errno == EWOULDBLOCK))
{
/* We have processed all incoming
connections. */
break;
}
else
{
perror ("accept");
break;
}
}

s = getnameinfo (&in_addr, in_len,
hbuf, sizeof hbuf,
sbuf, sizeof sbuf,
NI_NUMERICHOST | NI_NUMERICSERV);
if (s == 0)
{
printf("Accepted connection on descriptor %d "
"(host=%s, port=%s)\n", infd, hbuf, sbuf);
}

/* Make the incoming socket non-blocking and add it to the
list of fds to monitor. */
s = make_socket_non_blocking (infd);
if (s == -1)
abort ();

event.data.fd = infd;
event.events = EPOLLIN | EPOLLET;
s = epoll_ctl (efd, EPOLL_CTL_ADD, infd, &event);
if (s == -1)
{
perror ("epoll_ctl");
abort ();
}
}
continue;
}
else
{
/* We have data on the fd waiting to be read. Read and
display it. We must read whatever data is available
completely, as we are running in edge-triggered mode
and won't get a notification again for the same
data. */
int done = 0;

while (1)
{
ssize_t count;
char buf[512];

count = read (events[i].data.fd, buf, sizeof buf);
if (count == -1)
{
/* If errno == EAGAIN, that means we have read all
data. So go back to the main loop. */
if (errno != EAGAIN)
{
perror ("read");
done = 1;
}
break;
}
else if (count == 0)
{
/* End of file. The remote has closed the
connection. */
done = 1;
break;
}

/* Write the buffer to standard output */
s = write (1, buf, count);
if (s == -1)
{
perror ("write");
abort ();
}
}

if (done)
{
printf ("Closed connection on descriptor %d\n",
events[i].data.fd);

/* Closing the descriptor will make epoll remove it
from the set of descriptors which are monitored. */
close (events[i].data.fd);
}
}
}
}

free (events);

close (sfd);

return EXIT_SUCCESS;
}


 

main() 首先调用 create_and_bind() 建立套接字,然后将其设置为非阻塞的,再调用listen(2)。之后创建一个epoll 实例 efd(文件描述符),并将其加入到sfd的监听套接字中以边沿触发方式等待
事件输入。

外层的 while 循环是主事件循环,它调用了 epoll_wait(2),此时线程仍然被阻塞等待事件,当事件可用时,epoll_wait(2) 将会在events参数中返回可用事件。

epoll 实例 efd 在每次事件到来并需要添加新的监听时就会得到更新,并删除死亡的链接。

当事件可用时,可能有一下三种类型:

Errors: 当错误情况出现时,或者不是与读取数据相关的事件通告,我们只是关闭相关的描述符,关闭该描述符会自动的将其从被epoll 实例 efd 监听的的集合中删除。
New connections: 当监听的文件描述符 sfd 可读时,此时会有一个或多个新的连接到来,当新连接到来时,accept(2) 该连接,并打印一条信息,将其设置为非阻塞的并把它加入到被 epoll 实例监听的集合中。
Client data: 当数据在客户端描述符可用时,我们使用 read(2) 在一个内部循环中每次读取512 字节数据。由于我们必须读取所有的可用数据,此时我们并不能获取更多的事件,因为描述符是以边沿触发监听的,读取的数据被写到 stdout (fd=1) (write(2))。如果read(2) 返回 0,意味着到了文件末尾EOF,我们可以关闭客户端连接,如果返回 
-1,
errno
会被设置成
EAGAIN
, 这意味着所有的数据已经被读取,可以返回主循环了。
 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  epoll linux 可扩展