您的位置:首页 > 理论基础 > 计算机网络

boost asio 异步实现tcp通讯

2014-06-16 16:10 393 查看
---恢复内容开始---

asioboost

目录(?)[-]

一前言

二实现思路

通讯包数据结构

连接对象

连接管理器

服务器端的实现

对象串行化

一、前言

boost asio可算是一个简单易用,功能又强大可跨平台的C++通讯库,效率也表现的不错,linux环境是epoll实现的,而windows环境是iocp实现的。而tcp通讯是项目当中经常用到通讯方式之一,实现的方法有各式各样,因此总结一套适用于自己项目的方法是很有必要,很可能下一个项目直接套上去就可以用了。

二、实现思路

1.通讯包数据结构



Tag:检查数据包是否合法,具体会在下面讲解;

Length:描述Body的长度;

Command:表示数据包的类型,0表示心跳包(长连接需要心跳来检测连接是否正常),1表示注册包(客户端连接上服务器之后要将相关信息注册给服务器),2表示业务消息包;

business_type:业务消息包类型,服务器会根据业务消息包类型将数据路由到对应的客户端(客户端是有业务类型分类的);

app_id:客户端唯一标识符;

Data:消息数据;

2.连接对象

客户端连接上服务器之后,双方都会产生一个socket连接对象,通过这个对象可以收发数据,因此我定义为socket_session。

//socket_session.h

[cpp] view plaincopyprint?

#pragma once

#include <iostream>

#include <list>

#include <hash_map>

#include <boost/bind.hpp>

#include <boost/asio.hpp>

#include <boost/shared_ptr.hpp>

#include <boost/make_shared.hpp>

#include <boost/thread.hpp>

#include <boost/thread/mutex.hpp>

#include <boost/enable_shared_from_this.hpp>

#include <firebird/log/logger_log4.hpp>

#include <firebird/detail/config.hpp>

#include <firebird/socket_utils/message_archive.hpp>

using boost::asio::ip::tcp;

namespace firebird{

enum command{ heartbeat = 0, regist, normal};

const std::string tag = "KDS";

class FIREBIRD_DECL socket_session;

typedef boost::shared_ptr<socket_session> socket_session_ptr;

class FIREBIRD_DECL socket_session:

public boost::enable_shared_from_this<socket_session>,

private boost::noncopyable

{

public:

typedef boost::function<void(socket_session_ptr)> close_callback;

typedef boost::function<void(

const boost::system::error_code&,

socket_session_ptr, message&)> read_data_callback;

socket_session(boost::asio::io_service& io_service);

~socket_session(void);

DWORD id() { return m_id; }

WORD get_business_type(){ return m_business_type; }

void set_business_type(WORD type) { m_business_type = type; }

DWORD get_app_id(){ return m_app_id; }

void set_app_id(DWORD app_id) { m_app_id = app_id; }

std::string& get_remote_addr() { return m_name; }

void set_remote_addr(std::string& name) { m_name = name; }

tcp::socket& socket() { return m_socket; }

void installCloseCallBack(close_callback cb){ close_cb = cb; }

void installReadDataCallBack(read_data_callback cb) { read_data_cb = cb; }

void start();

void close();

void async_write(const std::string& sMsg);

void async_write(message& msg);

bool is_timeout();

void set_op_time(){std::time(&m_last_op_time);}

private:

static boost::detail::atomic_count m_last_id;

DWORD m_id;

WORD m_business_type;

DWORD m_app_id;

std::string m_name;

boost::array<char, 7> sHeader;

std::string sBody;

tcp::socket m_socket;

boost::asio::io_service& m_io_service;

std::time_t m_last_op_time;

close_callback close_cb;

read_data_callback read_data_cb;

//发送消息

void handle_write(const boost::system::error_code& e,

std::size_t bytes_transferred, std::string* pmsg);

//读消息头

void handle_read_header(const boost::system::error_code& error);

//读消息体

void handle_read_body(const boost::system::error_code& error);

void handle_close();

};

}

这里注意的是,定义了一个tag="KDS",目的是为了检查收到的数据包是否有效,每一个数据包前3个字节不为“KDS”,那么就认为是非法的请求包,你也可以定义tag等于其它字符串,只要按协议发包就正常,当然这是比较简单的数据包检查方法了。比较严谨的方法是双方使用哈希算法来检查的,怎么做,这里先不做详解。

//socket_session.cpp

[cpp] view plaincopyprint?

#include "socket_session.h"

namespace firebird{

boost::detail::atomic_count socket_session::m_last_id(0);

socket_session::socket_session(boost::asio::io_service& io_srv)

:m_io_service(io_srv), m_socket(io_srv),

m_business_type(0), m_app_id(0)

{

m_id = ++socket_session::m_last_id;

}

socket_session::~socket_session(void)

{

m_socket.close();

}

void socket_session::start()

{

m_socket.set_option(boost::asio::ip::tcp::acceptor::linger(true, 0));

m_socket.set_option(boost::asio::socket_base::keep_alive(true));

std::time(&m_last_op_time);

const boost::system::error_code error;

handle_read_header(error);

}

void socket_session::handle_close()

{

try{

m_socket.close();

close_cb(shared_from_this());

}

catch(std::exception& e)

{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "连接远程地址:[" << get_remote_addr() << "],socket异常:[" << e.what() << "]");

}

catch(...)

{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "连接远程地址:[" << get_remote_addr() << "],socket异常:[未知异常]");

}

}

void socket_session::close()

{

//由于回调中有加锁的情况,必须提交到另外一个线程去做,不然会出现死锁

m_io_service.post(boost::bind(&socket_session::handle_close, shared_from_this()));

}

static int connection_timeout = 60;

bool socket_session::is_timeout()

{

std::time_t now;

std::time(&now);

return now - m_last_op_time > connection_timeout;

}

//读消息头

void socket_session::handle_read_header(const boost::system::error_code& error)

{

LOG4CXX_DEBUG(firebird_log, KDS_CODE_INFO << "enter.");

try{

if(error)

{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "连接远程地址:[" << get_remote_addr() << "],socket异常:[" << error.message().c_str() << "]");

close();

return;

}

std::string data;

data.swap(sBody);

boost::asio::async_read(m_socket,

boost::asio::buffer(sHeader),

boost::bind(&socket_session::handle_read_body, shared_from_this(),

boost::asio::placeholders::error));

if (data.length() > 0 && data != "")

{//读到数据回调注册的READ_DATA函数

message msg;

message_iarchive(msg, data);

read_data_cb(error, shared_from_this(), msg);

}

}

catch(std::exception& e)

{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "连接远程地址:[" << get_remote_addr() << "],socket异常:[" << e.what() << "]");

close();

}

catch(...)

{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "连接远程地址:[" << get_remote_addr() << "],socket异常:[未知异常]");

close();

}

}

//读消息体

void socket_session::handle_read_body(const boost::system::error_code& error)

{

LOG4CXX_DEBUG(firebird_log, KDS_CODE_INFO << "enter.");

try{

if(error)

{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "连接远程地址:[" << get_remote_addr() << "],socket异常:[" << error.message().c_str() << "]");

close();

return;

}

if (tag.compare(0, tag.length(), sHeader.data(), 0, tag.length()))

{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "连接远程地址:[" << get_remote_addr() << "],socket异常:[这是个非法连接!]");

close();

return;

}

DWORD dwLength = 0;

char* len = (char*)&dwLength;

memcpy(len, &sHeader[tag.length()], sizeof(dwLength));

sBody.resize(dwLength);

char* pBody = &sBody[0];

boost::asio::async_read(m_socket,

boost::asio::buffer(pBody, dwLength),

boost::bind(&socket_session::handle_read_header, shared_from_this(),

boost::asio::placeholders::error));

}

catch(std::exception& e)

{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "连接远程地址:[" << get_remote_addr() << "],socket异常:[" << e.what() << "]");

close();

}

catch(...)

{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "连接远程地址:[" << get_remote_addr() << "],socket异常:[未知异常]");

close();

}

}

void socket_session::handle_write(const boost::system::error_code& error,

std::size_t bytes_transferred, std::string* pmsg)

{

//数据发送成功就销毁

if (pmsg != NULL)

{

delete pmsg;

}

if(error)

{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "连接远程地址:[" << get_remote_addr() << "],socket异常:[" << error.message().c_str() << "]");

close();

return;

}

}

void socket_session::async_write(const std::string& sMsg)

{

LOG4CXX_DEBUG(firebird_log, KDS_CODE_INFO << "enter.")

try

{

DWORD dwLength = sMsg.size();

char* pLen = (char*)&dwLength;

//由于是异步发送,要保证数据发送完整时,才把数据销毁

std::string* msg = new std::string();

msg->append(tag);

msg->append(pLen, sizeof(dwLength));

msg->append(sMsg);

boost::asio::async_write(m_socket,boost::asio::buffer(*msg, msg->size()),

boost::bind(&socket_session::handle_write, shared_from_this(),

boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred,

msg));

}

catch(std::exception& e)

{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "连接远程地址:[" << get_remote_addr() << "],socket异常:[" << e.what() << "]");

close();

}

catch(...)

{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "连接远程地址:[" << get_remote_addr() << "],socket异常:[未知异常]");

close();

}

}

void socket_session::async_write(message& msg)

{

std::string data;

message_oarchive(data, msg);

async_write(data);

}

}

接受数据时,socket_session会先读取7个字节的head,比较前3个字节“KDS”,然后取得4个字节的Length,再读出Length长度的数据,最后将该数据传给read_data_cb回调函数处理,read_data_cb回调函数是在外部注册的。

3.连接管理器

对于服务器来说,它同时服务多个客户端,为了有效的管理,因此需要一个连接管理器,我定义为session_manager。session_manager主要是对socket_session的增删改查,和有效性检查。

//session_manager.h

[cpp] view plaincopyprint?

#pragma once

#include "socket_session.h"

#include "filter_container.h"

#include <boost/date_time/posix_time/posix_time.hpp>

#include <boost/multi_index_container.hpp>

#include <boost/multi_index/member.hpp>

#include <boost/multi_index/ordered_index.hpp>

#include <boost/typeof/typeof.hpp>

#include <boost/random.hpp>

#include <boost/pool/detail/singleton.hpp>

namespace firebird{

template<typename T>

class var_gen_wraper

{

public:

var_gen_wraper(): gen(boost::mt19937((boost::int32_t)std::time(0)),

boost::uniform_smallint<>(1, 100)) {}

typename T::result_type operator() () { return gen(); }

private:

T gen;

};

struct session_stu

{

DWORD id;

WORD business_type;

std::string address;

DWORD app_id;

socket_session_ptr session;

};

struct sid{};

struct sbusiness_type{};

struct saddress{};

struct sapp_id{};

enum session_idx_member{ session_id = 0, session_business_type, session_address, app_id};

#define CLIENT 0

#define SERVER 1

typedef boost::multi_index::multi_index_container<

session_stu,

boost::multi_index::indexed_by<

boost::multi_index::ordered_unique<

boost::multi_index::tag<sid>, BOOST_MULTI_INDEX_MEMBER(session_stu, DWORD, id)>,

boost::multi_index::ordered_non_unique<

boost::multi_index::tag<sbusiness_type>, BOOST_MULTI_INDEX_MEMBER(session_stu, WORD, business_type)>,

boost::multi_index::ordered_non_unique<

boost::multi_index::tag<saddress>, BOOST_MULTI_INDEX_MEMBER(session_stu, std::string, address)>,

boost::multi_index::ordered_non_unique<

boost::multi_index::tag<sapp_id>, BOOST_MULTI_INDEX_MEMBER(session_stu, DWORD, app_id)>

>

> session_set;

#define MULTI_MEMBER_CON(Tag) boost::multi_index::index<session_set,Tag>::type&

#define MULTI_MEMBER_ITR(Tag) boost::multi_index::index<session_set,Tag>::type::iterator

struct is_business_type {

is_business_type(WORD type)

:m_type(type)

{

}

bool operator()(const session_stu& s)

{

return (s.business_type == m_type);

}

WORD m_type;

};

class session_manager

{

public:

typedef boost::shared_lock<boost::shared_mutex> readLock;

typedef boost:: unique_lock<boost::shared_mutex> writeLock;

session_manager(boost::asio::io_service& io_srv, int type, int expires_time);

~session_manager();

void add_session(socket_session_ptr p);

void update_session(socket_session_ptr p);

template<typename Tag, typename Member>

void del_session(Member m)

{

writeLock lock(m_mutex);

if (m_sessions.empty())

{

return ;

}

MULTI_MEMBER_CON(Tag) idx = boost::multi_index::get<Tag>(m_sessions);

//BOOST_AUTO(idx, boost::multi_index::get<Tag>(m_sessions));

BOOST_AUTO(iter, idx.find(m));

if (iter != idx.end())

{

idx.erase(iter);

}

}

//获取容器中的第一个session

template<typename Tag, typename Member>

socket_session_ptr get_session(Member m)

{

readLock lock(m_mutex);

if (m_sessions.empty())

{

return socket_session_ptr();

}

MULTI_MEMBER_CON(Tag) idx = boost::multi_index::get<Tag>(m_sessions);

BOOST_AUTO(iter, idx.find(m));

return iter != boost::end(idx) ? iter->session : socket_session_ptr();

}

//随机获取容器中的session

template<typename Tag>

socket_session_ptr get_session_by_business_type(WORD m)

{

typedef filter_container<is_business_type, MULTI_MEMBER_ITR(Tag)> FilterContainer;

readLock lock(m_mutex);

if (m_sessions.empty())

{

return socket_session_ptr();

}

MULTI_MEMBER_CON(Tag) idx = boost::multi_index::get<Tag>(m_sessions);

//对容器的元素条件过滤

is_business_type predicate(m);

FilterContainer fc(predicate, idx.begin(), idx.end());

FilterContainer::FilterIter iter = fc.begin();

if (fc.begin() == fc.end())

{

return socket_session_ptr();

}

//typedef boost::variate_generator<boost::mt19937, boost::uniform_smallint<>> var_gen;

//typedef boost::details::pool::singleton_default<var_gen_wraper<var_gen>> s_var_gen;

////根据随机数产生session

//s_var_gen::object_type &gen = s_var_gen::instance();

//int step = gen() % fc.szie();

int step = m_next_session % fc.szie();

++m_next_session;

for (int i = 0; i < step; ++i)

{

iter++;

}

return iter != fc.end() ? iter->session : socket_session_ptr();

}

//根据类型和地址取session

template<typename Tag>

socket_session_ptr get_session_by_type_ip(WORD m, std::string& ip)

{

typedef filter_container<is_business_type, MULTI_MEMBER_ITR(Tag)> FilterContainer;

readLock lock(m_mutex);

if (m_sessions.empty())

{

return socket_session_ptr();

}

MULTI_MEMBER_CON(Tag) idx = boost::multi_index::get<Tag>(m_sessions);

//对容器的元素条件过滤

is_business_type predicate(m);

FilterContainer fc(predicate, idx.begin(), idx.end());

FilterContainer::FilterIter iter = fc.begin();

if (fc.begin() == fc.end())

{

return socket_session_ptr();

}

while (iter != fc.end())

{

if (iter->session->get_remote_addr().find(ip) != std::string::npos)

{

break;

}

iter++;

}

return iter != fc.end() ? iter->session : socket_session_ptr();

}

//根据类型和app_id取session

template<typename Tag>

socket_session_ptr get_session_by_type_appid(WORD m, DWORD app_id)

{

typedef filter_container<is_business_type, MULTI_MEMBER_ITR(Tag)> FilterContainer;

readLock lock(m_mutex);

if (m_sessions.empty())

{

return socket_session_ptr();

}

MULTI_MEMBER_CON(Tag) idx = boost::multi_index::get<Tag>(m_sessions);

//对容器的元素条件过滤

is_business_type predicate(m);

FilterContainer fc(predicate, idx.begin(), idx.end());

FilterContainer::FilterIter iter = fc.begin();

if (fc.begin() == fc.end())

{

return socket_session_ptr();

}

while (iter != fc.end())

{

if (iter->session->get_app_id() == app_id)

{

break;

}

iter++;

}

return iter != fc.end() ? iter->session : socket_session_ptr();

}

private:

int m_type;

int m_expires_time;

boost::asio::io_service& m_io_srv;

boost::asio::deadline_timer m_check_tick;

boost::shared_mutex m_mutex;

unsigned short m_next_session;

session_set m_sessions;

void check_connection();

};

}

这里主要用到了boost的multi_index容器,这是一个非常有用方便的容器,可实现容器的多列索引,具体的使用方法,在这里不多做详解。

//session_manager.cpp

[cpp] view plaincopyprint?

#include "session_manager.h"

namespace firebird{

session_manager::session_manager(boost::asio::io_service& io_srv, int type, int expires_time)

:m_io_srv(io_srv), m_check_tick(io_srv), m_type(type), m_expires_time(expires_time),m_next_session(0)

{

check_connection();

}

session_manager::~session_manager()

{

}

//检查服务器所有session的连接状态

void session_manager::check_connection()

{

try{

writeLock lock(m_mutex);

session_set::iterator iter = m_sessions.begin();

while (iter != m_sessions.end())

{

LOG4CXX_DEBUG(firebird_log, "循环");

if (CLIENT == m_type)//客户端的方式

{

if (!iter->session->socket().is_open())//已断开,删除已断开的连接

{

LOG4CXX_INFO(firebird_log, "重新连接[" << iter->address << "]");

iter->session->close(); //通过关闭触发客户端重连

}

else{//连接中,发送心跳

message msg;

msg.command = heartbeat;

msg.business_type = iter->session->get_business_type();

msg.app_id = iter->session->get_app_id();

msg.data() = "H";

iter->session->async_write(msg);

iter->session->set_op_time();

}

}

else if (SERVER == m_type)//服务器的方式

{

if (!iter->session->socket().is_open())//已断开,删除已断开的连接

{

LOG4CXX_INFO(firebird_log, KDS_CODE_INFO << "删除已关闭的session:[" << iter->session->get_remote_addr() << "]");

iter = m_sessions.erase(iter);

continue;

}

else{//连接中,设定每30秒检查一次

if (iter->session->is_timeout()) //如果session已长时间没操作,则关闭

{

LOG4CXX_INFO(firebird_log, KDS_CODE_INFO << "删除已超时的session:[" << iter->session->get_remote_addr() << "]");

iter->session->close();//通过关闭触发删除session

}

}

iter->session->set_op_time();

}

else{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "unknown manager_type");

}

++iter;

}

LOG4CXX_DEBUG(firebird_log, "定时检查");

m_check_tick.expires_from_now(boost::posix_time::seconds(m_expires_time));

m_check_tick.async_wait(boost::bind(&session_manager::check_connection, this));

}

catch(std::exception& e)

{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "[" << e.what() << "]");

}

catch(...)

{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "unknown exception.");

}

}

void session_manager::add_session(socket_session_ptr p)

{

writeLock lock(m_mutex);

session_stu stuSession;

stuSession.id = p->id();

stuSession.business_type = 0;

stuSession.address = p->get_remote_addr();

stuSession.app_id = p->get_app_id();

stuSession.session = p;

m_sessions.insert(stuSession);

}

void session_manager::update_session(socket_session_ptr p)

{

writeLock lock(m_mutex);

if (m_sessions.empty())

{

return ;

}

MULTI_MEMBER_CON(sid) idx = boost::multi_index::get<sid>(m_sessions);

BOOST_AUTO(iter, idx.find(p->id()));

if (iter != idx.end())

{

const_cast<session_stu&>(*iter).business_type = p->get_business_type();

const_cast<session_stu&>(*iter).app_id = p->get_app_id();

}

}

}

这个时候,我就可以使用id、business_type、address、app_id当做key来索引socket_session了,单使用map容器是做不到的。

还有索引时,需要的一个条件过滤器

//filter_container.h

[cpp] view plaincopyprint?

#pragma once

#include <boost/iterator/filter_iterator.hpp>

namespace firebird{

template <class Predicate, class Iterator>

class filter_container

{

public:

typedef boost::filter_iterator<Predicate, Iterator> FilterIter;

filter_container(Predicate p, Iterator begin, Iterator end)

:m_begin(p, begin, end),

m_end(p, end, end)

{

}

~filter_container() {}

FilterIter begin() { return m_begin; }

FilterIter end() { return m_end; }

int szie() {

int i = 0;

FilterIter fi = m_begin;

while(fi != m_end)

{

++i;

++fi;

}

return i;

}

private:

FilterIter m_begin;

FilterIter m_end;

};

}

4.服务器端的实现

服务器我定义为server_socket_utils,拥有一个session_manager,每当accept成功得到一个socket_session时,都会将其增加到session_manager去管理,注册相关回调函数。

read_data_callback 接收到数据的回调函数

收到数据之后,也就是数据包的body部分,反序列化出command、business_type、app_id和data(我使用到了thrift),如果command==normal正常的业务包,会调用handle_read_data传入data。

close_callback 关闭socket_session触发的回调函数

根据id将该连接从session_manager中删除掉

//server_socket_utils.h

[cpp] view plaincopyprint?

#pragma once

#include "socket_session.h"

#include "session_manager.h"

#include <boost/format.hpp>

#include <firebird/message/message.hpp>

namespace firebird{

using boost::asio::ip::tcp;

class FIREBIRD_DECL server_socket_utils

{

private:

boost::asio::io_service m_io_srv;

boost::asio::io_service::work m_work;

tcp::acceptor m_acceptor;

void handle_accept(socket_session_ptr session, const boost::system::error_code& error);

void close_callback(socket_session_ptr session);

void read_data_callback(const boost::system::error_code& e,

socket_session_ptr session, message& msg);

protected:

virtual void handle_read_data(message& msg, socket_session_ptr pSession) = 0;

public:

server_socket_utils(int port);

~server_socket_utils(void);

void start();

boost::asio::io_service& get_io_service() { return m_io_srv; }

session_manager m_manager;

};

}

//server_socket_utils.cpp

[cpp] view plaincopyprint?

#include "server_socket_utils.h"

namespace firebird{

server_socket_utils::server_socket_utils(int port)

:m_work(m_io_srv),

m_acceptor(m_io_srv, tcp::endpoint(tcp::v4(), port)),

m_manager(m_io_srv, SERVER, 3)

{

//m_acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));

//// 关闭连接前留0秒给客户接收数据

//m_acceptor.set_option(boost::asio::ip::tcp::acceptor::linger(true, 0));

//m_acceptor.set_option(boost::asio::ip::tcp::no_delay(true));

//m_acceptor.set_option(boost::asio::socket_base::keep_alive(true));

//m_acceptor.set_option(boost::asio::socket_base::receive_buffer_size(16384));

}

server_socket_utils::~server_socket_utils(void)

{

}

void server_socket_utils::start()

{

try{

socket_session_ptr new_session(new socket_session(m_io_srv));

m_acceptor.async_accept(new_session->socket(),

boost::bind(&server_socket_utils::handle_accept, this, new_session,

boost::asio::placeholders::error));

}

catch(std::exception& e)

{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "socket异常:[" << e.what() << "]");

}

catch(...)

{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "socket异常:[未知异常]");

}

}

void server_socket_utils::handle_accept(socket_session_ptr session, const boost::system::error_code& error)

{

if (!error)

{

try{

socket_session_ptr new_session(new socket_session(m_io_srv));

m_acceptor.async_accept(new_session->socket(),

boost::bind(&server_socket_utils::handle_accept, this, new_session,

boost::asio::placeholders::error));

if (session != NULL)

{

//注册关闭回调函数

session->installCloseCallBack(boost::bind(&server_socket_utils::close_callback, this, _1));

//注册读到数据回调函数

session->installReadDataCallBack(boost::bind(&server_socket_utils::read_data_callback, this, _1, _2, _3));

boost::format fmt("%1%:%2%");

fmt % session->socket().remote_endpoint().address().to_string();

fmt % session->socket().remote_endpoint().port();

session->set_remote_addr(fmt.str());

session->start();

m_manager.add_session(session);

}

}

catch(std::exception& e)

{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "socket异常:[" << e.what() << "]");

}

catch(...)

{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "socket异常:[未知异常]");

}

}

}

void server_socket_utils::close_callback(socket_session_ptr session)

{

LOG4CXX_DEBUG(firebird_log, "close_callback");

try{

m_manager.del_session<sid>(session->id());

}

catch(std::exception& e)

{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "socket异常:[" << e.what() << "]");

}

catch(...)

{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "socket异常:[未知异常]");

}

}

void server_socket_utils::read_data_callback(const boost::system::error_code& e,

socket_session_ptr session, message& msg)

{

try{

LOG4CXX_DEBUG(firebird_log, "command =[" << msg.command << "],["

<< msg.business_type << "],[" << msg.data() << "]");

if (msg.command == heartbeat)

{//心跳

session->async_write(msg);

}

else if (msg.command == regist)

{//注册

session->set_business_type(msg.business_type);

session->set_app_id(msg.app_id);

m_manager.update_session(session);

session->async_write(msg);

LOG4CXX_FATAL(firebird_log, "远程地址:[" << session->get_remote_addr() << "],服务器类型:[" <<

session->get_business_type() << "],服务器ID:[" << session->get_app_id() << "]注册成功!");

}

else if (msg.command == normal)

{//业务数据

handle_read_data(msg, session);

}

else

{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "收到非法消息包!");

}

}

catch(std::exception& e)

{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "socket异常:[" << e.what() << "]");

}

catch(...)

{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "socket异常:[未知异常]");

}

}

}

5.客户端

客户端与服务器的逻辑也差不多,区别就是在于客户端通过connect得到socket_session,而服务器是通过accept得到socket_session。

//client_socket_utils.h

[cpp] view plaincopyprint?

#pragma once

#include "socket_session.h"

#include "session_manager.h"

#include <boost/algorithm/string.hpp>

#include <firebird/message/message.hpp>

namespace firebird{

class FIREBIRD_DECL client_socket_utils

{

public:

client_socket_utils();

~client_socket_utils();

void session_connect(std::vector<socket_session_ptr>& vSession);

void session_connect(socket_session_ptr pSession);

//socket_session_ptr get_session(std::string& addr);

boost::asio::io_service& get_io_service() { return m_io_srv; }

protected:

virtual void handle_read_data(message& msg, socket_session_ptr pSession) = 0;

private:

boost::asio::io_service m_io_srv;

boost::asio::io_service::work m_work;

session_manager m_manager;

void handle_connect(const boost::system::error_code& error,

tcp::resolver::iterator endpoint_iterator, socket_session_ptr pSession);

void close_callback(socket_session_ptr session);

void read_data_callback(const boost::system::error_code& e,

socket_session_ptr session, message& msg);

};

}

//client_socket_utils.cpp

[cpp] view plaincopyprint?

#include "client_socket_utils.h"

namespace firebird{

client_socket_utils::client_socket_utils()

:m_work(m_io_srv), m_manager(m_io_srv, CLIENT, 3)

{

}

client_socket_utils::~client_socket_utils()

{

}

void client_socket_utils::session_connect(std::vector<socket_session_ptr>& vSession)

{

for (int i = 0; i < vSession.size(); ++i)

{

session_connect(vSession[i]);

}

}

void client_socket_utils::session_connect(socket_session_ptr pSession)

{

std::string& addr = pSession->get_remote_addr();

try{

//注册关闭回调函数

pSession->installCloseCallBack(boost::bind(&client_socket_utils::close_callback, this, _1));

//注册读到数据回调函数

pSession->installReadDataCallBack(boost::bind(&client_socket_utils::read_data_callback, this, _1, _2, _3));

std::vector<std::string> ip_port;

boost::split(ip_port, addr, boost::is_any_of(":"));

if (ip_port.size() < 2)

{

//throw std::runtime_error("ip 格式不正确!");

LOG4CXX_ERROR(firebird_log, "[" << addr << "] ip 格式不正确!");

return;

}

tcp::resolver resolver(pSession->socket().get_io_service());

tcp::resolver::query query(ip_port[0], ip_port[1]);

tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);

//pSession->set_begin_endpoint(endpoint_iterator);//设置起始地址,以便重连

//由于客户端是不断重连的,即使还未连接也要保存该session

m_manager.add_session(pSession);

tcp::endpoint endpoint = *endpoint_iterator;

pSession->socket().async_connect(endpoint,

boost::bind(&client_socket_utils::handle_connect, this,

boost::asio::placeholders::error, ++endpoint_iterator, pSession));

}

catch(std::exception& e)

{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "连接远程地址:[" << addr << "],socket异常:[" << e.what() << "]");

}

catch(...)

{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "连接远程地址:[" << addr << "],socket异常:[未知异常]");

}

}

void client_socket_utils::handle_connect(const boost::system::error_code& error,

tcp::resolver::iterator endpoint_iterator, socket_session_ptr pSession)

{

LOG4CXX_DEBUG(firebird_log, KDS_CODE_INFO << " enter.");

std::string sLog;

try{

if (!error)

{

LOG4CXX_FATAL(firebird_log, "服务器:[" << pSession->get_business_type() <<"],连接远程地址:[" << pSession->get_remote_addr().c_str() << "]成功!");

pSession->start();

//向服务器注册服务类型

message msg;

msg.command = regist;

msg.business_type = pSession->get_business_type();

msg.app_id = pSession->get_app_id();

msg.data() = "R";

pSession->async_write(msg);

}

else if (endpoint_iterator != tcp::resolver::iterator())

{

LOG4CXX_ERROR(firebird_log, "连接远程地址:[" << pSession->get_remote_addr().c_str() << "]失败,试图重连下一个地址。");

pSession->socket().close();//此处用socket的close,不应用session的close触发连接,不然会导致一直重连

tcp::endpoint endpoint = *endpoint_iterator;

pSession->socket().async_connect(endpoint,

boost::bind(&client_socket_utils::handle_connect, this,

boost::asio::placeholders::error, ++endpoint_iterator, pSession));

}

else

{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "连接远程地址:[" << pSession->get_remote_addr().c_str() << "]失败!");

pSession->socket().close();//此处用socket的close,不应用session的close触发连接,不然会导致一直重连

}

}

catch(std::exception& e)

{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "连接远程地址:[" << pSession->get_remote_addr().c_str() <<"],socket异常:[" << e.what() << "]");

}

catch(...)

{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "连接远程地址:[" << pSession->get_remote_addr().c_str() <<"],socket异常:[未知异常]");

}

}

void client_socket_utils::read_data_callback(const boost::system::error_code& e,

socket_session_ptr session, message& msg)

{

LOG4CXX_DEBUG(firebird_log, "command =[" << msg.command << "],["

<< msg.business_type << "],[" << msg.data() << "]");

if (msg.command == heartbeat)

{//心跳

}

else if (msg.command == regist)

{//注册

LOG4CXX_FATAL(firebird_log, "服务器:[" << session->get_business_type() <<"]注册成功。");

}

else if (msg.command == normal)

{//业务数据

handle_read_data(msg, session);

}

else

{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "收到非法消息包!");

}

}

//关闭session就会重连

void client_socket_utils::close_callback(socket_session_ptr session)

{

LOG4CXX_DEBUG(firebird_log, KDS_CODE_INFO << "enter.");

try{

//tcp::resolver::iterator endpoint_iterator = context.session->get_begin_endpoint();

std::string& addr = session->get_remote_addr();

std::vector<std::string> ip_port;

boost::split(ip_port, addr, boost::is_any_of(":"));

if (ip_port.size() < 2)

{

LOG4CXX_ERROR(firebird_log, "[" << addr << "] ip 格式不正确!");

return;

}

tcp::resolver resolver(session->socket().get_io_service());

tcp::resolver::query query(ip_port[0], ip_port[1]);

tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);

tcp::endpoint endpoint = *endpoint_iterator;

session->socket().async_connect(endpoint,

boost::bind(&client_socket_utils::handle_connect, this,

boost::asio::placeholders::error, ++endpoint_iterator, session));

}

catch(std::exception& e)

{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "连接远程地址:[" << session->get_remote_addr().c_str() <<"],socket异常:[" << e.what() << "]");

}

catch(...)

{

LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "连接远程地址:[" << session->get_remote_addr().c_str() <<"],socket异常:[未知异常]");

}

}

}

5.对象串行化

socket_session发送和接收数据包的时候使用到了对象串行化,我这里是通过thrift实现的,其实boost的serialization库也提供了这样的功能,使用起来更为方便,但我在测试过程中,thrift相比之下性能会高很多,因此就坚持使用thrift了,感兴趣的话可以看我之前写的使用thrift串行化对象轻量级序列化库boost serialization》

5.1字符串与thrift对象的相互转换

[cpp] view plaincopyprint?

#pragma once

#include <boost/shared_ptr.hpp>

#include <transport/TBufferTransports.h>

#include <protocol/TProtocol.h>

#include <protocol/TBinaryProtocol.h>

namespace firebird{

using namespace apache::thrift;

using namespace apache::thrift::transport;

using namespace apache::thrift::protocol;

template<typename T>

void thrift_iserialize(T& stu, std::string& s)

{

boost::shared_ptr<TMemoryBuffer> trans(new TMemoryBuffer((uint8_t*)&s[0], s.size()));

boost::shared_ptr<TProtocol> proto(new TBinaryProtocol(trans));

stu.read(proto.get());

}

template<typename T>

void thrift_oserialize(T& stu, std::string& s)

{

boost::shared_ptr<TMemoryBuffer> trans(new TMemoryBuffer());

boost::shared_ptr<TProtocol> proto(new TBinaryProtocol(trans));

stu.write(proto.get());

s = trans->getBufferAsString();

}

}

5.2通过thrift对象,普通的对象与字符串的相互转换

[cpp] view plaincopyprint?

#pragma once

#include "message_archive.hpp"

#include <firebird/archive/thrift_archive.hpp>

#include <firebird/message/TMessage_types.h>

namespace firebird

{

/*** message to ThriftMessage ***/

void msg_to_tmsg(TMessage& tmsg, message& msg)

{

//设置

tmsg.command = msg.command;

tmsg.business_type = msg.business_type;

tmsg.app_id = msg.app_id;

//设置context

tmsg.context.cmdVersion = msg.context().cmdVersion;

tmsg.context.cpid.swap(msg.context().cpid);

tmsg.context.remote_ip.swap(msg.context().remote_ip);

tmsg.context.wSerialNumber = msg.context().wSerialNumber;

tmsg.context.session_id = msg.context().session_id;

//设置source

for (int i = 0; i < msg.source().size(); ++i)

{

tmsg.source.push_back(msg.source()[i]);

}

//设置destination

for (int i = 0; i < msg.destination().size(); ++i)

{

tmsg.destination.push_back(msg.destination()[i]);

}

//设置data

tmsg.data = msg.data();

}

/*** ThriftMessage to message ***/

void tmsg_to_msg(message& msg, TMessage& tmsg)

{

//设置

msg.command = tmsg.command;

msg.business_type = tmsg.business_type;

msg.app_id = tmsg.app_id;

//设置context

msg.context().cmdVersion = tmsg.context.cmdVersion;

msg.context().cpid = tmsg.context.cpid;

msg.context().remote_ip = tmsg.context.remote_ip;

msg.context().wSerialNumber = tmsg.context.wSerialNumber;

msg.context().session_id = tmsg.context.session_id;

//设置source

for (int i = 0; i < tmsg.source.size(); ++i)

{

msg.source() << tmsg.source[i];

}

//设置destination

for (int i = 0; i < tmsg.destination.size(); ++i)

{

msg.destination() << tmsg.destination[i];

}

//设置data

msg.data() = tmsg.data;

}

void message_iarchive(message& msg, std::string& s)

{

TMessage tmsg;

thrift_iserialize(tmsg, s);

tmsg_to_msg(msg, tmsg);

}

void message_oarchive(std::string& s, message& msg)

{

TMessage tmsg;

msg_to_tmsg(tmsg, msg);

thrift_oserialize(tmsg, s);

}

}

---恢复内容结束---
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: