使用Boost.Asio写的多线程TCP转发代理服务器
2010-09-23 16:33
435 查看
应用场景是这样的:
客户端和服务器在不同的网段内,它们之间不能直接通过TCP连接,但是有一台机器(暂时称为转发器)有双网卡,两块网卡分别在客户端和服务器端的网段内,这样转发器就能分别和客户端即服务器建立连接,并来回传输数据。
设计思路是这样的:
当客户端连接到转发器后,转发器马上建立一条到服务器之间的连接,与服务器端的连接建立后,就同时异步地从客户端和服务器端接收数据到两个缓冲区中,一旦任何一方有数据接收,就通过另外一条连接将数据发送到另一方,发送完毕后又开始新一轮的数据接收。如果在接收数据的过程中有任何一方出现错误,就将取消另外一条连接的异步调用,这样整个连接就会关闭。
其中用到了Boost库的以下特性:
1.Asio的异步IO调用
2.多线程
3.share_ptr的自动指针管理
遗留的一些问题:
由于采用了多线程,为了保险起见,我将每个回调函数都用strand包裹起来,在各回调函数没有使用共享资源的情况下并不必要,因此在这方面可以优化下
客户端和服务器在不同的网段内,它们之间不能直接通过TCP连接,但是有一台机器(暂时称为转发器)有双网卡,两块网卡分别在客户端和服务器端的网段内,这样转发器就能分别和客户端即服务器建立连接,并来回传输数据。
设计思路是这样的:
当客户端连接到转发器后,转发器马上建立一条到服务器之间的连接,与服务器端的连接建立后,就同时异步地从客户端和服务器端接收数据到两个缓冲区中,一旦任何一方有数据接收,就通过另外一条连接将数据发送到另一方,发送完毕后又开始新一轮的数据接收。如果在接收数据的过程中有任何一方出现错误,就将取消另外一条连接的异步调用,这样整个连接就会关闭。
其中用到了Boost库的以下特性:
1.Asio的异步IO调用
2.多线程
3.share_ptr的自动指针管理
遗留的一些问题:
由于采用了多线程,为了保险起见,我将每个回调函数都用strand包裹起来,在各回调函数没有使用共享资源的情况下并不必要,因此在这方面可以优化下
/* * ===================================================================================== * * Filename: xproxy_main.cpp * * Description: * * Version: 1.0 * Created: 2009年11月26日 15时10分29秒 * Revision: none * Compiler: gcc * * Author: David Fang (A free programmer), qi_fd@163.com * Company: nocompany * * ===================================================================================== */ #include <iostream> #include <string> #include <boost/asio.hpp> #include "xproxy_server.hpp" int main(int argc, char* argv[]) { try { if(5 != argc) { std::cerr<<"Usage: xproxy <local port> <server ip> <server_port> <[thread size>/n"; std::cerr<<"local port: local port used to accept login client/n"; std::cerr<<"server ip: analysing server address, ip string in decimal dot format/n"; std::cerr<<"server port: analysing server port, an unsigned short value/n"; std::cerr<<"thread size: number of threads to running xproxy server/n"; return 1; } xproxy_server srv(atoi(argv[1]), argv[2], atoi(argv[3]), atoi(argv[4])); srv.run(); } catch(std::exception& e) { std::cerr<<"exception: "<<e.what()<<"/n"; } return 0; } |
/* * ===================================================================================== * * Filename: xproxy_server.hpp * * Description: * * Version: 1.0 * Created: 2009年11月26日 15时12分01秒 * Revision: none * Compiler: gcc * * Author: David Fang (A free programmer), qi_fd@163.com * Company: nocompany * * ===================================================================================== */ #ifndef XPROXY_SERVER_HPP #define XPROXY_SERVER_HPP #include <string> #include <boost/asio.hpp> #include <boost/noncopyable.hpp> #include <boost/shared_ptr.hpp> #include "xproxy_connection.hpp" using boost::asio::ip::tcp; class xproxy_server:private boost::noncopyable { public: //construction of xproxy_server, which takes the destination machine(analysing server)'s //address and port(ipv4) as arguments explicit xproxy_server(unsigned short local_port, const std::string& ana_address, unsigned short ana_port, std::size_t thread_pool_size = 1); //Run the server's io_service loop void run(); //Stop the server void stop(); private: //Handle the completion of an asynchronous accept from the login client void handle_accept(const boost::system::error_code& e); //The number of threads that will call io_service::run() std::size_t thread_pool_size_; //The io_service used to perform asynchronous operations. boost::asio::io_service io_service_; //Acceptor used to listen for incoming proxy connectins boost::asio::ip::tcp::acceptor acceptor_; //Local endpoint corresponding to the login client tcp::endpoint local_endpoint_; //The endpoint to analysing server tcp::endpoint analysing_server_endpoint_; //The next connectin to be accepted. xproxy_connection_ptr new_connection_; }; #endif |
/* * ===================================================================================== * * Filename: xproxy_server.cpp * * Description: * * Version: 1.0 * Created: 2009年11月26日 15时12分07秒 * Revision: none * Compiler: gcc * * Author: David Fang (A free programmer), qi_fd@163.com * Company: nocompany * * ===================================================================================== */ #include "xproxy_server.hpp" #include <boost/thread.hpp> #include <boost/bind.hpp> #include <boost/shared_ptr.hpp> #include <vector> xproxy_server::xproxy_server(unsigned short local_port, const std::string& ana_address, unsigned short ana_port, std::size_t thread_pool_size) :thread_pool_size_(thread_pool_size), acceptor_(io_service_), local_endpoint_(tcp::v4(), local_port), analysing_server_endpoint_(boost::asio::ip::address::from_string(ana_address), ana_port), new_connection_(new xproxy_connection(io_service_, analysing_server_endpoint_)) { acceptor_.open(local_endpoint_.protocol()); acceptor_.set_option(tcp::acceptor::reuse_address(true)); acceptor_.bind(local_endpoint_); acceptor_.listen(); acceptor_.async_accept(new_connection_->login_clt_sock(), boost::bind(&xproxy_server::handle_accept, this, boost::asio::placeholders::error)); } void xproxy_server::run() { std::vector<boost::shared_ptr<boost::thread> > threads; for(std::size_t i = 0; i < thread_pool_size_; ++i) { boost::shared_ptr<boost::thread> thread(new boost::thread( boost::bind(&boost::asio::io_service::run, &io_service_))); threads.push_back(thread); } for(std::size_t i = 0; i < threads.size(); ++i) { threads[i]->join(); } } void xproxy_server::stop() { io_service_.stop(); } void xproxy_server::handle_accept(const boost::system::error_code& e) { if(!e) { new_connection_->start(); new_connection_.reset(new xproxy_connection(io_service_, analysing_server_endpoint_)); acceptor_.async_accept(new_connection_->login_clt_sock(), boost::bind(&xproxy_server::handle_accept, this, boost::asio::placeholders::error)); } } |
/* * ===================================================================================== * * Filename: xproxy_connection.hpp * * Description: * * Version: 1.0 * Created: 2009年11月26日 15时11分04秒 * Revision: none * Compiler: gcc * * Author: David Fang (A free programmer), qi_fd@163.com * Company: nocompany * * ===================================================================================== */ #ifndef XPROXY_CONNECTION_HPP #define XPROXY_CONNECTION_HPP #include <boost/asio.hpp> #include <boost/array.hpp> #include <boost/noncopyable.hpp> #include <boost/shared_ptr.hpp> #include <boost/enable_shared_from_this.hpp> using boost::asio::ip::tcp; class xproxy_connection:public boost::enable_shared_from_this<xproxy_connection>, private boost::noncopyable { public: //Contruct a connection with the given io_service, the analysing server address and port explicit xproxy_connection(boost::asio::io_service& io_service, tcp::endpoint& ana_endpoint); ~xproxy_connection(); //Start the asyncronous connection to analysing server void start(); tcp::socket& login_clt_sock(); private: //Handle completion of connection to analysing server void handle_connect_to_ana_server(const boost::system::error_code& e); //Handle completion of login client socket read void handle_login_clt_sock_read(const boost::system::error_code& e, std::size_t bytes_transferred); //Handle completion of analysing server socket write void handle_ana_srv_sock_write(const boost::system::error_code& e); //Handle completion of analysing server socket read void handle_ana_srv_sock_read(const boost::system::error_code& e, std::size_t bytes_transferred); //Handle completion of login client socket write void handle_login_clt_sock_write(const boost::system::error_code& e); //Strand to ensure the connection's handles are not called concurrently boost::asio::io_service::strand strand_; //analysing server endpoint tcp::endpoint ana_endpoint_; //socket to the flex login client tcp::socket login_clt_sock_; //socket to analysing server tcp::socket ana_srv_sock_; //buffer used to recieve data from the login client boost::array<char, 1024> clt_buffer_; //buffer used to recieve data from the analysing server boost::array<char, 1024> srv_buffer_; }; typedef boost::shared_ptr<xproxy_connection> xproxy_connection_ptr; #endif |
/* * ===================================================================================== * * Filename: xproxy_connection.cpp * * Description: * * Version: 1.0 * Created: 2009年11月26日 15时12分33秒 * Revision: none * Compiler: gcc * * Author: David Fang (A free programmer), qi_fd@163.com * Company: nocompany * * ===================================================================================== */ #include "xproxy_connection.hpp" #include <vector> #include <iostream> #include <boost/bind.hpp> xproxy_connection::xproxy_connection(boost::asio::io_service& io_service, tcp::endpoint& ana_endpoint) :strand_(io_service), ana_endpoint_(ana_endpoint), login_clt_sock_(io_service), ana_srv_sock_(io_service) { std::cout<<"new connection construct/n"; } xproxy_connection::~xproxy_connection() { std::cout<<"connection destruct/n"; } tcp::socket& xproxy_connection::login_clt_sock() { return login_clt_sock_; } void xproxy_connection::start() { std::cout<<"connection start to connect to analysing server.../n"; ana_srv_sock_.async_connect(ana_endpoint_, strand_.wrap(boost::bind(&xproxy_connection::handle_connect_to_ana_server, shared_from_this(), boost::asio::placeholders::error))); } void xproxy_connection::handle_connect_to_ana_server(const boost::system::error_code& e) { if(!e) { std::cout<<"connect to analysing server succeed," <<"now start to receive data from both sides.../n"; login_clt_sock_.async_read_some(boost::asio::buffer(clt_buffer_), strand_.wrap( boost::bind(&xproxy_connection::handle_login_clt_sock_read, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); ana_srv_sock_.async_read_some(boost::asio::buffer(srv_buffer_), strand_.wrap( boost::bind(&xproxy_connection::handle_ana_srv_sock_read, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); } } void xproxy_connection::handle_login_clt_sock_read (const boost::system::error_code& e, std::size_t bytes_transferred) { if(!e) { std::cout<<"data read from login client:/n"; std::cout.write(clt_buffer_.data(), bytes_transferred); std::cout<<"/nnow send it to analysing server.../n"; boost::asio::async_write(ana_srv_sock_, boost::asio::buffer(clt_buffer_.data(), bytes_transferred), strand_.wrap(boost::bind( &xproxy_connection::handle_ana_srv_sock_write, shared_from_this(), boost::asio::placeholders::error))); } else { std::cout<<"read data from login client error, " <<"now need to shutdown this connection/n"; ana_srv_sock_.cancel(); } } void xproxy_connection::handle_ana_srv_sock_write(const boost::system::error_code& e) { if(!e) { std::cout<<"data send to analysing server complete, " <<"now start to receive data from login client again.../n"; login_clt_sock_.async_read_some(boost::asio::buffer(clt_buffer_), strand_.wrap( boost::bind(&xproxy_connection::handle_login_clt_sock_read, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); } } void xproxy_connection::handle_ana_srv_sock_read( const boost::system::error_code& e, std::size_t bytes_transferred) { if(!e) { std::cout<<"data read from analysing server:/n"; std::cout.write(srv_buffer_.data(), bytes_transferred); std::cout<<"/nnow send it to login client.../n"; boost::asio::async_write(login_clt_sock_, boost::asio::buffer(srv_buffer_.data(), bytes_transferred), strand_.wrap( boost::bind(&xproxy_connection::handle_login_clt_sock_write, shared_from_this(), boost::asio::placeholders::error))); } else { std::cout<<"read data from analysing server error, " <<"now need to shutdown this connection/n"; login_clt_sock_.cancel(); } } void xproxy_connection::handle_login_clt_sock_write(const boost::system::error_code& e) { if(!e) { std::cout<<"data send to login client complete, " <<"now start to receive data from analysing server again.../n"; ana_srv_sock_.async_read_some(boost::asio::buffer(srv_buffer_), strand_.wrap( boost::bind(&xproxy_connection::handle_ana_srv_sock_read, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); } } |
BOOST_INC=/home/done/dev_lib/boost_1_38_0/ BOOST_LIB=/home/done/dev_lib/boostlibs/ LIB_THREAD=$(BOOST_LIB)libboost_thread-gcc42-mt-s-1_38.a LIB_SYSTEM=$(BOOST_LIB)libboost_system-gcc42-mt-s-1_38.a SYS_INC=/usr/include/ CPP_INC=/usr/include/c++/4.2/ default:xproxy_main.o xproxy_server.o xproxy_connection.o g++ -o xproxy xproxy_main.o xproxy_server.o xproxy_connection.o $(LIB_THREAD) $(LIB_SYSTEM) -lpthread xproxy_main.o:xproxy_main.cpp xproxy_server.hpp g++ -c -I$(SYS_INC) -I$(BOOST_INC) -I$(CPP_INC) xproxy_main.cpp xproxy_server.o:xproxy_server.cpp xproxy_connection.hpp g++ -c -I$(SYS_INC) -I$(BOOST_INC) -I$(CPP_INC) xproxy_server.cpp xproxy_connection.o:xproxy_connection.cpp xproxy_connection.hpp g++ -c -I$(SYS_INC) -I$(BOOST_INC) -I$(CPP_INC) xproxy_connection.cpp clean: rm *.o xproxy 2>/dev/null |
相关文章推荐
- 使用Boost.Asio写的多线程TCP转发代理服务器
- 使用Boost.Asio写的多线程TCP转发代理服务器
- 使用Boost.Asio写的多线程TCP转发代理服务器
- boost::asio tcp socket 的使用示例
- boost asio中io_service类的几种使用
- 【Boost】boost库asio详解4——deadline_timer使用说明
- boost::asio::ip::tcp实现网络通信的小例子
- Boost.Asio使用实例
- boost.asio服务器使用io_service作为work pool
- Boost.Asio C++ Chapter_4 tcp_sync
- C++ boost::asio::serial_port 串口通信类 使用 封装 [大三四八九月实习]
- boost----多线程使用
- Boost.Asio使用入门
- [转]boost::asio::ip::tcp实现网络通信的小例子
- Python使用TCPServer编写(多线程)Socket服务
- boost::shared_ptr的多线程使用陷阱
- Python使用TCPServer编写(多线程)Socket服务
- BOOST::ASIO多线程下socket关闭导致进程崩溃问题定位及解决
- 使用libevent和boost编写一个简单的tcp服务器
- 【Boost】boost库asio详解8——几个TCP的简单例子