您的位置:首页 > Web前端 > React

一个回射服务器程序,采用reactor模型和epoll多路复用

2014-09-14 01:05 274 查看
<pre name="code" class="cpp">

#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <fcntl.h>
#include <time.h>
#include <errno.h>
#include <sys/types.h>
#include <pthread.h>
#include <signal.h>
#include <sys/epoll.h>

#define MY_LOGFILE "./log_echo"

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void mydebug(const char *fmt, ...)
{
time_t nowtime = time(NULL);
struct tm *pNowtime=NULL;
pNowtime = localtime(&nowtime);
char debugbuff[2048];
int outlen;
FILE *fp;
va_list ap;
pthread_mutex_lock(&mutex);
fp = fopen(MY_LOGFILE,"a");
if(fp==NULL)
{
fp = fopen(MY_LOGFILE,"w");
if(fp == NULL)
return;
}
va_start(ap, fmt);
outlen=vsprintf(debugbuff,fmt,ap);
va_end(ap);
*(debugbuff+outlen) = 0;
fprintf(fp,"%d-%02d-%02d %02d:%02d:%02d====",pNowtime->tm_year+1900,pNowtime->tm_mon+1,pNowtime->tm_mday,pNowtime->tm_hour,pNowtime->tm_min,pNowtime->tm_sec);
fprintf(fp,"%s",debugbuff);
fclose(fp);
pthread_mutex_unlock(&mutex);
}

struct workthread{
pthread_t pid;
int pipefd[2];
int threadid;
struct workthread* prev;
struct workthread* next;
};

typedef struct workthread workthread;

typedef struct{
workthread* head;
workthread* tail;
}thread_queue;

static	thread_queue tq;

void* work_thread(void* arg){
workthread *work = arg;
mydebug("%s[%d] INFO:work thread[%d] is begining\n",__FILE__,__LINE__,work->threadid);
//printf("work thread[%d] is begining\n",work->threadid);
int epollfd,ret,i;
epollfd = epoll_create(1024);
if(epollfd<0){
return;
}
struct epoll_event event;
event.events = EPOLLIN;
event.data.fd = work->pipefd[0];
epoll_ctl(epollfd,EPOLL_CTL_ADD,work->pipefd[0],&event);
struct epoll_event events[1024];
while(1){			//work thread event loop
ret = epoll_wait(epollfd,events,1024,2000);
if(ret<=0 || errno==EINTR){
continue;
}
else{
for(i=0;i<ret;i++){
if(events[i].data.fd == work->pipefd[0]){
if(events[i].events & EPOLLIN){
int n,fd;
n = read(work->pipefd[0],(void*)&fd,sizeof(fd));
mydebug("%s[%d] INFO:thread[%d] recv a connection socketfd[%d]\n",__FILE__,__LINE__,work->threadid,fd);
//printf("thread[%d] recv a connection socketfd[%d]\n",work->threadid,fd);
//if(n<0){
//	epoll_ctl(epollfd,EPOLL_CTL_DEL,&event);
//	continue;
//} //绠¢亾鍑洪敊闂鍏堜笉鑰冭檻
struct epoll_event recv_event;
recv_event.events = EPOLLIN | EPOLLET;
recv_event.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&recv_event);
}
//else if(event[i].event==EPOLLHUP || event[i].event==EPOLLERR){
//		printf("thread [%d] can not handle ")
//		epoll_ctl(epollfd,EPOLL_CTL_DEL,&event);
//		continue;
//}
}
else{
if(events[i].events&EPOLLIN){
int n = 0,index = 0;
char buff[512];
while(1){
n = recv(events[i].data.fd,buff+index,64,0);
if(n==0){
//printf("client[%d] have closed\n",events[i].data.fd);
struct epoll_event tmp;
int fd = events[i].data.fd;
tmp.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&tmp);
close(fd);
mydebug("%s[%d] INFO:thread[%d] have closed a connection socketfd[%d]\n",__FILE__,__LINE__,work->threadid,fd);
break;
}
else if(n<0){
if(errno==EAGAIN || errno==EWOULDBLOCK){
buff[index] = 0;
send(events[i].data.fd,buff,index,0);
}
else{
int fd = events[i].data.fd;
epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,0);
close(fd);
mydebug("%s[%d] INFO:thread[%d] have closed a connection socketfd[%d]\n",__FILE__,__LINE__,work->threadid,fd);
}
break;
}
else{
index = index + n;
}
}
}
else if(events[i].events&EPOLLERR || events[i].events&EPOLLHUP){
//printf("client[%d] has error\n",events[i].data.fd);
struct epoll_event tmp;
int fd = events[i].data.fd;
tmp.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&tmp);
mydebug("%s[%d] ERROR:thread[%d] have closed an error connection socketfd[%d]\n",__FILE__,__LINE__,work->threadid,fd);
close(fd);
}
}	//end for common socket
} //end for forloop;

}
}
return;
}
void thread_init(thread_queue* tq){

int ret;
if(tq==NULL){
return;
}
workthread* temp = tq->head;
while(temp!=NULL){
ret = pthread_create(&(temp->pid),NULL,work_thread,(void*)temp);
if(ret<0){
perror("pthread_create");
return;
}
temp = temp->next;
}
return;
}
void set_noblock(int fd){
int flag = fcntl(fd,F_GETFL);
flag = flag | O_NONBLOCK;
fcntl(fd,F_SETFL,flag);
return;

}
void dispatch(int sockfd){
static workthread* current = NULL;
if(!current){
current = tq.head;
}
mydebug("%s[%d] INFO:main thread dispatch a connection socketfd [%d] to thread [%d]\n",__FILE__,__LINE__,sockfd,current->threadid);
//printf("dispatch sockfd[%d] to thread[%d]\n",sockfd,current->threadid);
write(current->pipefd[1],(char*)&sockfd,4);
current = current->next;
return;
}
void thread_join(thread_queue* tq){
if(!tq){
return;
}
workthread* tmp = tq->head;
while(!tmp){
pthread_join(tmp->pid,NULL);
tmp = tmp->next;
}
return;
}
int main(int argc,char* argv[])
{
if(argc<3){
printf("usage:server [port] [worknum]\n");
return -1;
}
int port = atoi(argv[1]);
if(port>65535 || port<=0){
printf("invalid port! should range in [1-65535]\n");
return -1;
}
int worknum = atoi(argv[2]);
if(worknum<=0 || worknum>=128){
printf("invalid worknum! should range in [1-128]\n");
return -1;
}
daemon(1,0);
signal(SIGPIPE,SIG_IGN);
workthread* thread;
thread = (struct workthread*) malloc(sizeof(struct workthread));
if(thread==NULL){
mydebug("%s[%d] ERROR:main thread have an malloc error\n",__FILE__,__LINE__);
//perror("malloc");
return -1;

}
if(pipe(thread->pipefd)<0){
mydebug("%s[%d] ERROR:main thread have pipe error\n",__FILE__,__LINE__);
//perror("create pipe");
return -1;
}
thread->threadid = 1;
thread->next = thread->prev = NULL;
tq.head = tq.tail = thread;
int i;
//鍒涘缓绾跨▼闃熷垪
for(i=1;i<worknum;i++){
thread = (struct workthread*) malloc(sizeof(struct workthread));
if(thread==NULL){
mydebug("%s[%d] ERROR:main thread have an malloc error\n",__FILE__,__LINE__);
//perror("malloc");
return -1;
}
if(pipe(thread->pipefd)<0){
mydebug("%s[%d] ERROR:main thread have pipe error\n",__FILE__,__LINE__);
//perror("create pipe");
return -1;
}
thread->threadid = i+1;
tq.tail->next = thread;
thread->next = NULL;
thread->prev = tq.tail;
tq.tail = thread;
}
int listenfd,ret;
listenfd = socket(AF_INET,SOCK_STREAM,0);
set_noblock(listenfd);
if(listenfd<0){
mydebug("%s[%d] ERROR:main thread create listen socket error\n",__FILE__,__LINE__);
//perror("socket");
return -1;
}
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = 0;
ret = bind(listenfd,(struct sockaddr*)&addr,sizeof(struct sockaddr));
if(ret<0){
mydebug("%s[%d] ERROR:main thread bind error\n",__FILE__,__LINE__);
//perror("bind");
return -1;
}
ret = listen(listenfd,10);
if(ret<0){
mydebug("%s[%d] ERROR:main thread listen error\n",__FILE__,__LINE__);
//perror("listen");
return -1;
}
thread_init(&tq);
int epollfd;
epollfd = epoll_create(32);
struct epoll_event event;
event.data.fd = listenfd;
event.events = EPOLLIN;
epoll_ctl(epollfd,EPOLL_CTL_ADD,listenfd,&event);
while(1){			//listen thread event loop;
int ret;
struct epoll_event events[32];
ret = epoll_wait(epollfd,events,32,2000);
if(ret==0){
continue;
}
else if(ret<0){
9c04

if(ret<=0 || errno==EINTR){
continue;
}
}
else{
struct sockaddr_in addr;
int sockfd;
int len = sizeof(struct sockaddr);
sockfd = accept(listenfd,(struct sockaddr*)&addr,&len);
if(sockfd<0){
if(errno==EAGAIN || EWOULDBLOCK){
continue;
}
else{
mydebug("%s[%d] ERROR:main thread accept error\n",__FILE__,__LINE__);
_exit(1);
}
}
set_noblock(sockfd);
dispatch(sockfd);
}

}
thread_join(&tq);
pthread_mutex_destroy(&mutex);
return 0;
}



                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息