您的位置:首页 > 运维架构 > Nginx

nginx 源码学习笔记(二十二)—— event 模块(三) ——epoll模块

2012-05-28 14:15 721 查看
上一节我们讲到了事件驱动的模块,它把我们引入epoll模块,今天我们主要学习下nginx如何使用epoll完成时间驱动,实现高并发;这里不详细讲解epoll原理,如果有机会再做一次单独的epoll的学习。

本文来自于:http://blog.csdn.net/lengzijian

回忆一下上一节的内容,在我们讲到ngx_process_events_and_timers时,在源码最后提到了ngx_process_events,这里是把我们引入epoll的入口:

1.先来看下ngx_process_events的宏定义:

src/event/ngx_event.h

#define ngx_process_events   ngx_event_actions.process_events


2.继续查找ngx_event_actions,我们找到如下结构体:

src/event/ngx_event.h

typedef struct {
ngx_int_t  (*add)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
ngx_int_t  (*del)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);

ngx_int_t  (*enable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
ngx_int_t  (*disable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);

ngx_int_t  (*add_conn)(ngx_connection_t *c);
ngx_int_t  (*del_conn)(ngx_connection_t *c, ngx_uint_t flags);

ngx_int_t  (*process_changes)(ngx_cycle_t *cycle, ngx_uint_t nowait);
ngx_int_t  (*process_events)(ngx_cycle_t *cycle, ngx_msec_t timer,
ngx_uint_t flags);

ngx_int_t  (*init)(ngx_cycle_t *cycle, ngx_msec_t timer);
void       (*done)(ngx_cycle_t *cycle);
} ngx_event_actions_t;


a.我们去源代码中搜索下关键字ngx_event_actions:

modules/ngx_epoll_module.c:    ngx_event_actions = ngx_epoll_module_ctx.actions;
modules/ngx_select_module.c:    ngx_event_actions = ngx_select_module_ctx.actions;
modules/ngx_poll_module.c:    ngx_event_actions = ngx_poll_module_ctx.actions;

ngx_event.c:ngx_event_actions_t   ngx_event_actions;


前面三行表示:所有event模块对象中的actions就是ngx_event_actions_t对象,而ngx_event_action在第四行定义为全局变量,用于同一接口,下面又存在一个疑问,event模块到底做了些什么?

b.先找到ngx_event_module_t的结构体:

src/event/ngx_event.h

typedef struct {
ngx_str_t              *name;           //模块名

void                 *(*create_conf)(ngx_cycle_t *cycle);  //钩子函数,之前讲过
char                 *(*init_conf)(ngx_cycle_t *cycle, void *conf);//同上

ngx_event_actions_t     actions;       //接下来主要看
} ngx_event_module_t;


我们找一个例子来详细讲解下

src/event/modules/ngx_epoll_module.c

ngx_event_module_t  ngx_epoll_module_ctx = {
&epoll_name,
ngx_epoll_create_conf,               /* create configuration */
ngx_epoll_init_conf,                 /* init configuration */

{
ngx_epoll_add_event,             /* add an event */
ngx_epoll_del_event,             /* delete an event */
ngx_epoll_add_event,             /* enable an event */
ngx_epoll_del_event,             /* disable an event */
ngx_epoll_add_connection,        /* add an connection */
ngx_epoll_del_connection,        /* delete an connection */
NULL,                            /* process the changes */
ngx_epoll_process_events,        /* process the events */
ngx_epoll_init,                  /* init the events */
ngx_epoll_done,                  /* done the events */
}
};


这里有注释就不详细讲解了。

ngx_process_events这个函数就是我们要找的,要了好大一圈,ngx_process_events实际上就是调用这个函数,此处本人纠结,为什么作者不加点注释呢。

3.下面正式观察下ngx_epoll_init函数:

src/event/modules/ngx_epoll_module.c

static ngx_int_t
ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer)
{
ngx_epoll_conf_t  *epcf;
//获取epoll模块的配置结构
epcf = ngx_event_get_conf(cycle->conf_ctx, ngx_epoll_module);
//ep是epoll模块定义的一个全局变量,初始化为-1
if (ep == -1) {
//创建一个epoll对象,容量为总连接数的一半
ep = epoll_create(cycle->connection_n / 2);

if (ep == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"epoll_create() failed");
return NGX_ERROR;
}
}
//nevents也是epoll模块的全局变量,初始化为0
if (nevents < epcf->events) {
if (event_list) {
ngx_free(event_list);
}
//event_list存储产生时间的数组
event_list = ngx_alloc(sizeof(struct epoll_event) * epcf->events,
cycle->log);
if (event_list == NULL) {
return NGX_ERROR;
}
}

nevents = epcf->events;
/*初始化全局变量ngx_io,ngx_os_io定义为:
ngx_os_io_t ngx_os_io = {
ngx_unix_recv,
ngx_readv_chain,
ngx_udp_unix_recv,
ngx_unix_send,
ngx_writev_chain,
0
};(src/os/unix/ngx_posix_init.c)
*/
ngx_io = ngx_os_io;
//这里之前讲过
ngx_event_actions = ngx_epoll_module_ctx.actions;

#if (NGX_HAVE_CLEAR_EVENT)
//实现边沿触发
ngx_event_flags = NGX_USE_CLEAR_EVENT
#else
//实现水平出发
ngx_event_flags = NGX_USE_LEVEL_EVENT
#endif
|NGX_USE_GREEDY_EVENT  //io是知道收到DAGAIN为止
|NGX_USE_EPOLL_EVENT;  //epoll标志

return NGX_OK;
}


4.下面观察下主要的函数ngx_epoll_process_events:

src/event/modules/ngx_epoll_module.c

static ngx_int_t
ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
{
int                events;
uint32_t           revents;
ngx_int_t          instance, i;
ngx_uint_t         level;
ngx_err_t          err;
ngx_event_t       *rev, *wev, **queue;
ngx_connection_t  *c;

/* NGX_TIMER_INFINITE == INFTIM */

ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll timer: %M", timer);
//一开支就等待时间,最长等待时间为timer,这里的timer下一节会详细讲解
events = epoll_wait(ep, event_list, (int) nevents, timer);

err = (events == -1) ? ngx_errno : 0;

if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
//执行一次时间更新,nginx将时间缓存到一组全局变量中,方便程序高效获取事件
ngx_time_update();
}
//wait出错
if (err) {
if (err == NGX_EINTR) {

if (ngx_event_timer_alarm) {
ngx_event_timer_alarm = 0;
return NGX_OK;
}

level = NGX_LOG_INFO;

} else {
level = NGX_LOG_ALERT;
}

ngx_log_error(level, cycle->log, err, "epoll_wait() failed");
return NGX_ERROR;
}
//wait返回事件数0,可能是timeout返回,如果不是timeout返回,那么就是error
if (events == 0) {
//这里限定timer不是无线超时
if (timer != NGX_TIMER_INFINITE) {
return NGX_OK;
}

ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
"epoll_wait() returned no events without timeout");
return NGX_ERROR;
}
//ngx_posted_events_mutex上锁
ngx_mutex_lock(ngx_posted_events_mutex);
//
for (i = 0; i < events; i++) {
c = event_list[i].data.ptr;

instance = (uintptr_t) c & 1;
//从发生的epoll事件对象中取得ngx_connection_t对象
c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);
//取出读事件
rev = c->read;
//....
//取得发生的时间
revents = event_list[i].events;
//记录wait的错误返回状态
if (revents & (EPOLLERR|EPOLLHUP)) {
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll_wait() error on fd:%d ev:%04XD",
c->fd, revents);
}

if ((revents & (EPOLLERR|EPOLLHUP))
&& (revents & (EPOLLIN|EPOLLOUT)) == 0)
{
/*
* if the error events were returned without EPOLLIN or EPOLLOUT,
* then add these flags to handle the events at least in one
* active handler
*/

revents |= EPOLLIN|EPOLLOUT;
}
//读取一个事件,并且该连接上注册的读时间是active的
if ((revents & EPOLLIN) && rev->active) {
if ((flags & NGX_POST_THREAD_EVENTS) && !rev->accept) {
rev->posted_ready = 1;

} else {
rev->ready = 1;
}
//时间放入相应的队列中;之前有说过nginx是先放入队列,释放锁之后再做处理
if (flags & NGX_POST_EVENTS) {
queue = (ngx_event_t **) (rev->accept ?
&ngx_posted_accept_events : &ngx_posted_events);
/*
这里根据accept状态
如果accept为真:加入到ngx_posted_accept_events事件队列中
如果accept为假:加入到ngx_posted_events事件队列中
*/
ngx_locked_post_event(rev, queue);//加入到ngx_posted_accept_events队里面

} else {
rev->handler(rev);//调用读事件处理函数,通常就是读取事件了
}
}
//取出写事件
wev = c->write;
//如果是写事件并且active状态
if ((revents & EPOLLOUT) && wev->active) {

if (c->fd == -1 || wev->instance != instance) {

/*
* the stale event from a file descriptor
* that was just closed in this iteration
*/

ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll: stale event %p", c);
continue;
}

if (flags & NGX_POST_THREAD_EVENTS) {
//写事件设为posted_ready状态
wev->posted_ready = 1;

} else {
写事件设为ready状态
wev->ready = 1;
}

if (flags & NGX_POST_EVENTS) {
//不立即处理,要时间排队,加入队列
ngx_locked_post_event(wev, &ngx_posted_events);

} else {
//调用写事件处理函数,通常就是写入数据
wev->handler(wev);
}
}
}
//ngx_mutex_unlock解锁
ngx_mutex_unlock(ngx_posted_events_mutex);

return NGX_OK;
}
//其中用到了ngx_locked_post_event()这个宏,它把事件放到事件队列的头部。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐