您的位置:首页 > 理论基础 > 计算机网络

【Boost】boost库asio详解9——TCP的简单例子2

2016-03-15 12:08 846 查看
客户端:

// Client.cpp : 定义控制台应用程序的入口点。
//

#include "stdafx.h"
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>

using boost::asio::ip::tcp;
using boost::asio::ip::address;

class session: public boost::enable_shared_from_this<session> {
public:
session(boost::asio::io_service &io_service, tcp::endpoint &endpoint)
: io_service_(io_service), socket_(io_service), endpoint_(endpoint)
{
}

void start() {
socket_.async_connect(endpoint_,
boost::bind(&session::handle_connect,
shared_from_this(),
boost::asio::placeholders::error));
}

private:
void handle_connect(const boost::system::error_code &error) {
if (error) {
if (error.value() != boost::system::errc::operation_canceled) {
std::cerr << boost::system::system_error(error).what() << std::endl;
}

socket_.close();
return;
}

static tcp::no_delay option(true);
socket_.set_option(option);

strcpy(buf, "Hello World!\n");
boost::asio::async_write(socket_,
boost::asio::buffer(buf, strlen(buf)),
boost::bind(&session::handle_write,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}

void handle_write(const boost::system::error_code& error, size_t bytes_transferred) {
memset(buf, sizeof(buf), 0);
boost::asio::async_read_until(socket_,
sbuf,
"\n",
boost::bind(&session::handle_read,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}

void handle_read(const boost::system::error_code& error, size_t bytes_transferred) {
std::cout << buf << std::endl;
}

private:
boost::asio::io_service &io_service_;
tcp::socket socket_;
tcp::endpoint &endpoint_;
char buf[1024];
boost::asio::streambuf sbuf;
};

typedef boost::shared_ptr<session> session_ptr;

int main(int argc, char* argv[])
{
boost::asio::io_service io_service;
tcp::endpoint endpoint(address::from_string("172.16.6.70"), 10028);

session_ptr new_session(new session(io_service, endpoint));
new_session->start();
io_service.run();

return 0;
}


服务器:

#include <string.h>
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>

using boost::asio::ip::tcp;
using boost::asio::ip::address;

class session: public boost::enable_shared_from_this<session> {
public:
session(boost::asio::io_service &io_service): socket_(io_service)
{
}

void start() {
static tcp::no_delay option(true);
socket_.set_option(option);

boost::asio::async_read_until(socket_,
sbuf_,
"\n",
boost::bind(&session::handle_read,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}

tcp::socket &socket() {
return socket_;
}

private:
void handle_write(const boost::system::error_code& error, size_t bytes_transferred) {
boost::asio::async_read_until(socket_,
sbuf_,
"\n",
boost::bind(&session::handle_read,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}

void handle_read(const boost::system::error_code& error, size_t bytes_transferred) {
boost::asio::async_write(socket_,
sbuf_,
boost::bind(&session::handle_write,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));

std::ostream os(&sbuf_);
printf("%s \n",sbuf_.data());
}

private:
tcp::socket socket_;
boost::asio::streambuf sbuf_;
};

typedef boost::shared_ptr<session> session_ptr;

class server {
public:
server(boost::asio::io_service &io_service, tcp::endpoint &endpoint)
: io_service_(io_service), acceptor_(io_service, endpoint)
{
session_ptr new_session(new session(io_service_));
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept,
this,
new_session,
boost::asio::placeholders::error));
}

void handle_accept(session_ptr new_session, const boost::system::error_code& error) {
if (error) {
return;
}

new_session->start();
new_session.reset(new session(io_service_));
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept,
this,
new_session,
boost::asio::placeholders::error));
}

void run() {
io_service_.run();
}

private:
boost::asio::io_service &io_service_;
tcp::acceptor acceptor_;
};

int main(int argc, char* argv[])
{
boost::asio::io_service io_service;
//tcp::endpoint endpoint(address::from_string("172.16.6.70"), 10028);
tcp::endpoint endpoint(boost::asio::ip::tcp::v4(), 10028);

server s(io_service, endpoint);
s.run();
return 0;
}


编译:

g++ -Wall -o client client.cpp -lboost_system

g++ -Wall -o server server.cpp -lboost_system

这里需要注意的是: async_write, async_read, async_read_until 都是需要达到特定条件才会调用回调函数,

在调用回调函数之前, 不能再次调用, 否则接收到的数据很可能是乱的. 所以, 在实际代码当中, 会有一个写缓冲队列, 当需要write的时, 先放到队列中, 如果队列个数为1, 则调用async_write, 否则等待函数回调, 当函数回调时将首个元素从队列中移除, 然后接着发送下一个, 直到队列为空.

对于client, 由于is_open在async_connect之后就是true状态了, 因此在async_connect回调返回之前没有方法知道是否已经连接成功, 实际代码当中一般会增加一个变量以表示该套接字是否已经允许发送数据.

============================================================================

服务器:

[cpp] view
plain copy







#define PACK_MAX_SIZE 16*1024

class TCPConnection: public boost::enable_shared_from_this<TCPConnection> {

public:

static boost::shared_ptr<TCPConnection> create(IoService& ioService);

tcp::socket& getSocket();

void start();

private:

TCPConnection(IoService& ioService);

void handleRead(const boost::system::error_code& e,size_t bytesTransferred);

void handleWrite(const boost::system::error_code& e,size_t bytesTransferred);

//TCP的socket

tcp::socket socket;

//接收和发送的缓冲区

char m_buffer[PACK_MAX_SIZE];

size_t m_bytesReceived;

};

typedef boost::shared_ptr<TCPConnection> pointer;

class TCPServer {

public:

TCPServer(IoService& ioService, int port);

public:

private:

void startAccept();

void handleAccept(boost::shared_ptr<TCPConnection> newConnection,

const boost::system::error_code& error);

private:

tcp::acceptor acceptor;

};

[cpp] view
plain copy







TCPConnection::TCPConnection(IoService& ioService) :socket(ioService) {

m_bytesReceived = 0;

}

boost::shared_ptr<TCPConnection> TCPConnection::create(IoService& ioService) {

return pointer(new TCPConnection(ioService));

}

tcp::socket& TCPConnection::getSocket() {

return socket;

}

void TCPConnection::start() {

static tcp::no_delay option( true );

socket.set_option( option );

socket.set_option(boost::asio::socket_base::keep_alive(true));

memset(m_buffer, 0x0, 2048);

socket.async_read_some(boost::asio::buffer(m_buffer),

boost::bind(&TCPConnection::handleRead, shared_from_this(),

boost::asio::placeholders::error,

boost::asio::placeholders::bytes_transferred));

}

void TCPConnection::sendTermMessage( long phoneNum, unsigned short serialNum, unsigned char cmd, void* rawData )

{

socket.async_write_some(

boost::asio::buffer( resp->mem, resp->size ),

boost::bind(

&TCPConnection::handleWrite,

shared_from_this(),

boost::asio::placeholders::error,

boost::asio::placeholders::bytes_transferred

)

);

}

void TCPConnection::handleRead(const boost::system::error_code& error, size_t bytesTransferred)

{

//cout <<"************************socket read " << bytesTransferred << " bytes codes ***************************"<< endl;

if( error != 0 || bytesTransferred == 0 )

{

std::cout << "!!! network exception: err(" << error.value() << "), " << boost::system::system_error( error ).what() << " !!!" << std::endl;

socket.close();

CmdQueue::getInstance()->cancelSubscribe( this );

return;

}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

std::cout << "received some from remote serivce";

time_t nowTime = time(NULL);

std::cout << ":(time:"<<nowTime<<")" << SQ::Utils::bytes2HexString( (SQ::Byte *) m_buffer, bytesTransferred<=40?bytesTransferred:40);

if (bytesTransferred<=40)

{

std::cout<< std::endl;

}

else

{

cout << "..." <<endl;

}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

//解析消息(主要考虑到TCP分包和TCP粘包)

size_t m_bytesReceived = 0;

m_bytesReceived += bytesTransferred;

size_t dealDataSize = 0;

if (m_bytesReceived>=sizeof(TCPHead))

{

int offset = 0;

while (true)//考虑到TCP粘包问题,做个循环

{

//剩余缓冲区不足消息头

offset = dealDataSize;

if (m_bytesReceived-offset < sizeof(TCPHead)) break;

TCPHead* head = (TCPHead*)(m_buffer + offset);

unsigned long phoneNum = SQ::Utils::BCD2long( head->terminalID, PHONENUM_LENGTH );

offset += PHONENUM_LENGTH;

unsigned short serialNum = ntohs( head->serialNum );

offset += 2;

unsigned char cmd = head->cmd ;

offset += 1;

int bodyLen = ntohl( head->bodyLen );

offset += 4;

if (m_bytesReceived-offset < bodyLen) break;

SQ::Byte *pRawData = (SQ::Byte *)m_buffer + offset;

sendUDPPackToTerminal(phoneNum, pRawData, bodyLen);

dealDataSize += (sizeof(TCPHead)+bodyLen);

}

}

//处理数据之后刷新缓冲区

if (dealDataSize > 0)

{

memmove(m_buffer,m_buffer+dealDataSize,m_bytesReceived-dealDataSize);

m_bytesReceived -= dealDataSize;

}

socket.async_read_some(boost::asio::buffer(m_buffer + m_bytesReceived, PACK_MAX_SIZE-m_bytesReceived),

boost::bind(&TCPConnection::handleRead, shared_from_this(),

boost::asio::placeholders::error,

boost::asio::placeholders::bytes_transferred));

return;

}

void TCPConnection::handleWrite( const boost::system::error_code& error, size_t bytesTransferred)

{

if( error != 0 )

{

std::cout << "write error." << std::endl;

}

}

TCPServer::TCPServer(IoService& ioService, int port) :

acceptor(ioService, tcp::endpoint(tcp::v4(), port)) {

startAccept();

}

void TCPServer::startAccept() {

boost::shared_ptr<TCPConnection> newConnection = TCPConnection::create(

acceptor.get_io_service());

acceptor.async_accept(newConnection->getSocket(),

boost::bind(&TCPServer::handleAccept, this, newConnection,

boost::asio::placeholders::error));

}

void TCPServer::handleAccept(boost::shared_ptr<TCPConnection> newConnection,

const boost::system::error_code& error) {

if (!error) {

newConnection->start();

startAccept();

}

}

客户端:

[cpp] view
plain copy







class CommonSession : public boost::enable_shared_from_this<CommonSession>

{

//外部接口

public:

CommonSession( boost::asio::io_service *io_service, const std::string &ip, short port);

void start();

void sendDataBytes(SQ::Byte *pByte, int len);

bool getConnectState() {return m_isConnected;}

//内部接口

protected:

void onConnect( const boost::system::error_code &error );

void onWrite( const boost::system::error_code& error, size_t bytes_transferred );

void onRead( const boost::system::error_code& error, size_t bytes_transferred );

private:

tcp::socket m_socket; //套接字

char m_buffer[PACK_MAX_SIZE]; //接收缓冲区

bool m_isConnected; //连接状态

std::string m_ip; //目标 I P

short m_port; //目标端口

size_t m_bytesReceived; //收字节数

unsigned short m_serialNum; //发流水号

boost::recursive_mutex m_mutex; //互斥体

boost::asio::io_service* m_io_service; //I O 服务

};

[cpp] view
plain copy







CommonSession::CommonSession( boost::asio::io_service *io_service, const std::string &ip, short port )

: m_io_service( io_service ), m_socket( *io_service ), m_ip( ip ), m_port( port )

{

m_bytesReceived = 0;

m_isConnected = false;

haveLog = false;

}

void CommonSession::onConnect( const boost::system::error_code & error )

{

if( error )

{

//错误LOG只输出一次

if (!haveLog)

{

haveLog = true;

//连接失败

std::cout << "connect to " << m_ip << ":" << m_port << " failed!" << " -> ERR : ";

if( error.value() != boost::system::errc::operation_canceled )

{

std::cerr << boost::system::system_error( error ).what() << std::endl;

}

}

//3秒后重新连接

m_socket.close();

boost::this_thread::sleep( boost::posix_time::seconds( 3 ) );

start();

}

else

{

std::cout << "connect to " << m_ip << ":" << m_port << " successed! (time:"<< time(NULL)<<")" << std::endl;

m_isConnected = true;

static tcp::no_delay option( true );

m_socket.set_option( option );

m_socket.set_option( boost::asio::socket_base::keep_alive( true ) );

//重发未发送的消息

//发起读事件

m_socket.async_read_some( boost::asio::buffer( m_buffer ),

boost::bind( &CommonSession::onRead,

shared_from_this(),

boost::asio::placeholders::error,

boost::asio::placeholders::bytes_transferred ) );

}

}

void CommonSession::onWrite( const boost::system::error_code& error, size_t bytes_transferred )

{

if( error != 0 )

{

std::cout << "!!! send error, code is " << error<<" !!!"<<std::endl;

std::cerr << boost::system::system_error( error ).what() << std::endl;

}

}

void CommonSession::onRead( const boost::system::error_code& error, size_t bytes_transferred )

{

//cout <<"************************socket read " << bytes_transferred << " bytes codes ***************************"<< endl;

//出错检查

if( error != 0 )

{

std::cout << "!!! onRead error, code is " << error<<" !!!"<<std::endl;

std::cerr << boost::system::system_error( error ).what() << std::endl;

m_socket.close();

//3秒后重新连接

boost::this_thread::sleep( boost::posix_time::seconds( 3 ) );

m_isConnected = false;

start();

return;

}

else if( bytes_transferred == 0 )

{

m_socket.close();

//3秒后重新连接

boost::this_thread::sleep( boost::posix_time::seconds( 3 ) );

m_isConnected = false;

start();

return;

}

//解析消息(主要考虑到TCP分包和TCP粘包)

size_t m_bytesReceived = 0;

m_bytesReceived += bytes_transferred;

size_t dealDataSize = 0;

if (m_bytesReceived>=sizeof(TCPHead))

{

int offset = 0;

while (true)//考虑到TCP粘包问题,做个循环

{

//剩余缓冲区不足消息头

offset = dealDataSize;

if (m_bytesReceived-offset < sizeof(SQ::TCPHead)) break;

SQ::TCPHead* head = (SQ::TCPHead*)(m_buffer + offset);

unsigned long phoneNum = SQ::Utils::BCD2long( head->terminalID, PHONENUM_LENGTH );

offset += PHONENUM_LENGTH;

unsigned short serialNum = ntohs( head->serialNum );

offset += 2;

unsigned char cmd = head->cmd ;

offset += 1;

int bodyLen = ntohl( head->bodyLen );

offset += 4;

if (bodyLen == 0)//处理来自网关的数据,而不是终端

{

std::cout<< "receive(time:"<< time(NULL)<<") command from gate, cmd:" << (int)cmd << " phoneNum:"<<phoneNum<<std::endl;

dealCmd(cmd, phoneNum);

dealDataSize += sizeof(SQ::TCPHead);

}

else//处理来自终端的消息

{

//剩余缓冲区不足消息体

if (m_bytesReceived-offset < bodyLen) break;

//取出808消息体

char *pRawData = m_buffer + offset;

//解开消息头

CodecMsgHead csMsgHead;

AbstractCodec tmpCode;

tmpCode.decodeHead((SQ::Byte*)pRawData, &csMsgHead);

//处理消息长度

dealDataSize += (sizeof(SQ::TCPHead)+bodyLen);

}

}

}

//处理数据之后刷新缓冲区

if (dealDataSize > 0)

{

memmove(m_buffer,m_buffer+dealDataSize,m_bytesReceived-dealDataSize);

m_bytesReceived -= dealDataSize;

}

//发起读事件

m_socket.async_read_some( boost::asio::buffer(m_buffer + m_bytesReceived, PACK_MAX_SIZE-m_bytesReceived),

boost::bind( &CommonSession::onRead,

shared_from_this(),

boost::asio::placeholders::error,

boost::asio::placeholders::bytes_transferred ) );

}

void CommonSession::start()

{

tcp::endpoint ep = tcp::endpoint( address::from_string( m_ip ), m_port );

m_socket.async_connect( ep,

boost::bind( &CommonSession::onConnect,

shared_from_this(),

boost::asio::placeholders::error ) );

}

//发送字节流,编码808协议之后的字节流

void CommonSession::sendDataBytes(SQ::Byte *pByte, int len)

{

//检测连接

if (!m_isConnected)

{

std::cout<< "connection(to "<<m_ip<<":"<< m_port<<") is not established, so sendDataBytes faild" <<std::endl;

return;

}

//发送消息

time_t nowTime = time(NULL);

std::cout << "send:(time:"<<nowTime<<")" << SQ::Utils::bytes2HexString( pByte, len<=40?len:40);

if (len<=40)

{

std::cout<< std::endl;

}

else

{

cout << "..." <<endl;

}

// std::cout << "send:(time:"<<nowTime<<")" << SQ::Utils::bytes2HexString( pByte, len) <<endl;

boost::asio::async_write(

m_socket,

boost::asio::buffer( pByte, len),

boost::bind(&CommonSession::onWrite,shared_from_this(),boost::asio::placeholders::error,boost::asio::placeholders::bytes_transferred)

);

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: