ACE-reactor设计模式和代码示例
2018-02-27 08:39
483 查看
1 reactor设计模式
1.1 意图
Reactor,即反应堆。Reactor 的一般工作过程是首先在 Reactor 中注册(Reactor)感兴趣事件,并在注册时候指定个已定义的回调函数(callback);当客户端发送请求时,在Reactor 中会触发刚才注册的事件,并调用对应的处理函数。在这一个处理回调函数中,一般会有数据接收、处理、回复请求等操作。将多个客户的请求分离和调度给应用程序。
1.2 结构图:
结构图中各部分组成:
handle事件描述符,socket;文件描述符;定时器等 (可以处理io 信号量 定时器事件)
Event-handle:具体的事件处理器 (事件抽象出来统一处理 facade模式,依据事件类型定义其优先级和相应的事件处理的方式。不同的事件使用不同的event handle,桥接模式处理)
同步事件分离器:demultiplexer,用现成的epoll、select实现。其中notify用于唤醒被阻塞的系统调用
reactor管理器:用于事件的注册,删除,调度。使用单例模式实现,全局唯一。
1.3 业务流程及时序图:
应用启动,将关注的事件handle注册到Reactor中;调用Reactor,进入无限事件循环,等待注册的事件到来;
事件到来,select返回,Reactor将事件分发到之前注册的回调函数中处理
2 代码示例
2.1 服务端:mReactor.h
#ifndef _MREACTOR_H_ #define _MREACTOR_H_ #include<iostream> #include<string> #include<errno.h> #include <ace/Reactor.h> #include <ace/SOCK_Connector.h> #include <ace/SOCK_Acceptor.h> #include <ace/Auto_Ptr.h> #include "logger.h"//使用log4cplus #include "strategy.h" //需要自己实现的方法类 /*** #include "common.h" #include "strategy.h" ***/ //todo change MAX_BUF_LENGTH #define MAX_BUF_LENGTH 1000 using namespace std; class ClientService : public ACE_Event_Handler { public: ClientService(ACE_SOCK_Stream new_stream,string cli_host,int cli_port) :sock_(new_stream), m_ip(cli_host), m_port(cli_port) { } ACE_SOCK_Stream& peer (void) { return this->sock_; } void setStrategy(ProcStrategy* pStrategy) {m_pStrategy = pStrategy;} int open (void) { //注册读就绪回调函数 return this->reactor ()->register_handler(this, ACE_Event_Handler::READ_MASK); } virtual ACE_HANDLE get_handle (void) const { return this->sock_.get_handle (); } virtual int handle_input (ACE_HANDLE fd); int sendRespond(string sendbuf); // 释放相应资源 virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask mask); protected: char buf[MAX_BUF_LENGTH]; ACE_SOCK_Stream sock_; ProcStrategy *m_pStrategy; string m_ip; int m_port; }; class ClientAcceptor : public ACE_Event_Handler { public: virtual ~ClientAcceptor(){this->handle_close (ACE_INVALID_HANDLE, 0);} void setStrategy(ProcStrategy* pStrategy) {m_pStrategy = pStrategy;} int open (const ACE_INET_Addr &listen_addr); virtual ACE_HANDLE get_handle(void) const { return this->acceptor_.get_handle (); } virtual int handle_input(ACE_HANDLE fd ); virtual int handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask); protected: ACE_SOCK_Acceptor acceptor_; ACE_INET_Addr listen_addr_; ProcStrategy *m_pStrategy; //需要自己实现的方法类 用于消息处理 }; #endif
mReactor.cpp
#include "mReactor.h" int ClientService::sendRespond(string sendbuf) { int len = peer().send(sendbuf.c_str(),sendbuf.length()); LOG4CPLUS_INFO(g_logger, "send msg length:"<<len); if(len == -1) { LOG4CPLUS_INFO(g_logger, "client ip:"<<m_ip<<" port:"<<m_port<<" send fail,errno:"<<errno); return -1; } LOG4CPLUS_INFO(g_logger, "send msg:"<<sendbuf<<" to client,client ip:"<<m_ip<<" port:"<<m_port); return 0; } int ClientService::handle_input (ACE_HANDLE fd) { int rev = peer().recv(buf,sizeof(buf)); if(rev<=0) { LOG4CPLUS_INFO(g_logger, "client recv fail errno:"<<errno<<" rev:"<<rev); return -1; } //process request string strReq(buf,rev); LOG4CPLUS_INFO(g_logger, "client ip:"<<m_ip<<" port:"<<m_port<<" recv:"<<strReq); string getRsp; int ret = m_pStrategy->processReq(strReq,cmdType,getRsp); //send response to client string sendbuf; m_pStrategy->processRsp(ret,cmdType,getRsp,sendbuf); sendRespond(sendbuf); return 0; } // 释放相应资源 int ClientService::handle_close(ACE_HANDLE, ACE_Reactor_Mask mask) { if (mask == ACE_Event_Handler::WRITE_MASK) return 0; LOG4CPLUS_INFO(g_logger, "client ip:"<<m_ip<<" port:"<<m_port<<" closing"); mask = ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL; this->reactor ()->remove_handler (this, mask); this->sock_.close (); delete this; //socket出错时,将自动删除该客户端,释放相应资源 return 0; } int ClientAcceptor::open (const ACE_INET_Addr &listen_addr) { listen_addr_ = listen_addr; //if (this->acceptor_.open (listen_addr, 1) == -1) if (this->acceptor_.open (listen_addr) == -1) { LOG4CPLUS_ERROR(g_logger, "open port fail,port:"<<listen_addr.get_port_number()); return -1; } //注册接受连接回调事件 return this->reactor ()->register_handler(this, ACE_Event_Handler::ACCEPT_MASK); } int ClientAcceptor::handle_input (ACE_HANDLE fd ) { ACE_SOCK_Stream new_stream; ACE_INET_Addr cli_addr; string cli_host; int cli_port = -1; if (this->acceptor_.accept (new_stream,&cli_addr) == -1) { LOG4CPLUS_ERROR(g_logger, "accept client fail,client ip:"<<cli_host<<" port:"<<cli_port); return -1; } cli_host = cli_addr.get_host_addr(); cli_port = cli_addr.get_port_number(); ClientService *client = new ClientService(new_stream,cli_host,cli_port); client- 9fd7 >setStrategy(m_pStrategy); client->reactor (this->reactor ()); if (client->open () == -1) { LOG4CPLUS_ERROR(g_logger, "client open fail"); client->handle_close (ACE_INVALID_HANDLE, 0); } return 0; } int ClientAcceptor::handle_close (ACE_HANDLE handle,ACE_Reactor_Mask close_mask) { if (this->acceptor_.get_handle () != ACE_INVALID_HANDLE) { LOG4CPLUS_INFO(g_logger, "closing acceptor"); ACE_Reactor_Mask m = ACE_Event_Handler::ACCEPT_MASK | ACE_Event_Handler::DONT_CALL; this->reactor ()->remove_handler (this, m); this->acceptor_.close (); } return 0; }
main:
#include "mReactor.h" #include "strategy.h" int main() { //strategy ProcStrategy *strategy = new Strategy(); strategy->setRedis(pRedis); //ClientAcceptor string t=m_ip+":"+m_port; ACE_INET_Addr addr(t.c_str()); ClientAcceptor server; server.setStrategy(strategy); server.reactor(ACE_Reactor::instance()); LOG4CPLUS_INFO(g_logger, "listening addr:"<<t); ret = server.open(addr); while(true) { ACE_Reactor::instance()->handle_events(); } }
说明:懒,这是写好的代码里面kou出来的,想跑起来 还得改改
2.2 client端
#include <string> #include <iostream> #include <stdio.h> #include <stdlib.h> #include <ace/SOCK_Stream.h> #include <ace/SOCK_Connector.h> #include <ace/INET_Addr.h> #include <ace/Time_Value.h> using namespace std; #define TYPE32 int #define TYPE64 long #define PERNUMS 2000 void processReq() { } void processRsp(string strresp) { } void* func(void* start) { int startnum =(TYPE64)(start); cout<<startnum<<endl; ACE_INET_Addr addr(3000,"127.0.0.1"); ACE_SOCK_Connector connector; ACE_Time_Value timeout(10,0); ACE_SOCK_Stream peer; if (connector.connect(peer,addr,&timeout)!=0) { cout<<"connection failed!"<<endl; } for(int i=startnum+1;i<=startnum+PERNUMS;i++) { char buff[1024]; string cmd="SET"; char temp[10]; sprintf(temp,"%d",i); string key = temp; string value = temp; int extime =-1; string output; processReq(**); int s = peer.send(output.c_str(),output.length()); if(s<=0) { cout<<"send fail,key"<<key<<endl; continue; } int bc=peer.recv(buff,1024,&timeout); if(bc<=0) { cout<<"recv fail,key"<<key<<endl; continue; } string strresp(buff,bc); processRsp(strresp); } peer.close(); } int main() { pthread_t tid[4]; for(int i=0;i<4;i++) { int startnum = 10000*i+1000*i; int ret = pthread_create(&tid[i], NULL,func,(void*)startnum); } for(int i=0;i<4;i++) { int retnum = pthread_join(tid[i], NULL); } return 0; }
2.3代码使用方法:
linux下 ace安装使用
https://www.cnblogs.com/liangxiaxu/archive/2013/03/07/2948417.html
windows下http://blog.csdn.net/qq_34233232/article/details/52595211
参考现有的资料ctrl c v写出来的,算原创么
相关文章推荐
- 23种设计模式 UML 类图及对应示例代码 (三)
- 对ACE反应器Reactor模式的示例程序分析
- 设计模式之单例模式代码示例
- 关于JAVA中状态设计模式的讲解示例代码
- Java 设计模式(示例代码)
- Java设计模式及示例代码
- 【JS设计模式】解释器模式代码示例
- 设计模式之工厂模式代码示例
- PHP各种设计模式代码示例
- 设计模式之适配器模式代码示例
- 单例设计模式代码示例 MRC&ARC
- 23种设计模式 UML 类图及对应示例代码 (三)
- 修改ACE的Reactor模式示例中的参数后测试,依据测试结果,猜测Reactor似乎可以突破62个事件的限制。但是...
- 设计模式之抽象工厂模式代码示例
- 设计模式之原型模式代码示例
- C#设计模式之迭代器模式示例代码
- 【JS设计模式】策略模式的代码示例
- 有关《模式——工程化实现及扩展(设计模式C# 版)》一书中多个章节示例代码中缺少空格的公告
- 【出版直播】博客园征途系列,《设计模式——基于C#的工程化实现与扩展》电子书、示例代码发布,互动网预订开始
- 设计模式之工厂方法模式及代码示例