您的位置:首页 > 运维架构 > Nginx

nginx 源码学习笔记(二十一)—— event 模块(二) ——事件驱动核心ngx_process_events_and_timers

2015-12-04 09:45 1051 查看
首先继续回忆下,之前子线程执行操作里面有一个未涉及的内容ngx_process_events_and_timers,今天我们就来研究下这个函数。

本篇文章来自于:http://blog.csdn.net/lengzijian/article/details/7601730

先来看一下第十九节的部分截图:



今天主要讲解的就是事件驱动函数,图中的红色部分:

[cpp] view
plaincopyprint?

src/event/ngx_event.c  

  

void  

ngx_process_events_and_timers(ngx_cycle_t *cycle)  

{  

    ngx_uint_t  flags;  

    ngx_msec_t  timer, delta;  

  

    if (ngx_timer_resolution) {  

        timer = NGX_TIMER_INFINITE;  

        flags = 0;  

  

    } else {  

        timer = ngx_event_find_timer();  

        flags = NGX_UPDATE_TIME;  

    }  

      

    /* 

    ngx_use_accept_mutex变量代表是否使用accept互斥体 

    默认是使用,可以通过accept_mutex off;指令关闭; 

    accept mutex 的作用就是避免惊群,同时实现负载均衡 

    */  

    if (ngx_use_accept_mutex) {  

          

        /* 

        ngx_accept_disabled变量在ngx_event_accept函数中计算。 

        如果ngx_accept_disabled大于0,就表示该进程接受的链接过多, 

        因此放弃一次争抢accept mutex的机会,同时将自己减一。 

        然后,继续处理已有连接上的事件。 

        nginx就利用这一点实现了继承关于连接的基本负载均衡。 

        */  

        if (ngx_accept_disabled > 0) {  

            ngx_accept_disabled--;  

  

        } else {  

            /* 

            尝试锁accept mutex,只有成功获取锁的进程,才会将listen套接字放到epoll中。 

            因此,这就保证了只有一个进程拥有监听套接口,故所有进程阻塞在epoll_wait时, 

            才不会惊群现象。 

            */  

            if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {  

                return;  

            }  

  

            if (ngx_accept_mutex_held) {  

                /* 

                如果进程获得了锁,将添加一个 NGX_POST_EVENTS 标志。 

                这个标志的作用是将所有产生的事件放入一个队列中,等释放后,在慢慢来处理事件。 

                因为,处理时间可能会很耗时,如果不先施放锁再处理的话,该进程就长时间霸占了锁, 

                导致其他进程无法获取锁,这样accept的效率就低了。 

                */  

                flags |= NGX_POST_EVENTS;  

  

            } else {  

                /* 

                没有获得所得进程,当然不需要NGX_POST_EVENTS标志。 

                但需要设置延时多长时间,再去争抢锁。 

                */  

                if (timer == NGX_TIMER_INFINITE  

                    || timer > ngx_accept_mutex_delay)  

                {  

                    timer = ngx_accept_mutex_delay;  

                }  

            }  

        }  

    }  

  

    delta = ngx_current_msec;  

      

    /*接下来,epoll要开始wait事件, 

    ngx_process_events的具体实现是对应到epoll模块中的ngx_epoll_process_events函数 

    这里之后会详细讲解的哦 

    */  

    (void) ngx_process_events(cycle, timer, flags);  

    //统计本次wait事件的耗时  

    delta = ngx_current_msec - delta;  

  

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,  

                   "timer delta: %M", delta);  

  

    /* 

    ngx_posted_accept_events是一个事件队列,暂存epoll从监听套接口wait到的accept事件。 

    前文提到的NGX_POST_EVENTS标志被使用后,会将所有的accept事件暂存到这个队列 

    */  

    if (ngx_posted_accept_events) {  

        ngx_event_process_posted(cycle, &ngx_posted_accept_events);  

    }  

    //所有accept事件处理完之后,如果持有锁的话,就释放掉。  

    if (ngx_accept_mutex_held) {  

        ngx_shmtx_unlock(&ngx_accept_mutex);  

    }  

      

    /* 

    delta是之前统计的耗时,存在毫秒级的耗时,就对所有时间的timer进行检查, 

    如果timeout 就从time rbtree中删除到期的timer,同时调用相应事件的handler函数处理 

    */  

    if (delta) {  

        ngx_event_expire_timers();  

    }  

  

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,  

                   "posted events %p", ngx_posted_events);  

  

    /* 

    处理普通事件(连接上获得的读写事件), 

    因为每个事件都有自己的handler方法, 

    */  

    if (ngx_posted_events) {  

        if (ngx_threaded) {  

            ngx_wakeup_worker_thread(cycle);  

  

        } else {  

            ngx_event_process_posted(cycle, &ngx_posted_events);  

        }  

    }  

}  

之前有说过accept事件,其实他就是监听套接口上是否有新来的事件,下面介绍下accept时间的handler方法:

ngx_event_accept:

[cpp] view
plaincopyprint?

src/event/ngx_event_accept.c  

  

void  

ngx_event_accept(ngx_event_t *ev)  

{  

    socklen_t          socklen;  

    ngx_err_t          err;  

    ngx_log_t         *log;  

    ngx_socket_t       s;  

    ngx_event_t       *rev, *wev;  

    ngx_listening_t   *ls;  

    ngx_connection_t  *c, *lc;  

    ngx_event_conf_t  *ecf;  

    u_char             sa[NGX_SOCKADDRLEN];  

      

    //省略部分代码  

  

    lc = ev->data;  

    ls = lc->listening;  

    ev->ready = 0;  

  

    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,  

                   "accept on %V, ready: %d", &ls->addr_text, ev->available);  

  

    do {  

        socklen = NGX_SOCKADDRLEN;  

        //accept一个新的连接  

        s = accept(lc->fd, (struct sockaddr *) sa, &socklen);  

        //省略部分代码  

          

        /* 

        accept到一个新的连接后,就重新计算ngx_accept_disabled的值, 

        它主要是用来做负载均衡,之前有提过。 

         

        这里,我们可以看到他的就只方式 

        “总连接数的八分之一   -   剩余的连接数“ 

        总连接指每个进程设定的最大连接数,这个数字可以再配置文件中指定。 

         

        所以每个进程到总连接数的7/8后,ngx_accept_disabled就大于零,连接超载了 

         

        */  

  

        ngx_accept_disabled = ngx_cycle->connection_n / 8  

                              - ngx_cycle->free_connection_n;  

          

        //获取一个connection  

        c = ngx_get_connection(s, ev->log);  

  

        //为新的链接创建起一个memory pool  

        //连接关闭的时候,才释放pool  

  

        c->pool = ngx_create_pool(ls->pool_size, ev->log);  

        if (c->pool == NULL) {  

            ngx_close_accepted_connection(c);  

            return;  

        }  

  

        c->sockaddr = ngx_palloc(c->pool, socklen);  

        if (c->sockaddr == NULL) {  

            ngx_close_accepted_connection(c);  

            return;  

        }  

  

        ngx_memcpy(c->sockaddr, sa, socklen);  

  

        log = ngx_palloc(c->pool, sizeof(ngx_log_t));  

        if (log == NULL) {  

            ngx_close_accepted_connection(c);  

            return;  

        }  

  

        /* set a blocking mode for aio and non-blocking mode for others */  

  

        if (ngx_inherited_nonblocking) {  

            if (ngx_event_flags & NGX_USE_AIO_EVENT) {  

                if (ngx_blocking(s) == -1) {  

                    ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,  

                                  ngx_blocking_n " failed");  

                    ngx_close_accepted_connection(c);  

                    return;  

                }  

            }  

  

        } else {  

            //我们使用epoll模型,这里我们设置连接为nonblocking  

            if (!(ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_RTSIG_EVENT))) {  

                if (ngx_nonblocking(s) == -1) {  

                    ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,  

                                  ngx_nonblocking_n " failed");  

                    ngx_close_accepted_connection(c);  

                    return;  

                }  

            }  

        }  

  

        *log = ls->log;  

        //初始化新的连接  

        c->recv = ngx_recv;  

        c->send = ngx_send;  

        c->recv_chain = ngx_recv_chain;  

        c->send_chain = ngx_send_chain;  

  

        c->log = log;  

        c->pool->log = log;  

  

        c->socklen = socklen;  

        c->listening = ls;  

        c->local_sockaddr = ls->sockaddr;  

  

        c->unexpected_eof = 1;  

  

#if (NGX_HAVE_UNIX_DOMAIN)  

        if (c->sockaddr->sa_family == AF_UNIX) {  

            c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED;  

            c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED;  

#if (NGX_SOLARIS)  

            /* Solaris's sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */  

            c->sendfile = 0;  

#endif  

        }  

#endif  

  

        rev = c->read;  

        wev = c->write;  

  

        wev->ready = 1;  

  

        if (ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_RTSIG_EVENT)) {  

            /* rtsig, aio, iocp */  

            rev->ready = 1;  

        }  

  

        if (ev->deferred_accept) {  

            rev->ready = 1;  

#if (NGX_HAVE_KQUEUE)  

            rev->available = 1;  

#endif  

        }  

  

        rev->log = log;  

        wev->log = log;  

  

        /* 

         * TODO: MT: - ngx_atomic_fetch_add() 

         *             or protection by critical section or light mutex 

         * 

         * TODO: MP: - allocated in a shared memory 

         *           - ngx_atomic_fetch_add() 

         *             or protection by critical section or light mutex 

         */  

  

        c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);  

          

        if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) {  

            if (ngx_add_conn(c) == NGX_ERROR) {  

                ngx_close_accepted_connection(c);  

                return;  

            }  

        }  

  

        log->data = NULL;  

        log->handler = NULL;  

          

        /* 

        这里listen handler很重要,它将完成新连接的最后初始化工作, 

        同时将accept到的新的连接放入epoll中;挂在这个handler上的函数, 

        就是ngx_http_init_connection 在之后http模块中在详细介绍 

        */  

        ls->handler(c);  

  

        if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {  

            ev->available--;  

        }  

  

    } while (ev->available);  

}  

accpt事件的handler方法也就是如此了。之后就是每个连接的读写事件handler方法,这一部分会直接将我们引入http模块,我们还不急,还要学习下nginx经典模块epoll。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  nginx