您的位置:首页 > 其它

boost.asio源码剖析(三) ---- 流程分析

2014-06-19 07:41 507 查看
[b]* 常见流程分析之一(Tcp异步连接)[/b]

我们用一个简单的demo分析Tcp异步连接的流程:

#include <iostream>
#include <boost/asio.hpp>

// 异步连接回调函数
void on_connect(boost::system::error_code ec)
{
if (ec)  // 连接失败, 输出错误码
std::cout << "async connect error:" << ec.message() << std::endl;
else  // 连接成功
std::cout << "async connect ok!" << std::endl;
}

int main()
{
boost::asio::io_service ios;  // 创建io_service对象
boost::asio::ip::tcp::endpoint addr(
boost::asio::ip::address::from_string("127.0.0.1"), 12345);  // server端地址
boost::asio::ip::tcp::socket conn_socket(ios);  // 创建tcp协议的socket对象
conn_socket.async_connect(addr, &on_connect);  // 发起异步连接请求
ios.run();  // 调用io_service::run, 等待异步操作结果

std::cin.get();
return 0;
}


这段代码中的异步连接请求在asio源码中的序列图如下:



其中,basic_socket是个模板类,tcp协议中的socket的定义如下:
typedef basic_socket<tcp> socket;

reactor的定义如下:
#if defined(BOOST_ASIO_WINDOWS_RUNTIME)
typedef class null_reactor reactor;
#elif defined(BOOST_ASIO_HAS_IOCP)
typedef class select_reactor reactor;
#elif defined(BOOST_ASIO_HAS_EPOLL)
typedef class epoll_reactor reactor;
#elif defined(BOOST_ASIO_HAS_KQUEUE)
typedef class kqueue_reactor reactor;
#elif defined(BOOST_ASIO_HAS_DEV_POLL)
typedef class dev_poll_reactor reactor;
#else
typedef class select_reactor reactor;
#endif

在这个序列图中最值得注意的一点是:在windows平台下,异步连接请求不是由Iocp处理的,而是由select模型处理的,这是与异步读写数据最大的不同之处。

[b]* 常见流程分析之二(Tcp异步接受连接)[/b]

我们用一个简单的demo分析Tcp异步连接的流程:

#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>

// 异步连接回调函数
void on_accept(boost::system::error_code ec, boost::asio::ip::tcp::socket * socket_ptr)
{
if (ec)  // 连接失败, 输出错误码
std::cout << "async accept error:" << ec.message() << std::endl;
else  // 连接成功
std::cout << "async accept from (" << socket_ptr->remote_endpoint() << ")" << std::endl;

// 断开连接, 释放资源.
socket_ptr->close(), delete socket_ptr;
}

int main()
{
boost::asio::io_service ios;  // 创建io_service对象
boost::asio::ip::tcp::endpoint addr(
boost::asio::ip::address::from_string("0.0.0.0"), 12345);  // server端地址
boost::asio::ip::tcp::acceptor acceptor(ios, addr, false);  // 创建acceptor对象
boost::asio::ip::tcp::socket * socket_ptr = new boost::asio::ip::tcp::socket(ios);
acceptor.async_accept(*socket_ptr
, boost::bind(&on_accept, boost::asio::placeholders::error, socket_ptr));  // 调用异步accept请求
ios.run();  // 调用io_service::run, 等待异步操作结果

std::cin.get();
return 0;
}


这段代码中的异步连接请求在asio源码中的序列图如下:



[b]* 常见流程分析之三(Tcp异步读写数据)[/b]

我们依然以上一节的例子为基础,扩展一个简单的demo分析Tcp异步读写数据的流程:

#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/array.hpp>

typedef boost::shared_ptr<boost::asio::ip::tcp::socket> socket_ptr_t;
typedef boost::array<char, 128> buffer_t;
typedef boost::shared_ptr<buffer_t> buffer_ptr_t;

// 异步读数据回调函数
void on_read(boost::system::error_code ec
, std::size_t len, socket_ptr_t socket_ptr, buffer_ptr_t buffer_ptr)
{
if (ec)
std::cout << "async write error:" << ec.message() << std::endl;
else
{
std::cout << "async read size:" << len;
std::cout << " info:" << std::string((char*)buffer_ptr->begin(), len) << std::endl;

// auto release socket and buffer.
}
}

// 异步写数据回调函数
void on_write(boost::system::error_code ec
, std::size_t len, socket_ptr_t socket_ptr, buffer_ptr_t buffer_ptr)
{
if (ec)
std::cout << "async write error:" << ec.message() << std::endl;
else
{
std::cout << "async write size:" << len << std::endl;
socket_ptr->async_read_some(boost::asio::buffer(buffer_ptr.get(), buffer_t::size())
, boost::bind(&on_read, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred
, socket_ptr, buffer_ptr));
}
}

// 异步连接回调函数
void on_accept(boost::system::error_code ec, socket_ptr_t socket_ptr)
{
if (ec)  // 连接失败, 输出错误码
{
std::cout << "async accept error:" << ec.message() << std::endl;
}
else  // 连接成功
{
std::cout << "async accept from (" << socket_ptr->remote_endpoint() << ")" << std::endl;
buffer_ptr_t buffer_ptr(new buffer_t);
strcpy_s((char*)buffer_ptr->begin(), buffer_t::size(), "abcdefg");
socket_ptr->async_write_some(boost::asio::buffer(buffer_ptr.get(), strlen((char*)buffer_ptr->begin()))
, boost::bind(&on_write, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred
, socket_ptr, buffer_ptr));
}
}

int main()
{
boost::asio::io_service ios;  // 创建io_service对象
boost::asio::ip::tcp::endpoint addr(
boost::asio::ip::address::from_string("0.0.0.0"), 12345);  // server端地址
boost::asio::ip::tcp::acceptor acceptor(ios, addr, false);  // 创建acceptor对象
socket_ptr_t socket_ptr(new boost::asio::ip::tcp::socket(ios));
acceptor.async_accept(*socket_ptr
, boost::bind(&on_accept, boost::asio::placeholders::error, socket_ptr));  // 调用异步accept请求
ios.run();  // 调用io_service::run, 等待异步操作结果

std::cout << "press enter key...";
std::cin.get();
return 0;
}


这段代码中的异步连接请求在asio源码中的序列图如下:



[b]* 常见流程分析之四(Tcp强制关闭连接)[/b]

我们依然以上一节的例子为基础,扩展一个简单的demo分析Tcp强制关闭连接的流程:

#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/array.hpp>

typedef boost::shared_ptr<boost::asio::ip::tcp::socket> socket_ptr_t;
typedef boost::array<char, 128> buffer_t;
typedef boost::shared_ptr<buffer_t> buffer_ptr_t;

// 异步读数据回调函数
void on_read(boost::system::error_code ec
, std::size_t len, socket_ptr_t socket_ptr, buffer_ptr_t buffer_ptr)
{
if (ec)  // 连接失败, 输出错误码
{
std::cout << "async read error:" << ec.message() << std::endl;
}
}

// 异步写数据回调函数
void on_write(boost::system::error_code ec
, std::size_t len, socket_ptr_t socket_ptr, buffer_ptr_t buffer_ptr)
{
if (ec)  // 连接失败, 输出错误码
{
std::cout << "async write error:" << ec.message() << std::endl;
}
}

// 异步连接回调函数
void on_accept(boost::system::error_code ec, socket_ptr_t socket_ptr)
{
if (ec)  // 连接失败, 输出错误码
{
std::cout << "async accept error:" << ec.message() << std::endl;
}
else  // 连接成功
{
std::cout << "async accept from (" << socket_ptr->remote_endpoint() << ")" << std::endl;

{
buffer_ptr_t buffer_ptr(new buffer_t);
strcpy_s((char*)buffer_ptr->begin(), buffer_t::size(), "abcdefg");
socket_ptr->async_write_some(boost::asio::buffer(buffer_ptr.get(), strlen((char*)buffer_ptr->begin()))
, boost::bind(&on_write, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred
, socket_ptr, buffer_ptr));
}

{
buffer_ptr_t buffer_ptr(new buffer_t);
socket_ptr->async_read_some(boost::asio::buffer(buffer_ptr.get(), buffer_t::size())
, boost::bind(&on_read, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred
, socket_ptr, buffer_ptr));
}

/// 强制关闭连接
socket_ptr->close(ec);
if (ec)
std::cout << "close error:" << ec.message() << std::endl;
}
}

int main()
{
boost::asio::io_service ios;  // 创建io_service对象
boost::asio::ip::tcp::endpoint addr(
boost::asio::ip::address::from_string("0.0.0.0"), 12345);  // server端地址
boost::asio::ip::tcp::acceptor acceptor(ios, addr, false);  // 创建acceptor对象
socket_ptr_t socket_ptr(new boost::asio::ip::tcp::socket(ios));
acceptor.async_accept(*socket_ptr
, boost::bind(&on_accept, boost::asio::placeholders::error, socket_ptr));  // 调用异步accept请求
socket_ptr.reset();
ios.run();  // 调用io_service::run, 等待异步操作结果

std::cout << "press enter key...";
std::cin.get();
return 0;
}


这个例子中,接受到客户端的连接后,立即发起异步读请求和异步写请求,然后立即强制关闭socket。

其中,强制关闭socket的请求在asio源码中的序列图如下:



[b] * 常见流程分析之五(Tcp优雅地关闭连接)[/b]

我们依然以第三节的例子为基础,扩展一个简单的demo分析Tcp优雅地关闭连接的流程:

#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/array.hpp>

typedef boost::shared_ptr<boost::asio::ip::tcp::socket> socket_ptr_t;
typedef boost::array<char, 32> buffer_t;
typedef boost::shared_ptr<buffer_t> buffer_ptr_t;

// 异步读数据回调函数
void on_read(boost::system::error_code ec
, std::size_t len, socket_ptr_t socket_ptr, buffer_ptr_t buffer_ptr)
{
static int si = 0;
if (ec)  // 连接失败, 输出错误码
{
std::cout << "async read(" << si++ << ") error:" << ec.message() << std::endl;
socket_ptr->shutdown(boost::asio::socket_base::shutdown_receive, ec);
socket_ptr->close(ec);
if (ec)
std::cout << "close error:" << ec.message() << std::endl;
}
else
{
std::cout << "read(" << si++ << ") len:" << len << std::endl;

socket_ptr->async_read_some(boost::asio::buffer(buffer_ptr.get(), buffer_t::size())
, boost::bind(&on_read, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred
, socket_ptr, buffer_ptr));
}
}

// 异步写数据回调函数
void on_write(boost::system::error_code ec
, std::size_t len, socket_ptr_t socket_ptr, buffer_ptr_t buffer_ptr)
{
if (ec)  // 连接失败, 输出错误码
{
std::cout << "async write error:" << ec.message() << std::endl;
}
else
{
/// 优雅地关闭连接
socket_ptr->shutdown(boost::asio::ip::tcp::socket::shutdown_send, ec);
if (ec)
std::cout << "shutdown send error:" << ec.message() << std::endl;
}
}

// 异步连接回调函数
void on_accept(boost::system::error_code ec, socket_ptr_t socket_ptr)
{
if (ec)  // 连接失败, 输出错误码
{
std::cout << "async accept error:" << ec.message() << std::endl;
}
else  // 连接成功
{
std::cout << "async accept from (" << socket_ptr->remote_endpoint() << ")" << std::endl;

{
buffer_ptr_t buffer_ptr(new buffer_t);
socket_ptr->async_read_some(boost::asio::buffer(buffer_ptr.get(), buffer_t::size())
, boost::bind(&on_read, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred
, socket_ptr, buffer_ptr));
}

{
buffer_ptr_t buffer_ptr(new buffer_t);
strcpy_s((char*)buffer_ptr->begin(), buffer_t::size(), "abcdefg");
socket_ptr->async_write_some(boost::asio::buffer(buffer_ptr.get(), strlen((char*)buffer_ptr->begin()))
, boost::bind(&on_write, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred
, socket_ptr, buffer_ptr));
}
}
}

int main()
{
boost::asio::io_service ios;  // 创建io_service对象
boost::asio::ip::tcp::endpoint addr(
boost::asio::ip::address::from_string("0.0.0.0"), 12345);  // server端地址
boost::asio::ip::tcp::acceptor acceptor(ios, addr, false);  // 创建acceptor对象
socket_ptr_t socket_ptr(new boost::asio::ip::tcp::socket(ios));
acceptor.async_accept(*socket_ptr
, boost::bind(&on_accept, boost::asio::placeholders::error, socket_ptr));  // 调用异步accept请求
socket_ptr.reset();
ios.run();  // 调用io_service::run, 等待异步操作结果

std::cout << "press enter key...";
std::cin.get();
return 0;
}


在这个例子中,接收到客户端的连接并向客户端发送数据以后,先关闭socket的发送通道,然后等待socket接收缓冲区中的数据全部read出来以后,再关闭socket的接收通道。此时,socket的接收和发送通道均以关闭,任何进程都无法使用此socket收发数据,但其所占用的系统资源并未释放,底层发送缓冲区中的数据也不保证已全部发出,需要在此之后执行close操作以便释放系统资源。
若在释放系统资源前希望底层发送缓冲区中的数据依然可以发出,则需在socket的linger属性中设置一个等待时间,以便有时间等待发送缓冲区中的数据发送完毕。但linger中的值绝对不是越大越好,这是因为其原理是操作系统帮忙保留socket的资源以等待其发送缓冲区中的数据发送完毕,如果远端socket的一直未能接收数据便会导致本地socket一直等待下去,这对系统资源是极大的浪费。因此,在需要处理大量连接的服务端,linger的值一定不可过大。

由于本文会实时根据读者反馈的宝贵意见更新,为防其他读者看到过时的文章,因此本系列专题谢绝转载!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: