【Boost】boost库asio详解9——TCP的简单例子2
2016-03-15 12:08
846 查看
客户端:
服务器:
编译:
g++ -Wall -o client client.cpp -lboost_system
g++ -Wall -o server server.cpp -lboost_system
这里需要注意的是: async_write, async_read, async_read_until 都是需要达到特定条件才会调用回调函数,
在调用回调函数之前, 不能再次调用, 否则接收到的数据很可能是乱的. 所以, 在实际代码当中, 会有一个写缓冲队列, 当需要write的时, 先放到队列中, 如果队列个数为1, 则调用async_write, 否则等待函数回调, 当函数回调时将首个元素从队列中移除, 然后接着发送下一个, 直到队列为空.
对于client, 由于is_open在async_connect之后就是true状态了, 因此在async_connect回调返回之前没有方法知道是否已经连接成功, 实际代码当中一般会增加一个变量以表示该套接字是否已经允许发送数据.
============================================================================
服务器:
[cpp] view
plain copy
#define PACK_MAX_SIZE 16*1024
class TCPConnection: public boost::enable_shared_from_this<TCPConnection> {
public:
static boost::shared_ptr<TCPConnection> create(IoService& ioService);
tcp::socket& getSocket();
void start();
private:
TCPConnection(IoService& ioService);
void handleRead(const boost::system::error_code& e,size_t bytesTransferred);
void handleWrite(const boost::system::error_code& e,size_t bytesTransferred);
//TCP的socket
tcp::socket socket;
//接收和发送的缓冲区
char m_buffer[PACK_MAX_SIZE];
size_t m_bytesReceived;
};
typedef boost::shared_ptr<TCPConnection> pointer;
class TCPServer {
public:
TCPServer(IoService& ioService, int port);
public:
private:
void startAccept();
void handleAccept(boost::shared_ptr<TCPConnection> newConnection,
const boost::system::error_code& error);
private:
tcp::acceptor acceptor;
};
[cpp] view
plain copy
TCPConnection::TCPConnection(IoService& ioService) :socket(ioService) {
m_bytesReceived = 0;
}
boost::shared_ptr<TCPConnection> TCPConnection::create(IoService& ioService) {
return pointer(new TCPConnection(ioService));
}
tcp::socket& TCPConnection::getSocket() {
return socket;
}
void TCPConnection::start() {
static tcp::no_delay option( true );
socket.set_option( option );
socket.set_option(boost::asio::socket_base::keep_alive(true));
memset(m_buffer, 0x0, 2048);
socket.async_read_some(boost::asio::buffer(m_buffer),
boost::bind(&TCPConnection::handleRead, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void TCPConnection::sendTermMessage( long phoneNum, unsigned short serialNum, unsigned char cmd, void* rawData )
{
socket.async_write_some(
boost::asio::buffer( resp->mem, resp->size ),
boost::bind(
&TCPConnection::handleWrite,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred
)
);
}
void TCPConnection::handleRead(const boost::system::error_code& error, size_t bytesTransferred)
{
//cout <<"************************socket read " << bytesTransferred << " bytes codes ***************************"<< endl;
if( error != 0 || bytesTransferred == 0 )
{
std::cout << "!!! network exception: err(" << error.value() << "), " << boost::system::system_error( error ).what() << " !!!" << std::endl;
socket.close();
CmdQueue::getInstance()->cancelSubscribe( this );
return;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
std::cout << "received some from remote serivce";
time_t nowTime = time(NULL);
std::cout << ":(time:"<<nowTime<<")" << SQ::Utils::bytes2HexString( (SQ::Byte *) m_buffer, bytesTransferred<=40?bytesTransferred:40);
if (bytesTransferred<=40)
{
std::cout<< std::endl;
}
else
{
cout << "..." <<endl;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//解析消息(主要考虑到TCP分包和TCP粘包)
size_t m_bytesReceived = 0;
m_bytesReceived += bytesTransferred;
size_t dealDataSize = 0;
if (m_bytesReceived>=sizeof(TCPHead))
{
int offset = 0;
while (true)//考虑到TCP粘包问题,做个循环
{
//剩余缓冲区不足消息头
offset = dealDataSize;
if (m_bytesReceived-offset < sizeof(TCPHead)) break;
TCPHead* head = (TCPHead*)(m_buffer + offset);
unsigned long phoneNum = SQ::Utils::BCD2long( head->terminalID, PHONENUM_LENGTH );
offset += PHONENUM_LENGTH;
unsigned short serialNum = ntohs( head->serialNum );
offset += 2;
unsigned char cmd = head->cmd ;
offset += 1;
int bodyLen = ntohl( head->bodyLen );
offset += 4;
if (m_bytesReceived-offset < bodyLen) break;
SQ::Byte *pRawData = (SQ::Byte *)m_buffer + offset;
sendUDPPackToTerminal(phoneNum, pRawData, bodyLen);
dealDataSize += (sizeof(TCPHead)+bodyLen);
}
}
//处理数据之后刷新缓冲区
if (dealDataSize > 0)
{
memmove(m_buffer,m_buffer+dealDataSize,m_bytesReceived-dealDataSize);
m_bytesReceived -= dealDataSize;
}
socket.async_read_some(boost::asio::buffer(m_buffer + m_bytesReceived, PACK_MAX_SIZE-m_bytesReceived),
boost::bind(&TCPConnection::handleRead, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
return;
}
void TCPConnection::handleWrite( const boost::system::error_code& error, size_t bytesTransferred)
{
if( error != 0 )
{
std::cout << "write error." << std::endl;
}
}
TCPServer::TCPServer(IoService& ioService, int port) :
acceptor(ioService, tcp::endpoint(tcp::v4(), port)) {
startAccept();
}
void TCPServer::startAccept() {
boost::shared_ptr<TCPConnection> newConnection = TCPConnection::create(
acceptor.get_io_service());
acceptor.async_accept(newConnection->getSocket(),
boost::bind(&TCPServer::handleAccept, this, newConnection,
boost::asio::placeholders::error));
}
void TCPServer::handleAccept(boost::shared_ptr<TCPConnection> newConnection,
const boost::system::error_code& error) {
if (!error) {
newConnection->start();
startAccept();
}
}
客户端:
[cpp] view
plain copy
class CommonSession : public boost::enable_shared_from_this<CommonSession>
{
//外部接口
public:
CommonSession( boost::asio::io_service *io_service, const std::string &ip, short port);
void start();
void sendDataBytes(SQ::Byte *pByte, int len);
bool getConnectState() {return m_isConnected;}
//内部接口
protected:
void onConnect( const boost::system::error_code &error );
void onWrite( const boost::system::error_code& error, size_t bytes_transferred );
void onRead( const boost::system::error_code& error, size_t bytes_transferred );
private:
tcp::socket m_socket; //套接字
char m_buffer[PACK_MAX_SIZE]; //接收缓冲区
bool m_isConnected; //连接状态
std::string m_ip; //目标 I P
short m_port; //目标端口
size_t m_bytesReceived; //收字节数
unsigned short m_serialNum; //发流水号
boost::recursive_mutex m_mutex; //互斥体
boost::asio::io_service* m_io_service; //I O 服务
};
[cpp] view
plain copy
CommonSession::CommonSession( boost::asio::io_service *io_service, const std::string &ip, short port )
: m_io_service( io_service ), m_socket( *io_service ), m_ip( ip ), m_port( port )
{
m_bytesReceived = 0;
m_isConnected = false;
haveLog = false;
}
void CommonSession::onConnect( const boost::system::error_code & error )
{
if( error )
{
//错误LOG只输出一次
if (!haveLog)
{
haveLog = true;
//连接失败
std::cout << "connect to " << m_ip << ":" << m_port << " failed!" << " -> ERR : ";
if( error.value() != boost::system::errc::operation_canceled )
{
std::cerr << boost::system::system_error( error ).what() << std::endl;
}
}
//3秒后重新连接
m_socket.close();
boost::this_thread::sleep( boost::posix_time::seconds( 3 ) );
start();
}
else
{
std::cout << "connect to " << m_ip << ":" << m_port << " successed! (time:"<< time(NULL)<<")" << std::endl;
m_isConnected = true;
static tcp::no_delay option( true );
m_socket.set_option( option );
m_socket.set_option( boost::asio::socket_base::keep_alive( true ) );
//重发未发送的消息
//发起读事件
m_socket.async_read_some( boost::asio::buffer( m_buffer ),
boost::bind( &CommonSession::onRead,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred ) );
}
}
void CommonSession::onWrite( const boost::system::error_code& error, size_t bytes_transferred )
{
if( error != 0 )
{
std::cout << "!!! send error, code is " << error<<" !!!"<<std::endl;
std::cerr << boost::system::system_error( error ).what() << std::endl;
}
}
void CommonSession::onRead( const boost::system::error_code& error, size_t bytes_transferred )
{
//cout <<"************************socket read " << bytes_transferred << " bytes codes ***************************"<< endl;
//出错检查
if( error != 0 )
{
std::cout << "!!! onRead error, code is " << error<<" !!!"<<std::endl;
std::cerr << boost::system::system_error( error ).what() << std::endl;
m_socket.close();
//3秒后重新连接
boost::this_thread::sleep( boost::posix_time::seconds( 3 ) );
m_isConnected = false;
start();
return;
}
else if( bytes_transferred == 0 )
{
m_socket.close();
//3秒后重新连接
boost::this_thread::sleep( boost::posix_time::seconds( 3 ) );
m_isConnected = false;
start();
return;
}
//解析消息(主要考虑到TCP分包和TCP粘包)
size_t m_bytesReceived = 0;
m_bytesReceived += bytes_transferred;
size_t dealDataSize = 0;
if (m_bytesReceived>=sizeof(TCPHead))
{
int offset = 0;
while (true)//考虑到TCP粘包问题,做个循环
{
//剩余缓冲区不足消息头
offset = dealDataSize;
if (m_bytesReceived-offset < sizeof(SQ::TCPHead)) break;
SQ::TCPHead* head = (SQ::TCPHead*)(m_buffer + offset);
unsigned long phoneNum = SQ::Utils::BCD2long( head->terminalID, PHONENUM_LENGTH );
offset += PHONENUM_LENGTH;
unsigned short serialNum = ntohs( head->serialNum );
offset += 2;
unsigned char cmd = head->cmd ;
offset += 1;
int bodyLen = ntohl( head->bodyLen );
offset += 4;
if (bodyLen == 0)//处理来自网关的数据,而不是终端
{
std::cout<< "receive(time:"<< time(NULL)<<") command from gate, cmd:" << (int)cmd << " phoneNum:"<<phoneNum<<std::endl;
dealCmd(cmd, phoneNum);
dealDataSize += sizeof(SQ::TCPHead);
}
else//处理来自终端的消息
{
//剩余缓冲区不足消息体
if (m_bytesReceived-offset < bodyLen) break;
//取出808消息体
char *pRawData = m_buffer + offset;
//解开消息头
CodecMsgHead csMsgHead;
AbstractCodec tmpCode;
tmpCode.decodeHead((SQ::Byte*)pRawData, &csMsgHead);
//处理消息长度
dealDataSize += (sizeof(SQ::TCPHead)+bodyLen);
}
}
}
//处理数据之后刷新缓冲区
if (dealDataSize > 0)
{
memmove(m_buffer,m_buffer+dealDataSize,m_bytesReceived-dealDataSize);
m_bytesReceived -= dealDataSize;
}
//发起读事件
m_socket.async_read_some( boost::asio::buffer(m_buffer + m_bytesReceived, PACK_MAX_SIZE-m_bytesReceived),
boost::bind( &CommonSession::onRead,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred ) );
}
void CommonSession::start()
{
tcp::endpoint ep = tcp::endpoint( address::from_string( m_ip ), m_port );
m_socket.async_connect( ep,
boost::bind( &CommonSession::onConnect,
shared_from_this(),
boost::asio::placeholders::error ) );
}
//发送字节流,编码808协议之后的字节流
void CommonSession::sendDataBytes(SQ::Byte *pByte, int len)
{
//检测连接
if (!m_isConnected)
{
std::cout<< "connection(to "<<m_ip<<":"<< m_port<<") is not established, so sendDataBytes faild" <<std::endl;
return;
}
//发送消息
time_t nowTime = time(NULL);
std::cout << "send:(time:"<<nowTime<<")" << SQ::Utils::bytes2HexString( pByte, len<=40?len:40);
if (len<=40)
{
std::cout<< std::endl;
}
else
{
cout << "..." <<endl;
}
// std::cout << "send:(time:"<<nowTime<<")" << SQ::Utils::bytes2HexString( pByte, len) <<endl;
boost::asio::async_write(
m_socket,
boost::asio::buffer( pByte, len),
boost::bind(&CommonSession::onWrite,shared_from_this(),boost::asio::placeholders::error,boost::asio::placeholders::bytes_transferred)
);
}
// Client.cpp : 定义控制台应用程序的入口点。 // #include "stdafx.h" #include <iostream> #include <boost/asio.hpp> #include <boost/bind.hpp> #include <boost/shared_ptr.hpp> #include <boost/enable_shared_from_this.hpp> using boost::asio::ip::tcp; using boost::asio::ip::address; class session: public boost::enable_shared_from_this<session> { public: session(boost::asio::io_service &io_service, tcp::endpoint &endpoint) : io_service_(io_service), socket_(io_service), endpoint_(endpoint) { } void start() { socket_.async_connect(endpoint_, boost::bind(&session::handle_connect, shared_from_this(), boost::asio::placeholders::error)); } private: void handle_connect(const boost::system::error_code &error) { if (error) { if (error.value() != boost::system::errc::operation_canceled) { std::cerr << boost::system::system_error(error).what() << std::endl; } socket_.close(); return; } static tcp::no_delay option(true); socket_.set_option(option); strcpy(buf, "Hello World!\n"); boost::asio::async_write(socket_, boost::asio::buffer(buf, strlen(buf)), boost::bind(&session::handle_write, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } void handle_write(const boost::system::error_code& error, size_t bytes_transferred) { memset(buf, sizeof(buf), 0); boost::asio::async_read_until(socket_, sbuf, "\n", boost::bind(&session::handle_read, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } void handle_read(const boost::system::error_code& error, size_t bytes_transferred) { std::cout << buf << std::endl; } private: boost::asio::io_service &io_service_; tcp::socket socket_; tcp::endpoint &endpoint_; char buf[1024]; boost::asio::streambuf sbuf; }; typedef boost::shared_ptr<session> session_ptr; int main(int argc, char* argv[]) { boost::asio::io_service io_service; tcp::endpoint endpoint(address::from_string("172.16.6.70"), 10028); session_ptr new_session(new session(io_service, endpoint)); new_session->start(); io_service.run(); return 0; }
服务器:
#include <string.h> #include <iostream> #include <boost/asio.hpp> #include <boost/bind.hpp> #include <boost/shared_ptr.hpp> #include <boost/enable_shared_from_this.hpp> using boost::asio::ip::tcp; using boost::asio::ip::address; class session: public boost::enable_shared_from_this<session> { public: session(boost::asio::io_service &io_service): socket_(io_service) { } void start() { static tcp::no_delay option(true); socket_.set_option(option); boost::asio::async_read_until(socket_, sbuf_, "\n", boost::bind(&session::handle_read, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } tcp::socket &socket() { return socket_; } private: void handle_write(const boost::system::error_code& error, size_t bytes_transferred) { boost::asio::async_read_until(socket_, sbuf_, "\n", boost::bind(&session::handle_read, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } void handle_read(const boost::system::error_code& error, size_t bytes_transferred) { boost::asio::async_write(socket_, sbuf_, boost::bind(&session::handle_write, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); std::ostream os(&sbuf_); printf("%s \n",sbuf_.data()); } private: tcp::socket socket_; boost::asio::streambuf sbuf_; }; typedef boost::shared_ptr<session> session_ptr; class server { public: server(boost::asio::io_service &io_service, tcp::endpoint &endpoint) : io_service_(io_service), acceptor_(io_service, endpoint) { session_ptr new_session(new session(io_service_)); acceptor_.async_accept(new_session->socket(), boost::bind(&server::handle_accept, this, new_session, boost::asio::placeholders::error)); } void handle_accept(session_ptr new_session, const boost::system::error_code& error) { if (error) { return; } new_session->start(); new_session.reset(new session(io_service_)); acceptor_.async_accept(new_session->socket(), boost::bind(&server::handle_accept, this, new_session, boost::asio::placeholders::error)); } void run() { io_service_.run(); } private: boost::asio::io_service &io_service_; tcp::acceptor acceptor_; }; int main(int argc, char* argv[]) { boost::asio::io_service io_service; //tcp::endpoint endpoint(address::from_string("172.16.6.70"), 10028); tcp::endpoint endpoint(boost::asio::ip::tcp::v4(), 10028); server s(io_service, endpoint); s.run(); return 0; }
编译:
g++ -Wall -o client client.cpp -lboost_system
g++ -Wall -o server server.cpp -lboost_system
这里需要注意的是: async_write, async_read, async_read_until 都是需要达到特定条件才会调用回调函数,
在调用回调函数之前, 不能再次调用, 否则接收到的数据很可能是乱的. 所以, 在实际代码当中, 会有一个写缓冲队列, 当需要write的时, 先放到队列中, 如果队列个数为1, 则调用async_write, 否则等待函数回调, 当函数回调时将首个元素从队列中移除, 然后接着发送下一个, 直到队列为空.
对于client, 由于is_open在async_connect之后就是true状态了, 因此在async_connect回调返回之前没有方法知道是否已经连接成功, 实际代码当中一般会增加一个变量以表示该套接字是否已经允许发送数据.
============================================================================
服务器:
[cpp] view
plain copy
#define PACK_MAX_SIZE 16*1024
class TCPConnection: public boost::enable_shared_from_this<TCPConnection> {
public:
static boost::shared_ptr<TCPConnection> create(IoService& ioService);
tcp::socket& getSocket();
void start();
private:
TCPConnection(IoService& ioService);
void handleRead(const boost::system::error_code& e,size_t bytesTransferred);
void handleWrite(const boost::system::error_code& e,size_t bytesTransferred);
//TCP的socket
tcp::socket socket;
//接收和发送的缓冲区
char m_buffer[PACK_MAX_SIZE];
size_t m_bytesReceived;
};
typedef boost::shared_ptr<TCPConnection> pointer;
class TCPServer {
public:
TCPServer(IoService& ioService, int port);
public:
private:
void startAccept();
void handleAccept(boost::shared_ptr<TCPConnection> newConnection,
const boost::system::error_code& error);
private:
tcp::acceptor acceptor;
};
[cpp] view
plain copy
TCPConnection::TCPConnection(IoService& ioService) :socket(ioService) {
m_bytesReceived = 0;
}
boost::shared_ptr<TCPConnection> TCPConnection::create(IoService& ioService) {
return pointer(new TCPConnection(ioService));
}
tcp::socket& TCPConnection::getSocket() {
return socket;
}
void TCPConnection::start() {
static tcp::no_delay option( true );
socket.set_option( option );
socket.set_option(boost::asio::socket_base::keep_alive(true));
memset(m_buffer, 0x0, 2048);
socket.async_read_some(boost::asio::buffer(m_buffer),
boost::bind(&TCPConnection::handleRead, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void TCPConnection::sendTermMessage( long phoneNum, unsigned short serialNum, unsigned char cmd, void* rawData )
{
socket.async_write_some(
boost::asio::buffer( resp->mem, resp->size ),
boost::bind(
&TCPConnection::handleWrite,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred
)
);
}
void TCPConnection::handleRead(const boost::system::error_code& error, size_t bytesTransferred)
{
//cout <<"************************socket read " << bytesTransferred << " bytes codes ***************************"<< endl;
if( error != 0 || bytesTransferred == 0 )
{
std::cout << "!!! network exception: err(" << error.value() << "), " << boost::system::system_error( error ).what() << " !!!" << std::endl;
socket.close();
CmdQueue::getInstance()->cancelSubscribe( this );
return;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
std::cout << "received some from remote serivce";
time_t nowTime = time(NULL);
std::cout << ":(time:"<<nowTime<<")" << SQ::Utils::bytes2HexString( (SQ::Byte *) m_buffer, bytesTransferred<=40?bytesTransferred:40);
if (bytesTransferred<=40)
{
std::cout<< std::endl;
}
else
{
cout << "..." <<endl;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//解析消息(主要考虑到TCP分包和TCP粘包)
size_t m_bytesReceived = 0;
m_bytesReceived += bytesTransferred;
size_t dealDataSize = 0;
if (m_bytesReceived>=sizeof(TCPHead))
{
int offset = 0;
while (true)//考虑到TCP粘包问题,做个循环
{
//剩余缓冲区不足消息头
offset = dealDataSize;
if (m_bytesReceived-offset < sizeof(TCPHead)) break;
TCPHead* head = (TCPHead*)(m_buffer + offset);
unsigned long phoneNum = SQ::Utils::BCD2long( head->terminalID, PHONENUM_LENGTH );
offset += PHONENUM_LENGTH;
unsigned short serialNum = ntohs( head->serialNum );
offset += 2;
unsigned char cmd = head->cmd ;
offset += 1;
int bodyLen = ntohl( head->bodyLen );
offset += 4;
if (m_bytesReceived-offset < bodyLen) break;
SQ::Byte *pRawData = (SQ::Byte *)m_buffer + offset;
sendUDPPackToTerminal(phoneNum, pRawData, bodyLen);
dealDataSize += (sizeof(TCPHead)+bodyLen);
}
}
//处理数据之后刷新缓冲区
if (dealDataSize > 0)
{
memmove(m_buffer,m_buffer+dealDataSize,m_bytesReceived-dealDataSize);
m_bytesReceived -= dealDataSize;
}
socket.async_read_some(boost::asio::buffer(m_buffer + m_bytesReceived, PACK_MAX_SIZE-m_bytesReceived),
boost::bind(&TCPConnection::handleRead, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
return;
}
void TCPConnection::handleWrite( const boost::system::error_code& error, size_t bytesTransferred)
{
if( error != 0 )
{
std::cout << "write error." << std::endl;
}
}
TCPServer::TCPServer(IoService& ioService, int port) :
acceptor(ioService, tcp::endpoint(tcp::v4(), port)) {
startAccept();
}
void TCPServer::startAccept() {
boost::shared_ptr<TCPConnection> newConnection = TCPConnection::create(
acceptor.get_io_service());
acceptor.async_accept(newConnection->getSocket(),
boost::bind(&TCPServer::handleAccept, this, newConnection,
boost::asio::placeholders::error));
}
void TCPServer::handleAccept(boost::shared_ptr<TCPConnection> newConnection,
const boost::system::error_code& error) {
if (!error) {
newConnection->start();
startAccept();
}
}
客户端:
[cpp] view
plain copy
class CommonSession : public boost::enable_shared_from_this<CommonSession>
{
//外部接口
public:
CommonSession( boost::asio::io_service *io_service, const std::string &ip, short port);
void start();
void sendDataBytes(SQ::Byte *pByte, int len);
bool getConnectState() {return m_isConnected;}
//内部接口
protected:
void onConnect( const boost::system::error_code &error );
void onWrite( const boost::system::error_code& error, size_t bytes_transferred );
void onRead( const boost::system::error_code& error, size_t bytes_transferred );
private:
tcp::socket m_socket; //套接字
char m_buffer[PACK_MAX_SIZE]; //接收缓冲区
bool m_isConnected; //连接状态
std::string m_ip; //目标 I P
short m_port; //目标端口
size_t m_bytesReceived; //收字节数
unsigned short m_serialNum; //发流水号
boost::recursive_mutex m_mutex; //互斥体
boost::asio::io_service* m_io_service; //I O 服务
};
[cpp] view
plain copy
CommonSession::CommonSession( boost::asio::io_service *io_service, const std::string &ip, short port )
: m_io_service( io_service ), m_socket( *io_service ), m_ip( ip ), m_port( port )
{
m_bytesReceived = 0;
m_isConnected = false;
haveLog = false;
}
void CommonSession::onConnect( const boost::system::error_code & error )
{
if( error )
{
//错误LOG只输出一次
if (!haveLog)
{
haveLog = true;
//连接失败
std::cout << "connect to " << m_ip << ":" << m_port << " failed!" << " -> ERR : ";
if( error.value() != boost::system::errc::operation_canceled )
{
std::cerr << boost::system::system_error( error ).what() << std::endl;
}
}
//3秒后重新连接
m_socket.close();
boost::this_thread::sleep( boost::posix_time::seconds( 3 ) );
start();
}
else
{
std::cout << "connect to " << m_ip << ":" << m_port << " successed! (time:"<< time(NULL)<<")" << std::endl;
m_isConnected = true;
static tcp::no_delay option( true );
m_socket.set_option( option );
m_socket.set_option( boost::asio::socket_base::keep_alive( true ) );
//重发未发送的消息
//发起读事件
m_socket.async_read_some( boost::asio::buffer( m_buffer ),
boost::bind( &CommonSession::onRead,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred ) );
}
}
void CommonSession::onWrite( const boost::system::error_code& error, size_t bytes_transferred )
{
if( error != 0 )
{
std::cout << "!!! send error, code is " << error<<" !!!"<<std::endl;
std::cerr << boost::system::system_error( error ).what() << std::endl;
}
}
void CommonSession::onRead( const boost::system::error_code& error, size_t bytes_transferred )
{
//cout <<"************************socket read " << bytes_transferred << " bytes codes ***************************"<< endl;
//出错检查
if( error != 0 )
{
std::cout << "!!! onRead error, code is " << error<<" !!!"<<std::endl;
std::cerr << boost::system::system_error( error ).what() << std::endl;
m_socket.close();
//3秒后重新连接
boost::this_thread::sleep( boost::posix_time::seconds( 3 ) );
m_isConnected = false;
start();
return;
}
else if( bytes_transferred == 0 )
{
m_socket.close();
//3秒后重新连接
boost::this_thread::sleep( boost::posix_time::seconds( 3 ) );
m_isConnected = false;
start();
return;
}
//解析消息(主要考虑到TCP分包和TCP粘包)
size_t m_bytesReceived = 0;
m_bytesReceived += bytes_transferred;
size_t dealDataSize = 0;
if (m_bytesReceived>=sizeof(TCPHead))
{
int offset = 0;
while (true)//考虑到TCP粘包问题,做个循环
{
//剩余缓冲区不足消息头
offset = dealDataSize;
if (m_bytesReceived-offset < sizeof(SQ::TCPHead)) break;
SQ::TCPHead* head = (SQ::TCPHead*)(m_buffer + offset);
unsigned long phoneNum = SQ::Utils::BCD2long( head->terminalID, PHONENUM_LENGTH );
offset += PHONENUM_LENGTH;
unsigned short serialNum = ntohs( head->serialNum );
offset += 2;
unsigned char cmd = head->cmd ;
offset += 1;
int bodyLen = ntohl( head->bodyLen );
offset += 4;
if (bodyLen == 0)//处理来自网关的数据,而不是终端
{
std::cout<< "receive(time:"<< time(NULL)<<") command from gate, cmd:" << (int)cmd << " phoneNum:"<<phoneNum<<std::endl;
dealCmd(cmd, phoneNum);
dealDataSize += sizeof(SQ::TCPHead);
}
else//处理来自终端的消息
{
//剩余缓冲区不足消息体
if (m_bytesReceived-offset < bodyLen) break;
//取出808消息体
char *pRawData = m_buffer + offset;
//解开消息头
CodecMsgHead csMsgHead;
AbstractCodec tmpCode;
tmpCode.decodeHead((SQ::Byte*)pRawData, &csMsgHead);
//处理消息长度
dealDataSize += (sizeof(SQ::TCPHead)+bodyLen);
}
}
}
//处理数据之后刷新缓冲区
if (dealDataSize > 0)
{
memmove(m_buffer,m_buffer+dealDataSize,m_bytesReceived-dealDataSize);
m_bytesReceived -= dealDataSize;
}
//发起读事件
m_socket.async_read_some( boost::asio::buffer(m_buffer + m_bytesReceived, PACK_MAX_SIZE-m_bytesReceived),
boost::bind( &CommonSession::onRead,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred ) );
}
void CommonSession::start()
{
tcp::endpoint ep = tcp::endpoint( address::from_string( m_ip ), m_port );
m_socket.async_connect( ep,
boost::bind( &CommonSession::onConnect,
shared_from_this(),
boost::asio::placeholders::error ) );
}
//发送字节流,编码808协议之后的字节流
void CommonSession::sendDataBytes(SQ::Byte *pByte, int len)
{
//检测连接
if (!m_isConnected)
{
std::cout<< "connection(to "<<m_ip<<":"<< m_port<<") is not established, so sendDataBytes faild" <<std::endl;
return;
}
//发送消息
time_t nowTime = time(NULL);
std::cout << "send:(time:"<<nowTime<<")" << SQ::Utils::bytes2HexString( pByte, len<=40?len:40);
if (len<=40)
{
std::cout<< std::endl;
}
else
{
cout << "..." <<endl;
}
// std::cout << "send:(time:"<<nowTime<<")" << SQ::Utils::bytes2HexString( pByte, len) <<endl;
boost::asio::async_write(
m_socket,
boost::asio::buffer( pByte, len),
boost::bind(&CommonSession::onWrite,shared_from_this(),boost::asio::placeholders::error,boost::asio::placeholders::bytes_transferred)
);
}
相关文章推荐
- 深信服笔试题(网络project师售后)
- Asynchronous HTTP Requests in Android Using Volley
- TCP并发服务器模型(二)
- SVN 无法连接的解决方法:网络代理
- JavaEE_POI导出Excel (网络下载) +(本地导出) 工具类的编写
- [转]【鹅厂网事】全局精确流量调度新思路-HttpDNS服务详解
- 目前网络时间服务有哪几种协议?
- Java 入门 之 HTTP 的 GET & POST 方法
- 3月第1周网络安全报告:发现放马站点域名仍为162个
- 北斗网络同步时钟解决方案
- centos6.6 设置静态网络
- Java网络编程(二)http
- 网络通信 --> CRC校验
- go语言实现http服务端与客户端
- python 网络编程
- Activiti-explorer 在tomcat中部署报HTTP Status 404问题
- 轻量级网络请求框架MKNetworkKit介绍及使用
- Android Volley完全解析(三),定制自己的Request 转载:http://blog.csdn.net/guolin_blog/article/details/176127
- TCP的长连接和短连接
- HTTP解析