您的位置:首页 > 运维架构 > Linux

linux高性能服务器编程之epoll

2016-05-30 23:03 597 查看
一.概述:
epoll是多路复用的一种,但它比select和poll更加高效。具体体现在以下几个方面:
(1).select能打开的文件描述符是有一定限制的,默认情况下是2048,这对应那些大型服务器来说h是不足的。但
epoll则没有这个限制,它所支持的fd上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左
右,具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。
(2).因为文件描述符是内核管理的,所以每次调用select或poll,都需要把fd集合从用户态拷贝到内核态,并且每次检查文件描述符的状态时,都要在内核遍历所有文件描述符,这个开销在fd很多时会很大。而epoll采用了nmap(内存映射)(和共享内存一样),内核和用户空间共用一行份fd集。
(3).另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

其它优化:
(4).epoll会给所有要关注的文件描述符建立一个红黑树,这样在查找某一个文件描述符时效率会有所提升。

(5).epoll会给准备好的文件描述符建立一个链表,这样查找一个已准备好的文件描述符时就不用在以前所有要关注的fd集中查找了。

二.epoll用法篇:
epoll是什么?按照man手册的说法:是为处理大批量句柄而作了改进的poll。当然,这不是2.6内核才有的,它是在2.5.44内核中被引进的(epoll(4) is a new API introduced in Linux kernel2.5.44),它几乎具备了之前所说select和poll的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。

epoll有epoll_create,epoll_ctl,epoll_wait 3个系统调用。

(1).epoll_create:
int   epoll_create(int size);
创建一个epoll的句柄(后面会根据这个句柄创建红黑树)。自从linux2.6.8之后,size参数是被忽略的(也就是说,可以为任意值)。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
返回值:成功返回一个epoll句柄,失败返回-1;

(2).epoll_ctl
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
函数描述:epoll的事件注册函数,它不同于select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。(一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。)
返回值:成功返回0,失败返回-1;

epfd参数:epoll_create创建的一个epoll句柄。
op参数:表示要执行的动作,用三个宏来表示:
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd;
fd参数:需要监听的文件描述符。
event参数:告诉内核需要监听什么事。struct epoll_event结构如下:
The event argument describes the object linked to the file descriptor fd.  The struct epoll_event is defined as :

typedef union epoll_data {
void        *ptr;
int          fd;
uint32_t     u32;
uint64_t     u64;
} epoll_data_t;

struct epoll_event {
uint32_t     events;      /* Epoll events */
epoll_data_t data;        /* User data variable */
};
events有如下值:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);

EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET:将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水水平触发(Level
Triggered)来说的。(epoll默认为水平触发)

EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。

(3).epoll_wait:

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
函数功能:监听在epoll监控的事件中已经发送的事件。
返回值:成功返回监听文件描述符集中已经准备好的文件描述符,返回0代表timeout,失败返回-1。
epoll参数:epoll_create创建的epoll句柄。
events参数:输出型参数,保存监听文件描述符集中已经准备好的文件描述符集。
maxevents参数:events数组的大小。
timeout参数:超时时间。单位为毫秒。

三.LT模式下的阻塞模式。
相关代码:

server.c:

1 /****************************************
2     > File Name:epoll_server.c
3     > Author:xiaoxiaohui
4     > mail:1924224891@qq.com
5     > Created Time:2016年05月28日 星期六 15时38分17秒
6 ****************************************/
7
8 #include<stdio.h>
9 #include<stdlib.h>
10 #include<sys/types.h>
11 #include<sys/socket.h>
12 #include<arpa/inet.h>
13 #include<netinet/in.h>
14 #include<string.h>
15 #include<unistd.h>
16 #include<sys/epoll.h>
17
18 #define LEN 1024地
19 const char* IP = "127.0.0.1";
20 const int PORT = 8080;
21 const int BACKLOG = 5;
22 int timeout = 5000;
23 const int MAXEVENTS = 64;
24 struct sockaddr_in local;
25 struct sockaddr_in client;

26 int SIZE_CLIENT = sizeof(client);
27
28 typedef struct data_buf     //用于存储epoll_event中的data中的不同元素
29 {
30     int fd;
31     char buf[LEN];
32 }data_buf_t, *data_buf_p;地
33
34
35 int ListenSock()
36 {
37     int listenSock = socket(AF_INET, SOCK_STREAM, 0);
38     if(listenSock < 0)
39     {
40         perror("socket");
41         exit(1);
42     }
43
44     local.sin_family = AF_INET;
45     local.sin_port = htons(PORT);
46     local.sin_addr.s_addr = inet_addr(IP);
47     if( bind(listenSock, (struct sockaddr*)&local, sizeof(local)) < 0)
48     {
49         perror("bind");
50         exit(2);
51     }
52
53     if( listen(listenSock, BACKLOG) < 0)
54     {
55         perror("listen");
56         exit(3);
57     }
58
59     return listenSock;
60 }
61
62 static int epoll_fd(int listenSock)
63 {
64
65     int epoll_fd = epoll_create(256);           //size随便选一个值
66
67     struct epoll_event ev;          //把listenSock设置进epoll_fd中
68     ev.events = EPOLLIN;
69     ev.data.fd = listenSock;
70     epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listenSock, &ev);        //系统会维护一个红黑树
71
72     struct epoll_event ev_outs[MAXEVENTS];      //准备好的队列
73     int max = MAXEVENTS;
74
75     while(1)
76     {
77         int num = -1;
78         switch( num = epoll_wait(epoll_fd, ev_outs, max, timeout))
79         {
80             case 0:     //timeout
81                 printf("timeout.....\n");
82                 break;
83             case -1:    //error
84                 perror("epoll_wait");
85                 break;
86             default:
87                 for(int index = 0; index < num; index++)
88                 {
89                     if(ev_outs[index].data.fd == listenSock && (ev_outs[index].events & EPOLLIN))      //监听套接字准备就绪
90                     {
91                         printf("accept is ready\n");
92                         int linkSock = accept(listenSock, (s地truct sockaddr*)&client, &SIZE_CLIENT);
93                         if(linkSock < 0)
94                         {
95                             perror("accept");
96                             continue;           //这次可能是一个新客户端的请求,所以后面可能还有文件描述符准备就绪了
97                         }地
98
99                         ev.events = EPOLLIN;     //把新套接字放到红黑树中
100                         ev.data.fd = linkSock;
101                         epoll_ctl(epoll_fd, EPOLL_CTL_ADD, linkSock, &ev);
102                     }
103                     else         //已链接套接字准备就绪
104                     {
105                         if(ev_outs[index].events & EPOLLIN)      //读事件准备就绪
106                         {
107                             data_buf_p mem = (data_buf_p)malloc(sizeof(data_buf_t));
108                             memset(mem->buf, '\0', sizeof(mem->buf));
109                             mem->fd = ev_outs[index].data.fd;
110
111                             int ret = read(mem->fd, mem->buf, sizeof(mem->buf));
112                             if(ret > 0)
113                             {
114                                 mem->buf[ret] = '\0';
115                                 printf("client# %s\n", mem->buf);
116
117                                 ev.data.ptr = mem;         //mem中即保持了fd,又保持了buf数据
118                                 ev.events = EPOLLOUT;    //读事件已经完成,现在要关心写事件
119                                 epoll_ctl(epoll_fd, EPOLL_CTL_MOD, mem->fd, &ev);
120                             }
121                             else if(ret == 0 )      //客户端已关闭
122                             {
123                                 printf("client is closed\n");
124                                 epoll_ctl(epoll_fd, EPOLL_CTL_DEL, ev_outs[index].data.fd, NULL);     //把该文件描述符从红黑树中移除
125                                 close(ev_outs[index].data.fd);
126                                 free(mem);
127                             }
128                             else
129                             {
130                                 perror("read");
131                                 continue;
132                             }
133                         }
134                         else if(ev_outs[index].events & EPOLLOUT)   //写事件准备就绪
135                         {
136                             data_buf_p mem = (data_buf_p)ev_outs[index].data.ptr;
137                             int fd = mem->fd;
138                             char* buf = mem->buf;
139
140                             if( write(fd, buf, strlen(buf)) < 0)
141                             {
142                                 perror("write");
143                                 continue;
144                             }
145
146                             ev.events = EPOLLIN;       //这个文件描述符的写事件已完成,下次关心读事件
147                             ev.data.fd = mem->fd;
148                             epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ev);
149                         }
150                         else   //DoNothing
151                         {}
152                     }
153                 }
154                 break;
155         }
156     }
157 }
158
159 int main()
160 {
161     int listen地Sock = ListenSock();
162     epoll_fd(listenSock);
163     close(listenSock);
164     return 0;
165 }


client.c:
1 /****************************************
2     > File Name:client.c
3     > Author:xiaoxiaohui
4     > mail:1924224891@qq.com
5     > Created Time:2016年05月23日 星期一 12时30分01秒
6 ****************************************/
7
8 #include<stdio.h>
9 #include<stdlib.h>
10 #include<string.h>
11 #include<sys/types.h>
12 #include<sys/socket.h>
13 #include<netinet/in.h>
14 #include<arpa/inet.h>
15 #include<sys/time.h>
16 #include<unistd.h>
17
18 #define LEN 1024
19 const int PORT = 8080;
20 const char* IP = "127.0.0.1";
21 struct sockaddr_in server;
22 int clientSock;
23 char buf[LEN];
24
25 int main()
26 {
27     clientSock = socket(AF_INET, SOCK_STREAM, 0);
28     if(clientSock < 0)
29     {
30         perror("socket");
31         exit(1);
32     }
33
34     server.sin_family = AF_INET;
35     server.sin_addr.s_addr = inet_addr(IP);
36     server.sin_port = htons(PORT);
37
38     if ( connect(clientSock, (struct sockaddr*)&server, sizeof(server)) < 0)
39     {
40         perror("connect");
41         exit(2);
42     }
43
44     while(1)
45     {
46         memset(buf, '\0', LEN);
47         printf("please input: ");
48         gets(buf);
49         write(clientSock, buf, strlen(buf));
50
51         memset(buf, '地\0', LEN);
52         int ret = read(clientSock, buf, LEN);
53         buf[ret] = '\0';
54         printf("echo: %s\n", buf);
55     }
56
57     return 0;
58 }
执行结果:








四.ET模式下的非阻塞模式:
(1).概述:
ET模式要在epoll_ctl中进行设置(具体设置看代码),并且要把套接字设置为非阻塞模式。
下面概况以下ET与LT的区别:
ET (edge-triggered)是高效的工作方式,只支持no-block socket,它效率要比LT更高。ET与LT的区别在于,当一个新的事件到来时,ET模式下当然可以从epoll_wait调用中获取到这个事件,可是如果这次没有把这个事件对应的套接字缓冲区处理完,在这个套接字中没有新的事件再次到来时,在ET模式下是无法再次从epoll_wait调用中获取这个事件的。而LT模式下,只要一个事件对应的套接字缓冲区还有数据,就总能从epoll_wait中获取这个事件。
因此,LT模式下开发基于epoll的应用要简单些,不太容易出错。而在ET模式下事件发生时,如果没有彻底地将缓冲区数据处理完,则会导致缓冲区中的用户请求得不到响应。

所以在下面代码中要封装一个read_data函数,来确保一次把数据缓冲区内的数据读取完。因为ET模式下只支持非阻塞模式,所以还要把每个套接字设置为非阻塞的。

下面代码实现 在浏览器访问服务器程序,并在浏览器中打印hello world :) ,当服务器程序发送完给浏览器的数据时,服务器程序关闭链接。
相关代码:
1 /****************************************
2     > File Name:epoll_server.c
3     > Author:xiaoxiaohui
4     > mail:1924224891@qq.com
5     > Created Time:2016年05月28日 星期六 15时38分17秒
6 ****************************************/
7
8 #include<stdio.h>
9 #include<stdlib.h>
10 #include<sys/types.h>
11 #include<sys/socket.h>
12 #include<arpa/inet.h>
13 #include<netinet/in.h>
14 #include<string.h>
15 #include<unistd.h>
16 #include<sys/epoll.h>
17 #include<fcntl.h>
18 #include<errno.h>
19
20 #define LEN 1024
21 const char* IP = "127.0.0.1";
22 const int PORT = 8080;
23 const int BACKLOG = 5;
24 int timeout = 5000;
25 const int MAXEVENTS = 64;
26 struct sockaddr_in local;
27 struct sockaddr_in client;
28 int SIZE_CLIENT = sizeof(client);
29
30 typedef struct data_buf     //用于存储epoll_event中的data中的不同元素
31 {
32     int fd;
33     char buf[LEN];
34 }data_buf_t, *data_buf_p;
35
36 static int set_no_block(int fd)       //把fd设置为非阻塞
37 {
38     int oldfd = fcntl(fd, F_GETFL);
39     if(oldfd < 0)
40     {
41         perror("fcntl");
42         return -1;
43     }
44
45     if( fcntl(fd, F_SETFL, oldfd | O_NONBLOCK))
46     {
47         perror("fcntl");
48         return -1;
49     }
50     return 0;
51 }
52
53 int read_data(int fd, char* buf, int len)      //ET模式下读取数据,因为ET模式下只通知一次,所以要保证把所有数据都读完
54 {                                              //成功返回读取的个数,失败返回-1,返回0代表读到文件尾
55     int index = 0;
56     int ret = -1;
57
58     while(index < len)
59     {
60         ret = read(fd, buf + index, len - index);
61         printf("the read return ret is %d\n", ret);
62         if(ret > 0)
63         {
64             index += ret;
65         }
66         else if(ret < 0)
67         {
68             printf("the errno is %d\n",errno);
69             if(errno == EAGAIN)
70             {
71                 break;
72             }
73         }
74         else
75         {
76             return 0;
77         }
78     }
79
80     return index;
81 }
82
83
84
85
86 static int ListenSock()
87 {
88     int listenSock = socket(AF_INET, SOCK_STREAM, 0);
89     if(listenSock < 0)
90     {
91         perror("socket");
92         exit(1);
93     }
94
95     int opt = 1;
96     if( setsockopt(listenSock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)))     //设置端口复用
97     {
98         perror("sersockopt");
99         exit(2);
100     }
101     if( set_no_block(listenSock) != 0)      //设置为非阻塞    .............修改
102     {
103         printf("set_non_block is error\n");
104         exit(3);
105     }
106
107     local.sin_family = AF_INET;
108     local.sin_port = htons(PORT);
109     local.sin_addr.s_addr = inet_addr(IP);
110     if( bind(listenSock, (struct sockaddr*)&local, sizeof(local)) < 0)
111     {
112         perror("bind");
113         exit(4);
114     }
115
116     if( listen(listenSock, BACKLOG) < 0)
117     {
118         perror("listen");
119         exit(5);
120     }
121
122     return listenSock;
123 }
124
125 static int epoll_fd(int listenSock)
126 {
127
128     int epoll_fd = epoll_create(256);           //size随便选一个值
129
130     struct epoll_event ev;          //把listenSock设置进epoll_fd中
131     ev.events = EPOLLIN | EPOLLET;    //....................修改
132     ev.data.fd = listenSock;
133     epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listenSock, &ev);        //系统会维护一个红黑树
134
135     struct epoll_event ev_outs[MAXEVENTS];      //准备好的队列
136     int max = MAXEVENTS;
137
138     while(1)
139     {
140         int num = -1;
141         switch( num = epoll_wait(epoll_fd, ev_outs, max, timeout))
142         {
143             case 0:     //timeout
144                 printf("timeout.....\n");
145                 break;
146             case -1:    //error
147                 perror("epoll_wait");
148                 break;
149             default:
150                 for(int index = 0; index < num; index++)
151                 {
152                     if(ev_outs[index].data.fd == listenSock && (ev_outs[index].events & EPOLLIN))      //监听套接字准备就绪
153                     {
154                         printf("accept is ready\n");
155                         int linkSock = accept(listenSock, (struct sockaddr*)&client, &SIZE_CLIENT);
156                         if(linkSock < 0)
157                         {
158                             perror("accept");
159                             continue;           //这次可能是一个新客户端的请求,所以后面可能还有文件描述符准备就绪了
160                         }
161
162                         if( set_no_block(linkSock) != 0)      //设置为非阻塞    .............修改
163                         {
164                             printf("set_non_block is error\n");
165                             exit(3);
166                         }
167
168                         ev.events = EPOLLIN | EPOLLET;     //把新套接字放到红黑树中   ...................修改
169                         ev.data.fd = linkSock;
170                         epoll_ctl(epoll_fd, EPOLL_CTL_ADD, linkSock, &ev);
171                     }
172                     else         //已链接套接字准备就绪
173                     {
174                         if(ev_outs[index].events & EPOLLIN)      //读事件准备就绪
175                         {
176                             data_buf_p mem = (data_buf_p)malloc(sizeof(data_buf_t));
177                             memset(mem->buf, '\0', sizeof(mem->buf));
178                             mem->fd = ev_outs[index].data.fd;
179
180                             printf("read is ready\n");
181                             int ret = read_data(mem->fd, mem->buf, sizeof(mem->buf)); //.........................修改
182                             printf("read is over, the ret is %d\n", ret);
183                             if(ret > 0)
184                             {
185                                 mem->buf[ret] = '\0';
186                                 printf("client# %s\n", mem->buf);
187
188                                 ev.data.ptr = mem;         //mem中即保持了fd,又保持了buf数据
189                                 ev.events = EPOLLOUT;    //读事件已经完成,现在要关心写事件
190                                 epoll_ctl(epoll_fd, EPOLL_CTL_MOD, mem->fd, &ev);
191                             }
192                             else if(ret == 0 )      //客户端已关闭
193                             {
194                                 printf("client is closed\n");
195                                 epoll_ctl(epoll_fd, EPOLL_CTL_DEL, ev_outs[index].data.fd, NULL);     //把该文件描述符从红黑树中移除
196                                 close(ev_outs[index].data.fd);
197                                 free(mem);
198                             }
199                             else
200                             {
201                                 perror("read");
202                                 continue;
203                             }
204                         }
205                         else if(ev_outs[index].events & EPOLLOUT)   //写事件准备就绪
206                         {
207                             data_buf_p mem = (data_buf_p)ev_outs[index].data.ptr;
208                             int fd = mem->fd;
209                             char* buf = mem->buf;
210
211                             char *msg = "HTTP/1.0 200 OK\r\n\r\nhello world:)\r\n"; //.....................修改
212                             if( write(fd, msg, strlen(msg)) < 0)                //.........................修改
213                             {
214                                 perror("write");
215                                 continue;
216                             }
217
218                             epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL);     //把该文件描述符从红黑树中移除    ............修改
219                             close(fd);                                       //.............................................修改
220                             free(mem);                                       //.............................................修改
221                             mem = NULL;
222
223                 //          ev.events = EPOLLIN;       //这个文件描述符的写事件已完成,下次关心读事件  ..................修改
224                 //          ev.data.fd = mem->fd;     //........................................修改
225                 //          epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ev);  //....................修改
226                         }
227                         else   //DoNothing
228                         {}
229                     }
230                 }
231                 break;
232         }
233     }
234 }
235
236 int main()
237 {
238     int listenSock = ListenSock();
239     epoll_fd(listenSock);
240     close(listenSock);
241     return 0;
242 }


执行结果:




六.总结:
epoll目前为止最为高效的多路复用接口,它比select和poll高效的本质原因在于epoll采用的nmap技术和使用了基于事件的触发机制。
另外,epoll会给所有要监听的文件描述符创建一个红黑树以方便操作,并且会把已经准备好的文件描述符单独拿出来放到一个链表中,来提高效率。
epoll默认的工作模式是LT模式,当ET模式要比LT模式更高效,所以,要提高epoll效率,可以使epoll工作在ET模式下,在ET模式下,要把所有的套接字都设置为非阻塞模式。
使epoll工作在ET模式下,就要封装一个read函数,来保证能把I/O缓冲区内的数据一次读取完。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  linux 服务器 高性能