您的位置:首页 > 运维架构 > Nginx

Nginx进程分析(worker_process篇)

2015-09-08 20:08 537 查看
Linux下进程的创建使用fork系统调用,fork在哪里呢?答案是在ngx_spawn_process函数中。我们先不忙着看ngx_spawn_process函数,先看调用ngx_spawn_process的ngx_start_worker_processes函数。该函数在主线程循环中需要创建工作进程的地方被调用。

void ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type)
{
    ngx_int_t      i, s;
    ngx_channel_t  ch;

    ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start worker processes");

    ch.command = NGX_CMD_OPEN_CHANNEL;

    for (i = 0; i < n; i++) {
        cpu_affinity = ngx_get_cpu_affinity(i);

        ngx_spawn_process(cycle, ngx_worker_process_cycle, NULL,
                          "worker process", type);

        ch.pid = ngx_processes[ngx_process_slot].pid;
        ch.slot = ngx_process_slot;
        ch.fd = ngx_processes[ngx_process_slot].channel[0];

        for (s = 0; s < ngx_last_process; s++) {
            if (s == ngx_process_slot
                || ngx_processes[s].pid == -1
                || ngx_processes[s].channel[0] == -1)
            {
                continue;
            }
            ngx_log_debug6(NGX_LOG_DEBUG_CORE, cycle->log, 0,
                          "pass channel s:%d pid:%P fd:%d to s:%i pid:%P fd:%d",
                          ch.slot, ch.pid, ch.fd,
                          s, ngx_processes[s].pid,
                          ngx_processes[s].channel[0]);
            ngx_write_channel(ngx_processes[s].channel[0],
                              &ch, sizeof(ngx_channel_t), cycle->log);
        }
    }
}

ngx_start_worker_process函数还是很简单的,基本就是调用ngx_spawn_process函数创建工作进程,然后通知所有进程。下面我们就具体看看主进程是如何创建工作进程的。

ngx_spawn_process函数首先需要为即将创建的工作进程分配一个进程信息结构体。如果是重启某个工作进程,直接使用respawn参数作为工作进程索引,否则需要从ngx_process数组中找到一个空闲的元素。如果ngx_process数组全部用光了,就返回错误。函数在fork之前会将新进程在ngx_process中的索引保存在ngx_process_slot中。

ngx_pid_t ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data,
char *name, ngx_int_t respawn)
{
u_long     on;
ngx_pid_t  pid;
ngx_int_t  s;

if (respawn >= 0) {
s = respawn;
} else {
for (s = 0; s < ngx_last_process; s++) {
if (ngx_processes[s].pid == -1) {
break;
}
}

if (s == NGX_MAX_PROCESSES) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
"no more than %d processes can be spawned",
NGX_MAX_PROCESSES);
return NGX_INVALID_PID;
}
}

...

ngx_process_slot = s;
如果希望新进程与主进程是分离的(NGX_PROCESS_DETACHED),就只需要将进程信息中的管道描述符设置为-1即可。否则,就需要创建一对Unix域套接字描述符,并将其设置为异步非阻塞。

if (respawn != NGX_PROCESS_DETACHED) {
if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1)
{
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"socketpair() failed while spawning \"%s\"", name);
return NGX_INVALID_PID;
}

ngx_log_debug2(NGX_LOG_DEBUG_CORE, cycle->log, 0,
"channel %d:%d",
ngx_processes[s].channel[0],
ngx_processes[s].channel[1]);
// 设置channel为非阻塞
if (ngx_nonblocking(ngx_processes[s].channel[0]) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
ngx_nonblocking_n " failed while spawning \"%s\"",
name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
}

if (ngx_nonblocking(ngx_processes[s].channel[1]) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
ngx_nonblocking_n " failed while spawning \"%s\"",
name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
}
// 设置channel[0]为异步
on = 1;
if (ioctl(ngx_processes[s].channel[0], FIOASYNC, &on) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"ioctl(FIOASYNC) failed while spawning \"%s\"", name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
}
// <span style="font-family: Arial, Helvetica, sans-serif;">当channel[0]数据到来时,向主进程发送SIGIO信号</span>
if (fcntl(ngx_processes[s].channel[0], F_SETOWN, ngx_pid) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"fcntl(F_SETOWN) failed while spawning \"%s\"", name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
}
// 希望进程调用exec执行外部程序的时候自动关闭channel
if (fcntl(ngx_processes[s].channel[0], F_SETFD, FD_CLOEXEC) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"fcntl(FD_CLOEXEC) failed while spawning \"%s\"",
name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
}

if (fcntl(ngx_processes[s].channel[1], F_SETFD, FD_CLOEXEC) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"fcntl(FD_CLOEXEC) failed while spawning \"%s\"",
name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
}

ngx_channel = ngx_processes[s].channel[1];

} else {
ngx_processes[s].channel[0] = -1;
ngx_processes[s].channel[1] = -1;
}
准备好channel后就可以调用fork创建进程了,这段代码Linux程序员肯定很熟悉了:

pid = fork();

switch (pid) {

case -1:
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"fork() failed while spawning \"%s\"", name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;

case 0:
ngx_pid = ngx_getpid();
proc(cycle, data);
break;

default:
break;
}

ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start %s %P", name, pid);

ngx_processes[s].pid = pid;
ngx_processes[s].exited = 0;

if (respawn >= 0) {
return pid;
}
工作进程在这里调用proc进入进程循环,而主进程则将新进程的pid保存到进程信息中,对于重新启动的进程,到这里就结束了。接下来,也是最后一部分是填充进程结构体中的其他一部分信息,对于重启的进程这些信息是保留着的。这部分是由主进程负责完成的,并不是本文讨论的重点。

ngx_processes[s].proc = proc;
ngx_processes[s].data = data;
ngx_processes[s].name = name;
ngx_processes[s].exiting = 0;

switch (respawn) {
case NGX_PROCESS_RESPAWN:
ngx_processes[s].respawn = 1;
ngx_processes[s].just_respawn = 0;
ngx_processes[s].detached = 0;
break;
case NGX_PROCESS_JUST_RESPAWN:
ngx_processes[s].respawn = 1;
ngx_processes[s].just_respawn = 1;
ngx_processes[s].detached = 0;
break;
case NGX_PROCESS_DETACHED:
ngx_processes[s].respawn = 0;
ngx_processes[s].just_respawn = 0;
ngx_processes[s].detached = 1;
break;
}
// 到这里才算真正在ngx_processes数组中增加了新进程对应的元素
if (s == ngx_last_process) {
ngx_last_process++;
}

return pid;
}


讲述了工作进程创建的过程之后,就可以看看工作进程的循环函数了。Nginx可以支持多线程,但本文不关心多线程的情况,下面代码中的省略号部分为支持多线程的代码,可以忽略而不影响我们理解。

static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
{
ngx_uint_t         i;
ngx_connection_t  *c;
...

ngx_worker_process_init(cycle, 1);

ngx_setproctitle("worker process");

...

for ( ;; ) {

if (ngx_exiting) {

c = cycle->connections;

for (i = 0; i < cycle->connection_n; i++) {

/* THREAD: lock */

if (c[i].fd != -1 && c[i].idle) {
c[i].close = 1;
c[i].read->handler(c[i].read);
}
}

if (ngx_event_timer_rbtree.root == ngx_event_timer_rbtree.sentinel)
{
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");

ngx_worker_process_exit(cycle);
}
}

ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle");

ngx_process_events_and_timers(cycle);

if (ngx_terminate) {
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");

ngx_worker_process_exit(cycle);
}

if (ngx_quit) {
ngx_quit = 0;
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0,
"gracefully shutting down");
ngx_setproctitle("worker process is shutting down");

if (!ngx_exiting) {
ngx_close_listening_sockets(cycle);
ngx_exiting = 1;
}
}

if (ngx_reopen) {
ngx_reopen = 0;
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reopening logs");
ngx_reopen_files(cycle, -1);
}
}
}

ngx_worker_process_cycle的主循环中最核心的是事件处理函数ngx_process_events_and_timers。

void ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
ngx_uint_t  flags;
ngx_msec_t  timer, delta;
// timer是事件等待的超时值,如果设置了定时器分辨率,则定时器会周期触发事件,
// 进程将不断地被唤醒处理事件,因此不必设置超时值,也不需要设置NGX_TIMER_INFINITE。
if (ngx_timer_resolution) {
timer = NGX_TIMER_INFINITE;
flags = 0;
} else {
timer = ngx_event_find_timer();
flags = NGX_UPDATE_TIME;
}
// 以下语句块处理“惊群”
if (ngx_use_accept_mutex) {
if (ngx_accept_disabled > 0) {
// 工作进程处于过载,仅仅将ngx_accept_disabled减一,不请求锁,从而避免过载时引发惊群效应
ngx_accept_disabled--;
} else {
if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
return;
}

if (ngx_accept_mutex_held) {
flags |= NGX_POST_EVENTS;
} else {
if (timer == NGX_TIMER_INFINITE
|| timer > ngx_accept_mutex_delay)
{
timer = ngx_accept_mutex_delay;
}
}
}
}

delta = ngx_current_msec;

(void) ngx_process_events(cycle, timer, flags);

delta = ngx_current_msec - delta;

ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"timer delta: %M", delta);

if (ngx_posted_accept_events) {
ngx_event_process_posted(cycle, &ngx_posted_accept_events);
}

if (ngx_accept_mutex_held) {
ngx_shmtx_unlock(&ngx_accept_mutex);
}

if (delta) {
ngx_event_expire_timers();
}

ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"posted events %p", ngx_posted_events);

if (ngx_posted_events) {
if (ngx_threaded) {
ngx_wakeup_worker_thread(cycle);
} else {
ngx_event_process_posted(cycle, &ngx_posted_events);
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: