您的位置:首页 > 其它

I/O多路复用(二)epoll的内核实现简述

2018-03-02 13:36 204 查看
本篇主要简述epoll的内核实现,以及结合之前select与poll的实现分析作出它与这两者间的区别。

一、epoll简介

使用过epoll的同学都知道我们要使用epoll主要使用三个函数,
epoll_create
epoll_ctl
epoll_wait
。在man手册中给出的epoll使用的例子如下。大致可以分为三个部分:

使用epoll_create创建epoll的文件描述符;

将我们需要监听的文件描述符和事件通过epoll_ctl加入到epoll描述符中;

使用epoll_wait函数等待事件的发生。

#define MAX_EVENTS 10
struct epoll_event ev, events[MAX_EVENTS];
int listen_sock, conn_sock, nfds, epollfd;

/* Code to set up listening socket, 'listen_sock',
(socket(), bind(), listen()) omitted */

/*1.创建epoll的文件描述符*/
epollfd = epoll_create1(0);
if (epollfd == -1) {
perror("epoll_create1");
exit(EXIT_FAILURE);
}

/*2.将我们需要监听的文件描述符和事件通过epoll_ctl加入到epoll描述符中*/
ev.events = EPOLLIN;
ev.data.fd = listen_sock;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == -1) {
perror("epoll_ctl: listen_sock");
exit(EXIT_FAILURE);
}

for (;;) {

/*3.等待事件的发生*/
nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
if (nfds == -1) {
perror("epoll_wait");
exit(EXIT_FAILURE);
}

for (n = 0; n < nfds; ++n) {
if (events
.data.fd == listen_sock) {
conn_sock = accept(listen_sock,
(struct sockaddr *) &addr, &addrlen);
if (conn_sock == -1) {
perror("accept");
exit(EXIT_FAILURE);
}
setnonblocking(conn_sock);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = conn_sock;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock,
&ev) == -1) {
perror("epoll_ctl: conn_sock");
exit(EXIT_FAILURE);
}
} else {
do_use_fd(events
.data.fd);
}
}
}


当然以上都是在用户空间的使用,下面将对内核空间的实现进行介绍。

二、epoll在内核中的实现

与用户空间对应,用户空间调用的三个epoll函数在内核中都有对应的实现。

1、epoll_create

首先我们看到
epoll_create
epoll_create1
的实现基本是一样的,两者最后调用的都是
epoll_create1
函数。

SYSCALL_DEFINE1(epoll_create, int, size)
{
if (size <= 0)
return -EINVAL;

return sys_epoll_create1(0);
}


epoll_create1
的函数主要是在内核中申请了一个
eventpoll
结构,这个结构承载着epoll的核心数据,包括已经就绪的描述符列表、各种互斥锁等。除此之外该函数还申请了一个文件描述符,并为该描述符增加了文件指针核inode结点,使得返回的epoll描述符可以像普通文件一样进行操作,在最后它将
eventpoll
结构放置在了文件的私有数据域。做完上述操作之后,将申请的epoll描述符返回给用户,这样之后用户只需要传入epoll的描述符就可以访问到epoll的的主数据结构eventpoll

SYSCALL_DEFINE1(epoll_create1, int, flags)
{
int error, fd;
struct eventpoll *ep = NULL;
struct file *file;

/* Check the EPOLL_* constant for consistency.  */
BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);

if (flags & ~EPOLL_CLOEXEC)
return -EINVAL;
/*
* Create the internal data structure ("struct eventpoll").
*/
error = ep_alloc(&ep);
if (error < 0)
return error;
/*
* Creates all the items needed to setup an eventpoll file. That is,
* a file structure and a free file descriptor.
*/
fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
if (fd < 0) {
error = fd;
goto out_free_ep;
}
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
if (IS_ERR(file)) {
error = PTR_ERR(file);
goto out_free_fd;
}
ep->file = file;
fd_install(fd, file);
return fd;

out_free_fd:
put_unused_fd(fd);
out_free_ep:
ep_free(ep);
return error;
}


2、epoll_ctl

epoll_ctl主要是对event_poll的主数据结构中的描述符集合进行操作,这些描述符集合就是我们等待事件的那些文件描述符。从前一篇文章I/O多路复用(一)select与poll的内核实现简述中我们知道,select使用数组存放描述符集合,poll使用数组+链表存放描述符集合,而在这里epoll使用的则是红黑树来存放描述符集合。

epoll_ctl对描述符集合的操作主要包括增加修改删除。增加即在红黑树中增加描述符,修改就是找到目标描述符并修改它的属性,删除就是删除红黑树中的目标描述符。修改与删除操作较为简单,这里主要介绍增加操作。

int epoll_ctl(...)
{
...
/*在红黑树中查找目标描述符*/
epi = ep_find(ep, tf.file, fd);

error = -EINVAL;
switch (op) {

/*增加描述符*/
case EPOLL_CTL_ADD:
if (!epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_insert(ep, &epds, tf.file, fd, full_check);
} else
error = -EEXIST;
if (full_check)
clear_tfile_check_list();
break;

/*删除描述符*/
case EPOLL_CTL_DEL:
if (epi)
error = ep_remove(ep, epi);
else
error = -ENOENT;
break;

/*修改描述符*/
case EPOLL_CTL_MOD:
if (epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_modify(ep, epi, &epds);
} else
error = -ENOENT;
break;
}
...
}


增加描述符是调用了
ep_insert
函数。
ep_insert
函数主要做了四件事情:

为增加的描述符申请epoll_item结构体,用来存放等待事件等等;

为该描述符对应的设备驱动的poll函数中的
poll_wait
赋值(设备驱动的poll函数调用的其实是poll_wait函数,poll_wait函数的主要功能是将当前进程加入到该设备的等待队列中,当设备状态就绪时将唤醒等待队列上的进程并执行添加的唤醒回调);

立即检查当前设备是否就绪,如果就绪的话就将当前设备的epoll_item结构体添加至
event_poll
ready_list
中,并唤醒当前进程;

将申请的epoll_item添加至红黑树中。

static int ep_insert(...)
{

/*1、为所添加的描述符申请epoll_item结构,并初始化*/
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
return -ENOMEM;
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
ep_set_ffd(&epi->ffd, tfile, fd);
epi->event = *event;
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR;
epq.epi = epi;

/*2、为poll_wait函数赋值*/
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);

/*3、立即进行对添加的设备进行poll操作获取当前状态*/
revents = ep_item_poll(epi, &epq.pt);

/*4、将当前设备的epoll_item添加到红黑树中*/
ep_rbtree_insert(ep, epi);

/*如果当前设备已经就绪那么久唤醒等待队列*/
if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);

/* Notify waiting tasks that events are available */
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);

if (waitqueue_active(&ep->poll_wait))
pwake++;
}

}


注意上述代码中的
ep_ptable_queue_proc
函数,该函数即是设备执行poll函数中的
poll_wait
真正执行的函数(可以查看
init_poll_funcptr
函数和poll.h中的
poll_wait
知晓)。这个函数的功能是将当前进程添加到设备就绪的等待列表中,除此之外它还为设备唤醒添加了一个回调函数ep_poll_callback,这个函数将在队列唤醒时执行。

static void ep_ptable_queue_proc(...)
{
struct epitem *epi = ep_item_from_epqueue(pt);
struct eppoll_entry *pwq;

if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {

/*被唤醒的时候执行的回调*/
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
pwq->whead = whead;
pwq->base = epi;
add_wait_queue(whead, &pwq->wait);
list_add_tail(&pwq->llink, &epi->pwqlist);
epi->nwait++;
} else {
/* We have to signal that an error occurred */
epi->nwait = -1;
}


ep_poll_callback
函数将在队列被唤醒时调用。它的主要功能是将当前设备的
epoll_item
添加到
event_poll
ready_list
中,并唤醒当前进程。这里需要注意的是
ovflist
列表,
ovflist
存放的是epoll正在把就绪结果拷贝到用户空间时,刚好有设备就绪,此时应把该就绪设备加入到
ovflist
列表中,之后再进行处理。

static int ep_poll_callback(...)
{
int pwake = 0;
unsigned long flags;
struct epitem *epi = ep_item_from_wait(wait);
struct eventpoll *ep = epi->ep;

if ((unsigned long)key & POLLFREE) {
ep_pwq_from_wait(wait)->whead = NULL;
/*
* whead = NULL above can race with ep_remove_wait_queue()
* which can do another remove_wait_queue() after us, so we
* can't use __remove_wait_queue(). whead->lock is held by
* the caller.
*/
list_del_init(&wait->task_list);
}

spin_lock_irqsave(&ep->lock, flags);

/*
* If the event mask does not contain any poll(2) event, we consider the
* descriptor to be disabled. This condition is likely the effect of the
* EPOLLONESHOT bit that disables the descriptor when an event is received,
* until the next EPOLL_CTL_MOD will be issued.
*/
if (!(epi->event.events & ~EP_PRIVATE_BITS))
goto out_unlock;

/*
* Check the events coming with the callback. At this stage, not
* every device reports the events in the "key" parameter of the
* callback. We need to be able to handle both cases here, hence the
* test for "key" != NULL before the event match test.
*/
if (key && !((unsigned long) key & epi->event.events))
goto out_unlock;

/*
* If we are transferring events to userspace, we can hold no locks
* (because we're accessing user memory, and because of linux f_op->poll()
* semantics). All the events that happen during that period of time are
* chained in ep->ovflist and requeued later on.
*/
if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
if (epi->next == EP_UNACTIVE_PTR) {
epi->next = ep->ovflist;
ep->ovflist = epi;
if (epi->ws) {
/*
* Activate ep->ws since epi->ws may get
* deactivated at any time.
*/
__pm_stay_awake(ep->ws);
}

}
goto out_unlock;
}

/* If this file is already in the ready list we exit soon */
if (!ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake_rcu(epi);
}

/*
* Wake up ( if active ) both the eventpoll wait list and the ->poll()
* wait list.
*/
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);

/*这个是eventpoll作为fd进行poll操作时*/
if (waitqueue_active(&ep->poll_wait))
pwake++;

out_unlock:
spin_unlock_irqrestore(&ep->lock, flags);

/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);

return 1;
}


3、epoll_wait

epoll_wait其实就是一直在等待目标设备的就绪。epoll_wait最终调用的是
ep_poll
函数。

int epoll_wait(...)
{
/* Get the "struct file *" for the eventpoll file */
f = fdget(epfd);

ep = f.file->private_data;

/* Time to fish for events ... */
error = ep_poll(ep, events, maxevents, timeout);

error_fput:
fdput(f);
return error;
}


ep_poll主要是在睡眠,等到超时或者被唤醒的时候才会继续执行。

static int ep_poll(...)
{

fetch_events:
spin_lock_irqsave(&ep->lock, flags);

if (!ep_events_available(ep)) {

/*加入到等待队列*/
init_waitqueue_entry(&wait, current);
__add_wait_queue_exclusive(&ep->wq, &wait);

for (;;) {

/*睡眠,等待唤醒*/
set_current_state(TASK_INTERRUPTIBLE);
if (ep_events_available(ep) || timed_out)
break;
if (signal_pending(current)) {
res = -EINTR;
break;
}

spin_unlock_irqrestore(&ep->lock, flags);

/*检查是否超时*/
if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
timed_out = 1;

spin_lock_irqsave(&ep->lock, flags);
}

__remove_wait_queue(&ep->wq, &wait);
__set_current_state(TASK_RUNNING);
}
check_events:
/* Is it worth to try to dig for events ? */
eavail = ep_events_available(ep);

spin_unlock_irqrestore(&ep->lock, flags);

/*将就绪的设备拷贝到用户空间*/
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && !timed_out)
goto fetch_events;

return res;
}


ep_send_events函数负责处理已经就绪的设备,其调用的是
ep_scan_ready_list
函数,而观察
ep_scan_ready_list
函数我们发现,最终调用的是
ep_send_events_proc
将就绪的设备信息拷贝到用户空间的。

static int ep_send_events(...)
{
struct ep_send_events_data esed;

esed.maxevents = maxevents;
esed.events = events;

return ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
}

static int ep_scan_ready_list(...)
{
...

/*将就绪的设备状态拷贝到用户空间*/
error = (*sproc)(ep, &txlist, priv);

/*
*下面这些就是处理ovflist列表,即发送时就绪的设备最终还是
*会放到readylist中,等待发送到用户空间
*/
/*
* During the time we spent inside the "sproc" callback, some
* other events might have been queued by the poll callback.
* We re-insert them inside the main ready-list here.
*/
for (nepi = ep->ovflist; (epi = nepi) != NULL;
nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
/*
* We need to check if the item is already in the list.
* During the "sproc" callback execution time, items are
* queued into ->ovflist but the "txlist" might already
* contain them, and the list_splice() below takes care of them.
*/
if (!ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
}
}
/*
* We need to set back ep->ovflist to EP_UNACTIVE_PTR, so that after
* releasing the lock, events will be queued in the normal way inside
* ep->rdllist.
*/
ep->ovflist = EP_UNACTIVE_PTR;

/*
* Quickly re-inject items left on "txlist".
*/
list_splice(&txlist, &ep->rdllist);
__pm_relax(ep->ws);

if (!list_empty(&ep->rdllist)) {
/*
* Wake up (if active) both the eventpoll wait list and
* the ->poll() wait list (delayed after we release the lock).
*/
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}

return error;
}


ep_send_events_proc函数就是将就绪的设备信息拷贝到用户空间的地方。主要就是遍历readylist,然后把readylist中就绪设备的信息拷贝到用户的event内存中。

static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
void *priv)
{

init_poll_funcptr(&pt, NULL);

/*遍历readylist*/
for (eventcnt = 0, uevent = esed->events;
!list_empty(head) && eventcnt < esed->maxevents;) {
epi = list_first_entry(head, struct epitem, rdllink);

/*
* Activate ep->ws before deactivating epi->ws to prevent
* triggering auto-suspend here (in case we reactive epi->ws
* below).
*
* This could be rearranged to delay the deactivation of epi->ws
* instead, but then epi->ws would temporarily be out of sync
* with ep_is_linked().
*/
ws = ep_wakeup_source(epi);
if (ws) {
if (ws->active)
__pm_stay_awake(ep->ws);
__pm_relax(ws);
}

list_del_init(&epi->rdllink);

revents = ep_item_poll(epi, &pt);

/*
* If the event mask intersect the caller-requested one,
* deliver the event to userspace. Again, ep_scan_ready_list()
* is holding "mtx", so no operations coming from userspace
* can change the item.
*/
if (revents) {
if (__put_user(revents, &uevent->events) ||
__put_user(epi->event.data, &uevent->data)) {
list_add(&epi->rdllink, head);
ep_pm_stay_awake(epi);
return eventcnt ? eventcnt : -EFAULT;
}
eventcnt++;
uevent++;
if (epi->event.events & EPOLLONESHOT)
epi->event.events &= EP_PRIVATE_BITS;
else if (!(epi->event.events & EPOLLET)) {
/*
* If this file has been added with Level
* Trigger mode, we need to insert back inside
* the ready list, so that the next call to
* epoll_wait() will check again the events
* availability. At this point, no one can insert
* into ep->rdllist besides us. The epoll_ctl()
* callers are locked out by
* ep_scan_ready_list() holding "mtx" and the
* poll callback will queue them in ep->ovflist.
*/
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
}
}
}

return eventcnt;
}


三、epoll的调用与实现流程图

1、整体流程图



2、详细调用流程图

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  epoll