您的位置:首页 > 其它

基于libevent的多线程通信框架实现

2016-08-08 16:28 369 查看
很久没有写博客了,这半年多时间一直很忙,一直没有更新博客,今天心血来潮准备做一篇,刚好把最近的新研究东西拿出来给大家分享一下!自己以前的一个后台程序框架(应用于了很多应用项目,运营商***拦截系统,国内某视频聊天应用的后台系统等),里面的网络部分基于ACE来实现的,最近准备淘汰ACE,大部分组件功能打算重写,所以基于网络这块打算用libevent来实现,在做的过程中发现了一些问题,就是能找到的例子都是单线程实现的,有多线的例子也没有写得那么详细,都是很简单的实现,经过一周时间对源码和api的分析,自己做了实现,经过测试还没有发现问题,效率上比之前的框架做了很大的提升,今天给大家贴出来,做分享交流。
NetFrame.h

[cpp] view
plain copy

//

// NetFrame.h

// Frame

//

// Created by chenjianjun on 15/9/7.

// Copyright (c) 2015年 jsbn. All rights reserved.

//

#ifndef __Frame__NetFrame__

#define __Frame__NetFrame__

#include <event.h>

#include <glog/logging.h>

#include "Common.h"

#include "Thread.h"

namespace NAME_SPACE {

class NetFrame {

public:

static NetFrame* Instance();

int NetWorkInit();

int NetWorkExit();

protected:

NetFrame();

~NetFrame();

private:

class NetRunnable:public Runnable {

public:

NetRunnable();

~NetRunnable();

protected:

virtual void Run(void*);

};

friend class NetRunnable;

DISALLOW_EVIL_CONSTRUCTORS(NetFrame);

public:

static struct event_base* _base;

private:

Thread _main_loop_thread;

volatile bool _run_flg;

NetRunnable _runnable;

};

}

#endif /* defined(__Frame__NetFrame__) */

NetFrame.cpp

[cpp] view
plain copy

//

// NetFrame.cpp

// Frame

//

// Created by chenjianjun on 15/9/7.

// Copyright (c) 2015年 jsbn. All rights reserved.

//

#include "NetFrame.h"

#include <event2/event.h>

namespace NAME_SPACE {

struct event_base* NetFrame::_base = nullptr;

NetFrame* NetFrame::Instance() {

LIBJINGLE_DEFINE_STATIC_LOCAL(NetFrame, manager, ());

return &manager;

}

NetFrame::NetFrame():_run_flg(false){}

NetFrame::~NetFrame(){

NetWorkExit();

}

NetFrame::NetRunnable::NetRunnable() {}

NetFrame::NetRunnable::~NetRunnable() {}

void NetFrame::NetRunnable::Run(void* arg) {

NetFrame* pNetFrame = (NetFrame*)arg;

while (pNetFrame->_run_flg) {

Thread::SleepMs(2000);

event_base_dispatch(NetFrame::_base);

}

}

int NetFrame::NetWorkInit() {

if (_run_flg) {

return FUNC_SUCCESS;

}

evthread_use_pthreads();

_base = event_base_new();

evthread_make_base_notifiable(_base);

// 开启事件监听主线程

_run_flg = true;

if (_main_loop_thread.Start(&_runnable, this)) {

return FUNC_SUCCESS;

}

// 开始线程失败置运行标志

_run_flg = false;

return FUNC_FAILED;

}

int NetFrame::NetWorkExit() {

if (!_run_flg) {

return FUNC_SUCCESS;

}

_run_flg = false;

event_base_loopexit(NetFrame::_base, nullptr);

_main_loop_thread.Stop();

event_base_free(_base);

_base = nullptr;

return FUNC_SUCCESS;

}

}

服务器对象类
ServerWorker.h

[cpp] view
plain copy

//

// ServerWorker.h

// 服务器对象类

//

// Created by chenjianjun on 15/9/7.

// Copyright (c) 2015年 jsbn. All rights reserved.

//

#ifndef __ServerWorker_H_

#define __ServerWorker_H_

#include <string>

#include <event2/listener.h>

#include "NetSignal.h"

namespace NAME_SPACE {

class ServerWorker {

public:

/**

* @brief 服务器构造函数

*

* @param listen_ip 监听的本地IP

* @param listen_port 监听的本地端口

*

* @return

*/

ServerWorker(std::string listen_ip, int listen_port);

ServerWorker(int listen_port);

~ServerWorker();

/**

* @brief 启动工作

*

* @param

*

* @return

*/

bool StartWork(TCPServerSignal* pTCPServerSignal);

/**

* @brief 停止工作

*

* @param

*

* @return

*/

void StopWork();

// 获取监听套接字

SOCKET GetFd() { return _listen_fd; }

public:

/**

* @brief 新连接处理,此函数外部禁止调用,用于event事件回调函数调用

*

* @param int 套接字句柄

* @param struct sockaddr_in * 客户端地址

*/

void Accept(int fd, struct sockaddr_in *sa);

/**

* @brief 监听失败事件处理,此函数外部禁止调用,用于event事件回调函数调用

*

* @param int 套接字句柄

* @param EM_NET_EVENT 错误码

*/

void AcceptError(int fd, EM_NET_EVENT msg);

private:

// 事件监听器

evconnlistener* _listener;

// 监听的IP

std::string _listen_ip;

// 监听的端口

unsigned short _listen_port;

// 监听的套接字

SOCKET _listen_fd;

// 连接器

TCPServerSignal* _pTCPServerSignal;

};

}

#endif /* defined(__ServerWorker_H_) */

ServerWorker.cpp

[cpp] view
plain copy

//

// ServerWorker.cpp

// Frame

//

// Created by chenjianjun on 15/9/7.

// Copyright (c) 2015年 jsbn. All rights reserved.

//

#include "ServerWorker.h"

#include "NetFrame.h"

namespace NAME_SPACE {

// 客户端连接事件回调处理函数

static void ListenerEventCb(evconnlistener *listener, evutil_socket_t fd,

sockaddr *sa, int socklen, void *user_data) {

ServerWorker *pServerWorker = (ServerWorker*)user_data;

struct linger l;

l.l_onoff = 1;

l.l_linger = 0;

setsockopt(fd, SOL_SOCKET, SO_LINGER, (void*)&l, sizeof(l));

pServerWorker->Accept(fd, (struct sockaddr_in *)sa);

}

// 监听失败回调处理函数

static void ListenerErrorCb(struct evconnlistener *listener, void *user_data) {

ServerWorker* pServerWorker = (ServerWorker*)user_data;

pServerWorker->AcceptError(pServerWorker->GetFd(),ENE_ACCEPT_ERROR);

//evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));

}

ServerWorker::ServerWorker(std::string listen_ip, int listen_port)

:_listen_ip(listen_ip),

_listen_port(listen_port),

_listener(nullptr),

_pTCPServerSignal(nullptr) {}

ServerWorker::ServerWorker(int listen_port)

:_listen_port(listen_port),

_listener(nullptr),

_pTCPServerSignal(nullptr) {

_listen_ip.clear();

}

bool ServerWorker::StartWork(TCPServerSignal* pTCPServerSignal) {

if (_listener) {

return false;

}

sockaddr_in sin;

memset(&sin, 0, sizeof(sin));

sin.sin_family = AF_INET;

if (!_listen_ip.empty()) { sin.sin_addr.s_addr = ::inet_addr(_listen_ip.c_str()); }

sin.sin_port = htons(_listen_port);

_listener = evconnlistener_new_bind(NetFrame::_base,

ListenerEventCb,

(void*)this,

LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE,

-1,

(sockaddr*)&sin,

sizeof(sockaddr_in));

if( nullptr == _listener ) {

LOG(ERROR)<<"创建监听器失败,IP["<<_listen_ip<<":"<<_listen_port<<"]";

return false;

}

_pTCPServerSignal = pTCPServerSignal;

// 设置监听失败回调

evconnlistener_set_error_cb(_listener, ListenerErrorCb);

// 获取监听的套接字句柄

_listen_fd = evconnlistener_get_fd(_listener);

return true;

}

void ServerWorker::StopWork()

{

if (_listener) {

evconnlistener_free(_listener);

_listener = nullptr;

}

}

void ServerWorker::Accept(int fd, struct sockaddr_in *sa) {

if (_pTCPServerSignal) {

_pTCPServerSignal->SignalAccept(fd, sa);

}

}

void ServerWorker::AcceptError(int fd, EM_NET_EVENT msg) {

if (_pTCPServerSignal) {

_pTCPServerSignal->SignalAcceptError(fd, msg);

}

}

}

被动客户端连接类
PassiveTCPClient.h

[cpp] view
plain copy

//

// PassiveTCPClient.h

// Frame

//

// Created by chenjianjun on 15/9/7.

// Copyright (c) 2015年 jsbn. All rights reserved.

//

#ifndef __PassiveTCPClient_H_

#define __PassiveTCPClient_H_

#include <string>

#include <event.h>

#include <event2/listener.h>

#include "NetSignal.h"

namespace NAME_SPACE {

class PassiveTCPClient {

public:

/**

* @brief 构造函数

*

* @param SOCKET 套接字句柄

* @param sockaddr_in* 客户端地址

* @param short 心跳时间

*

* @return

*/

PassiveTCPClient(SOCKET fd, struct sockaddr_in* sa, short heart_time = 10);

~PassiveTCPClient();

/**

* @brief 启动工作

*

* @param

*

* @return

*/

bool StartWork(TCPClientSignal*);

/**

* @brief 停止工作

*

* @param

*

* @return

*/

void StopWork();

/**

* @brief 发送数据

*

* @paramv char* 数据

* @paramv size_t 数据长度

*

* @return

*/

int SendData(void* pdata, size_t len);

SOCKET GetFd() { return _fd; }

void SetHeartFlg(bool flg) { _heart_flg = flg; }

bool GetHeartFlg() { return _heart_flg; }

public:

/**

* @brief 接收数据,此函数外部禁止调用,用于event事件回调函数调用

*

* @param void* 数据

* @param size_t 数据长度

*/

void PutRecvData(void*, size_t);

/**

* @brief 事件处理,此函数外部禁止调用,用于event事件回调函数调用

*

* @param short 事件集合

*/

void ProcEvent(short events);

public:

/*

连接器类,这个分离很重要,如果不分离会出现小概率崩溃现象,主要是连接函数在调用的时候回调函数里面,

如果外部删除这个对象会出现内存访问异常,具体看实现(那里是一个自动锁实现,崩溃也在自动锁释放那里)

*/

TCPClientSignal* _pTCPClientSignal;

private:

// 客户端IP

std::string _client_ip;

// 客户端端口

unsigned short _client_port;

// 套接字句柄

SOCKET _fd;

// 心跳时间

short _heart_time;

// bufferevent

struct bufferevent *_bev;

// 心跳事件器

struct event *_event;

// 心跳标志

volatile bool _heart_flg;

};

}

#endif /* defined(__PassiveTCPClient_H_) */

PassiveTCPClient.cpp

[cpp] view
plain copy

//

// PassiveTCPClient.cpp

// 被动TCP客户端

//

// Created by chenjianjun on 15/9/7.

// Copyright (c) 2015年 jsbn. All rights reserved.

//

#include "PassiveTCPClient.h"

#include "NetFrame.h"

namespace NAME_SPACE {

void PassiveTCPTimeOutEventCb(evutil_socket_t fd, short, void *data) {

PassiveTCPClient *pPassiveTCPClient = (PassiveTCPClient*)data;

if (pPassiveTCPClient->GetHeartFlg()) {

// 超时清除标志

pPassiveTCPClient->SetHeartFlg(false);

} else {

// 心跳超时回调

pPassiveTCPClient->ProcEvent(BEV_EVENT_TIMEOUT);

}

}

void PassiveTCPReadEventCb(struct bufferevent *bev, void *data) {

PassiveTCPClient* pPassiveTCPClient = (PassiveTCPClient*)data;

static char databuf[40960];

size_t datalen = 0;

size_t nbytes;

while ((nbytes = evbuffer_get_length(bev->input)) > 0) {

evbuffer_remove(bev->input, databuf+datalen, sizeof(databuf)-datalen);

datalen += nbytes;

}

// 有数据往来,设置标志

pPassiveTCPClient->SetHeartFlg(true);

// 数据接收回调

pPassiveTCPClient->PutRecvData(databuf, datalen);

}

void PassiveTCPEventCb(struct bufferevent *bev, short events, void *data) {

PassiveTCPClient* pPassiveTCPClient = (PassiveTCPClient*)data;

// 处理事件

pPassiveTCPClient->ProcEvent(events);

}

PassiveTCPClient::PassiveTCPClient(SOCKET fd, struct sockaddr_in* sa, short heart_time)

:_fd(fd),

_client_ip(inet_ntoa(sa->sin_addr)),

_client_port(ntohs(sa->sin_port)),

_bev(nullptr),

_heart_flg(false),

_heart_time(heart_time),

_pTCPClientSignal(nullptr)

{}

PassiveTCPClient::~PassiveTCPClient() {

StopWork();

_pTCPClientSignal = nullptr;

}

bool PassiveTCPClient::StartWork(TCPClientSignal* pTCPClientSignal) {

if (_bev) {

return false;

}

_bev = bufferevent_socket_new(NetFrame::_base,

_fd,

BEV_OPT_CLOSE_ON_FREE|BEV_OPT_THREADSAFE);

if (_bev == nullptr) {

return false;

}

_event = event_new(NetFrame::_base,

_fd,

EV_TIMEOUT|EV_PERSIST,

PassiveTCPTimeOutEventCb, this);

if (_event == nullptr) {

bufferevent_free(_bev);

_bev = nullptr;

return false;

}

_pTCPClientSignal = pTCPClientSignal;

// 设置心跳检测时间

struct timeval timeout = {_heart_time, 0};

event_add(_event, &timeout);

bufferevent_setcb(_bev, PassiveTCPReadEventCb, nullptr, PassiveTCPEventCb, this);

bufferevent_enable(_bev, EV_READ);

return true;

}

void PassiveTCPClient::StopWork() {

if (_bev) {

bufferevent_disable(_bev, EV_READ);

bufferevent_free(_bev);

_bev = nullptr;

}

if (_event) {

event_del(_event);

event_free(_event);

_event = nullptr;

}

// 不要对_pPassiveTCPClientSignal置null,释放由外部传入者负责

}

int PassiveTCPClient::SendData(void* pdata, size_t len) {

if (_bev == nullptr) {

return FUNC_FAILED;

}

if (bufferevent_write(_bev, pdata, len) < 0) {

return FUNC_FAILED;

}

return FUNC_SUCCESS;

}

void PassiveTCPClient::PutRecvData(void* data, size_t len) {

if (_pTCPClientSignal) {

_pTCPClientSignal->SignalRecvData(_fd, data, len);

}

}

void PassiveTCPClient::ProcEvent(short events) {

if (!_pTCPClientSignal) {

return;

}

if (events & BEV_EVENT_CONNECTED) {

_pTCPClientSignal->SignalEvent(_fd, ENE_CONNECTED);

}

if(events & (BEV_EVENT_READING | BEV_EVENT_WRITING | BEV_EVENT_EOF | BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT))

{

_pTCPClientSignal->SignalEvent(_fd, ENE_CLOSE);

}

}

}

主动客户端连接类
ActiveTCPClient.h

[cpp] view
plain copy

//

// ActiveTCPClient.h

// Frame 主动TCP客户端连接类

//

// Created by chenjianjun on 15/9/8.

// Copyright (c) 2015年 jsbn. All rights reserved.

//

#ifndef __ActiveTCPClient_H_

#define __ActiveTCPClient_H_

#include <string>

#include <event.h>

#include "NetSignal.h"

#include "RWLock.h"

namespace NAME_SPACE {

class ActiveTCPClient {

public:

explicit ActiveTCPClient(std::string host_name,

unsigned short host_port,

short heart_time = 10);

~ActiveTCPClient();

void SetTCPClientSignal(TCPClientSignal* pTCPClientSignal) { _pTCPClientSignal = pTCPClientSignal;}

/**

* @brief 启动工作

*

* @param

*

* @return

*/

bool StartWork();

/**

* @brief 停止工作

*

* @param

*

* @return

*/

void StopWork();

/**

* @brief 发送数据

*

* @paramv char* 数据

* @paramv size_t 数据长度

*

* @return

*/

int SendData(void* pdata, size_t len);

SOCKET GetFd() { return _fd; }

void SetHeartFlg(bool flg) { _heart_flg = flg; }

bool GetHeartFlg() { return _heart_flg; }

bool IsConnect() { return _connect_flg == 2;}

public:

/**

* @brief 接收数据,此函数外部禁止调用,用于event事件回调函数调用

*

* @param void* 数据

* @param size_t 数据长度

*/

void PutRecvData(void*, size_t);

/**

* @brief 事件处理,此函数外部禁止调用,用于event事件回调函数调用

*

* @param short 事件集合

*/

void ProcEvent(short events);

public:

/*

连接器类,这个分离很重要,如果不分离会出现小概率崩溃现象,主要是连接函数在调用的时候回调函数里面,

如果外部删除这个对象会出现内存访问异常,具体看实现(那里是一个自动锁实现,崩溃也在自动锁释放那里)

*/

TCPClientSignal* _pTCPClientSignal;

private:

// 服务器监听地址

std::string _host_name;

// 服务器监听端口

unsigned short _host_port;

// bufferevent

struct bufferevent *_bev;

// 心跳检测时间

short _heart_time;

// socket连接句柄

SOCKET _fd;

// 心跳事件器

struct event *_event;

// 心跳标志

volatile bool _heart_flg;

// 读写锁

RWLock* _m_rw_loacl;

// 连接标志 0:未连接 1:连接中 2:已连接

volatile unsigned char _connect_flg;

};

}

#endif /* defined(__ActiveTCPClient_H_) */

ActiveTCPClient.cpp

[cpp] view
plain copy

//

// ActiveTCPClient.cpp

// Frame

//

// Created by chenjianjun on 15/9/8.

// Copyright (c) 2015年 jsbn. All rights reserved.

//

#include "ActiveTCPClient.h"

#include "NetFrame.h"

namespace NAME_SPACE {

void ActiveTCPTimeOutEventCb(evutil_socket_t fd, short, void *data) {

ActiveTCPClient* pActiveTCPClient = (ActiveTCPClient*)data;

pActiveTCPClient->ProcEvent(BEV_EVENT_TIMEOUT);

}

void ActiveTCPEventCb(struct bufferevent *bev, short events, void *data) {

ActiveTCPClient* pActiveTCPClient = (ActiveTCPClient*)data;

pActiveTCPClient->ProcEvent(events);

}

void ActiveTCPReadEventCb(struct bufferevent *bev, void *data) {

ActiveTCPClient* pActiveTCPClient = (ActiveTCPClient*)data;

static char databuf[40960];

size_t datalen = 0;

size_t nbytes;

while ((nbytes = evbuffer_get_length(bev->input)) > 0) {

evbuffer_remove(bev->input, databuf+datalen, sizeof(databuf)-datalen);

datalen += nbytes;

}

// 有数据往来,设置标志

pActiveTCPClient->SetHeartFlg(true);

// 数据接收回调

pActiveTCPClient->PutRecvData(databuf, datalen);

}

ActiveTCPClient::ActiveTCPClient(std::string host_name, unsigned short host_port, short heart_time)

:_host_name(host_name),

_host_port(host_port),

_heart_time(heart_time),

_bev(nullptr),

_pTCPClientSignal(nullptr),

_heart_flg(false),

_event(nullptr),

_connect_flg(0),

_m_rw_loacl(RWLock::Create()) {

}

ActiveTCPClient::~ActiveTCPClient() {

StopWork();

_pTCPClientSignal = nullptr;

delete _m_rw_loacl;

}

bool ActiveTCPClient::StartWork() {

WriteLockScoped wLock(*_m_rw_loacl);

if (_bev) {

return false;

}

_fd = socket(AF_INET, SOCK_STREAM, 0);

evutil_make_socket_nonblocking(_fd);

if (_fd < 0) {

return false;

}

_bev = bufferevent_socket_new(NetFrame::_base, _fd, BEV_OPT_CLOSE_ON_FREE|BEV_OPT_THREADSAFE);

if (_bev == nullptr) {

close(_fd);

return false;

}

struct sockaddr_in sSvrAddr;

memset(&sSvrAddr, 0, sizeof(sSvrAddr));

sSvrAddr.sin_family = AF_INET;

sSvrAddr.sin_addr.s_addr = inet_addr(_host_name.c_str());

sSvrAddr.sin_port = htons(_host_port);

int addrlen = sizeof(struct sockaddr_in);

// 置为连接中状态

_connect_flg = 1;

if (bufferevent_socket_connect(_bev, (struct sockaddr*)&sSvrAddr, addrlen) < 0) {

_connect_flg = 0;

StopWork();

return false;

}

bufferevent_setcb(_bev, ActiveTCPReadEventCb, nullptr, ActiveTCPEventCb, this);

bufferevent_enable(_bev, EV_READ);

return true;

}

void ActiveTCPClient::StopWork() {

WriteLockScoped wLock(*_m_rw_loacl);

_connect_flg = 0;

if (_event) {

event_del(_event);

event_free(_event);

_event = nullptr;

}

if (_bev) {

bufferevent_disable(_bev, EV_READ);

bufferevent_free(_bev);

_bev = nullptr;

_fd = -1;

}

}

int ActiveTCPClient::SendData(void* pdata, size_t len) {

LOG(INFO)<<"发送数据1..........";

ReadLockScoped rLock(*_m_rw_loacl);

if (_bev == nullptr || _connect_flg != 2) {

return FUNC_FAILED;

}

if (bufferevent_write(_bev, pdata, len) < 0) {

return FUNC_FAILED;

}

// if (send(_fd, pdata, len, 0) < 0) {

// return FUNC_FAILED;

// }

LOG(INFO)<<"发送数据32..........";

return FUNC_SUCCESS;

}

void ActiveTCPClient::ProcEvent(short events) {

if (!_pTCPClientSignal) {

return;

}

if (events & BEV_EVENT_CONNECTED) {

// 已连接状态

_connect_flg = 2;

// 连接建立,开启心跳计数

_event = event_new(NetFrame::_base,

_fd,

EV_TIMEOUT|EV_PERSIST,

ActiveTCPTimeOutEventCb,

this);

// 设置心跳检测时间

struct timeval timeout = {_heart_time, 0};

event_add(_event, &timeout);

_pTCPClientSignal->SignalEvent(_fd, ENE_CONNECTED);

} else if (events & (BEV_EVENT_READING|BEV_EVENT_WRITING|BEV_EVENT_EOF|BEV_EVENT_TIMEOUT)) {

_pTCPClientSignal->SignalEvent(_fd, ENE_CLOSE);

} else {

_pTCPClientSignal->SignalEvent(_fd, EVE_UNKNOWN);

}

}

void ActiveTCPClient::PutRecvData(void* data, size_t len) {

if (_pTCPClientSignal) {

_pTCPClientSignal->SignalRecvData(_fd, data, len);

}

}

}

NetSignal.h

[html] view
plain copy

//

// NetSignal.h

// Frame

//

// Created by chenjianjun on 15/9/8.

// Copyright (c) 2015年 jsbn. All rights reserved.

//

#ifndef __NetSignal_H_

#define __NetSignal_H_

#include "Sigslot.h"

#include "Common.h"

namespace NAME_SPACE {

class TCPServerSignal {

public:

TCPServerSignal() {}

~TCPServerSignal() {}

// 客户端连接触发器

sigslot::signal2<SOCKET , struct sockaddr_in*> SignalAccept;

// 监听失败触发器

sigslot::signal2<SOCKET , EM_NET_EVENT> SignalAcceptError;

};

class TCPClientSignal {

public:

TCPClientSignal() {}

~TCPClientSignal() {}

// 数据接收连接器

sigslot::signal3<SOCKET, void*, size_t> SignalRecvData;

// 套接字事件处理器

sigslot::signal2<SOCKET, EM_NET_EVENT> SignalEvent;

};

}

#endif /* defined(__NetSignal_H_) */

借鉴了google的一个开源项目里面的sigslot机制,这里就不贴出来了,最后上一个测试代码

[cpp] view
plain copy

#include <glog/logging.h>

#include <map>

#include "NetFrame.h"

#include "ServerWorker.h"

#include "PassiveTCPClient.h"

#include "ActiveTCPClient.h"

#include "NetSignal.h"

#include "RWLock.h"

using namespace NAME_SPACE;

// 测试服务器

class TestServer : public sigslot::has_slots<>, public TCPClientSignal, public TCPServerSignal {

public:

TestServer() {

pthread_mutex_init(&_work_mutex, nullptr);

}

~TestServer() {

pthread_mutex_destroy(&_work_mutex);

}

int Start() {

_pServerWorker = new ServerWorker("192.168.1.74",8088);

SignalAccept.connect(this, &TestServer::Accept);

SignalAcceptError.connect(this, &TestServer::Event);

SignalRecvData.connect(this, &TestServer::RecvData);

SignalEvent.connect(this, &TestServer::Event);

if (!_pServerWorker->StartWork(this)) {

LOG(ERROR)<<"服务器监听启动失败";

return FUNC_FAILED;

}

return FUNC_SUCCESS;

}

void Stop() {

_pServerWorker->StopWork();

pthread_mutex_lock(&_work_mutex);

std::map<SOCKET, PassiveTCPClient*>::iterator it = _map_clients.begin();

while (it != _map_clients.end()) {

it->second->StopWork();

delete it->second;

_map_clients.erase(it++);

}

pthread_mutex_unlock(&_work_mutex);

}

int SendData(SOCKET fd, void* data, size_t len) {

pthread_mutex_lock(&_work_mutex);

std::map<SOCKET, PassiveTCPClient*>::iterator it = _map_clients.find(fd);

if (it != _map_clients.end()) {

it->second->SendData(data, len);

}

pthread_mutex_unlock(&_work_mutex);

return 0;

}

public:

// 数据接收

void RecvData(SOCKET fd, void* data, size_t len) {

// 接收到数据就回显,正常的程序师丢到队列里面去,让其他线程来处理

SendData(fd, data, len);

}

// 套接字事件处理器

void Event(SOCKET fd, EM_NET_EVENT msg) {

LOG(ERROR)<<"收到事件通知."<< msg;

pthread_mutex_lock(&_work_mutex);

std::map<SOCKET, PassiveTCPClient*>::iterator it = _map_clients.find(fd);

if (it != _map_clients.end()) {

it->second->StopWork();

delete it->second;

_map_clients.erase(it);

}

pthread_mutex_unlock(&_work_mutex);

}

// 客户端连接触发器

void Accept(SOCKET fd, struct sockaddr_in* sa) {

LOG(ERROR)<<"收到客户端连接.";

pthread_mutex_lock(&_work_mutex);

std::map<SOCKET, PassiveTCPClient*>::iterator it = _map_clients.find(fd);

if (it != _map_clients.end()) {

it->second->StartWork(this);

delete it->second;

_map_clients.erase(it);

}

PassiveTCPClient* pPassiveTCPClient = new PassiveTCPClient(fd, sa, 15);

if (!pPassiveTCPClient->StartWork(this)) {

LOG(ERROR)<<"启动客户端失败";

} else {

_map_clients[fd] = pPassiveTCPClient;

}

pthread_mutex_unlock(&_work_mutex);

}

private:

ServerWorker* _pServerWorker;

pthread_mutex_t _work_mutex;

std::map<SOCKET, PassiveTCPClient*> _map_clients;

};

// 测试客户端

class TestClient : public sigslot::has_slots<>, public TCPClientSignal, public Runnable {

public:

TestClient():_is_run_flg(false) {

}

~TestClient() {

}

int Start() {

_pActiveTCPClient = new ActiveTCPClient("192.168.1.5", 8088, 15);

_pActiveTCPClient->SetTCPClientSignal(this);

SignalEvent.connect(this, &TestClient::Event);

SignalRecvData.connect(this, &TestClient::RecvData);

_is_run_flg = true;

if (!_connect_thread.Start(this)) {

_is_run_flg = false;

delete _pActiveTCPClient;

return FUNC_FAILED;

}

return FUNC_SUCCESS;

}

void Stop() {

if (_pActiveTCPClient) {

_is_run_flg = false;

_connect_thread.Stop();

SignalEvent.disconnect(this);

SignalRecvData.disconnect(this);

_pActiveTCPClient->StopWork();

delete _pActiveTCPClient;

_pActiveTCPClient = nullptr;

}

}

int SendData(void* data,size_t len) {

if (_pActiveTCPClient) {

_pActiveTCPClient->SendData(data, len);

}

return FUNC_SUCCESS;

}

// 数据接收

void RecvData(SOCKET fd, void* data, size_t len) {

// 接收到数据就回显,正常的程序师丢到队列里面去,让其他线程来处理

SendData(data, len);

}

// 套接字事件处理器

void Event(SOCKET fd, EM_NET_EVENT msg) {

if (msg == ENE_CONNECTED) {

} else {

_pActiveTCPClient->StopWork();

}

}

protected:

virtual void Run(void* arg) {

//TestClient* p = (TestClient*)arg;

while (_is_run_flg) {

if (!_pActiveTCPClient->IsConnect()) {

_pActiveTCPClient->StartWork();

}

Thread::SleepMs(2000);

}

}

private:

ActiveTCPClient* _pActiveTCPClient;

// 运行标志

volatile bool _is_run_flg;

// 连接检测线程

Thread _connect_thread;

};

int main(int argc,char* argv[]) {

// 初期化网络

if (NetFrame::Instance()->NetWorkInit() != FUNC_SUCCESS) {

LOG(ERROR)<<"网络初期化失败....";

return -1;

}

// {

// // 测试服务器

// TestServer mTestServer;

// mTestServer.Start();

// sleep(200);// 模拟测试,休眠10分钟时间来测试整个网络库

// mTestServer.Stop();

// }

// {

// // 测试客户端

// TestClient mTestClient;

// mTestClient.Start();

// char buf[4] = "bye";

// for (int i = 0; i < 200; ++i) {

// memset(buf, 0x00, sizeof(buf));

// sprintf(buf, "%03d", i);

// sleep(1);

// mTestClient.SendData(buf, 3);

// }

//

// mTestClient.Stop();

// }

// 关闭网络

NetFrame::Instance()->NetWorkExit();

return 0;

}

测试代码被注释了,打开就可以测试.
这里把主要代码贴了出来,一些其他的代码可以看我的github项目:
https://github.com/chenjianjun571/cioforandroid. href="http://lib.csdn.net/base/28" target=_blank>Git
https://github.com/chenjianjun571/cioforios.git
FROM: http://blog.csdn.net/cjj198561/article/details/48370933
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: