您的位置:首页 > 其它

poll和epoll的内核实现

2015-05-13 11:03 232 查看

poll & epoll的内核实现

1 poll

poll的使用这里不多说。需要注意的是,调用poll时需要传递一个数组,每个fd对应一个struct pollfd,然后在poll返回后需要重新遍历数组以确定哪些fd可读或可写,效率比较低下。试想,在高并发的网络环境下,遍历一遍几十万规模的数组太耗功夫了。

但是,研究poll的内核实现,可以为我们提供一些背景知识。

1.1 驱动的poll与等待队列

每一个打开的文件均对应一个驱动程序,例如普通文件基于磁盘文件系统驱动,socket则对应网络协议栈,设备文件则基于对应的设备驱动。驱动程序最核心的就是通过read/write等操作为用户提供操作该设备的接口。

读写操作并不总是能够立即返回的,因此一般地,驱动程序会为读和写分别维护一个等待队列。当进程执行阻塞型read/write并且不能立即返回时,就会被加入到等待队列(
wait_queue
),并且自己陷入睡眠(
wait_event_interruptable
)。当条件满足时,驱动程序就会唤醒等待队列里的所有进程(
wake_up_interruptable
)。

因此,使用poll代替read/write的目的就是避免阻塞操作。能够对某个fd使用poll的条件是对应的驱动程序必须实现文件操作中的poll函数,具体可以参考
struct file_operations


首先它会调用
poll_wait
,执行(通过形参传递过来的)的回调函数;接着,它需要检查当前是否可读/可写,并返回一个mask值。有了驱动的poll支持,sys_poll就可以实现自己功能了。

例如,socket的poll(sock_poll)的实现如下:

[code]static unsigned int sock_poll(struct file *file, poll_table *wait)
{
    struct socket *sock;

    sock = file->private_data;
    return sock->ops->poll(file, sock, wait);
}

static unsigned int poll(struct file *file, struct socket *sock, poll_table *wait)
{
    struct sock *sk = sock->sk;
    u32 mask;

    poll_wait(file, sk->sk_sleep, wait);

    if (!skb_queue_empty(&sk->sk_receive_queue) ||
        (sock->state == SS_UNCONNECTED) ||
        (sock->state == SS_DISCONNECTING))
        mask = (POLLRDNORM | POLLIN);
    else
        mask = 0;

    if (sock->state == SS_DISCONNECTING)
        mask |= POLLHUP;
    else
        mask |= POLLOUT;

    return mask;
}


驱动程序提供了以下机制:

提供等待队列,当条件可用时,wake_up所有的等待项。一个进程可以wait多个等待队列,并指定被wake_up时的回调函数。注意,wake_up是对等待队列里的所有等待项执行的一个特定的动作,并不是说就一定唤醒进程。但通常意义上,等待条件的进程肯定是会睡眠的,因此回调函数相应地也一定会唤醒进程的。

提供poll函数,并执行指定的回调函数。一般用来将特定的等待队列项加入到驱动的等待队列。poll的贡献其实就是将该驱动的等待队列暴露给外部代码。

1.2 sys_poll的实现

接上面的话,内核里的poll(sys_poll)先遍历一遍pollfds,分别调用驱动的poll看是否有满足条件的fd,如果有那么更新pollfds.revents变量之后就可以返回到用户空间了。如果没有那么就需要陷入睡眠等待驱动程序唤醒它。当它被唤醒了就可以返回到用户空间了。

这个陷入睡眠是在哪里执行的?还记得上面有个
poll_wait
吗?它会执行一个回调函数。

[code]static inline void poll_wait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p)
{
    if (p && wait_address)
        p->qproc(filp, wait_address, p);
}


poll_table
只包裹了一个函数指针。具体到sys_poll,传递时指向的是
__pollwait
:

[code]static void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p)
{
    struct poll_wqueues *pwq = container_of(p, struct poll_queues, pt);
    struct poll_table_entry *entry = poll_get_entry(pwq);
    if (!entry)
        return;
    get_file(filp);
    entry->filp = filp;
    entry->wait_address = wait_address;
    init_waitqueue_func_entry(&entry->wait, pollwake);
    entry->wait.private = pwq;
    add_wait_queue(wait_address, &entry->wait);
}


这里,针对每一个需要等待的fd,均执行__pollwait,之后就陷入睡眠。此时,调用poll的进程加入了多个等待队列,任意一个唤醒操作就能够将其唤醒(
pollwake
),随后返回到用户空间。

它的效率怎么样呢?

开始需要拷贝整个pollfds数组到内核,返回时需要拷贝每个pollfds.revents回用户空间,因此不适合处理大数目的fds;

需要遍历数组,调用每一个文件对应的poll以确定是否可读或可写,效率也极低。

下面,看一下epoll是如何高效地实现的。

2 epoll

[code]struct eventpoll {
    spinlock_t lock;
    struct mutex mtx;

    wait_queue_head_t wq; /* 等待队列,执行epoll_wait阻塞的进程*/
    wait_queue_head_t poll_wait;

    struct list_head rdllist; /* 就绪fd列表 */
    struct rb_root rbr;
    struct epitem *ovflist;

    struct user_struct *user;
};


2.1 epoll_create

输入:int size。能够监听事件的数量,但在内核里只要>0就ok,值不起作用。

输出:int fd。表示内核epoll对象的文件描述符。

分配eventpoll对象内存

[code]static int ep_alloc(struct eventpoll **pep);


其中,user指向当前用户的user_struct结构。

从伪文件系统中新建文件

[code]int anno_inode_getfd(const char *name, 
                    const struct file_operations *fops, 
                    void *priv, int flags);


这样,就在文件系统eventfs中创建了新的inode节点、dentry、file等数据结构,并返回一个fd。至于eventfs文件系统是如何实现的,暂不用深究。

需要注意的是,在创建对应的file结构时,将其
private_data
指向了刚刚创建的
eventpoll
结构。

2.2 epoll_ctl

接下来,就是调用epoll_ctl往其中添加事件了。

[code]/* POSIX API*/
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);


拷贝事件到内存

[code]copy_from_user(&epds, event, sizeof(struct epoll_event));


虽然POSIX和内核对struct epoll_event的定义不是完全一致,但size却保存一致。

[code]/*用户空间*/
struct epoll_event {
    __uint32_t events;
    epoll_data_t data;
};

typedef union epoll_data {
    void *ptr;
    int fd;
    __uint32_t u32;
    __uint64_t u64;
};

/* kernel */
struct epoll_event {
    __u32 events;
    __u64 data;
 } EPOLL_PACKED;


检查文件

epfdfd分别查找到对应的file结构filetfile,判断是否可以将其加入到epoll监听列表。

并根据
file->private_data
获得指向eventpoll的指针ep。

执行操作

所有加入到epoll监听列表中的文件描述符均对应一个
struct epitem
结构。这些结构组成了一颗RB tree

[code]/* find epitem in rb tree */
struct epitem *ep_find(struct eventpoll *ep, struct file *file, int fd);

/* insert new event into rb tree */
int ep_insert(struct eventpoll *ep, struct epoll_event *event, struct file *tfile, int fd);

int ep_remove(struct eventpoll *ep, struct epitem *epi);


ep_insert

下面重点看一下新添加fd时的情况。

首先,它会调用驱动程序的poll函数,传递的函数是
ep_ptable_queue_proc
。这个函数跟sys_poll一样,也是将自己加入到驱动的等待队列中去,并设置wake_up时的回调函数为
ep_poll_callback


注意,epoll_ctl并不会睡眠,它只是将其加入到等待队列而已。如果在应用程序调用epoll_wait之前,某个fd可读或可写了,这时候内核就回去执行
ep_poll_callback
这个回调函数。epoll高效的地方就在于,它在这个函数里面将这个就绪的fd加入到一个就绪list里面,等待着下次的epoll_wait。

考虑这种场景:在调用epoll_ctl之前,某个进程/线程已经阻塞在epoll_wait上了。这时候调用epoll_ctl增加了一个fd,并且发现这个fd是立即可用的,因此这时候epoll_ctl就会把等待在ep->wq上的进程唤醒。

epoll_wait

首先判断就绪链表,如果不为空,那么无需阻塞可以直接返回到用户空间了。

否则,就将当前进程添加到ep->wq等待队列,并睡眠。直到下一次有fd就绪导致
ep_poll_callback
被回调。这时候,它就会唤醒等待在ep->wq队列上的进程,从而使epoll_wait继续执行。

总结

可以看出来,epoll的高效之处在于,你不用一次性把所有需要监听的fd全部都添加到对应的等待队列之后再睡眠。相反,你可以一次一次地添加fd,然后执行epoll_wait。每个操作均不会产生大量的数据拷贝操作。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: