一个回射服务器程序,采用reactor模型和epoll多路复用
2014-09-14 01:05
274 查看
<pre name="code" class="cpp">
#include <stdio.h> #include <stdlib.h> #include <stdarg.h> #include <sys/socket.h> #include <netinet/in.h> #include <unistd.h> #include <fcntl.h> #include <time.h> #include <errno.h> #include <sys/types.h> #include <pthread.h> #include <signal.h> #include <sys/epoll.h> #define MY_LOGFILE "./log_echo" pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; void mydebug(const char *fmt, ...) { time_t nowtime = time(NULL); struct tm *pNowtime=NULL; pNowtime = localtime(&nowtime); char debugbuff[2048]; int outlen; FILE *fp; va_list ap; pthread_mutex_lock(&mutex); fp = fopen(MY_LOGFILE,"a"); if(fp==NULL) { fp = fopen(MY_LOGFILE,"w"); if(fp == NULL) return; } va_start(ap, fmt); outlen=vsprintf(debugbuff,fmt,ap); va_end(ap); *(debugbuff+outlen) = 0; fprintf(fp,"%d-%02d-%02d %02d:%02d:%02d====",pNowtime->tm_year+1900,pNowtime->tm_mon+1,pNowtime->tm_mday,pNowtime->tm_hour,pNowtime->tm_min,pNowtime->tm_sec); fprintf(fp,"%s",debugbuff); fclose(fp); pthread_mutex_unlock(&mutex); } struct workthread{ pthread_t pid; int pipefd[2]; int threadid; struct workthread* prev; struct workthread* next; }; typedef struct workthread workthread; typedef struct{ workthread* head; workthread* tail; }thread_queue; static thread_queue tq; void* work_thread(void* arg){ workthread *work = arg; mydebug("%s[%d] INFO:work thread[%d] is begining\n",__FILE__,__LINE__,work->threadid); //printf("work thread[%d] is begining\n",work->threadid); int epollfd,ret,i; epollfd = epoll_create(1024); if(epollfd<0){ return; } struct epoll_event event; event.events = EPOLLIN; event.data.fd = work->pipefd[0]; epoll_ctl(epollfd,EPOLL_CTL_ADD,work->pipefd[0],&event); struct epoll_event events[1024]; while(1){ //work thread event loop ret = epoll_wait(epollfd,events,1024,2000); if(ret<=0 || errno==EINTR){ continue; } else{ for(i=0;i<ret;i++){ if(events[i].data.fd == work->pipefd[0]){ if(events[i].events & EPOLLIN){ int n,fd; n = read(work->pipefd[0],(void*)&fd,sizeof(fd)); mydebug("%s[%d] INFO:thread[%d] recv a connection socketfd[%d]\n",__FILE__,__LINE__,work->threadid,fd); //printf("thread[%d] recv a connection socketfd[%d]\n",work->threadid,fd); //if(n<0){ // epoll_ctl(epollfd,EPOLL_CTL_DEL,&event); // continue; //} //绠¢亾鍑洪敊闂鍏堜笉鑰冭檻 struct epoll_event recv_event; recv_event.events = EPOLLIN | EPOLLET; recv_event.data.fd = fd; epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&recv_event); } //else if(event[i].event==EPOLLHUP || event[i].event==EPOLLERR){ // printf("thread [%d] can not handle ") // epoll_ctl(epollfd,EPOLL_CTL_DEL,&event); // continue; //} } else{ if(events[i].events&EPOLLIN){ int n = 0,index = 0; char buff[512]; while(1){ n = recv(events[i].data.fd,buff+index,64,0); if(n==0){ //printf("client[%d] have closed\n",events[i].data.fd); struct epoll_event tmp; int fd = events[i].data.fd; tmp.data.fd = fd; epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&tmp); close(fd); mydebug("%s[%d] INFO:thread[%d] have closed a connection socketfd[%d]\n",__FILE__,__LINE__,work->threadid,fd); break; } else if(n<0){ if(errno==EAGAIN || errno==EWOULDBLOCK){ buff[index] = 0; send(events[i].data.fd,buff,index,0); } else{ int fd = events[i].data.fd; epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,0); close(fd); mydebug("%s[%d] INFO:thread[%d] have closed a connection socketfd[%d]\n",__FILE__,__LINE__,work->threadid,fd); } break; } else{ index = index + n; } } } else if(events[i].events&EPOLLERR || events[i].events&EPOLLHUP){ //printf("client[%d] has error\n",events[i].data.fd); struct epoll_event tmp; int fd = events[i].data.fd; tmp.data.fd = fd; epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&tmp); mydebug("%s[%d] ERROR:thread[%d] have closed an error connection socketfd[%d]\n",__FILE__,__LINE__,work->threadid,fd); close(fd); } } //end for common socket } //end for forloop; } } return; } void thread_init(thread_queue* tq){ int ret; if(tq==NULL){ return; } workthread* temp = tq->head; while(temp!=NULL){ ret = pthread_create(&(temp->pid),NULL,work_thread,(void*)temp); if(ret<0){ perror("pthread_create"); return; } temp = temp->next; } return; } void set_noblock(int fd){ int flag = fcntl(fd,F_GETFL); flag = flag | O_NONBLOCK; fcntl(fd,F_SETFL,flag); return; } void dispatch(int sockfd){ static workthread* current = NULL; if(!current){ current = tq.head; } mydebug("%s[%d] INFO:main thread dispatch a connection socketfd [%d] to thread [%d]\n",__FILE__,__LINE__,sockfd,current->threadid); //printf("dispatch sockfd[%d] to thread[%d]\n",sockfd,current->threadid); write(current->pipefd[1],(char*)&sockfd,4); current = current->next; return; } void thread_join(thread_queue* tq){ if(!tq){ return; } workthread* tmp = tq->head; while(!tmp){ pthread_join(tmp->pid,NULL); tmp = tmp->next; } return; } int main(int argc,char* argv[]) { if(argc<3){ printf("usage:server [port] [worknum]\n"); return -1; } int port = atoi(argv[1]); if(port>65535 || port<=0){ printf("invalid port! should range in [1-65535]\n"); return -1; } int worknum = atoi(argv[2]); if(worknum<=0 || worknum>=128){ printf("invalid worknum! should range in [1-128]\n"); return -1; } daemon(1,0); signal(SIGPIPE,SIG_IGN); workthread* thread; thread = (struct workthread*) malloc(sizeof(struct workthread)); if(thread==NULL){ mydebug("%s[%d] ERROR:main thread have an malloc error\n",__FILE__,__LINE__); //perror("malloc"); return -1; } if(pipe(thread->pipefd)<0){ mydebug("%s[%d] ERROR:main thread have pipe error\n",__FILE__,__LINE__); //perror("create pipe"); return -1; } thread->threadid = 1; thread->next = thread->prev = NULL; tq.head = tq.tail = thread; int i; //鍒涘缓绾跨▼闃熷垪 for(i=1;i<worknum;i++){ thread = (struct workthread*) malloc(sizeof(struct workthread)); if(thread==NULL){ mydebug("%s[%d] ERROR:main thread have an malloc error\n",__FILE__,__LINE__); //perror("malloc"); return -1; } if(pipe(thread->pipefd)<0){ mydebug("%s[%d] ERROR:main thread have pipe error\n",__FILE__,__LINE__); //perror("create pipe"); return -1; } thread->threadid = i+1; tq.tail->next = thread; thread->next = NULL; thread->prev = tq.tail; tq.tail = thread; } int listenfd,ret; listenfd = socket(AF_INET,SOCK_STREAM,0); set_noblock(listenfd); if(listenfd<0){ mydebug("%s[%d] ERROR:main thread create listen socket error\n",__FILE__,__LINE__); //perror("socket"); return -1; } struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = 0; ret = bind(listenfd,(struct sockaddr*)&addr,sizeof(struct sockaddr)); if(ret<0){ mydebug("%s[%d] ERROR:main thread bind error\n",__FILE__,__LINE__); //perror("bind"); return -1; } ret = listen(listenfd,10); if(ret<0){ mydebug("%s[%d] ERROR:main thread listen error\n",__FILE__,__LINE__); //perror("listen"); return -1; } thread_init(&tq); int epollfd; epollfd = epoll_create(32); struct epoll_event event; event.data.fd = listenfd; event.events = EPOLLIN; epoll_ctl(epollfd,EPOLL_CTL_ADD,listenfd,&event); while(1){ //listen thread event loop; int ret; struct epoll_event events[32]; ret = epoll_wait(epollfd,events,32,2000); if(ret==0){ continue; } else if(ret<0){ 9c04 if(ret<=0 || errno==EINTR){ continue; } } else{ struct sockaddr_in addr; int sockfd; int len = sizeof(struct sockaddr); sockfd = accept(listenfd,(struct sockaddr*)&addr,&len); if(sockfd<0){ if(errno==EAGAIN || EWOULDBLOCK){ continue; } else{ mydebug("%s[%d] ERROR:main thread accept error\n",__FILE__,__LINE__); _exit(1); } } set_noblock(sockfd); dispatch(sockfd); } } thread_join(&tq); pthread_mutex_destroy(&mutex); return 0; }
相关文章推荐
- 采用DOM模型时创建一个Select节点后,要删除option项的解决方法
- 一个windows下基于select多路分离器的Reactor模型
- LSTM模型在问答系统中的应用 2017-06-27 21:03 在问答系统的应用中,用户输入一个问题,系统需要根据问题去寻找最合适的答案。 1:采用句子相似度的方式。根据问题的字面相似度选择相似度最
- 采用XMLHTTP编写一个天气预报的程序
- Winsock提供了一个很有用的异步I/O模型之WSAAsyncSelect
- 一个关于手机快速开发模型的想法
- 针对SharePoint对象模型作二次开发的一个技巧
- 一个很好的日历控件(采用javascript编写)
- 设计了一个用户权限管理模型
- Hacking swing: 一个JDBC表控件模型
- 一个很好的日历控件(采用javascript编写)
- 一个粗略的用于计算IT技术图书收益的数学模型
- 计划建造一个 自己的 BBS 系统, 完全采用 Web 标准 设计.
- 针对SharePoint对象模型作二次开发的一个技巧
- 一个Download.jsp文件 (采用文件流读写方式。可以防止doc.excl等文件打开时乱码问题)
- 以前写的一个类,用OPENGL导入MD2模型,带关键桢动画
- 微软又在搞一个新的软件模型验证软件
- 一个消息提示托盘程序的开发历程(采用socket技术,附源代码)三---客户端源代码
- 设计了一个用户权限管理模型
- 一个隐藏的菜单完全采用CSS