Linux网络编程--使用epoll,共享内存技术实现高性能的聊天室程序
2017-04-07 19:04
946 查看
本篇博文主要介绍使用epoll和多进程的共享内存技术实现高性能的聊天室的服务器程序。
#include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #include <sys/epoll.h> #include <signal.h> #include <sys/wait.h> #include <sys/mman.h> #include <sys/stat.h> #include <fcntl.h> #define USER_LIMIT 5 #define BUFFER_SIZE 1024 #define FD_LIMIT 65535 #define MAX_EVENT_NUMBER 1024 #define PROCESS_LIMIT 65536 struct client_data { sockaddr_in address; int connfd; pid_t pid; int pipefd[2]; }; static const char* shm_name = "/my_shm"; int sig_pipefd[2]; int epollfd; int listenfd; int shmfd; char* share_mem = 0; client_data* users = 0; int* sub_process = 0; int user_count = 0; bool stop_child = false; int setnonblocking( int fd ) { int old_option = fcntl( fd, F_GETFL ); int new_option = old_option | O_NONBLOCK; //将fd设置为非阻塞 fcntl( fd, F_SETFL, new_option ); return old_option; } void addfd( int epollfd, int fd ) { epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLET; epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event ); //注册fd到epollfd监听队列中 setnonblocking( fd ); } void sig_handler( int sig ) { int save_errno = errno; int msg = sig; send( sig_pipefd[1], ( char* )&msg, 1, 0 ); //发送信号的值到管道的写端 errno = save_errno; } void addsig( int sig, void(*handler)(int), bool restart = true ) { struct sigaction sa; memset( &sa, '\0', sizeof( sa ) ); sa.sa_handler = handler; if( restart ) { sa.sa_flags |= SA_RESTART; //如果由于此信号影响程序运行,加上SA_RESTART标志使程序重新启动 } sigfillset( &sa.sa_mask ); //初始化所以信号 assert( sigaction( sig, &sa, NULL ) != -1 ); //使用sigaction设置信号发生器 } void del_resource() { close( sig_pipefd[0] ); close( sig_pipefd[1] ); close( listenfd ); close( epollfd ); shm_unlink( shm_name ); //删除/关闭共享内存 delete [] users; delete [] sub_process; } void child_term_handler( int sig ) { stop_child = true; } int run_child( int idx, client_data* users, char* share_mem ) //第idx个用户 用户数据结构 共享内存标识符 { epoll_event events[ MAX_EVENT_NUMBER ]; int child_epollfd = epoll_create( 5 ); assert( child_epollfd != -1 ); int connfd = users[idx].connfd; addfd( child_epollfd, connfd ); int pipefd = users[idx].pipefd[1]; addfd( child_epollfd, pipefd ); //监听和父进程通信管道的写端 int ret; addsig( SIGTERM, child_term_handler, false ); while( !stop_child ) { int number = epoll_wait( child_epollfd, events, MAX_EVENT_NUMBER, -1 ); if ( ( number < 0 ) && ( errno != EINTR ) ) { printf( "epoll failure\n" ); break; } for ( int i = 0; i < number; i++ ) { int sockfd = events[i].data.fd; if( ( sockfd == connfd ) && ( events[i].events & EPOLLIN ) ) //如果是连接描述符,那么就处理本描述符上的读事件 { memset( share_mem + idx*BUFFER_SIZE, '\0', BUFFER_SIZE ); //初始化用户idx内存区域 ret = recv( connfd, share_mem + idx*BUFFER_SIZE, BUFFER_SIZE-1, 0 ); if( ret < 0 ) { if( errno != EAGAIN ) { stop_child = true; } } else if( ret == 0 ) //服务端程序已经关闭,结束子进程 { stop_child = true; } else //发送数据(客户端序号)给写管道 { send( pipefd, ( char* )&idx, sizeof( idx ), 0 ); } } else if( ( sockfd == pipefd ) && ( events[i].events & EPOLLIN ) ) //如果是管道消息 { int client = 0; ret = recv( sockfd, ( char* )&client, sizeof( client ), 0 ); //接收数据(客户端序号) if( ret < 0 ) { if( errno != EAGAIN ) { stop_child = true; } } else if( ret == 0 ) { stop_child = true; } else { send( connfd, share_mem + client * BUFFER_SIZE, BUFFER_SIZE, 0 ); } } else { continue; } } } close( connfd ); close( pipefd ); close( child_epollfd ); return 0; } int main( int argc, char* argv[] ) { if( argc <= 2 ) { printf( "usage: %s ip_address port_number\n", basename( argv[0] ) ); return 1; } const char* ip = argv[1]; int port = atoi( argv[2] ); int ret = 0; struct sockaddr_in address; bzero( &address, sizeof( address ) ); address.sin_family = AF_INET; inet_pton( AF_INET, ip, &address.sin_addr ); address.sin_port = htons( port ); listenfd = socket( PF_INET, SOCK_STREAM, 0 ); assert( listenfd >= 0 ); ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) ); assert( ret != -1 ); ret = listen( listenfd, 5 ); assert( ret != -1 ); user_count = 0; users = new client_data [ USER_LIMIT+1 ]; //创建 USER_LIMIT个 client_data数据结构 sub_process = new int [ PROCESS_LIMIT ]; //创建 PROCESS_LIMIT 个 sub_process int型数组,用于存储进程PID,用来索引用户数据结构 for( int i = 0; i < PROCESS_LIMIT; ++i ) { sub_process[i] = -1; } epoll_event events[ MAX_EVENT_NUMBER ]; epollfd = epoll_create( 5 ); assert( epollfd != -1 ); addfd( epollfd, listenfd ); //监听listenfd描述符 ret = socketpair( PF_UNIX, SOCK_STREAM, 0, sig_pipefd ); assert( ret != -1 ); setnonblocking( sig_pipefd[1] ); addfd( epollfd, sig_pipefd[0] ); //监听sig_pipefd[0]读描述符(监听信号事件) addsig( SIGCHLD, sig_handler ); addsig( SIGTERM, sig_handler ); addsig( SIGINT, sig_handler ); addsig( SIGPIPE, SIG_IGN ); //注册信号发生器 bool stop_server = false; bool terminate = false; shmfd = shm_open( shm_name, O_CREAT | O_RDWR, 0666 ); //posix共享内存的创建/打开 assert( shmfd != -1 ); ret = ftruncate( shmfd, USER_LIMIT * BUFFER_SIZE ); //清空内存区域 assert( ret != -1 ); share_mem = (char*)mmap( NULL, USER_LIMIT * BUFFER_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmfd, 0 ); //映射共享内存的文件描述符到内存区域 assert( share_mem != MAP_FAILED ); close( shmfd ); while( !stop_server ) { int number = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 ); //使用I/O复用技术等待number个活跃事件的发生,并将事件结果存储在events结构体中 if ( ( number < 0 ) && ( errno != EINTR ) ) { printf( "epoll failure\n" ); break; } for ( int i = 0; i < number; i++ ) { int sockfd = events[i].data.fd; if( sockfd == listenfd ) { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof( client_address ); int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength ); if ( connfd < 0 ) { printf( "errno is: %d\n", errno ); continue; } if( user_count >= USER_LIMIT ) //如果用户数超过5个就拒绝建立连接 { const char* info = "too many users\n"; printf( "%s", info ); send( connfd, info, strlen( info ), 0 ); close( connfd ); continue; } users[user_count].address = client_address; users[user_count].connfd = connfd; //对建立连接的客户进行数据处理 ret = socketpair( PF_UNIX, SOCK_STREAM, 0, users[user_count].pipefd ); //fork之前使用 socketpair建立双向管道,子进程继承此管道 assert( ret != -1 ); pid_t pid = fork(); if( pid < 0 ) //如果fork失败继续fork { close( connfd ); continue; } else if( pid == 0 ) //子进程一直监听处理本描述符发送的事件 { close( epollfd ); close( listenfd ); close( users[user_count].pipefd[0] ); close( sig_pipefd[0] ); close( sig_pipefd[1] ); //子进程不处理信号管道事件,所以关闭描述符 run_child( user_count, users, share_mem ); munmap( (void*)share_mem, USER_LIMIT * BUFFER_SIZE ); //解除内存映射 exit( 0 ); } else //父进程 { close( connfd ); close( users[user_count].pipefd[1] ); //关闭管道的写端,只保留读端 addfd( epollfd, users[user_count].pipefd[0] ); //注册用户的读端事件 users[user_count].pid = pid; //对用户数据pid赋值 sub_process[pid] = user_count; //sub_process为了索引哪个进程处理某个客户端 user_count++; } } else if( ( sockfd == sig_pipefd[0] ) && ( events[i].events & EPOLLIN ) ) //信号事件 { int sig; char signals[1024]; ret = recv( sig_pipefd[0], signals, sizeof( signals ), 0 ); if( ret == -1 ) { continue; } else if( ret == 0 ) { continue; } else { for( int i = 0; i < ret; ++i ) { switch( signals[i] ) { case SIGCHLD: { pid_t pid; int stat; while ( ( pid = waitpid( -1, &stat, WNOHANG ) ) > 0 ) { int del_user = sub_process[pid]; sub_process[pid] = -1; if( ( del_user < 0 ) || ( del_user > USER_LIMIT ) ) { printf( "the deleted user was not change\n" ); continue; } epoll_ctl( epollfd, EPOLL_CTL_DEL, users[del_user].pipefd[0], 0 ); close( users[del_user].pipefd[0] ); users[del_user] = users[--user_count]; sub_process[users[del_user].pid] = del_user; printf( "child %d exit, now we have %d users\n", del_user, user_count ); } if( terminate && user_count == 0 ) { stop_server = true; } break; } case SIGTERM: case SIGINT: { printf( "kill all the clild now\n" ); //addsig( SIGTERM, SIG_IGN ); //addsig( SIGINT, SIG_IGN ); if( user_count == 0 ) { stop_server = true; break; } for( int i = 0; i < user_count; ++i ) { int pid = users[i].pid; kill( pid, SIGTERM ); } terminate = true; break; } default: { break; } } } } } else if( events[i].events & EPOLLIN ) //如果子进程的run_child函数处理完毕,之后会触发这个条件。同时接受客户端的序号 { int child = 0; ret = recv( sockfd, ( char* )&child, sizeof( child ), 0 ); printf( "read data from child accross pipe\n" ); if( ret == -1 ) { continue; } else if( ret == 0 ) { continue; } else //给每个客户端读管道派发数据(客户端序号) { for( int j = 0; j < user_count; ++j ) { if( users[j].pipefd[0] != sockfd ) { printf( "send data to child accross pipe\n" ); send( users[j].pipefd[0], ( char* )&child, sizeof( child ), 0 ); } } } } } } del_resource(); return 0; }
相关文章推荐
- Linux网络编程--使用epoll,共享内存技术实现高性能的聊天室程序
- Linux网络编程 使用epoll实现一个高性能TCP Echo服务器
- [转]使用epoll进行高性能网络编程
- Linux socket编程之使用epoll处理海量连接(程序例程)
- Linux网络编程:TCP服务器(单进程多用户),使用select方法实现
- linux基础编程 共享内存 使用内存映射接口mmap系统调用
- .Net网络通讯编程[利用Socket实现字串、文件、序列化对象传输]--使用封装的网络服务3[聊天室][使用IE浏览本页]
- [转]使用epoll进行高性能网络编程
- linux网络编程:用C语言实现的聊天程序(异步通信)
- C++ - 网络编程模型 - Linux EPOLL - - ITeye技术网站
- linux基础编程 共享内存 通过消息队列实现同步 shmget
- 使用epoll进行高性能网络编程(收藏)
- linux下c编程之内存共享shemget函数的实现及案例-bmi体重身高测试2
- .Net网络通讯编程[利用Socket实现字串、文件、序列化对象传输]--使用封装的网络服务3[聊天室]
- 使用epoll进行高性能网络编程
- linux网络编程:用C语言实现的聊天程序(异步通信)
- .Net网络通讯编程[利用Socket实现字串、文件、序列化对象传输]--使用封装的网络服务4[聊天室]
- linux网络编程:用C语言实现的聊天程序(异步通信)
- linux socket实现网络聊天室(二):使用线程独立收发消息
- Linux下通过共享内存进行进程间通信,进程间同步使用信号量来实现