您的位置:首页 > 其它

游戏服务器之ace的反应器应用思路

2014-03-09 21:07 204 查看
ace处理事件是以在反应器中注册句柄的方式来处理的。封装句柄成会话,句柄就是socket的封装。反应器有事件来就会调用处理会话的接口handle_input,不同的会话就会调用不同的接口。

使用线程池来作为连接的网络收发和逻辑模块处理。定时器定时调用。

本文目录:

1、开始反应器循环

注册监听句柄,开始反应器循环,开始处理接收连接

2、注册接收数据句柄

接收数据句柄是接收数据会话。监听句柄处理监听事件,来接收socket和创建连接会话

3、 ace的线程管理

(1)ace的线程

处理消息循环,消息发送,消息派送,消息转移

(2)线程池管理

线程池管理的初始化(场景服务器),初始化各个连接线程和逻辑线程

 4、消息转移和发送

(1)消息转移

不同类型会话之间的消息转移是通过线程的队列

(2)消息发送
发送消息到客户端,(在场景服务器是在消息发送线程处理CPlayerHandleManager)

5、主动连接

(1)主动连接基类

封装会话的主动、被动socket、地址、缓冲区等

(2)主动连接类的处理事件

主动连接对象处理网络事件

(3)服务器连接

各类服务器之间的连接

6、注册定时器

本文内容:

1、开始反应器循环

注册监听句柄,开始反应器循环,开始处理接收连接。

句柄管理源码参考:http://www.cppblog.com/sandy/archive/2006/02/23/3451.html

监听句柄

class CAcceptHandle : public ACE_Event_Handler

{

public:
CAcceptHandle(ACE_Addr &addr);
int open(ACE_Addr &addr);
int handle_input(ACE_HANDLE handle);
int handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mase);
ACE_HANDLE get_handle(void) const;

private:
~CAcceptHandle();

private:
ACE_SOCK_Acceptor m_peer_acceptor;//接收连接socket封装对象

};

 在主线程的主函数里:

生成epoll反应器对象

ACE_Dev_Poll_Reactor* acePoll = new ACE_Dev_Poll_Reactor(CONNUM);

ACE_Reactor::instance(new ACE_Reactor(acePoll));

//服务器监听端口

ACE_INET_Addr addr(tempPort);

CAcceptHandle *acceptHandle = new CAcceptHandle(addr);

//注册网络连接接受事件

ACE_Reactor::instance()->register_handler(acceptHandle,ACE_Event_Handler::ACCEPT_MASK);

//注册标准输入事件

ACE_Event_Handler::register_stdin_handler(acceptHandle,ACE_Reactor::instance(),ACE_Thread_Manager::instance());

//初始化定时器

CRWTimer* timer = new CRWTimer();

timer->init_timer();

ACE_Reactor::instance()->restart(true);

ACE_Reactor::instance()->run_event_loop();

//等待所有到线程结束

ACE_Thread_Manager::instance()->wait();

//删除定时器

timer->destroy_timer();

if(NULL != acceptHandle)

{
//delete acceptHandle;
printf("--------------- i am quit!!!! ------------\n\n\n");

}

...

}

2、注册接收数据句柄

 接收数据句柄是接收数据会话。监听句柄处理监听事件,来接收socket和创建连接会话

class CInputHandle : public ACE_Event_Handler

{

public:
CInputHandle();
void dispatch(CMyMessagePacket& msg);
int handle_input(ACE_HANDLE handle);
void detach_packet(char* ptr, int count);
void dispatch_detach_packet(char* ptr, int count);
int handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mase);
ACE_HANDLE get_handle() const;
ACE_SOCK_Stream& peer_stream();

...

private:
~CInputHandle();

private:
int m_number;
ACE_SOCK_Stream m_peer_stream;

char m_join_packet[2048];
short m_join_packet_size;
short m_total_len;

};

int CAcceptHandle::handle_input(ACE_HANDLE handle)

{
if(ACE_STDIN == handle)
{
char tempBuf[128] = {0};

ACE_OS::memset(tempBuf,128,0);
printf("please input %s",tempBuf);
scanf("%s",tempBuf);
std::string str(tempBuf);
std::string str2("quit");

 
if(0 == ACE_OS::strcmp(str.c_str(),str2.c_str()))
{
printf(" ========= ");
ACE_Reactor::instance()->end_event_loop();
close_task = 1;
}
}
else
{
CInputHandle* inputHandle = new CInputHandle();
int result = this->m_peer_acceptor.accept(inputHandle->peer_stream(),0,0,1);
if(-1 == result)
{
ACE_DEBUG((LM_ERROR,"connect error maybe no socket handle!!\n"));
}

ACE_DEBUG((LM_ERROR,"connect established!!\n"));

ACE_Reactor::instance()->register_handler(inputHandle,ACE_Event_Handler::READ_MASK);
}

return 0;

}

3、 ace的线程管理

(1)ace的线程

处理消息循环,消息发送,消息派送,消息转移

详细可参考:http://blog.csdn.net/chenjiayi_yun/article/details/20846041

class CTaskBase : public ACE_Task<ACE_MT_SYNCH>

{

public:
CTaskBase();
~CTaskBase();
virtual int open(void*);
virtual int close(u_long);
virtual void dispatch(CMyMessagePacket& msg);
virtual int svc(void);

void push_block(CMessageBlock* msgBlock);
CMessageBlock* pop_block();
//发送消息到客户端
int send_to_client(CMyMessagePacket& msg);
//回收消息块
void revoke_msg_block(CMyMessagePacket& msg);
//路由出去的消息需要设置回收标志
void transfer_msg_block(CMyMessagePacket& msg);

private:
CMsgBlockQueue m_msgblk_queue;

};

//线程循环

int CTaskBase::svc(void)

{
CMessageBlock* msgBlock = NULL;

while(1)
{
if(1 == close_task)
{
break;
}
msgBlock = pop_block();

if(NULL == msgBlock)
{
usleep(100);
}

if(NULL != msgBlock)
{
CMyMessagePacket packet(msgBlock);
dispatch(packet);//消息派送
revoke_msg_block(packet);
}
}

return 0;

}

(2)线程池管理

线程池管理的初始化(场景服务器),初始化各个连接线程和逻辑线程

void CTaskManager::init_task()

{
m_msg_process_center = new CMsgProcessCenter();//逻辑线程
m_msg_process_center->open(0);

m_player_handle_manager = new CPlayerHandleManager();//发送消息线程(到客户端、到全局服务器)
m_player_handle_manager->open(0);

m_chat_center = new CChatCenter();//连接聊天服务器线程
m_chat_center->open(0);

m_gateserver_process = new CGateServerProcess();//连接网关服务器线程(发送消息到网关,保持心跳)
m_gateserver_process->open(0);

m_globalserver_process = new CGlobalServerProcess();//连接全局服务器线程
m_globalserver_process->open(0);

m_statisticsserver_process = new CStatisticsServerProcess();//连接统计服务器线程
m_statisticsserver_process->open(0);

m_DBserver_process = new CDBServerProcess();//连接db服务器线程
m_DBserver_process->open(0);

m_logserver_process = new CLogServerProcess();//连接日志服务器线程
m_logserver_process->open(0);

}

 

接收线程继承 CTaskBase

class  CMsgProcessCenter: public CTaskBase

 线程初始化

int CMsgProcessCenter::open(void*)

{
CTaskBase::open(0);
return 0;

}

 4、消息转移和发送

(1)消息转移

不同类型会话之间的消息转移是通过线程的队列

void CInputHandle::send_to_handle_manager(CMyMessagePacket& msgPacket)

{
msgPacket.set_hid(get_handle());
CTaskManager::instance()->get_handle_manager_task()->push_block(msgPacket.get_msg_block());
transfer_msg_block(msgPacket);

}

void CInputHandle::send_to_process_center(CMyMessagePacket& msgPacket)

{
msgPacket.set_hid(get_handle());
CTaskManager::instance()->get_msg_process_center_task()->push_block(msgPacket.get_msg_block());
transfer_msg_block(msgPacket);

}

 

//路由出去的消息需要设置回收标志

void CTaskBase::transfer_msg_block(CMyMessagePacket& msg)

{
if(MBS_ALLOC != msg.get_revoke_flag())
{
if(MSB_TANSFER == msg.get_revoke_flag())
{
cout << "you have more CInputHandle::set_revoke_msg_block !!!!!" << endl;
exit(0);
}
return ;
}
msg.set_revoke_flag(MSB_TANSFER);

}

(2)消息发送

发送消息到客户端,(在场景服务器是在消息发送线程处理CPlayerHandleManager)

int  CTaskBase::send_to_client(CMyMessagePacket& msg)

{
cout<<"send_to_client_msgid : "<<msg.get_msg_id()<<endl;

#ifdef ENCRYPT
CEncrypt::instance()->encode_msg(msg.get_external_buf(),msg.get_external_buf_len());

#endif
return ACE_OS::send(msg.get_hid(),msg.get_external_buf(),msg.get_external_buf_len());

}

(3)消息回收

回收消息模块到消息模块内存池

void CTaskBase::revoke_msg_block(CMyMessagePacket& msg)

{
if(MBS_ALLOC != msg.get_revoke_flag())
{
if(MSB_REVOKE == msg.get_revoke_flag())
{
cout << "you have more CInputHandle::revoke_msg_block !! status is : " << msg.get_revoke_flag() << endl;
exit(0);
}
return ;
}
CMessageBlockPool::instance()->push_block(msg.get_msg_block());
msg.set_revoke_flag(MSB_REVOKE);

}

 

5、主动连接 

主动连接的会话

(1)主动连接基类

封装会话的主动、被动socket、地址、缓冲区等

class CConnectBase : public ACE_Event_Handler

{

public:
CConnectBase();
virtual ~CConnectBase();
//CConnectHandle(const char* hostName,int nPort);
virtual bool connect_server(const char* hostName,int nPort);
virtual bool reconnect_server();
virtual int handle_input(ACE_HANDLE handle);
virtual int handle_output(ACE_HANDLE handle);
virtual void dispatch(CMyMessagePacket& msg);
virtual void detach_packet(char* ptr, int count);
virtual void dispatch_detach_packet(char* ptr, int count);
virtual int handle_input_network();
virtual int handle_input_stdin();
virtual int handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mase);
ACE_HANDLE get_handle(void) const;
void send_to_handle_manager(CMyMessagePacket& msg);
...
void send_msg(CMyMessagePacket& msg);

//回收消息块
void revoke_msg_block(CMyMessagePacket& msg);

 ...

//心跳消息
void heart_beat_msg();

private:
ACE_SOCK_Connector m_connector;//连接封装对象
ACE_INET_Addr m_remote_addr;//地址封装对象
ACE_SOCK_Stream m_stream;//socket封装对象
bool m_is_connected;
string m_host_name;
int m_port;
char m_join_packet[2048];//缓冲区
short m_join_packet_size;
short m_total_len;

};

bool CConnectBase::connect_server(const char* hostName,int nPort)

{
m_remote_addr.set(nPort,hostName);
m_host_name.assign(hostName);
m_port = nPort;
if(-1 == m_connector.connect(m_stream,m_remote_addr,0,ACE_Addr::sap_any,1))
{
return false;
}

 ......
return true;

}

bool CConnectBase::reconnect_server()

{
m_remote_addr.set(m_host_name.c_str(),m_port);
if(-1 == m_connector.connect(m_stream,m_remote_addr,0,ACE_Addr::sap_any,1))
{
std::cout << "connect error !!!! " << m_host_name.c_str() << m_port << std::endl;
return false;
}
ACE_Reactor::instance()->register_handler(this,ACE_Event_Handler::READ_MASK);
set_connect_status(true);
return true;

}

(2)主动连接类的处理事件

主动连接对象处理网络事件

int CConnectBase::handle_input(ACE_HANDLE handle)

{
int result = 0;

......
result = handle_input_network();//处理网络事件
}

return result;

}

int CConnectBase::handle_input_network()

{
int result = 0;

//这里的内存需要改成线程成员
CMessageBlock* msgBlock = CMessageBlockPool::instance()->pop_block(2048);//获取消息块

if(NULL == msgBlock)
{
return 0;
}
char* temPtr = NULL;

temPtr = msgBlock->get_buf();
int len = msgBlock->get_buf_len() - 1;

int count = m_stream.recv(temPtr,len);

#ifdef ENCRYPT
CEncrypt::instance()->decode_msg(temPtr,count);

#endif
int msgLen = count;

msgBlock->set_msg_len(msgLen);

CMyMessagePacket packet(msgBlock);
if(count > 0)
{
if(count != (int)packet.get_i_msg_len())
{
//连包和断包处理
detach_packet(temPtr,count);
result = 0;
}
else
{
dispatch(packet);//接收数据后派送消息
result = 0;
}
revoke_msg_block(packet);//回收消息块
}
else
{
revoke_msg_block(packet);
std::cout << "CConnectHandle error !" << std::endl;
result = -1;
set_connect_status(false);
}
return result;

}

(3)服务器连接

各类服务器之间的连接

网关连接

class CConnectGate : public CConnectBase

{

public:
CConnectGate();
virtual ~CConnectGate();
virtual void dispatch(CMyMessagePacket& msg);
......
//发送服务器信息给帐号服务器
void send_server_info();

};

6、注册定时器

CRWTimer::CRWTimer() : m_timer_1(1),m_timer_2(2),m_timer_3(3),m_timer_4(4),m_flush_time("")

{

}

CRWTimer::~CRWTimer()

{
destroy_timer();

}

设定定时器的时间,开始时间和间隔时间

void CRWTimer::init_timer()

{
ACE_Reactor::instance()->schedule_timer(this,(const void*)m_timer_1,ACE_Time_Value(30),ACE_Time_Value(5));

......

}

void CRWTimer::destroy_timer()

{
ACE_Reactor::instance()->cancel_timer(m_timer_1);

......

}

int CRWTimer::handle_timeout(const ACE_Time_Value &tv, const void* arg)

{
long count = long(arg);
if((int)count == m_timer_1)
{
check_house_status();
check_activity();
}
else if((int)count == m_timer_2)
{
check_server_status();
check_xml_update();//配置检查刷新
}

......

return 0;

}

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: