您的位置:首页 > Web前端 > React

ACE-reactor设计模式和代码示例

2018-02-27 08:39 483 查看

1 reactor设计模式

1.1 意图

Reactor,即反应堆。Reactor 的一般工作过程是首先在 Reactor 中注册(Reactor)感兴趣事件,并在注册时候指定个已定义的回调函数(callback);当客户端发送请求时,在
Reactor 中会触发刚才注册的事件,并调用对应的处理函数。在这一个处理回调函数中,一般会有数据接收、处理、回复请求等操作。将多个客户的请求分离和调度给应用程序。

1.2 结构图:



结构图中各部分组成:

handle事件描述符,socket;文件描述符;定时器等 (可以处理io 信号量 定时器事件)
Event-handle:具体的事件处理器    (事件抽象出来统一处理 facade模式,依据事件类型定义其优先级和相应的事件处理的方式。不同的事件使用不同的event handle,桥接模式处理)
同步事件分离器:demultiplexer,用现成的epoll、select实现。其中notify用于唤醒被阻塞的系统调用
reactor管理器:用于事件的注册,删除,调度。使用单例模式实现,全局唯一。

1.3 业务流程及时序图:

应用启动,将关注的事件handle注册到Reactor中;
调用Reactor,进入无限事件循环,等待注册的事件到来;
事件到来,select返回,Reactor将事件分发到之前注册的回调函数中处理



2  代码示例

2.1 服务端:

mReactor.h

#ifndef _MREACTOR_H_
#define _MREACTOR_H_

#include<iostream>
#include<string>
#include<errno.h>

#include <ace/Reactor.h>
#include <ace/SOCK_Connector.h>
#include <ace/SOCK_Acceptor.h>
#include <ace/Auto_Ptr.h>

#include "logger.h"//使用log4cplus
#include "strategy.h" //需要自己实现的方法类

/***
#include "common.h"
#include "strategy.h"
***/

//todo change MAX_BUF_LENGTH
#define MAX_BUF_LENGTH 1000
using namespace std;

class ClientService : public ACE_Event_Handler
{
public:
ClientService(ACE_SOCK_Stream new_stream,string cli_host,int cli_port)
:sock_(new_stream),
m_ip(cli_host),
m_port(cli_port)
{
}

ACE_SOCK_Stream& peer (void) { return this->sock_; }
void setStrategy(ProcStrategy* pStrategy) {m_pStrategy = pStrategy;}
int open (void)
{
//注册读就绪回调函数
return this->reactor ()->register_handler(this, ACE_Event_Handler::READ_MASK);
}

virtual ACE_HANDLE get_handle (void) const { return this->sock_.get_handle (); }

virtual int handle_input (ACE_HANDLE fd);

int sendRespond(string sendbuf);

// 释放相应资源
virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask mask);
protected:
char buf[MAX_BUF_LENGTH];
ACE_SOCK_Stream sock_;
ProcStrategy *m_pStrategy;
string m_ip;
int m_port;
};

class ClientAcceptor : public ACE_Event_Handler
{
public:
virtual ~ClientAcceptor(){this->handle_close (ACE_INVALID_HANDLE, 0);}

void setStrategy(ProcStrategy* pStrategy) {m_pStrategy = pStrategy;}

int open (const ACE_INET_Addr &listen_addr);

virtual ACE_HANDLE get_handle(void) const   { return this->acceptor_.get_handle (); }

virtual int handle_input(ACE_HANDLE fd );

virtual int handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask);

protected:
ACE_SOCK_Acceptor acceptor_;
ACE_INET_Addr listen_addr_;
ProcStrategy *m_pStrategy;   //需要自己实现的方法类 用于消息处理
};

#endif

mReactor.cpp

#include "mReactor.h"

int ClientService::sendRespond(string sendbuf)
{
int len = peer().send(sendbuf.c_str(),sendbuf.length());
LOG4CPLUS_INFO(g_logger, "send msg length:"<<len);
if(len == -1)
{
LOG4CPLUS_INFO(g_logger, "client ip:"<<m_ip<<" port:"<<m_port<<" send fail,errno:"<<errno);
return -1;
}
LOG4CPLUS_INFO(g_logger, "send msg:"<<sendbuf<<" to client,client ip:"<<m_ip<<" port:"<<m_port);
return 0;
}

int ClientService::handle_input (ACE_HANDLE fd)
{
int rev = peer().recv(buf,sizeof(buf));
if(rev<=0)
{
LOG4CPLUS_INFO(g_logger, "client recv fail errno:"<<errno<<" rev:"<<rev);
return -1;
}
//process request
string strReq(buf,rev);
LOG4CPLUS_INFO(g_logger, "client ip:"<<m_ip<<" port:"<<m_port<<" recv:"<<strReq);
string getRsp;
int ret = m_pStrategy->processReq(strReq,cmdType,getRsp);

//send response to client
string sendbuf;
m_pStrategy->processRsp(ret,cmdType,getRsp,sendbuf);
sendRespond(sendbuf);
return 0;
}

// 释放相应资源
int ClientService::handle_close(ACE_HANDLE, ACE_Reactor_Mask mask)
{
if (mask == ACE_Event_Handler::WRITE_MASK)
return 0;
LOG4CPLUS_INFO(g_logger, "client ip:"<<m_ip<<" port:"<<m_port<<" closing");
mask = ACE_Event_Handler::ALL_EVENTS_MASK |
ACE_Event_Handler::DONT_CALL;
this->reactor ()->remove_handler (this, mask);
this->sock_.close ();
delete this;    //socket出错时,将自动删除该客户端,释放相应资源
return 0;
}

int ClientAcceptor::open (const ACE_INET_Addr &listen_addr)
{
listen_addr_ = listen_addr;
//if (this->acceptor_.open (listen_addr, 1) == -1)
if (this->acceptor_.open (listen_addr) == -1)
{
LOG4CPLUS_ERROR(g_logger, "open port fail,port:"<<listen_addr.get_port_number());
return -1;
}
//注册接受连接回调事件
return this->reactor ()->register_handler(this, ACE_Event_Handler::ACCEPT_MASK);
}

int ClientAcceptor::handle_input (ACE_HANDLE fd )
{
ACE_SOCK_Stream new_stream;
ACE_INET_Addr cli_addr;
string cli_host;
int cli_port = -1;
if (this->acceptor_.accept (new_stream,&cli_addr) == -1)
{
LOG4CPLUS_ERROR(g_logger, "accept client fail,client ip:"<<cli_host<<" port:"<<cli_port);
return -1;
}
cli_host = cli_addr.get_host_addr();
cli_port = cli_addr.get_port_number();
ClientService *client = new ClientService(new_stream,cli_host,cli_port);
client-
9fd7
>setStrategy(m_pStrategy);
client->reactor (this->reactor ());
if (client->open () == -1)
{
LOG4CPLUS_ERROR(g_logger, "client open fail");
client->handle_close (ACE_INVALID_HANDLE, 0);
}
return 0;
}

int ClientAcceptor::handle_close (ACE_HANDLE handle,ACE_Reactor_Mask close_mask)
{
if (this->acceptor_.get_handle () != ACE_INVALID_HANDLE)
{
LOG4CPLUS_INFO(g_logger, "closing acceptor");
ACE_Reactor_Mask m = ACE_Event_Handler::ACCEPT_MASK |
ACE_Event_Handler::DONT_CALL;
this->reactor ()->remove_handler (this, m);
this->acceptor_.close ();
}
return 0;
}

main:

#include "mReactor.h"
#include "strategy.h"

int main()
{
//strategy
ProcStrategy *strategy = new Strategy();
strategy->setRedis(pRedis);

//ClientAcceptor
string t=m_ip+":"+m_port;
ACE_INET_Addr addr(t.c_str());
ClientAcceptor server;
server.setStrategy(strategy);
server.reactor(ACE_Reactor::instance());
LOG4CPLUS_INFO(g_logger, "listening addr:"<<t);

ret = server.open(addr);

while(true)
{
ACE_Reactor::instance()->handle_events();
}

}

说明:懒,这是写好的代码里面kou出来的,想跑起来 还得改改


2.2 client端

#include <string>
#include <iostream>
#include <stdio.h>
#include <stdlib.h>

#include <ace/SOCK_Stream.h>
#include <ace/SOCK_Connector.h>
#include <ace/INET_Addr.h>
#include <ace/Time_Value.h>

using namespace std;
#define TYPE32 int
#define TYPE64 long

#define PERNUMS 2000

void processReq()
{
}

void processRsp(string strresp)
{
}

void* func(void* start)
{
int startnum =(TYPE64)(start);
cout<<startnum<<endl;

ACE_INET_Addr addr(3000,"127.0.0.1");
ACE_SOCK_Connector connector;
ACE_Time_Value timeout(10,0);
ACE_SOCK_Stream peer;

if (connector.connect(peer,addr,&timeout)!=0)
{
cout<<"connection failed!"<<endl;
}

for(int i=startnum+1;i<=startnum+PERNUMS;i++)
{
char buff[1024];
string cmd="SET";
char temp[10];
sprintf(temp,"%d",i);
string key = temp;
string value = temp;
int extime =-1;
string output;
processReq(**);
int s = peer.send(output.c_str(),output.length());
if(s<=0)
{
cout<<"send fail,key"<<key<<endl;
continue;
}
int bc=peer.recv(buff,1024,&timeout);
if(bc<=0)
{
cout<<"recv fail,key"<<key<<endl;
continue;
}
string strresp(buff,bc);
processRsp(strresp);
}
peer.close();
}

int main()
{
pthread_t tid[4];
for(int i=0;i<4;i++)
{
int startnum = 10000*i+1000*i;
int ret = pthread_create(&tid[i], NULL,func,(void*)startnum);
}
for(int i=0;i<4;i++)
{
int retnum = pthread_join(tid[i], NULL);
}
return 0;
}

2.3代码使用方法:

linux下 ace安装使用

https://www.cnblogs.com/liangxiaxu/archive/2013/03/07/2948417.html
windows下http://blog.csdn.net/qq_34233232/article/details/52595211

参考现有的资料ctrl c v写出来的,算原创么
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息