您的位置:首页 > 理论基础 > 计算机网络

lighttpd 源码分析1:网络模型

2013-07-16 15:03 477 查看
拿到lighttpd的源码就迫不及待的想去掉繁杂的皮肉以窥其简单的网络模型框架。我们平常所写的TCP网络服务程序离不开这样的步骤:新建socket ——》将socket绑定到某个地址——》侦听客户端连接——》accept获取已连接socket——》读写已连接socket。Lighttpd不外如此。

lighttpd使用的是TCP预先派生子进程,每一个子进程各自accept的服务器设计范式,或者叫watcher-worker模型,关于各种网络程序设计范式在unix网络编程一书中有详细描述。整个程序的入口函数在server.c文件中,在main函数开始部分是各种繁杂的初始化工作,现在暂且略过,直接看到重点代码:

[cpp]
view plaincopyprint?

/*当是以root用户运行程序时,调用network_init函数*/ if(i_am_root) { … ... /* we need root-perms for port < 1024 */ if (0 != network_init(srv)) { plugins_free(srv); server_free(srv); return -1; } … ... }
/*当是以root用户运行程序时,调用network_init函数*/
if(i_am_root)
{
…
...
/* we need root-perms for port < 1024 */
if (0 != network_init(srv)) {
plugins_free(srv);
server_free(srv);
return -1;
}
…
...
}


network_init定义在network.c中,起初也是各种初始化工作,最后调用network_server_init,我们假设运行平台是ipv4(代码针对不同socket类型有不同的流程,为了化繁为简,只看ipv4流程),看下network_server_init流程的关键代码:

[cpp]
view plaincopyprint?



/*这是在创建socket*/
if (srv_socket->fd == -1) {
srv_socket->addr.plain.sa_family = AF_INET;
if (-1 == (srv_socket->fd = socket(srv_socket->addr.plain.sa_family, SOCK_STREAM, IPPROTO_TCP))) {
log_error_write(srv, __FILE__, __LINE__, "ss", "socket failed:", strerror(errno));
goto error_free_socket;
}
}


/*这是在初始化socket地址*/
case AF_INET:
memset(&srv_socket->addr, 0, sizeof(struct sockaddr_in));
srv_socket->addr.ipv4.sin_family = AF_INET;
if (host == NULL) {
srv_socket->addr.ipv4.sin_addr.s_addr = htonl(INADDR_ANY);
} else {
struct hostent *he;
if (NULL == (he = gethostbyname(host))) {
log_error_write(srv, __FILE__, __LINE__,
"sds", "gethostbyname failed: ",
h_errno, host);
goto error_free_socket;
}

if (he->h_addrtype != AF_INET) {
log_error_write(srv, __FILE__, __LINE__, "sd", "addr-type != AF_INET: ", he->h_addrtype);
goto error_free_socket;
}

if (he->h_length != sizeof(struct in_addr)) {
log_error_write(srv, __FILE__, __LINE__, "sd", "addr-length != sizeof(in_addr): ", he->h_length);
goto error_free_socket;
}

memcpy(&(srv_socket->addr.ipv4.sin_addr.s_addr), he->h_addr_list[0], he->h_length);
}
srv_socket->addr.ipv4.sin_port = htons(port);

addr_len = sizeof(struct sockaddr_in);

break;


/*这是在绑定socket地址*/
if (0 != bind(srv_socket->fd, (struct sockaddr *) &(srv_socket->addr), addr_len)) {
switch(srv_socket->addr.plain.sa_family) {
case AF_UNIX:
log_error_write(srv, __FILE__, __LINE__, "sds",
"can't bind to socket:",
host, strerror(errno));
break;
default:
log_error_write(srv, __FILE__, __LINE__, "ssds",
"can't bind to port:",
host, port, strerror(errno));
break;
}
goto error_free_socket;
}


/*这是在侦听*/

if (-1 == listen(srv_socket->fd, 128 * 8)) {
log_error_write(srv, __FILE__, __LINE__, "ss", "listen failed: ", strerror(errno));
goto error_free_socket;
}



…
…
/*这是在创建socket*/
if (srv_socket->fd == -1) {
srv_socket->addr.plain.sa_family = AF_INET;
if (-1 == (srv_socket->fd = socket(srv_socket->addr.plain.sa_family, SOCK_STREAM, IPPROTO_TCP))) {
log_error_write(srv, __FILE__, __LINE__, "ss", "socket failed:", strerror(errno));
goto error_free_socket;
}
}
…
…
/*这是在初始化socket地址*/
case AF_INET:
memset(&srv_socket->addr, 0, sizeof(struct sockaddr_in));
srv_socket->addr.ipv4.sin_family = AF_INET;
if (host == NULL) {
srv_socket->addr.ipv4.sin_addr.s_addr = htonl(INADDR_ANY);
} else {
struct hostent *he;
if (NULL == (he = gethostbyname(host))) {
log_error_write(srv, __FILE__, __LINE__,
"sds", "gethostbyname failed: ",
h_errno, host);
goto error_free_socket;
}

if (he->h_addrtype != AF_INET) {
log_error_write(srv, __FILE__, __LINE__, "sd", "addr-type != AF_INET: ", he->h_addrtype);
goto error_free_socket;
}

if (he->h_length != sizeof(struct in_addr)) {
log_error_write(srv, __FILE__, __LINE__, "sd", "addr-length != sizeof(in_addr): ", he->h_length);
goto error_free_socket;
}

memcpy(&(srv_socket->addr.ipv4.sin_addr.s_addr), he->h_addr_list[0], he->h_length);
}
srv_socket->addr.ipv4.sin_port = htons(port);

addr_len = sizeof(struct sockaddr_in);

break;
…
…
/*这是在绑定socket地址*/
if (0 != bind(srv_socket->fd, (struct sockaddr *) &(srv_socket->addr), addr_len)) {
switch(srv_socket->addr.plain.sa_family) {
case AF_UNIX:
log_error_write(srv, __FILE__, __LINE__, "sds",
"can't bind to socket:",
host, strerror(errno));
break;
default:
log_error_write(srv, __FILE__, __LINE__, "ssds",
"can't bind to port:",
host, port, strerror(errno));
break;
}
goto error_free_socket;
}
…
…
/*这是在侦听*/

if (-1 == listen(srv_socket->fd, 128 * 8)) {
log_error_write(srv, __FILE__, __LINE__, "ss", "listen failed: ", strerror(errno));
goto error_free_socket;
}
…
…

一直到此处,lighttpd走的都是我们熟悉的流程。再回到main函数,来看下main中最重要的部分:

[cpp]
view plaincopyprint?


...
/*父进程是watcher,fork出许多worker子进程,当子进程个数达到上限时,父进程进入等待*/
/*直到有子进程退出,父进程在while循环中运行中,一旦跳出while循环程序也结束了*/
/*子进程fork出老后跳出while,也就是后面代码都是子进程的流程。*/
/* start watcher and workers */
num_childs = srv->srvconf.max_worker;
if (num_childs > 0) {
int child = 0;
while (!child && !srv_shutdown && !graceful_shutdown) {
if (num_childs > 0) {
switch (fork()) {
case -1:
return -1;
case 0:
child = 1;
break;
default:
num_childs--;
break;
}
} else {
int status;

if (-1 != wait(&status)) {
/**
* one of our workers went away
*/
num_childs++;
} else {
switch (errno) {
case EINTR:
/**
* if we receive a SIGHUP we have to close our logs ourself as we don't
* have the mainloop who can help us here
*/
if (handle_sig_hup) {
handle_sig_hup = 0;

log_error_cycle(srv);

/**
* forward to all procs in the process-group
*
* we also send it ourself
*/ if (!forwarded_sig_hup) {
forwarded_sig_hup = 1;
kill(0, SIGHUP);
}
}
break;
default:
break;
}
}
}
}

/**
* for the parent this is the exit-point
*/
if (!child) {
/**
* kill all children too
*/
if (graceful_shutdown) {
kill(0, SIGINT);
} else if (srv_shutdown) {
kill(0, SIGTERM);
}

log_error_close(srv);
network_close(srv);
connections_free(srv);
plugins_free(srv);
server_free(srv);
return 0;
}
}



…
...
/*父进程是watcher,fork出许多worker子进程,当子进程个数达到上限时,父进程进入等待*/
/*直到有子进程退出,父进程在while循环中运行中,一旦跳出while循环程序也结束了*/
/*子进程fork出老后跳出while,也就是后面代码都是子进程的流程。*/
/* start watcher and workers */
num_childs = srv->srvconf.max_worker;
if (num_childs > 0) {
int child = 0;
while (!child && !srv_shutdown && !graceful_shutdown) {
if (num_childs > 0) {
switch (fork()) {
case -1:
return -1;
case 0:
child = 1;
break;
default:
num_childs--;
break;
}
} else {
int status;

if (-1 != wait(&status)) {
/**
* one of our workers went away
*/
num_childs++;
} else {
switch (errno) {
case EINTR:
/**
* if we receive a SIGHUP we have to close our logs ourself as we don't
* have the mainloop who can help us here
*/
if (handle_sig_hup) {
handle_sig_hup = 0;

log_error_cycle(srv);

/**
* forward to all procs in the process-group
*
* we also send it ourself
*/				if (!forwarded_sig_hup) {
forwarded_sig_hup = 1;
kill(0, SIGHUP);
}
}
break;
default:
break;
}
}
}
}

/**
* for the parent this is the exit-point
*/
if (!child) {
/**
* kill all children too
*/
if (graceful_shutdown) {
kill(0, SIGINT);
} else if (srv_shutdown) {
kill(0, SIGTERM);
}

log_error_close(srv);
network_close(srv);
connections_free(srv);
plugins_free(srv);
server_free(srv);
return 0;
}
}
…
…


到此,我们知道父进程在固定端口上监听后预先fork了一定数量的子进程,子进程将会做什么呢?按照本文开头描述的应该是accept后读写socket了吧!看接下的代码是否如此:

[cpp]
view plaincopyprint?



/*fdevent系统的初始化,fdevent在lighttpd中主要处理各种IO事件,lighttpd采用的*/
/*是reactor模式,也就是多路复用加非阻塞式IO,而多路复用在各种平台上有差异,fdevent*/
/*通过OO的方法封装了各个不同实现,以使得代码中可以使用统一的接口*/
if (NULL == (srv->ev = fdevent_init(srv, srv->max_fds + 1, srv->event_handler))) {
log_error_write(srv, __FILE__, __LINE__,
"s", "fdevent_init failed");
return -1;
}
/*注册srv中保存的socket到fdevent中*/
/*
* kqueue() is called here, select resets its internals,
* all server sockets get their handlers
*
* */
if (0 != network_register_fdevents(srv)) {
plugins_free(srv);
network_close(srv);
server_free(srv);

return -1;
}



…
…
/*fdevent系统的初始化,fdevent在lighttpd中主要处理各种IO事件,lighttpd采用的*/
/*是reactor模式,也就是多路复用加非阻塞式IO,而多路复用在各种平台上有差异,fdevent*/
/*通过OO的方法封装了各个不同实现,以使得代码中可以使用统一的接口*/
if (NULL == (srv->ev = fdevent_init(srv, srv->max_fds + 1, srv->event_handler))) {
log_error_write(srv, __FILE__, __LINE__,
"s", "fdevent_init failed");
return -1;
}
/*注册srv中保存的socket到fdevent中*/
/*
* kqueue() is called here, select resets its internals,
* all server sockets get their handlers
*
* */
if (0 != network_register_fdevents(srv)) {
plugins_free(srv);
network_close(srv);
server_free(srv);

return -1;
}
…
…

函数network_register_fdevents在network.c中定义,代码如下:

[cpp]
view plaincopyprint?

int network_register_fdevents(server *srv) {
size_t i;
/*清除fdevent的IO句柄,如同select的FD_ZERO清除fd set*/
if (-1 == fdevent_reset(srv->ev)) {
return -1;
}

/* register fdevents after reset */
for (i = 0; i < srv->srv_sockets.used; i++) {
server_socket *srv_socket = srv->srv_sockets.ptr[i];
//注册回调函数
//一旦srv_socket->fd就绪,则触发函数 network_server_handle_fdevent

fdevent_register(srv->ev, srv_socket->fd, network_server_handle_fdevent, srv_socket);
//告诉fdevent观察srv_socket->fd,一旦可读,则调用相应回调函数。

fdevent_event_set(srv->ev, &(srv_socket->fde_ndx), srv_socket->fd, FDEVENT_IN);
}
return 0;
}

int network_register_fdevents(server *srv) {
size_t i;
/*清除fdevent的IO句柄,如同select的FD_ZERO清除fd set*/
if (-1 == fdevent_reset(srv->ev)) {
return -1;
}

/* register fdevents after reset */
for (i = 0; i < srv->srv_sockets.used; i++) {
server_socket *srv_socket = srv->srv_sockets.ptr[i];
//注册回调函数
//一旦srv_socket->fd就绪,则触发函数 network_server_handle_fdevent
fdevent_register(srv->ev, srv_socket->fd, network_server_handle_fdevent, srv_socket);
//告诉fdevent观察srv_socket->fd,一旦可读,则调用相应回调函数。
fdevent_event_set(srv->ev, &(srv_socket->fde_ndx), srv_socket->fd, FDEVENT_IN);
}
return 0;
}


这里的srv_socket->fd其实就是之前创建的监听套接字,至此,我们假设有一个客户连接请求过来,这时子进程的srv_socket->fd 可读,回调函数network_server_handle_fdevent被调用:

[cpp]
view plaincopyprint?

static handler_t network_server_handle_fdevent(server *srv, void *context, int revents) {

...
/* accept()s at most 100 connections directly
*
* we jump out after 100 to give the waiting connections a chance */
for (loops = 0; loops < 100 && NULL != (con = connection_accept(srv, srv_socket)); loops++) {
handler_t r;

connection_state_machine(srv, con);

switch(r = plugins_call_handle_joblist(srv, con)) {
case HANDLER_FINISHED:
case HANDLER_GO_ON:
break;
default:
log_error_write(srv, __FILE__, __LINE__, "d", r);
break;
}
}
return HANDLER_GO_ON;
}
connection_accept在connections.c中定义,代码简化为如下:


//获取已连接套接字
if (-1 == (cnt = accept(srv_socket->fd, (struct sockaddr *) &cnt_addr, &cnt_len))) {
switch (errno) {
case EAGAIN:
#if EWOULDBLOCK != EAGAIN
case EWOULDBLOCK:
#endif
case EINTR:
/* we were stopped _before_ we had a connection */
case ECONNABORTED: /* this is a FreeBSD thingy */
/* we were stopped _after_ we had a connection */
break;
case EMFILE:
/* out of fds */
break;
default:
log_error_write(srv, __FILE__, __LINE__, "ssd", "accept failed:", strerror(errno), errno);
}
return NULL;
}


con->fd = cnt;
con->fde_ndx = -1;
//在fdevent中注册已连接socket : con->fd的回调函数connection_handle_fdevent

fdevent_register(srv->ev, con->fd, connection_handle_fdevent, con);


//设置一些属性,比如将con->fd设置为非阻塞的
if (-1 == (fdevent_fcntl_set(srv->ev, con->fd))) {
log_error_write(srv, __FILE__, __LINE__, "ss", "fcntl failed: ", strerror(errno));
return NULL;
}



static handler_t network_server_handle_fdevent(server *srv, void *context, int revents) {
…
...
/* accept()s at most 100 connections directly
*
* we jump out after 100 to give the waiting connections a chance */
for (loops = 0; loops < 100 && NULL != (con = connection_accept(srv, srv_socket)); loops++) {
handler_t r;

connection_state_machine(srv, con);

switch(r = plugins_call_handle_joblist(srv, con)) {
case HANDLER_FINISHED:
case HANDLER_GO_ON:
break;
default:
log_error_write(srv, __FILE__, __LINE__, "d", r);
break;
}
}
return HANDLER_GO_ON;
}
connection_accept在connections.c中定义,代码简化为如下:
…
…
//获取已连接套接字
if (-1 == (cnt = accept(srv_socket->fd, (struct sockaddr *) &cnt_addr, &cnt_len))) {
switch (errno) {
case EAGAIN:
#if EWOULDBLOCK != EAGAIN
case EWOULDBLOCK:
#endif
case EINTR:
/* we were stopped _before_ we had a connection */
case ECONNABORTED: /* this is a FreeBSD thingy */
/* we were stopped _after_ we had a connection */
break;
case EMFILE:
/* out of fds */
break;
default:
log_error_write(srv, __FILE__, __LINE__, "ssd", "accept failed:", strerror(errno), errno);
}
return NULL;
}
…
…
con->fd = cnt;
con->fde_ndx = -1;
//在fdevent中注册已连接socket : con->fd的回调函数connection_handle_fdevent
fdevent_register(srv->ev, con->fd, connection_handle_fdevent, con);
…
…
//设置一些属性,比如将con->fd设置为非阻塞的
if (-1 == (fdevent_fcntl_set(srv->ev, con->fd))) {
log_error_write(srv, __FILE__, __LINE__, "ss", "fcntl failed: ", strerror(errno));
return NULL;
}
…
…


分析到了这个地方,lighttpd的网络模型框架大致清楚了,正如文首所述,它和所有网络服务器程序一样都要走socket->bind->listen->accept流程,更具体的说,它使用了预先创建子进程,各子进程各自accept的范式,在UNIX网络编程中说这种范式会有accept惊群的问题,即当监听套接字可读,所有accept的子进程都会醒过来,但是只有一个进程获得已连接套接字,所有进程都唤醒是没有必要的,这样影响效率。对于这个问题,lighttpd似乎并没有处理。但是在新的linux内核中已经不存在accept惊群现象了。不过对于多路复用函数如select,epoll仍然存在类似问题,而代码里时常是先调epoll(select),再accept,lighttpd就是如此,因此还是会有新的惊群现象需要处理。如果不是我遗漏了的话,我没有发现lighttpd有相关代码对此进行处理,而nginx却有相关处理。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: