您的位置:首页 > 其它

通过完整示例来理解如何使用 epoll

2017-11-08 21:18 351 查看
网络服务器通常使用一个独立的进程或线程来实现每个连接。由于高性能应用程序需要同时处理大量的客户端,这种方法就不太好用了,因为资源占用和上下文切换时间等因素影响了同时处理大量客户端的能力。另一种方法是在一个线程中使用非阻塞 I/O,以及一些就绪通知方法,即当你可以在一个套接字上读写更多数据的时候告诉你。

本文介绍了 Linux 的 epoll(7) 机制,它是 Linux 最好的就绪通知机制。我们用 C 语言编写了示例代码,实现了一个完整的 TCP 服务器。 我假设您有一定 C 语言编程经验,知道如何在 Linux 上编译和运行程序,并且可以阅读手册查看各种需要的 C 函数。

epoll 是在 Linux 2.6 中引入的,在其他类 UNIX 操作系统上不可用。它提供了一个类似于 select(2) 和 poll(2) 函数的功能:

select(2) 一次可以监测
FD_SETSIZE
数量大小的描述符,FD_SETSIZE 通常是一个在 libc 编译时指定的小数字。

poll(2) 一次可以监测的描述符数量并没有限制,但撇开其它因素,我们每次都不得不检查就绪通知,线性扫描所有通过描述符,这样时间复杂度为 O(n)而且很慢。

epoll 没有这些固定限制,也不执行任何线性扫描。因此它可以更高效地执行和处理大量事件。

一个 epoll 实例可由 epoll_create(2) 或 epoll_create1(2) (它们采用不同的参数)创建,它们的返回值是一个 epoll 实例。epoll_ctl(2) 用来添加或删除监听 epoll 实例的描述符。epoll_wait(2) 用来等待被监听的描述符事件,一直阻塞到事件可用。更多信息请参见相关手册。

当描述符被添加到 epoll 实例时,有两种模式:电平触发和边缘触发(译者注:借鉴电路里面的概念)。当你使用电平触发模式,并且数据可以被读取,epoll_wait(2) 函数总是会返回就绪事件。如果你还没有读完数据,并且再次在 epoll 实例上调用 epoll_wait(2) 函数监听这个描述符,由于还有数据可读,那么它会再次返回这个事件。在边缘触发模式下,你只会得到一次就绪通知。如果你没有将数据全部读走,并且再次在 epoll 实例上调用 epoll_wait(2) 函数监听这个描述符,它就会阻塞,因为就绪事件已经发送过了。

传递到 epoll_ctl(2) 的 epoll 事件结构体如下。对每一个被监听的描述符,你可以关联到一个整数或者一个用户数据的指针。

C

typedef union epoll_data
{
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;

struct epoll_event
{
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};

1
2
3
4
5
6
7
8
9
10
11
12
13

typedef union epoll_data
{
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;

struct epoll_event
{
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};

现在我们开始写代码。我们将实现一个小的 TCP 服务器,将发送到这个套接字的所有数据打印到标准输出上。首先编写一个 create_and_bind() 函数,用来创建和绑定 TCP 套接字:

C

static int
create_and_bind (char *port)
{
struct addrinfohints;
struct addrinfo*result, *rp;
int s, sfd;

memset (&hints, 0, sizeof (struct addrinfo));
hints.ai_family = AF_UNSPEC; /* Return IPv4 and IPv6 choices */
hints.ai_socktype = SOCK_STREAM; /* We want a TCP socket */
hints.ai_flags = AI_PASSIVE; /* All interfaces */

s = getaddrinfo(NULL, port, &hints, &result);
if (s != 0)
{
fprintf (stderr, "getaddrinfo: %sn", gai_strerror (s));
return -1;
}

for (rp = result; rp != NULL; rp = rp->ai_next)
{
sfd= socket (rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (sfd== -1)
continue;

s = bind (sfd, rp->ai_addr, rp->ai_addrlen);
if (s == 0)
{
/* We managed to bind successfully! */
break;
}

close (sfd);
}

if (rp == NULL)
{
fprintf (stderr, "Could not bindn");
return -1;
}

freeaddrinfo(result);

return sfd;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

static int
create_and_bind (char *port)
{
struct addrinfohints;
struct addrinfo*result, *rp;
int s, sfd;

memset (&hints, 0, sizeof (struct addrinfo));
hints.ai_family = AF_UNSPEC; /* Return IPv4 and IPv6 choices */
hints.ai_socktype = SOCK_STREAM; /* We want a TCP socket */
hints.ai_flags = AI_PASSIVE; /* All interfaces */

s = getaddrinfo(NULL, port, &hints, &result);
if (s != 0)
{
fprintf (stderr, "getaddrinfo: %sn", gai_strerror (s));
return -1;
}

for (rp = result; rp != NULL; rp = rp->ai_next)
{
sfd= socket (rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (sfd== -1)
continue;

s = bind (sfd, rp->ai_addr, rp->ai_addrlen);
if (s == 0)
{
/* We managed to bind successfully! */
break;
}

close (sfd);
}

if (rp == NULL)
{
fprintf (stderr, "Could not bindn");
return -1;
}

freeaddrinfo(result);

return sfd;
}

create_and_bind() 包含一个标准代码块,用一种可移植的方式来获得 IPv4 和 IPv6 套接字。它接受一个
port
字符串参数,可由
argv[1]
传递。getaddrinfo(3) 函数返回一堆
addrinfo
结构体到
result 变量中
,它们与传入的
hints
参数是兼容的。
addrinfo
结构体像这样:

C

struct addrinfo
{
int ai_flags;
int ai_family;
int ai_socktype;
int ai_protocol;
size_t ai_addrlen;
struct sockaddr *ai_addr;
char *ai_canonname;
struct addrinfo*ai_next;
};

1
2
3
4
5
6
7
8
9
10
11

struct addrinfo
{
int ai_flags;
int ai_family;
int ai_socktype;
int ai_protocol;
size_t ai_addrlen;
struct sockaddr *ai_addr;
char *ai_canonname;
struct addrinfo*ai_next;
};

我们依次遍历这些结构体并用它们创建套接字,直到可以创建并绑定一个套接字。如果成功了,create_and_bind() 返回这个套接字描述符。如果失败则返回 -1。

下面我们编写一个函数,用于将套接字设置为非阻塞状态。make_socket_non_blocking() 为传入的
sfd
参数设置
O_NONBLOCK
标志:

C

static int
make_socket_non_blocking (int sfd)
{
int flags, s;

flags = fcntl (sfd, F_GETFL, 0);
if (flags == -1)
{
perror ("fcntl");
return -1;
}

flags |= O_NONBLOCK;
s = fcntl (sfd, F_SETFL, flags);
if (s == -1)
{
perror ("fcntl");
return -1;
}

return 0;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

static int
make_socket_non_blocking (int sfd)
{
int flags, s;

flags = fcntl (sfd, F_GETFL, 0);
if (flags == -1)
{
perror ("fcntl");
return -1;
}

flags |= O_NONBLOCK;
s = fcntl (sfd, F_SETFL, flags);
if (s == -1)
{
perror ("fcntl");
return -1;
}

return 0;
}

现在说说 main() 函数吧,它里面包含了这个程序的事件循环。这是主要代码:

#define MAXEVENTS 64

int
main (int argc, char *argv[])
{
int sfd, s;
int efd;
struct epoll_event event;
struct epoll_event *events;

if (argc != 2)
{
fprintf (stderr, "Usage: %s [port]n", argv[0]);
exit (EXIT_FAILURE);
}

sfd= create_and_bind (argv[1]);
if (sfd== -1)
abort ();

s = make_socket_non_blocking (sfd);
if (s == -1)
abort ();

s = listen (sfd, SOMAXCONN);
if (s == -1)
{
perror ("listen");
abort ();
}

efd = epoll_create1 (0);
if (efd == -1)
{
perror ("epoll_create");
abort ();
}

event.data.fd = sfd;
event.events = EPOLLIN | EPOLLET;
s = epoll_ctl (efd, EPOLL_CTL_ADD, sfd, &event);
if (s == -1)
{
perror ("epoll_ctl");
abort ();
}

/* Buffer where events are returned */
events = calloc (MAXEVENTS, sizeof event);

/* The event loop */
while (1)
{
int n, i;

n = epoll_wait (efd, events, MAXEVENTS, -1);
for (i = 0; i < n; i++)
{
if ((events[i].events & EPOLLERR) ||
(events[i].events & EPOLLHUP) ||
(!(events[i].events & EPOLLIN)))
{
/* An error has occured on this fd, or the socket is not
ready for reading (why were we notified then?) */
fprintf (stderr, "epoll errorn");
close (events[i].data.fd);
continue;
}

else if (sfd== events[i].data.fd)
{
/* We have a notification on the listening socket, which
means one or more incoming connections. */
while (1)
{
struct sockaddr in_addr;
socklen_t in_len;
int infd;
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];

in_len = sizeof in_addr;
infd = accept (sfd, &in_addr, &in_len);
if (infd == -1)
{
if ((errno == EAGAIN) ||
(errno == EWOULDBLOCK))
{
/* We have processed all incoming
connections. */
break;
}
else
{
perror ("accept");
break;
}
}

s = getnameinfo (&in_addr, in_len,
hbuf, sizeof hbuf,
sbuf, sizeof sbuf,
NI_NUMERICHOST | NI_NUMERICSERV);
if (s == 0)
{
printf("Accepted connection on descriptor %d "
"(host=%s, port=%s)n", infd, hbuf, sbuf);
}

/* Make the incoming socket non-blocking and add it to the
list of fds to monitor. */
s = make_socket_non_blocking (infd);
if (s == -1)
abort ();

event.data.fd = infd;
event.events = EPOLLIN | EPOLLET;
s = epoll_ctl (efd, EPOLL_CTL_ADD, infd, &event);
if (s == -1)
{
perror ("epoll_ctl");
abort ();
}
}
continue;
}
else
{
/* We have data on the fd waiting to be read. Read and
display it. We must read whatever data is available
completely, as we are running in edge-triggered mode
and won't get a notification again for the same
data. */
int done = 0;

while (1)
{
ssize_t count;
char buf[512];

count = read (events[i].data.fd, buf, sizeof buf);
if (count == -1)
{
/* If errno == EAGAIN, that means we have read all
data. So go back to the main loop. */
if (errno != EAGAIN)
{
perror ("read");
done = 1;
}
break;
}
else if (count == 0)
{
/* End of file. The remote has closed the
connection. */
done = 1;
break;
}

/* Write the buffer to standard output */
s = write (1, buf, count);
if (s == -1)
{
perror ("write");
abort ();
}
}

if (done)
{
printf ("Closed connection on descriptor %dn",
events[i].data.fd);

/* Closing the descriptor will make epoll remove it
from the set of descriptors which are monitored. */
close (events[i].data.fd);
}
}
}
}

free (events);

close (sfd);

return EXIT_SUCCESS;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187

#define MAXEVENTS 64

int
main (int argc, char *argv[])
{
int sfd, s;
int efd;
struct epoll_event event;
struct epoll_event *events;

if (argc != 2)
{
fprintf (stderr, "Usage: %s [port]n", argv[0]);
exit (EXIT_FAILURE);
}

sfd= create_and_bind (argv[1]);
if (sfd== -1)
abort ();

s = make_socket_non_blocking (sfd);
if (s == -1)
abort ();

s = listen (sfd, SOMAXCONN);
if (s == -1)
{
perror ("listen");
abort ();
}

efd = epoll_create1 (0);
if (efd == -1)
{
perror ("epoll_create");
abort ();
}

event.data.fd = sfd;
event.events = EPOLLIN | EPOLLET;
s = epoll_ctl (efd, EPOLL_CTL_ADD, sfd, &event);
if (s == -1)
{
perror ("epoll_ctl");
abort ();
}

/* Buffer where events are returned */
events = calloc (MAXEVENTS, sizeof event);

/* The event loop */
while (1)
{
int n, i;

n = epoll_wait (efd, events, MAXEVENTS, -1);
for (i = 0; i < n; i++)
{
if ((events[i].events & EPOLLERR) ||
(events[i].events & EPOLLHUP) ||
(!(events[i].events & EPOLLIN)))
{
/* An error has occured on this fd, or the socket is not
ready for reading (why were we notified then?) */
fprintf (stderr, "epoll errorn");
close (events[i].data.fd);
continue;
}

else if (sfd== events[i].data.fd)
{
/* We have a notification on the listening socket, which
means one or more incoming connections. */
while (1)
{
struct sockaddr in_addr;
socklen_t in_len;
int infd;
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];

in_len = sizeof in_addr;
infd = accept (sfd, &in_addr, &in_len);
if (infd == -1)
{
if ((errno == EAGAIN) ||
(errno == EWOULDBLOCK))
{
/* We have processed all incoming
connections. */
break;
}
else
{
perror ("accept");
break;
}
}

s = getnameinfo (&in_addr, in_len,
hbuf, sizeof hbuf,
sbuf, sizeof sbuf,
NI_NUMERICHOST | NI_NUMERICSERV);
if (s == 0)
{
printf("Accepted connection on descriptor %d "
"(host=%s, port=%s)n", infd, hbuf, sbuf);
}

/* Make the incoming socket non-blocking and add it to the
list of fds to monitor. */
s = make_socket_non_blocking (infd);
if (s == -1)
abort ();

event.data.fd = infd;
event.events = EPOLLIN | EPOLLET;
s = epoll_ctl (efd, EPOLL_CTL_ADD, infd, &event);
if (s == -1)
{
perror ("epoll_ctl");
abort ();
}
}
continue;
}
else
{
/* We have data on the fd waiting to be read. Read and
display it. We must read whatever data is available
completely, as we are running in edge-triggered mode
and won't get a notification again for the same
data. */
int done = 0;

while (1)
{
ssize_t count;
char buf[512];

count = read (events[i].data.fd, buf, sizeof buf);
if (count == -1)
{
/* If errno == EAGAIN, that means we have read all
data. So go back to the main loop. */
if (errno != EAGAIN)
{
perror ("read");
done = 1;
}
break;
}
else if (count == 0)
{
/* End of file. The remote has closed the
connection. */
done = 1;
break;
}

/* Write the buffer to standard output */
s = write (1, buf, count);
if (s == -1)
{
perror ("write");
abort ();
}
}

if (done)
{
printf ("Closed connection on descriptor %dn",
events[i].data.fd);

/* Closing the descriptor will make epoll remove it
from the set of descriptors which are monitored. */
close (events[i].data.fd);
}
}
}
}

free (events);

close (sfd);

return EXIT_SUCCESS;
}

main() 首先调用 create_and_bind() 新建套接字。然后把套接字设置非阻塞模式,再调用listen(2)。接下来它创建一个 epoll 实例
efd
,添加监听套接字
sfd
,用电平触发模式来监听输入事件。

外层的 while 循环是主要事件循环。它调用epoll_wait(2),线程保持阻塞以等待事件到来。当事件就绪,epoll_wait(2) 用
events
参数返回事件,这个参数是一群
epoll_event
结构体。

当我们添加新的监听输入连接以及删除终止的现有连接时,efd 这个 epoll 实例在事件循环中不断更新。

当事件是可用的,它们可以有三种类型:

错误:当一个错误连接出现,或事件不是一个可以读取数据的通知,我们只要简单地关闭相关的描述符。关闭描述符会自动地移除 efd 这个 epoll 实例的监听列表。

新连接:当监听描述符
sfd
是可读状态,这表明一个或多个连接已经到达。当有一个新连接, accept(2) 接受这个连接,打印一条相应的消息,把这个到来的套接字设置为非阻塞状态,并将其添加到
efd 这个 epoll 实例
的监听列表。

客户端数据:当任何一个客户端描述符的数据可读时,我们在内部 while 循环中用 read(2) 以 512 字节大小读取数据。这是因为当前我们必须读走所有可读的数据,当监听描述符是边缘触发模式下,我们不会再得到事件。被读取的数据使用 write(2) 被写入标准输出(fd=1)。如果 read(2) 返回 0,这表示 EOF 并且我们可以关闭这个客户端的连接。如果返回 -1,
errno
被设置为
EAGAIN
,这表示这个事件的所有数据被读走,我们可以返回主循环。

就是这样。它在一个循环中运行,在监听列表中添加和删除描述符。

下载 epoll-example.c 代码。

更新1:电平和边缘触发的定义被颠倒错误了(虽然代码是正确的)。这是被Reddit用户 bodski 发现的。文章现在正确了。我应该在发布前校对的。对不起,并感谢谢指出错误。:)

更新2:代码被修改成连接将被阻塞时才执行accept(2),所以如果多个连接到达,我们全部接受。这是Reddit用户 pitchford 提出。谢谢你的评论。 :)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: