您的位置:首页 > 运维架构 > Nginx

21.Nginx工作进程之处理函数

2017-11-08 16:44 232 查看
前面我们学习了Nginx事件循环的内容,其实就是为此篇学习worker进程的处理函数做铺垫的。

/* os/unix/ngx_channel.c */

/* 将通道的读/写事件添加到事件循环, 并注册相应的事件发生处理函数
* param cycle: ngx_cycle_t指针
* fd: 通道任意一侧的文件描述符
* event: 读/写事件类型
* handler: 事件处理函数
* return : NGX_ERROR/NGX_OK
*/
ngx_int_t ngx_add_channel_event(ngx_cycle_t *cycle, ngx_fd_t fd,
ngx_int_t event, ngx_event_handler_pt handler)
{
ngx_event_t *ev, *rev, *wev;
ngx_connection_t *c;

// 通过fd我们在连接池中找到其对应的连接
c = &cycle->connections[fd];
// 自然, 我们也可以在read_events和write_events中找到其对应的事件
rev = &cycle->read_events[fd];
wev = &cycle->write_events[fd];

// 对连接、读事件、写事件进行初始化置0
ngx_memzero(c, sizeof(ngx_connection_t));
ngx_memzero(rev, sizeof(ngx_event_t));
ngx_memzero(wev, sizeof(ngx_event_t));

// 记录连接c的fd和使用的内存池
c->fd = fd;
c->pool = cycle->pool;

// 记录连接c对应的读写事件
c->read = rev;
c->write = wev;
// 记录连接c、读事件rev、写事件wev的日志句柄
c->log = cycle->log;
rev->log = cycle->log;
wev->log = cycle->log;

rev->index = NGX_INVALID_INDEX;
wev->index = NGX_INVALID_INDEX;
// 记录读写事件对应的连接c
rev->data = c;
wev->data = c;

#if (NGX_THREADS)
// 线程化的情况下

// 读写事件的lock和own_lock锁都使用连接c的lock锁
rev->lock = &c->lock;
wev->lock = &c->lock;
rev->own_lock = &c->lock;
wev->own_lock = &c->lock;
#endif

// 根据指定的事件类型来找到对应的事件
ev = (event == NGX_READ_EVENT) ? rev : wev;

// 为事件注册处理函数
ev->event_handler = handler;

if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) {
// 如果ngx_add_conn宏指定的函数指针不为空且事件处理模型的标志不包含NGX_USE_EPOLL_EVENT即不使用epoll

// 调用ngx_add_conn来添加连接c
if (ngx_add_conn(c) == NGX_ERROR) {
return NGX_ERROR;
}

} else {
// 使用epoll的情况下,

// 调用ngx_add_event宏指向的函数来添加事件,
// 其实这里就是调用ngx_epoll_add_event函数,
// 而ngx_epoll_add_event则是使用epoll_ctl系统调用来向epoll句柄注册、删除和修改事件
if (ngx_add_event(ev, event, 0) == NGX_ERROR) {
return NGX_ERROR;
}
}

return NGX_OK;
}

/* os/unix/ngx_process_cycle.c */

/* 通道事件处理函数
param ev: 通道读/写事件
*/
static void ngx_channel_handler(ngx_event_t *ev)
{
ngx_int_t n;
ngx_channel_t ch;
ngx_connection_t *c;

c = ev->data;

ngx_log_debug0(NGX_LOG_DEBUG_CORE, ev->log, 0, "channel handler");

// 从通道读取来自master进程的指令
n = ngx_read_channel(c->fd, &ch, sizeof(ngx_channel_t), ev->log);

ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0, "channel: %d", n);

if (n <= 0) {
return;
}

ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0,
"channel command: %d", ch.command);

switch (ch.command) {
// 判断指令的类型, 然后修改相应的进程状态全局变量或者执行相应的操作

case NGX_CMD_QUIT:
// 置ngx_quit为1, 即worker进程优雅退出
ngx_quit = 1;
break;

case NGX_CMD_TERMINATE:
// 置ngx_terminate为1, 即worker进程终止
ngx_terminate = 1;
break;

case NGX_CMD_REOPEN:
// 置ngx_reopen为1, 即worker进程重新打开日志文件
ngx_reopen = 1;
break;

case NGX_CMD_OPEN_CHANNEL:
// 打开通道指令

// 通常由别的进程创建好通道, 然后通过"描述符传递"来传递通道一侧,
// 因此这里只需要记录通道一侧的fd即可
ngx_log_debug3(NGX_LOG_DEBUG_CORE, ev->log, 0,
"get channel s:%d pid:" PID_T_FMT " fd:%d",
ch.slot, ch.pid, ch.fd);

ngx_processes[ch.slot].pid = ch.pid;
ngx_processes[ch.slot].channel[0] = ch.fd;
break;

case NGX_CMD_CLOSE_CHANNEL:
// 关闭通道指令
ngx_log_debug4(NGX_LOG_DEBUG_CORE, ev->log, 0,
"close channel s:%d pid:" PID_T_FMT " our:" PID_T_FMT
" fd:%d",
ch.slot, ch.pid, ngx_processes[ch.slot].pid,
ngx_processes[ch.slot].channel[0]);
// 关闭ch.slot指定进程的channel[0]一侧
if (close(ngx_processes[ch.slot].channel[0]) == -1) {
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno, "close() failed");
}

ngx_processes[ch.slot].channel[0] = -1;
break;
}
}

/* worker进程处理函数
*/
static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
{
sigset_t set;
ngx_err_t err;
ngx_int_t n;
ngx_uint_t i;
struct timeval tv;
ngx_listening_t *ls;
ngx_core_conf_t *ccf;
ngx_connection_t *c;

// ngx_gettimeofday宏就是调用gettimeofday库函数来获得当前精确时间
ngx_gettimeofday(&tv);

// 记录Nginx的启动时间ngx_start_msec,
// 因为刚启动所以运行时间ngx_old_elapsed_msec和ngx_elapsed_msec都为0
ngx_start_msec = (ngx_epoch_msec_t) tv.tv_sec * 1000 + tv.tv_usec / 1000;
ngx_old_elapsed_msec = 0;
ngx_elapsed_msec = 0;

// 置全局变量ngx_process为NGX_PROCESS_WORKER
ngx_process = NGX_PROCESS_WORKER;

// 获取模块ngx_core_module的配置信息
ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);

if (ccf->group != (gid_t) NGX_CONF_UNSET) {
// 如果配置了group

// 调用setgid设置work进程的有效组ID
if (setgid(ccf->group) == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"setgid(%d) failed", ccf->group);
exit(2);
}
}

if (ccf->user != (uid_t) NGX_CONF_UNSET) {
// 如果配置了user

// 调用setuid设置worker进程的有效用户ID
if (setuid(ccf->user) == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"setuid(%d) failed", ccf->user);
exit(2);
}
}

#if (HAVE_PR_SET_DUMPABLE)
// 如果定义了HAVE_PR_SET_DUMPABLE宏, 说明prctl支持PR_SET_DUMPABLE参数

// 调用prctl使能coredump
// 默认情况下是开启的, 但是执行setuid或setgid后一般会清除
if (prctl(PR_SET_DUMPABLE, 1, 0, 0, 0) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"prctl(PR_SET_DUMPABLE) failed");
}

#endif

// 初始化set信号集并清空
sigemptyset(&set);

// 调用sigprocmask设置当前的信号掩码, 这里也就是不屏蔽任何信号
if (sigprocmask(SIG_SETMASK, &set, NULL) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"sigprocmask() failed");
}

// 初始化ngx_temp_number全局变量
ngx_init_temp_number();

ls = cycle->listening.elts;
for (i = 0; i < cycle->listening.nelts; i++) {
ls[i].remain = 0;
}

// 遍历ngx_modules模块数组
for (i = 0; ngx_modules[i]; i++) {
if (ngx_modules[i]->init_process) {
// 如果模块的init_process函数指针不为空
// 该指针注册的函数用于对进程进行模块特定的初始化工作, 后面我们会重点学习ngx_event_core_module模块的该函数

// 调用模块的init_process指针所指函数
if (ngx_modules[i]->init_process(cycle) == NGX_ERROR) {
exit(2);
}
}
}

// 遍历ngx_processes进程数组
for (n = 0; n < ngx_last_process; n++) {

if (ngx_processes
.pid == -1) {
// pid成员为-1, 表示该元素没有被进程占用, 跳过
continue;
}

if (n == ngx_process_slot) {
// 如果是当前进程占用的位置, 跳过
continue;
}

if (ngx_processes
.channel[1] == -1) {
// 如果与该进程通信的通道的索引为1这一侧的fd为-1, 代表已被关闭或无效, 跳过
continue;
}

if (close(ngx_processes
.channel[1]) == -1) {
// 关闭与该进程通信的通道的索引为1这一侧
// 为什么要关闭这一侧呢? 我们知道Nginx会为每一个work进程创建一个channel(socketpair),
// channel[0]一侧会被其他进程使用, 用来发送消息给该进程或者接收来自该进程的消息;
// 而channel[1]则被该进程所使用, 因此channel[1]于当前worker进程而言, 是无用的, 所以需要关闭
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"close() failed");
}
}

// 对于该进程自己的channel, channel[0]自然也是无用的, 进行关闭
if (close(ngx_processes[ngx_process_slot].channel[0]) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"close() failed");
}

#if 0
ngx_last_process = 0;
#endif

// ngx_channel就是ngx_processes[ngx_process_slot].channel[1],
// 因为worker进程需要接收来自master进程的指令消息, 因此这里将ngx_channel的读事件添加到事件循环,
// 事件处理函数为ngx_channel_handler
if (ngx_add_channel_event(cycle, ngx_channel, NGX_READ_EVENT,
ngx_channel_handler) == NGX_ERROR)
{
exit(2);
}

// 设置worker进程名称
ngx_setproctitle("worker process");

#if (NGX_THREADS)
/* 如果要进行线程化,
* 势必要创建线程,
* 并且为一些共享资源创建互斥锁、条件变量
*/

if (ngx_time_mutex_init(cycle->log) == NGX_ERROR) {
exit(2);
}

if (ngx_threads_n) {
if (ngx_init_threads(ngx_threads_n,
ccf->thread_stack_size, cycle) == NGX_ERROR)
{
exit(2);
}

err = ngx_thread_key_create(&ngx_core_tls_key);
if (err != 0) {
ngx_log_er
b248
ror(NGX_LOG_ALERT, cycle->log, err,
ngx_thread_key_create_n " failed");
exit(2);
}

for (n = 0; n < ngx_threads_n; n++) {

if (!(ngx_threads
.cv = ngx_cond_init(cycle->log))) {
exit(2);
}

if (ngx_create_thread((ngx_tid_t *) &ngx_threads
.tid,
ngx_worker_thread_cycle,
(void *) &ngx_threads
, cycle->log) != 0)
{
exit(2);
}
}
}

#endif

// worker进程主循环
for ( ;; ) {
if (ngx_exiting
&& ngx_event_timer_rbtree == &ngx_event_timer_sentinel)
{
// 如果ngx_exiting为1即worker进程需要退出
// 且ngx_event_timer_rbtree定时器红黑树为空

ngx_log_error(NGX_LOG_INFO, cycle->log, 0, "exiting");

#if (NGX_THREADS)
// 置ngx_terminate为1, 然后唤醒worker线程, 也就是让worker线程退出
ngx_terminate = 1;
ngx_wakeup_worker_threads(cycle);
#endif
// 退出
exit(0);
}

ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle");

// 事件循环: 前面讲过就是定时器处理及事件处理
ngx_process_events(cycle);

if (ngx_terminate) {
// 如果ngx_terminate为1, 那么唤醒worker线程, 然后退出
ngx_log_error(NGX_LOG_INFO, cycle->log, 0, "exiting");

#if (NGX_THREADS)
ngx_wakeup_worker_threads(cycle);
#endif
exit(0);
}

if (ngx_quit) {
// 如果ngx_quit为1, 也就是优雅退出

ngx_quit = 0;
ngx_log_error(NGX_LOG_INFO, cycle->log, 0,
"gracefully shutting down");
// 修改进程名称以示意自己正在退出
ngx_setproctitle("worker process is shutting down");

if (!ngx_exiting) {
// 如果ngx_exiting不为1

// 关闭监听套接字然后置ngx_exiting为1
ngx_close_listening_sockets(cycle);
ngx_exiting = 1;
}
}

if (ngx_reopen) {
// 如果ngx_reopen为1, 意味着需要重新打开日志文件

ngx_reopen = 0;
ngx_log_error(NGX_LOG_INFO, cycle->log, 0, "reopen logs");
// 重新打开日志文件
ngx_reopen_files(cycle, -1);
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: