您的位置:首页 > 运维架构 > Nginx

nginx事件模块之ngx_epoll_module源码分析

2018-02-09 15:10 981 查看
ngx_epoll_module是nginx众多事件模块的其中一个,它利用linux的epoll模型实现nginx事件框架所定义的事件模块接口。运行在linux系统上的nginx默认使用该模块作为事件框架的底层实现。
ngx_epoll_module主要是实现了ngx_event.h中规定的事件模块的接口,即实现了ngx_event_module_t中定义的那组回调函数,如下代码片段所示:
static ngx_event_module_t ngx_epoll_module_ctx = {
&epoll_name, /* epoll */
ngx_epoll_create_conf, /* create configuration */
ngx_epoll_init_conf, /* init configuration */

{
ngx_epoll_add_event, /* add an event */
ngx_epoll_del_event, /* delete an event */
ngx_epoll_add_event, /* enable an event */
ngx_epoll_del_event, /* disable an event */
ngx_epoll_add_connection, /* add an connection */
ngx_epoll_del_connection, /* delete an connection */
#if (NGX_HAVE_EVENTFD)
ngx_epoll_notify, /* trigger a notify */
#else
NULL, /* trigger a notify */
#endif
ngx_epoll_process_events, /* process the events */
ngx_epoll_init, /* init the events */
ngx_epoll_done, /* done the events */
}
};模块一共提供了两个指令,epoll_events和worker_aio_requests,分别对应于一次epoll_wait允许处理的最大事件数和异步请求数,它们的默认值分别是512和32。
ngx_epoll_create_conf和ngx_epoll_init_conf用于创建和初始化模块配置,这里略过,重点关注ngx_epoll_init,ngx_epoll_add_event,ngx_epoll_del_event,ngx_epoll_process_events以及ngx_epoll_add_connection和ngx_epoll_del_connection。
ngx_epoll_init

在所有接口中,ngx_epoll_init是最先被调用的接口,因为它的工作是为模块的运行创建和配置环境。ngx_epoll_init接口在worker进程启动过程中,开始提供服务前被调用。它只会被调用一次
static ngx_int_t
ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer)
{
ngx_epoll_conf_t *epcf;

epcf = ngx_event_get_conf(cycle->conf_ctx, ngx_epoll_module);

/* 创建epoll文件 */
if (ep == -1) {
ep = epoll_create(cycle->connection_n / 2);

if (ep == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"epoll_create() failed");
return NGX_ERROR;
}

/* 事件通知环境初始化 */
#if (NGX_HAVE_EVENTFD)
if (ngx_epoll_notify_init(cycle->log) != NGX_OK) {
ngx_epoll_module_ctx.actions.notify = NULL;
}
#endif

/* 文件异步IO初始化 */
#if (NGX_HAVE_FILE_AIO)
ngx_epoll_aio_init(cycle, epcf);
#endif

/* 测试是否支持EPOLLRDHUP */
#if (NGX_HAVE_EPOLLRDHUP)
ngx_epoll_test_rdhup(cycle);
#endif
}

/* 创建epoll事件数组,数组大小与配置的events一致,默认是512 */
if (nevents < epcf->events) {
if (event_list) {
ngx_free(event_list);
}

event_list = ngx_alloc(sizeof(struct epoll_event) * epcf->events,
cycle->log);
if (event_list == NULL) {
return NGX_ERROR;
}
}

nevents = epcf->events;

/* 设置ngx_io为ngx_os_io。
* 其它模块进行网络io的时候调用的是
* ngx_io中的接口,因此在worker进程开始工作前
* 就必须先设置好,实际上其它事件模块也是这么做的
* /
ngx_io = ngx_os_io;

/* 这一步很关键。nginx事件框架和外部接口调用的是ngx_event_actions
* 中定义的接口,虽然在ngx_event_core_module的
* ngx_event_core_init_conf函数中已经选定了要使用的事件模块,
* 但是并没有将ngx_event_actions与具体的事件模块进行映射,不能
* 正常工作,所以这里将ngx_event_actions映射到epoll模块
* 的接口上。通过这样的方式,调用ngx_event_actions的接口实际上就是
* 调用epoll模块的接口。
* /
ngx_event_actions = ngx_epoll_module_ctx.actions;

#if (NGX_HAVE_CLEAR_EVENT)
/* epoll边缘触发 */
ngx_event_flags = NGX_USE_CLEAR_EVENT
#else
/* epoll水平触发 */
ngx_event_flags = NGX_USE_LEVEL_EVENT
#endif
/* 这两个标志在别处会用到,所以需要设置 */
|NGX_USE_GREEDY_EVENT
|NGX_USE_EPOLL_EVENT;

return NGX_OK;
}

ngx_epoll_add_event

ngx_epoll_add_event用于向事件框架中添加事件,核心是使用epoll_ctl实现
static ngx_int_t
ngx_epoll_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)
{
int op;
uint32_t events, prev;
ngx_event_t *e;
ngx_connection_t *c;
struct epoll_event ee;

/* 除了个别场景,大部分情况下事件的data都指向所对应的连接 */
c = ev->data;

events = (uint32_t) event;

if (event == NGX_READ_EVENT) {
e = c->write;
prev = EPOLLOUT;
#if (NGX_READ_EVENT != EPOLLIN|EPOLLRDHUP)
events = EPOLLIN|EPOLLRDHUP;
#endif

} else {
e = c->read;
prev = EPOLLIN|EPOLLRDHUP;
#if (NGX_WRITE_EVENT != EPOLLOUT)
events = EPOLLOUT;
#endif
}

if (e->active) {
op = EPOLL_CTL_MOD;
events |= prev;

} else {
op = EPOLL_CTL_ADD;
}

#if (NGX_HAVE_EPOLLEXCLUSIVE && NGX_HAVE_EPOLLRDHUP)
if (flags & NGX_EXCLUSIVE_EVENT) {
events &= ~EPOLLRDHUP;
}
#endif

ee.events = events | (uint32_t) flags;
ee.data.ptr = (void *) ((uintptr_t) c | ev->instance);

ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0,
"epoll add event: fd:%d op:%d ev:%08XD",
c->fd, op, ee.events);

if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
"epoll_ctl(%d, %d) failed", op, c->fd);
return NGX_ERROR;
}

ev->active = 1;
#if 0
ev->oneshot = (flags & NGX_ONESHOT_EVENT) ? 1 : 0;
#endif

return NGX_OK;
}

ngx_epoll_del_event

ngx_epoll_del_event用于删除事件,同样也是使用epoll_ctl实现

ngx_epoll_process_events

该函数是处理事件的核心所在,在使用epoll作为IO多路复用模型的情况下,ngx_process_events_and_timers函数中调用的ngx_process_events实际上就是这个函数。static ngx_int_t
ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
{
int events;
uint32_t revents;
ngx_int_t instance, i;
ngx_uint_t level;
ngx_err_t err;
ngx_event_t *rev, *wev;
ngx_queue_t *queue;
ngx_connection_t *c;

/* NGX_TIMER_INFINITE == INFTIM */

ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll timer: %M", timer);

events = epoll_wait(ep, event_list, (int) nevents, timer);

err = (events == -1) ? ngx_errno : 0;

/* 如果设置了NGX_UPDATE_TIME或者设置了时间精度并且触发了事件精度事件,
* 就更新缓存时间 */
if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
ngx_time_update();
}

/* epoll_wait调用失败,如果系统错误码是EINTR,说明是被优先级更高的系统调
* 用打断(例如接收到信号),这种情况下不视为错误,其它情况下视为
*epoll_wait失败 */
if (err) {
if (err == NGX_EINTR) {

if (ngx_event_timer_alarm) {
ngx_event_timer_alarm = 0;
return NGX_OK;
}

level = NGX_LOG_INFO;

} else {
level = NGX_LOG_ALERT;
}

ngx_log_error(level, cycle->log, err, "epoll_wait() failed");
return NGX_ERROR;
}

/* events为0,说明没有事件发生。如果timer是-1,epoll_wait在没有
* 事件发生时不会返回,这说明在某个地方出现了问题 */
if (events == 0) {
if (timer != NGX_TIMER_INFINITE) {
return NGX_OK;
}

ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
"epoll_wait() returned no events without timeout");
return NGX_ERROR;
}

for (i = 0; i < events; i++) {
c = event_list[i].data.ptr;

instance = (uintptr_t) c & 1;
c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);

rev = c->read;

/* 如果fd是-1,或者instance与连接的instance标志不一致,
* 说明在这个时刻,该读事件已经是过期事件,不应该再处理。
if (c->fd == -1 || rev->instance != instance) {

/*
* the stale event from a file descriptor
* that was just closed in this iteration
*/

ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll: stale event %p", c);
continue;
}

revents = event_list[i].events;

ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll: fd:%d ev:%04XD d:%p",
c->fd, revents, event_list[i].data.ptr);

if (revents & (EPOLLERR|EPOLLHUP)) {
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll_wait() error on fd:%d ev:%04XD",
c->fd, revents);

/*
* if the error events were returned, add EPOLLIN and EPOLLOUT
* to handle the events at least in one active handler
*/

revents |= EPOLLIN|EPOLLOUT;
}

#if 0
if (revents & ~(EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP)) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
"strange epoll_wait() events fd:%d ev:%04XD",
c->fd, revents);
}
#endif

if ((revents & EPOLLIN) && rev->active) {

#if (NGX_HAVE_EPOLLRDHUP)
if (revents & EPOLLRDHUP) {
rev->pending_eof = 1;
}

rev->available = 1;
#endif

rev->ready = 1;

/* 如果设置了NGX_POST_EVENTS,说明该事件要延后处理。
* 如果是accept事件,就把事件放到accept_post队列中,否则
* 放到普通的post队列中
if (flags & NGX_POST_EVENTS) {
queue = rev->accept ? &ngx_posted_accept_events
: &ngx_posted_events;

ngx_post_event(rev, queue);

} else {
/* 不需要延后处理,立即调用handler */
rev->handler(rev);
}
}

wev = c->write;

if ((revents & EPOLLOUT) && wev->active) {

/* 如果fd是-1,或者instance与连接的instance标志不一致,
* 说明在这个时刻,该写事件已经是过期事件,不应该再处理。
if (c->fd == -1 || wev->instance != instance) {

/*
* the stale event from a file descriptor
* that was just closed in this iteration
*/

ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll: stale event %p", c);
continue;
}

wev->ready = 1;
#if (NGX_THREADS)
wev->complete = 1;
#endif
/* 如果设置了NGX_POST_EVENTS,说明该事件要延后处理,
* 将其放到post事件队列中 */
if (flags & NGX_POST_EVENTS) {
ngx_post_event(wev, &ngx_posted_events);

} else {
/* 不需要延后,立即调用handler处理 */
wev->handler(wev);
}
}
}

return NGX_OK;
}
ngx_epoll_add_connection
添加连接,相当于把连接相关的事件及其触发方式(读写,对端关闭,边缘触发)一次性注册,并且设置读写事件为激活状态,代码如下
static ngx_int_t
ngx_epoll_add_connection(ngx_connection_t *c)
{
struct epoll_event ee;

ee.events = EPOLLIN|EPOLLOUT|EPOLLET|EPOLLRDHUP;
ee.data.ptr = (void *) ((uintptr_t) c | c->read->instance);

ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
"epoll add connection: fd:%d ev:%08XD", c->fd, ee.events);

if (epoll_ctl(ep, EPOLL_CTL_ADD, c->fd, &ee) == -1) {
ngx_log_error(NGX_LOG_ALERT, c->log, ngx_errno,
"epoll_ctl(EPOLL_CTL_ADD, %d) failed", c->fd);
return NGX_ERROR;
}

c->read->active = 1;
c->write->active = 1;

return NGX_OK;
}

ngx_epoll_del_connection

删除连接。把连接相关的事件从框架中删除,并且把连接的读写事件设置为非激活static ngx_int_t
ngx_epoll_del_connection(ngx_connection_t *c, ngx_uint_t flags)
{
int op;
struct epoll_event ee;

/*
* when the file descriptor is closed the epoll automatically deletes
* it from its queue so we do not need to delete explicitly the event
* before the closing the file descriptor
*/

if (flags & NGX_CLOSE_EVENT) {
c->read->active = 0;
c->write->active = 0;
return NGX_OK;
}

ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
"epoll del connection: fd:%d", c->fd);

op = EPOLL_CTL_DEL;
ee.events = 0;
ee.data.ptr = NULL;

if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
ngx_log_error(NGX_LOG_ALERT, c->log, ngx_errno,
"epoll_ctl(%d, %d) failed", op, c->fd);
return NGX_ERROR;
}

c->read->active = 0;
c->write->active = 0;

return NGX_OK;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: