ACE - Reactor实现I/O,Dispatch,Service三层完整服务器(完结)
2016-07-01 10:23
435 查看
框架描述
服务器层次:/*----------------------------------------------------------------- * filename: Reactor.cpp * author: bing * time: 2016-06-29 15:26 * function: using ACE Reactor implement I/O multiplex server, * include service thread pool. *-----------------------------------------------------------------*/ #include <ace/INET_Addr.h> #include <ace/SOCK_Acceptor.h> #include <ace/SOCK_Stream.h> #include <ace/Reactor.h> #include <ace/Log_Msg.h> #include "ace/Task.h" #include "ace/OS.h" #include <list> #define MAX_BUFF_SIZE 1024 #define LISTEN_PORT 5010 #define SERVER_IP ACE_LOCALHOST #define THREAD_NUM 10 struct MsgData { ACE_HANDLE* IOHandle; int DataFlag; char Data[MAX_BUFF_SIZE]; MsgData() { IOHandle = NULL; DataFlag = -1; ACE_OS::memset(Data, 0, sizeof(Data)); } }; class TaskThread; class ServerStream : public ACE_Event_Handler { public: ServerStream(TaskThread* pMsgQueue); ~ServerStream(); ACE_SOCK_Stream& GetStream(){return m_Svr_stream;} //给accept提供接口绑定数据通道 virtual int handle_input(ACE_HANDLE fd); //I/O触发事件后调用 void close(); virtual ACE_HANDLE get_handle(void) const {return m_Svr_stream.get_handle();} //不重载需要手动将handle传入ACE_Reactor private: ACE_SOCK_Stream m_Svr_stream; TaskThread* m_MsgQueue; }; std::list<ServerStream*> g_StreamPool; //stream pool class TaskThread: public ACE_Task<ACE_MT_SYNCH> { public: virtual int svc(void) { ACE_Message_Block *Msg;// = new ACE_Message_Block(); while(1) { getq(Msg); //空闲线程阻塞 ACE_Data_Block *Data_Block = Msg->data_block(); MsgData *pData = reinterpret_cast <MsgData*>(Data_Block->base()); if (0 == pData->DataFlag) { std::list<ServerStream*>::iterator it; for (it = g_StreamPool.begin();it != g_StreamPool.end();++it) { if (get_handle() == (*it)->get_handle()) { g_StreamPool.erase(it); delete *it; break; } } return 0; } char strBuffer[MAX_BUFF_SIZE]; ACE_OS::memset(strBuffer, 0, sizeof(strBuffer)); ACE_OS::memcpy(strBuffer, pData->Data, sizeof(strBuffer)); /* 这里接口业务代码分发数据 */ ACE_DEBUG((LM_INFO,"[time:%d]recevie msg:%s\n",(int)ACE_OS::time(),strBuffer)); //ACE_SOCK_Stream Stream(*(pData->IOHandle)); //Stream.send("server recive data!\n",sizeof("server recive data!")); //响应client数据 //ACE_OS::sleep(1); //模拟业务耗时 Msg->release(); //release,inclue data_block //ACE_DEBUG((LM_INFO,"thread end queue count:%d\n",msg_queue_->message_count())); } return 0; } }; typedef ACE_Singleton<TaskThread, ACE_Thread_Mutex> TaskThreadPool; ServerStream::ServerStream(TaskThread* pMsgQueue) { m_MsgQueue = pMsgQueue; } ServerStream::~ServerStream() { close(); } /*------------------------------------------------------ * IO上报流数据,使用select复用上报,这里单线程处理 * 原来考虑直接把IO插队列给线程池处理,但是线程池和 * 这里是异步操作,线程没有处理队列这条消息ACE底层会 * 一直上报这个IO插消息队列,暂时在这里做单线程revc * 考虑epoll边沿触发,一次上报处理 *------------------------------------------------------*/ int ServerStream::handle_input(ACE_HANDLE fd) { MsgData Message; char strBuffer[MAX_BUFF_SIZE]; Message.DataFlag = m_Svr_stream.recv(strBuffer,MAX_BUFF_SIZE); //获取数据回select响应避免反复通知 if (-1 == Message.DataFlag) { ACE_DEBUG((LM_INFO, ACE_TEXT("recive data error!\n"))); return -1; } else if(0 == Message.DataFlag) { close(); ACE_DEBUG((LM_INFO, ACE_TEXT("client closed!\n"))); } ACE_Data_Block *Data_Block = new ACE_Data_Block; //线程做释放 ACE_HANDLE Cli_IO = get_handle(); Message.IOHandle = &Cli_IO; ACE_OS::memcpy(Message.Data,strBuffer,sizeof(strBuffer));//传的data可带length信息来适配消息大小 char *p = reinterpret_cast <char*>(&Message); Data_Block->base(p,sizeof(Message)); ACE_Message_Block* msg = new ACE_Message_Block(Data_Block); m_MsgQueue->putq(msg); //put //Data_Block->release(); return 0; } void ServerStream::close() { m_Svr_stream.close(); ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL); } class ServerAcceptor : public ACE_Event_Handler { public: ServerAcceptor(int port,char* ip); ~ServerAcceptor(); bool open(); virtual int handle_input(ACE_HANDLE fd); //有client连接 void close(); virtual ACE_HANDLE get_handle(void) const {return m_Svr_aceept.get_handle();} private: ACE_INET_Addr m_Svr_addr; ACE_SOCK_Acceptor m_Svr_aceept; }; ServerAcceptor::ServerAcceptor(int port,char* ip):m_Svr_addr(port,ip) { if (!open()) //open listen port { ACE_DEBUG((LM_INFO, ACE_TEXT("open failed!\n"))); } else { ACE_DEBUG((LM_INFO, ACE_TEXT("open success!\n"))); TaskThreadPool::instance()->activate(THR_NEW_LWP | THR_JOINABLE |THR_INHERIT_SCHED , THREAD_NUM);//创建10个线程处理业务 } } ServerAcceptor::~ServerAcceptor() { close(); std::list<ServerStream*>::iterator it; for (it = g_StreamPool.begin();it != g_StreamPool.end();++it) { if (NULL != (*it)) { (*it)->close(); delete (*it); } } } bool ServerAcceptor::open() { if (-1 == m_Svr_aceept.open(m_Svr_addr,1)) { ACE_DEBUG((LM_ERROR,ACE_TEXT("failed to accept\n"))); m_Svr_aceept.close(); return false; } return true; } int ServerAcceptor::handle_input(ACE_HANDLE fd ) { ServerStream *stream = new ServerStream(TaskThreadPool::instance()); //产生新通道 if (NULL != stream) { g_StreamPool.push_back(stream);//暂时存储全局变量用于内存管理,优化可增加一个连接管理类管理连接通道 } if (m_Svr_aceept.accept(stream->GetStream()) == -1) //绑定通道 { printf("accept client fail\n"); return -1; } ACE_Reactor::instance()->register_handler(stream,ACE_Event_Handler::READ_MASK); //通道注册到ACE_Reactor ACE_DEBUG((LM_INFO,"User connect success!,ClientPool num = %d\n",g_StreamPool.size())); return 0; } void ServerAcceptor::close() { ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::ACCEPT_MASK); m_Svr_aceept.close(); } int ACE_TMAIN() { ServerAcceptor server(LISTEN_PORT,(char *)SERVER_IP); ACE_Reactor::instance()->register_handler(&server,ACE_Event_Handler::ACCEPT_MASK); //listen port注册到ACE_Reactor ACE_Reactor::instance()->run_reactor_event_loop(); //进入消息循环,有I/O事件回调handle_input return 0; }
View Code
代码实现了最简单的完整并发服务器,有部分还值得思考和优化:
1.dispatch进行类封装
2.回话通道的数据流管理进行类封装
3.dispatch消息结构优化
4.dispatch处为单线程,直接传递I/O给线程获取数据流还是获取数据流完成后给线程,如何实现两个线程同步
5.底层I/O复用使用epoll边沿优化
6.业务buff处理优化,进行消息类型划分,进入不同业务处理
由于实现完整服务器代码以最简单形式实现,上述优化在实际商用代码中还需要大量封装优化考虑。
相关文章推荐
- ACE - Reactor源码总结整理
- 使用react context实现一个支持组件组合和嵌套的React Tab组件
- React实现限制checkBox的点击个数
- React Native 开发之 (02) 用Sublime 3作为React Native的开发IDE
- React Native 开发之 (01) 配置开发环境
- 解决react native使用fetch函数在ios9报network request failed的问题
- win7搭建React Native开发环境
- React Native入门教程 1 -- 开发环境搭建
- React Native入门教程 1 -- 开发环境搭建
- React-Native学习指南
- React Native NavigationExperimental
- ReactJs设置css样式
- netty初析
- Reactor模式详解
- 用 React.js 写一个最简单的 To-do List 应用
- ReactiveCocoa框架菜鸟入门(五)——信号的FlattenMap与Map
- react-native-android之初次相识
- react-native-android之初次相识
- react-native-android之初次相识
- ReactJs入门教程