nginx 源码学习笔记(二十一)—— event 模块(二) ——事件驱动核心ngx_process_events_and_timers
2015-12-04 09:45
1051 查看
首先继续回忆下,之前子线程执行操作里面有一个未涉及的内容ngx_process_events_and_timers,今天我们就来研究下这个函数。
本篇文章来自于:http://blog.csdn.net/lengzijian/article/details/7601730
先来看一下第十九节的部分截图:
今天主要讲解的就是事件驱动函数,图中的红色部分:
[cpp] view
plaincopyprint?
src/event/ngx_event.c
void
ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
ngx_uint_t flags;
ngx_msec_t timer, delta;
if (ngx_timer_resolution) {
timer = NGX_TIMER_INFINITE;
flags = 0;
} else {
timer = ngx_event_find_timer();
flags = NGX_UPDATE_TIME;
}
/*
ngx_use_accept_mutex变量代表是否使用accept互斥体
默认是使用,可以通过accept_mutex off;指令关闭;
accept mutex 的作用就是避免惊群,同时实现负载均衡
*/
if (ngx_use_accept_mutex) {
/*
ngx_accept_disabled变量在ngx_event_accept函数中计算。
如果ngx_accept_disabled大于0,就表示该进程接受的链接过多,
因此放弃一次争抢accept mutex的机会,同时将自己减一。
然后,继续处理已有连接上的事件。
nginx就利用这一点实现了继承关于连接的基本负载均衡。
*/
if (ngx_accept_disabled > 0) {
ngx_accept_disabled--;
} else {
/*
尝试锁accept mutex,只有成功获取锁的进程,才会将listen套接字放到epoll中。
因此,这就保证了只有一个进程拥有监听套接口,故所有进程阻塞在epoll_wait时,
才不会惊群现象。
*/
if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
return;
}
if (ngx_accept_mutex_held) {
/*
如果进程获得了锁,将添加一个 NGX_POST_EVENTS 标志。
这个标志的作用是将所有产生的事件放入一个队列中,等释放后,在慢慢来处理事件。
因为,处理时间可能会很耗时,如果不先施放锁再处理的话,该进程就长时间霸占了锁,
导致其他进程无法获取锁,这样accept的效率就低了。
*/
flags |= NGX_POST_EVENTS;
} else {
/*
没有获得所得进程,当然不需要NGX_POST_EVENTS标志。
但需要设置延时多长时间,再去争抢锁。
*/
if (timer == NGX_TIMER_INFINITE
|| timer > ngx_accept_mutex_delay)
{
timer = ngx_accept_mutex_delay;
}
}
}
}
delta = ngx_current_msec;
/*接下来,epoll要开始wait事件,
ngx_process_events的具体实现是对应到epoll模块中的ngx_epoll_process_events函数
这里之后会详细讲解的哦
*/
(void) ngx_process_events(cycle, timer, flags);
//统计本次wait事件的耗时
delta = ngx_current_msec - delta;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"timer delta: %M", delta);
/*
ngx_posted_accept_events是一个事件队列,暂存epoll从监听套接口wait到的accept事件。
前文提到的NGX_POST_EVENTS标志被使用后,会将所有的accept事件暂存到这个队列
*/
if (ngx_posted_accept_events) {
ngx_event_process_posted(cycle, &ngx_posted_accept_events);
}
//所有accept事件处理完之后,如果持有锁的话,就释放掉。
if (ngx_accept_mutex_held) {
ngx_shmtx_unlock(&ngx_accept_mutex);
}
/*
delta是之前统计的耗时,存在毫秒级的耗时,就对所有时间的timer进行检查,
如果timeout 就从time rbtree中删除到期的timer,同时调用相应事件的handler函数处理
*/
if (delta) {
ngx_event_expire_timers();
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"posted events %p", ngx_posted_events);
/*
处理普通事件(连接上获得的读写事件),
因为每个事件都有自己的handler方法,
*/
if (ngx_posted_events) {
if (ngx_threaded) {
ngx_wakeup_worker_thread(cycle);
} else {
ngx_event_process_posted(cycle, &ngx_posted_events);
}
}
}
之前有说过accept事件,其实他就是监听套接口上是否有新来的事件,下面介绍下accept时间的handler方法:
ngx_event_accept:
[cpp] view
plaincopyprint?
src/event/ngx_event_accept.c
void
ngx_event_accept(ngx_event_t *ev)
{
socklen_t socklen;
ngx_err_t err;
ngx_log_t *log;
ngx_socket_t s;
ngx_event_t *rev, *wev;
ngx_listening_t *ls;
ngx_connection_t *c, *lc;
ngx_event_conf_t *ecf;
u_char sa[NGX_SOCKADDRLEN];
//省略部分代码
lc = ev->data;
ls = lc->listening;
ev->ready = 0;
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
"accept on %V, ready: %d", &ls->addr_text, ev->available);
do {
socklen = NGX_SOCKADDRLEN;
//accept一个新的连接
s = accept(lc->fd, (struct sockaddr *) sa, &socklen);
//省略部分代码
/*
accept到一个新的连接后,就重新计算ngx_accept_disabled的值,
它主要是用来做负载均衡,之前有提过。
这里,我们可以看到他的就只方式
“总连接数的八分之一 - 剩余的连接数“
总连接指每个进程设定的最大连接数,这个数字可以再配置文件中指定。
所以每个进程到总连接数的7/8后,ngx_accept_disabled就大于零,连接超载了
*/
ngx_accept_disabled = ngx_cycle->connection_n / 8
- ngx_cycle->free_connection_n;
//获取一个connection
c = ngx_get_connection(s, ev->log);
//为新的链接创建起一个memory pool
//连接关闭的时候,才释放pool
c->pool = ngx_create_pool(ls->pool_size, ev->log);
if (c->pool == NULL) {
ngx_close_accepted_connection(c);
return;
}
c->sockaddr = ngx_palloc(c->pool, socklen);
if (c->sockaddr == NULL) {
ngx_close_accepted_connection(c);
return;
}
ngx_memcpy(c->sockaddr, sa, socklen);
log = ngx_palloc(c->pool, sizeof(ngx_log_t));
if (log == NULL) {
ngx_close_accepted_connection(c);
return;
}
/* set a blocking mode for aio and non-blocking mode for others */
if (ngx_inherited_nonblocking) {
if (ngx_event_flags & NGX_USE_AIO_EVENT) {
if (ngx_blocking(s) == -1) {
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
ngx_blocking_n " failed");
ngx_close_accepted_connection(c);
return;
}
}
} else {
//我们使用epoll模型,这里我们设置连接为nonblocking
if (!(ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_RTSIG_EVENT))) {
if (ngx_nonblocking(s) == -1) {
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
ngx_nonblocking_n " failed");
ngx_close_accepted_connection(c);
return;
}
}
}
*log = ls->log;
//初始化新的连接
c->recv = ngx_recv;
c->send = ngx_send;
c->recv_chain = ngx_recv_chain;
c->send_chain = ngx_send_chain;
c->log = log;
c->pool->log = log;
c->socklen = socklen;
c->listening = ls;
c->local_sockaddr = ls->sockaddr;
c->unexpected_eof = 1;
#if (NGX_HAVE_UNIX_DOMAIN)
if (c->sockaddr->sa_family == AF_UNIX) {
c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED;
c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED;
#if (NGX_SOLARIS)
/* Solaris's sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */
c->sendfile = 0;
#endif
}
#endif
rev = c->read;
wev = c->write;
wev->ready = 1;
if (ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_RTSIG_EVENT)) {
/* rtsig, aio, iocp */
rev->ready = 1;
}
if (ev->deferred_accept) {
rev->ready = 1;
#if (NGX_HAVE_KQUEUE)
rev->available = 1;
#endif
}
rev->log = log;
wev->log = log;
/*
* TODO: MT: - ngx_atomic_fetch_add()
* or protection by critical section or light mutex
*
* TODO: MP: - allocated in a shared memory
* - ngx_atomic_fetch_add()
* or protection by critical section or light mutex
*/
c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) {
if (ngx_add_conn(c) == NGX_ERROR) {
ngx_close_accepted_connection(c);
return;
}
}
log->data = NULL;
log->handler = NULL;
/*
这里listen handler很重要,它将完成新连接的最后初始化工作,
同时将accept到的新的连接放入epoll中;挂在这个handler上的函数,
就是ngx_http_init_connection 在之后http模块中在详细介绍
*/
ls->handler(c);
if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {
ev->available--;
}
} while (ev->available);
}
accpt事件的handler方法也就是如此了。之后就是每个连接的读写事件handler方法,这一部分会直接将我们引入http模块,我们还不急,还要学习下nginx经典模块epoll。
本篇文章来自于:http://blog.csdn.net/lengzijian/article/details/7601730
先来看一下第十九节的部分截图:
今天主要讲解的就是事件驱动函数,图中的红色部分:
[cpp] view
plaincopyprint?
src/event/ngx_event.c
void
ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
ngx_uint_t flags;
ngx_msec_t timer, delta;
if (ngx_timer_resolution) {
timer = NGX_TIMER_INFINITE;
flags = 0;
} else {
timer = ngx_event_find_timer();
flags = NGX_UPDATE_TIME;
}
/*
ngx_use_accept_mutex变量代表是否使用accept互斥体
默认是使用,可以通过accept_mutex off;指令关闭;
accept mutex 的作用就是避免惊群,同时实现负载均衡
*/
if (ngx_use_accept_mutex) {
/*
ngx_accept_disabled变量在ngx_event_accept函数中计算。
如果ngx_accept_disabled大于0,就表示该进程接受的链接过多,
因此放弃一次争抢accept mutex的机会,同时将自己减一。
然后,继续处理已有连接上的事件。
nginx就利用这一点实现了继承关于连接的基本负载均衡。
*/
if (ngx_accept_disabled > 0) {
ngx_accept_disabled--;
} else {
/*
尝试锁accept mutex,只有成功获取锁的进程,才会将listen套接字放到epoll中。
因此,这就保证了只有一个进程拥有监听套接口,故所有进程阻塞在epoll_wait时,
才不会惊群现象。
*/
if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
return;
}
if (ngx_accept_mutex_held) {
/*
如果进程获得了锁,将添加一个 NGX_POST_EVENTS 标志。
这个标志的作用是将所有产生的事件放入一个队列中,等释放后,在慢慢来处理事件。
因为,处理时间可能会很耗时,如果不先施放锁再处理的话,该进程就长时间霸占了锁,
导致其他进程无法获取锁,这样accept的效率就低了。
*/
flags |= NGX_POST_EVENTS;
} else {
/*
没有获得所得进程,当然不需要NGX_POST_EVENTS标志。
但需要设置延时多长时间,再去争抢锁。
*/
if (timer == NGX_TIMER_INFINITE
|| timer > ngx_accept_mutex_delay)
{
timer = ngx_accept_mutex_delay;
}
}
}
}
delta = ngx_current_msec;
/*接下来,epoll要开始wait事件,
ngx_process_events的具体实现是对应到epoll模块中的ngx_epoll_process_events函数
这里之后会详细讲解的哦
*/
(void) ngx_process_events(cycle, timer, flags);
//统计本次wait事件的耗时
delta = ngx_current_msec - delta;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"timer delta: %M", delta);
/*
ngx_posted_accept_events是一个事件队列,暂存epoll从监听套接口wait到的accept事件。
前文提到的NGX_POST_EVENTS标志被使用后,会将所有的accept事件暂存到这个队列
*/
if (ngx_posted_accept_events) {
ngx_event_process_posted(cycle, &ngx_posted_accept_events);
}
//所有accept事件处理完之后,如果持有锁的话,就释放掉。
if (ngx_accept_mutex_held) {
ngx_shmtx_unlock(&ngx_accept_mutex);
}
/*
delta是之前统计的耗时,存在毫秒级的耗时,就对所有时间的timer进行检查,
如果timeout 就从time rbtree中删除到期的timer,同时调用相应事件的handler函数处理
*/
if (delta) {
ngx_event_expire_timers();
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"posted events %p", ngx_posted_events);
/*
处理普通事件(连接上获得的读写事件),
因为每个事件都有自己的handler方法,
*/
if (ngx_posted_events) {
if (ngx_threaded) {
ngx_wakeup_worker_thread(cycle);
} else {
ngx_event_process_posted(cycle, &ngx_posted_events);
}
}
}
之前有说过accept事件,其实他就是监听套接口上是否有新来的事件,下面介绍下accept时间的handler方法:
ngx_event_accept:
[cpp] view
plaincopyprint?
src/event/ngx_event_accept.c
void
ngx_event_accept(ngx_event_t *ev)
{
socklen_t socklen;
ngx_err_t err;
ngx_log_t *log;
ngx_socket_t s;
ngx_event_t *rev, *wev;
ngx_listening_t *ls;
ngx_connection_t *c, *lc;
ngx_event_conf_t *ecf;
u_char sa[NGX_SOCKADDRLEN];
//省略部分代码
lc = ev->data;
ls = lc->listening;
ev->ready = 0;
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
"accept on %V, ready: %d", &ls->addr_text, ev->available);
do {
socklen = NGX_SOCKADDRLEN;
//accept一个新的连接
s = accept(lc->fd, (struct sockaddr *) sa, &socklen);
//省略部分代码
/*
accept到一个新的连接后,就重新计算ngx_accept_disabled的值,
它主要是用来做负载均衡,之前有提过。
这里,我们可以看到他的就只方式
“总连接数的八分之一 - 剩余的连接数“
总连接指每个进程设定的最大连接数,这个数字可以再配置文件中指定。
所以每个进程到总连接数的7/8后,ngx_accept_disabled就大于零,连接超载了
*/
ngx_accept_disabled = ngx_cycle->connection_n / 8
- ngx_cycle->free_connection_n;
//获取一个connection
c = ngx_get_connection(s, ev->log);
//为新的链接创建起一个memory pool
//连接关闭的时候,才释放pool
c->pool = ngx_create_pool(ls->pool_size, ev->log);
if (c->pool == NULL) {
ngx_close_accepted_connection(c);
return;
}
c->sockaddr = ngx_palloc(c->pool, socklen);
if (c->sockaddr == NULL) {
ngx_close_accepted_connection(c);
return;
}
ngx_memcpy(c->sockaddr, sa, socklen);
log = ngx_palloc(c->pool, sizeof(ngx_log_t));
if (log == NULL) {
ngx_close_accepted_connection(c);
return;
}
/* set a blocking mode for aio and non-blocking mode for others */
if (ngx_inherited_nonblocking) {
if (ngx_event_flags & NGX_USE_AIO_EVENT) {
if (ngx_blocking(s) == -1) {
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
ngx_blocking_n " failed");
ngx_close_accepted_connection(c);
return;
}
}
} else {
//我们使用epoll模型,这里我们设置连接为nonblocking
if (!(ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_RTSIG_EVENT))) {
if (ngx_nonblocking(s) == -1) {
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
ngx_nonblocking_n " failed");
ngx_close_accepted_connection(c);
return;
}
}
}
*log = ls->log;
//初始化新的连接
c->recv = ngx_recv;
c->send = ngx_send;
c->recv_chain = ngx_recv_chain;
c->send_chain = ngx_send_chain;
c->log = log;
c->pool->log = log;
c->socklen = socklen;
c->listening = ls;
c->local_sockaddr = ls->sockaddr;
c->unexpected_eof = 1;
#if (NGX_HAVE_UNIX_DOMAIN)
if (c->sockaddr->sa_family == AF_UNIX) {
c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED;
c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED;
#if (NGX_SOLARIS)
/* Solaris's sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */
c->sendfile = 0;
#endif
}
#endif
rev = c->read;
wev = c->write;
wev->ready = 1;
if (ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_RTSIG_EVENT)) {
/* rtsig, aio, iocp */
rev->ready = 1;
}
if (ev->deferred_accept) {
rev->ready = 1;
#if (NGX_HAVE_KQUEUE)
rev->available = 1;
#endif
}
rev->log = log;
wev->log = log;
/*
* TODO: MT: - ngx_atomic_fetch_add()
* or protection by critical section or light mutex
*
* TODO: MP: - allocated in a shared memory
* - ngx_atomic_fetch_add()
* or protection by critical section or light mutex
*/
c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) {
if (ngx_add_conn(c) == NGX_ERROR) {
ngx_close_accepted_connection(c);
return;
}
}
log->data = NULL;
log->handler = NULL;
/*
这里listen handler很重要,它将完成新连接的最后初始化工作,
同时将accept到的新的连接放入epoll中;挂在这个handler上的函数,
就是ngx_http_init_connection 在之后http模块中在详细介绍
*/
ls->handler(c);
if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {
ev->available--;
}
} while (ev->available);
}
accpt事件的handler方法也就是如此了。之后就是每个连接的读写事件handler方法,这一部分会直接将我们引入http模块,我们还不急,还要学习下nginx经典模块epoll。
相关文章推荐
- nginx代理指定目录
- 访问Nginx发生SSL connection error的一种情况
- Nginx+Naxsi部署专业级Web应用防火墙
- CentOS 6.2实战部署Nginx+MySQL+PHP
- nginx中http核心模块的配置指令2
- nginx中http核心模块的配置指令3
- nginx中http核心模块的配置指令4
- nginx中http的fastcgi模块的配置指令1
- Nginx 学习笔记(一)
- 网站502与504错误分析
- 艰难完成 nginx + puma 部署 rails 4的详细记录
- 把Lua编译进nginx步骤方法
- web 应用中常用的各种 cache详解
- Linux系统上配置Nginx+Ruby on Rails+MySQL超攻略
- window+nginx+php环境配置 附配置搭配说明
- 解析CI即CodeIgniter框架在Nginx下的重写规则
- 将PHP从5.3.28升级到5.3.29时Nginx出现502错误
- 基于Nginx0.8.54+PHP5.3.4+MySQL5.5.8的全新LNMP稳定版架构搭建的VPS
- Nginx(PHP/fastcgi)的PATH_INFO问题