您的位置:首页 > 理论基础 > 计算机网络

TCP并发服务器(三)——预创建子进程,accept文件锁

2014-07-25 11:23 274 查看
1.说明

基于System V内核的实现可能不支持多个进程在同一个监听描述符上调用accept。需要在accept前后放置某种形式的锁。

这里采用fcntl的文件上锁功能,属于记录锁的一种。

在性能上慢于下节要讲的互斥锁。

2.代码

#include "unp.h"
#include <vector>
#include <sys/mman.h>        //for mmap()

using std::vector;

//共享内存首地址
static long *cptr;;
//预创建的子进程数目
static int nchildren;
//存放子进程的PID
static vector<int> pids;
//文件上锁
static struct flock lock_it;
//文件解锁
static struct flock unlock_it;
//上锁的文件描述符
static int lock_fd = -1;

//锁的初始化
void my_lock_init(const char *pathname)
{
char lock_file[1024];
//必须复制字符串,防止她是constant
strncpy(lock_file, pathname, sizeof(lock_file));
//lock_file是一个最后6个字符未XXXXXX的路径名
//该函数用不同字符替换XXXXXX创建唯一路径名
//成功返回,会修改字符串以反映临时文件名字
//与tempfile不同,此函数创建的临时文件不会自动删除,要使用unlink()删除
lock_fd = Mkstemp(lock_file);
//文件仍然打开,本身不会被删除
Unlink(lock_file);

//写锁
lock_it.l_type = F_WRLCK;
//上锁从字节偏移量0
lock_it.l_whence = SEEK_SET;
lock_it.l_start = 0;
//锁住整个文件,文件长度0
lock_it.l_len = 0;

//解锁
unlock_it.l_type = F_UNLCK;
//上锁从字节偏移量0
unlock_it.l_whence = SEEK_SET;
unlock_it.l_start = 0;
//锁住整个文件,文件长度0
unlock_it.l_len = 0;
}

void my_lock_wait()
{
int rc;
//给文件阻塞(wait)上锁
while ((rc = fcntl(lock_fd, F_SETLKW, &lock_it)) < 0) {
if (errno == EINTR) {
continue;
} else {
err_sys("fcntl() error for my_lock_wait");
}

}
}

void my_lock_release()
{
if (fcntl(lock_fd, F_SETLKW, &unlock_it) < 0) {
err_sys("fcntl() error for my_lock_release");
}
}

//中断的信号处理函数
void sig_int(int signo)
{
for (int i = 0; i < nchildren; ++i) {
kill(pids[i], SIGTERM);
}
while (wait(NULL) > 0) {
;
}
if (errno != ECHILD) {
err_sys("wait error");
}

DPRINTF("The number of child process accept.");
for (int i = 0; i < nchildren; ++i) {
DPRINTF("%ld", cptr[i]);
}

void pr_cpu_time(void);
pr_cpu_time();

exit(0);
}

int main(int argc, char *argv[])
{
int listenfd;
socklen_t addrlen;
if (argc == 3) {
listenfd = Tcp_listen(NULL, argv[1], &addrlen);
} else if (argc == 4) {
listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
} else {
err_quit("Usage: a.out [ <host> ] <port#> <#children>");
}

nchildren = atoi(argv[argc - 1]);
pids.resize(nchildren);
//查看连接在子进程的分布
//使用共享内存未每个子进程的连接计数
//fork之后父进程和子进程共享cptr
long *meter(int);
cptr = meter(atoi(argv[argc - 1]));
//cptr = (long*)Calloc(nchildren, sizeof(long));

//初始化锁
my_lock_init("/tmp/lock.XXXXXX");

//创建子进程
pid_t child_make(int i, int listenfd, int addrlen);
for (int i = 0; i < atoi(argv[argc - 1]); ++i) {
pids[i] = child_make(i, listenfd, addrlen);
}

//SIGCHLD处理函数
void sig_chld(int);
Signal(SIGCHLD, sig_chld);
Signal(SIGINT, sig_int);

for ( ; ;) {
;             //child does everything
} //end for(;;)

return 0;
}

void sig_chld(int)
{
static int cnt = 0;
pid_t pid;
int stat;
//param1: 想要等待的PID;-1: 等待第一个终止的子进程
//param2: 子进程的终止状态(整数)
//param3: 附加选项;WNOHANG:没有子进程终止时,不阻塞
while ((pid = waitpid(-1, &stat, WNOHANG)) > 0) {    //成功:返回进程ID > 0, 出错:0或-1
DPRINTF("Waitpid for %d child process\n", ++cnt);
;
}

return;
}

pid_t child_make(int i, int listenfd, int addrlen)
{
void child_main(int, int, int);
pid_t pid = Fork();
if (pid > 0) {        //parent
return pid;
}
child_main(i, listenfd, addrlen);
}

void child_main(int i, int listenfd, int addrlen)
{
DPRINTF("child %ld starting\n", (long)getpid());
void web_child(int);
for ( ; ; ) {
DPRINTF("Lock");
//上锁
my_lock_wait();
DPRINTF("Wait for a connection");
int connfd = Accept(listenfd, NULL, NULL);
DPRINTF("UNLOCK");
//释放锁
my_lock_release();
//共享内存计数增加
++cptr[i];
DPRINTF("Accept a connection.");

web_child(connfd);
Close(connfd);
}
}

long *meter(int nchildren)
{
#ifdef MAP_ANON            //匿名映射
long *ptr = (long*)Mmap(0, nchildren * sizeof(long), PROT_READ | PROT_WRITE,
MAP_ANON | MAP_SHARED, -1, 0);
#else
int fd = Open("/dev/zero", O_RDWR, 0);
long *ptr = (long*)Mmap(0, nchildren * sizeof(long), PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0);
//已完成文件到进程地址空间的映射,可以关闭原文件
Close(fd);
#endif

return ptr;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: