您的位置:首页 > Web前端 > React

ACE - Reactor实现I/O,Dispatch,Service三层完整服务器(完结)

2016-07-01 10:23 435 查看

框架描述

服务器层次:

/*-----------------------------------------------------------------
*  filename:    Reactor.cpp
*  author:        bing
*  time:        2016-06-29 15:26
*  function:    using ACE Reactor implement I/O multiplex server,
*               include service thread pool.
*-----------------------------------------------------------------*/
#include <ace/INET_Addr.h>
#include <ace/SOCK_Acceptor.h>
#include <ace/SOCK_Stream.h>
#include <ace/Reactor.h>
#include <ace/Log_Msg.h>
#include "ace/Task.h"
#include "ace/OS.h"
#include <list>

#define MAX_BUFF_SIZE     1024
#define LISTEN_PORT     5010
#define SERVER_IP        ACE_LOCALHOST
#define THREAD_NUM        10

struct MsgData
{
ACE_HANDLE* IOHandle;
int DataFlag;
char Data[MAX_BUFF_SIZE];
MsgData()
{
IOHandle = NULL;
DataFlag = -1;
ACE_OS::memset(Data, 0, sizeof(Data));
}
};

class TaskThread;

class ServerStream : public ACE_Event_Handler
{
public:
ServerStream(TaskThread* pMsgQueue);
~ServerStream();
ACE_SOCK_Stream& GetStream(){return m_Svr_stream;}      //给accept提供接口绑定数据通道
virtual int handle_input(ACE_HANDLE fd);        //I/O触发事件后调用
void close();
virtual ACE_HANDLE get_handle(void) const {return m_Svr_stream.get_handle();}   //不重载需要手动将handle传入ACE_Reactor
private:
ACE_SOCK_Stream m_Svr_stream;
TaskThread* m_MsgQueue;
};

std::list<ServerStream*> g_StreamPool;  //stream pool

class TaskThread: public ACE_Task<ACE_MT_SYNCH>
{
public:
virtual int svc(void)
{
ACE_Message_Block *Msg;// = new ACE_Message_Block();
while(1)
{
getq(Msg);        //空闲线程阻塞

ACE_Data_Block *Data_Block = Msg->data_block();
MsgData *pData = reinterpret_cast <MsgData*>(Data_Block->base());
if (0 == pData->DataFlag)
{
std::list<ServerStream*>::iterator it;
for (it = g_StreamPool.begin();it != g_StreamPool.end();++it)
{
if (get_handle() == (*it)->get_handle())
{
g_StreamPool.erase(it);
delete *it;
break;
}
}
return 0;
}
char strBuffer[MAX_BUFF_SIZE];
ACE_OS::memset(strBuffer, 0, sizeof(strBuffer));
ACE_OS::memcpy(strBuffer, pData->Data, sizeof(strBuffer));
/*
这里接口业务代码分发数据
*/
ACE_DEBUG((LM_INFO,"[time:%d]recevie msg:%s\n",(int)ACE_OS::time(),strBuffer));
//ACE_SOCK_Stream Stream(*(pData->IOHandle));
//Stream.send("server recive data!\n",sizeof("server recive data!"));  //响应client数据
//ACE_OS::sleep(1);        //模拟业务耗时
Msg->release();         //release,inclue data_block
//ACE_DEBUG((LM_INFO,"thread end queue count:%d\n",msg_queue_->message_count()));
}
return 0;
}
};
typedef ACE_Singleton<TaskThread, ACE_Thread_Mutex> TaskThreadPool;

ServerStream::ServerStream(TaskThread* pMsgQueue)
{
m_MsgQueue = pMsgQueue;
}

ServerStream::~ServerStream()
{
close();
}

/*------------------------------------------------------
*    IO上报流数据,使用select复用上报,这里单线程处理
*   原来考虑直接把IO插队列给线程池处理,但是线程池和
*   这里是异步操作,线程没有处理队列这条消息ACE底层会
*   一直上报这个IO插消息队列,暂时在这里做单线程revc
*   考虑epoll边沿触发,一次上报处理
*------------------------------------------------------*/
int ServerStream::handle_input(ACE_HANDLE fd)
{
MsgData Message;
char strBuffer[MAX_BUFF_SIZE];
Message.DataFlag = m_Svr_stream.recv(strBuffer,MAX_BUFF_SIZE); //获取数据回select响应避免反复通知
if (-1 == Message.DataFlag)
{
ACE_DEBUG((LM_INFO, ACE_TEXT("recive data error!\n")));
return -1;
}
else if(0 == Message.DataFlag)
{
close();
ACE_DEBUG((LM_INFO, ACE_TEXT("client closed!\n")));
}
ACE_Data_Block *Data_Block = new ACE_Data_Block; //线程做释放
ACE_HANDLE Cli_IO = get_handle();

Message.IOHandle = &Cli_IO;
ACE_OS::memcpy(Message.Data,strBuffer,sizeof(strBuffer));//传的data可带length信息来适配消息大小

char *p = reinterpret_cast <char*>(&Message);
Data_Block->base(p,sizeof(Message));
ACE_Message_Block* msg = new ACE_Message_Block(Data_Block);
m_MsgQueue->putq(msg);    //put
//Data_Block->release();
return 0;
}

void ServerStream::close()
{
m_Svr_stream.close();
ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);
}

class ServerAcceptor : public ACE_Event_Handler
{
public:
ServerAcceptor(int port,char* ip);
~ServerAcceptor();
bool open();
virtual int handle_input(ACE_HANDLE fd);  //有client连接
void close();
virtual ACE_HANDLE get_handle(void) const {return m_Svr_aceept.get_handle();}
private:
ACE_INET_Addr m_Svr_addr;
ACE_SOCK_Acceptor m_Svr_aceept;
};

ServerAcceptor::ServerAcceptor(int port,char* ip):m_Svr_addr(port,ip)
{
if (!open())    //open listen port
{
ACE_DEBUG((LM_INFO, ACE_TEXT("open failed!\n")));
}
else
{
ACE_DEBUG((LM_INFO, ACE_TEXT("open success!\n")));
TaskThreadPool::instance()->activate(THR_NEW_LWP | THR_JOINABLE |THR_INHERIT_SCHED , THREAD_NUM);//创建10个线程处理业务
}
}

ServerAcceptor::~ServerAcceptor()
{
close();
std::list<ServerStream*>::iterator it;
for (it = g_StreamPool.begin();it != g_StreamPool.end();++it)
{
if (NULL != (*it))
{
(*it)->close();
delete (*it);
}
}
}

bool ServerAcceptor::open()
{
if (-1 == m_Svr_aceept.open(m_Svr_addr,1))
{
ACE_DEBUG((LM_ERROR,ACE_TEXT("failed to accept\n")));
m_Svr_aceept.close();
return false;
}
return true;
}

int ServerAcceptor::handle_input(ACE_HANDLE fd )
{
ServerStream *stream = new ServerStream(TaskThreadPool::instance());    //产生新通道
if (NULL != stream)
{
g_StreamPool.push_back(stream);//暂时存储全局变量用于内存管理,优化可增加一个连接管理类管理连接通道
}
if (m_Svr_aceept.accept(stream->GetStream()) == -1)  //绑定通道
{
printf("accept client fail\n");
return -1;
}
ACE_Reactor::instance()->register_handler(stream,ACE_Event_Handler::READ_MASK);  //通道注册到ACE_Reactor
ACE_DEBUG((LM_INFO,"User connect success!,ClientPool num = %d\n",g_StreamPool.size()));
return 0;
}

void ServerAcceptor::close()
{
ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::ACCEPT_MASK);
m_Svr_aceept.close();
}

int ACE_TMAIN()
{
ServerAcceptor server(LISTEN_PORT,(char *)SERVER_IP);
ACE_Reactor::instance()->register_handler(&server,ACE_Event_Handler::ACCEPT_MASK);    //listen port注册到ACE_Reactor

ACE_Reactor::instance()->run_reactor_event_loop();  //进入消息循环,有I/O事件回调handle_input
return 0;
}


View Code
代码实现了最简单的完整并发服务器,有部分还值得思考和优化:

1.dispatch进行类封装

2.回话通道的数据流管理进行类封装

3.dispatch消息结构优化

4.dispatch处为单线程,直接传递I/O给线程获取数据流还是获取数据流完成后给线程,如何实现两个线程同步

5.底层I/O复用使用epoll边沿优化

6.业务buff处理优化,进行消息类型划分,进入不同业务处理

由于实现完整服务器代码以最简单形式实现,上述优化在实际商用代码中还需要大量封装优化考虑。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: