您的位置:首页 > 运维架构 > Linux

Linux多路复用epoll浅析

2016-08-05 17:01 411 查看
在前面已经简单的说了select和poll

http://blog.csdn.net/u011573853/article/details/52105365

http://blog.csdn.net/u011573853/article/details/52119879

今天说一下高效率的epoll。

一,epoll的高效的实现,epoll之所以比select更加高效是因为epoll改变了轮询检查的方式,select每一次会把所有的要监视的网络连接检查一遍,而epoll是通过内核监听设置好的事件,当检测到某个事件发生后就会通过函数回调加入的事件就绪队列中,每一次只需要向就绪队列中检查就好。

二, epoll的三个函数

1,新建epoll描述符==epoll_create()

2,epoll_ctrl(epoll描述符,添加或者删除所有待监控的连接)

3,返回的活跃连接 ==epoll_wait( epoll描述符 )

与select相比,epoll分清了频繁调用和不频繁调用的操作。例如,epoll_ctrl是不太频繁调用的,而epoll_wait是非常频繁调用的。这时,epoll_wait却几乎没有入参,这比select的效率高出一大截,而且,它也不会随着并发连接的增加使得入参越发多起来,导致内核执行效率下降。

函数原型

int epoll_create(int size)
size:告诉内核监听的数目


#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
epfd:为epoll_creat的句柄
op:表示动作,用3个宏来表示:
EPOLL_CTL_ADD(注册新的fd到epfd),
EPOLL_CTL_MOD(修改已经注册的fd的监听事件),
EPOLL_CTL_DEL(从epfd删除一个fd);
event:告诉内核需要监听的事件
struct epoll_event {
__uint32_t events;
/* Epoll events */
epoll_data_t data;
/* User data variable */
};
typedef union epoll_data {
void
*ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭)
EPOLLOUT:表示对应的文件描述符可以写
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)
EPOLLERR:表示对应的文件描述符发生错误
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里


#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
events:用来从内核得到事件的集合,
maxevents:告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,
timeout:是超时时间
-1:阻塞
0:立即返回,非阻塞
>0:指定微秒
返回值:成功返回有多少文件描述符就绪,时间到时返回0,出错返回-1


三, epoll的三大关键要素:mmap、红黑树、链表。

epoll是通过内核与用户空间mmap同一块内存实现的。mmap将用户空间的一块地址和内核空间的一块地址同时映射到相同的一块物理内存地址(不管是用户空间还是内核空间都是虚拟地址,最终要通过地址映射映射到物理地址),使得这块物理内存对内核和对用户均可见,减少用户态和内核态之间的数据交换。内核可以直接看到epoll监听的句柄,效率高。

红黑树将存储epoll所监听的套接字。上面mmap出来的内存如何保存epoll所监听的套接字,必然也得有一套数据结构,epoll在实现上采用红黑树去存储所有套接字,当添加或者删除一个套接字时(epoll_ctl),都在红黑树上去处理,红黑树本身插入和删除性能比较好,时间复杂度O(logN)。

核心数据结构

struct epitem
{
struct rb_node rbn;            //用于主结构管理的红黑树
struct list_head rdllink;       //事件就绪队列
struct epitem *next;           //用于主结构体中的链表
struct epoll_filefd ffd;         //每个fd生成的一个结构
int nwait;
struct list_head pwqlist;     //poll等待队列
struct eventpoll *ep;          //该项属于哪个主结构体
struct list_head fllink;         //链接fd对应的file链表
struct epoll_event event;  //注册的感兴趣的事件,也就是用户空间的epoll_event
}


struct eventpoll
{
spin_lock_t lock;            //对本数据结构的访问
struct mutex mtx;            //防止使用时被删除
wait_queue_head_t wq;        //sys_epoll_wait() 使用的等待队列
wait_queue_head_t poll_wait; //file->poll()使用的等待队列
struct list_head rdllist;    //事件满足条件的链表
struct rb_root rbr;          //用于管理所有fd的红黑树
struct epitem *ovflist;      //将事件到达的fd进行链接起来发送至用户空间
}


四,添加以及返回事件

通过epoll_ctl函数添加进来的事件都会被放在红黑树的某个节点内,所以,重复添加是没有用的。当把事件添加进来的时候时候会完成关键的一步,那就是该事件都会与相应的设备(网卡)驱动程序建立回调关系,当相应的事件发生后,就会调用这个回调函数,该回调函数在内核中被称为:ep_poll_callback,这个回调函数其实就所把这个事件添加到rdllist这个双向链表中。一旦有事件发生,epoll就会将该事件添加到双向链表中。那么当我们调用epoll_wait时,epoll_wait只需要检查rdlist双向链表中是否有存在注册的事件,效率非常可观。这里也需要将发生了的事件复制到用户态内存中即可。



五, epoll_wait的工作流程:

1,epoll_wait调用ep_poll,当rdlist为空(无就绪fd)时挂起当前进程,直到rdlist不空时进程才被唤醒。

2,文件fd状态改变(buffer由不可读变为可读或由不可写变为可写),导致相应fd上的回调函数ep_poll_callback()被调用。

3,ep_poll_callback将相应fd对应epitem加入rdlist,导致rdlist不空,进程被唤醒,epoll_wait得以继续执行。

4,ep_events_transfer函数将rdlist中的epitem拷贝到txlist中,并将rdlist清空。

5,ep_send_events函数(很关键),它扫描txlist中的每个epitem,调用其关联fd对用的poll方法。此时对poll的调用仅仅是取得fd上较新的events(防止之前events被更新),之后将取得的events和相应的fd发送到用户空间(封装在struct epoll_event,从epoll_wait返回)。

6,sever案例

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<sys/epoll.h>
#include<errno.h>
#include<ctype.h>
#include<unistd.h>

#define MAXLINE 80
#define SERV_PORT 8000
#define OPEN_MAX 1024

int main(void)
{
int i,j,maxi,listenfd,connfd,sockfd;
int nready,efd,res;
ssize_t n;
char buf[MAXLINE],str[INET_ADDRSTRLEN];
socklen_t clien;
int client[OPEN_MAX];
struct sockaddr_in cliaddr,servaddr;
struct epoll_event tep,ep[OPEN_MAX];

listenfd = socket(AF_INET,SOCK_STREAM,0);

bzero(&servaddr,sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(SERV_PORT);

bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr));

listen(listenfd,20);

for(i=0;i<OPEN_MAX;i++)
client[i]=-1;
maxi = -1;

efd = epoll_create(OPEN_MAX);
if(efd == -1){
printf("epoll create is error\n");
exit(1);
}

tep.events = EPOLLIN;
tep.data.fd = listenfd;
res = epoll_ctl(efd,EPOLL_CTL_ADD,listenfd,&tep);
if(res==-1){
printf("epoll ctl is error\n");
exit(1);
}
for(;;){
nready = epoll_wait(efd,ep,OPEN_MAX,-1);
if(nready == -1){
printf("epoll_wait error \n");
exit(-1);
}
for(i=0;i<nready;i++){
if(!(ep[i].events & EPOLLIN))
continue;
if(ep[i].data.fd == listenfd){
clien = sizeof(cliaddr);
connfd = accept(listenfd,(struct sockaddr*)&cliaddr,&clien);
printf("reciced from %s at PORT %d\n",inet_ntop(AF_INET,&cliaddr.sin_addr,str,sizeof(str)),ntohs(cliaddr.sin_port));
for(j=0;j<OPEN_MAX;j++){
if(client[j]<0){
client[j]=connfd;
break;
}
}
if(j==OPEN_MAX){
printf("too many clients\n");
exit(1);
}
if(j>maxi)
maxi = j;
tep.events = EPOLLIN;
tep.data.fd = connfd;
res = epoll_ctl(efd,EPOLL_CTL_ADD,connfd,&tep);
if(res == -1){
printf("epoll ctl error\n");
exit(1);
}
}else{
sockfd = ep[i].data.fd;
n= read(sockfd,buf,MAXLINE);
if(n==0){
for(j=0;j<=maxi;j++){
if(client[j]==sockfd){
client[j]=-1;
break;
}
}
res = epoll_ctl(efd,EPOLL_CTL_DEL,sockfd,NULL);
if(res==-1){
printf("epoll_ctl del error\n");
exit(1);
}
close(sockfd);
printf("client[%d] closed connection \n",j);
}else{
for(j=0;j<n;j++){
buf[j] = toupper(buf[j]);
}
write(sockfd,buf,n);
}
}
}
}
close(listenfd);
close(efd);
return 0;

}


7,select,poll,epoll对比

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  epoll linux 多路并发