您的位置:首页 > 理论基础 > 计算机网络

使用Boost.Asio写的多线程TCP转发代理服务器

2013-04-11 11:59 555 查看

使用Boost.Asio写的多线程TCP转发代理服务器


(2012-09-06 17:38:01)


转载▼

标签:

杂谈

分类:boost
应用场景是这样的:

客户端和服务器在不同的网段内,它们之间不能直接通过TCP连接,但是有一台机器(暂时称为转发器)有双网卡,两块网卡分别在客户端和服务器端的网段内,这样转发器就能分别和客户端即服务器建立连接,并来回传输数据。

设计思路是这样的:

当客户端连接到转发器后,转发器马上建立一条到服务器之间的连接,与服务器端的连接建立后,就同时异步地从客户端和服务器端接收数据到两个缓冲区中,一旦任何一方有数据接收,就通过另外一条连接将数据发送到另一方,发送完毕后又开始新一轮的数据接收。如果在接收数据的过程中有任何一方出现错误,就将取消另外一条连接的异步调用,这样整个连接就会关闭。

其中用到了Boost库的以下特性:

1.Asio的异步IO调用

2.多线程

3.share_ptr的自动指针管理

遗留的一些问题:
由于采用了多线程,为了保险起见,我将每个回调函数都用strand包裹起来,在各回调函数没有使用共享资源的情况下并不必要,因此在这方面可以优化下

1

18

19 #include <iostream>

20 #include <string>

21 #include <boost/asio.hpp>

22 #include "xproxy_server.hpp"

23

24 int main(int argc, char* argv[])

25 {

26 try

27 {

28 if(5 != argc)

29 {

30 std::cerr<<"Usage: xproxy <local port> <server ip> <server_port> <[thread size>\n";

31 std::cerr<<"local port: local port used to accept login client\n";

32 std::cerr<<"server ip: analysing server address, ip string in decimal dot format\n";

33 std::cerr<<"server port: analysing server port, an unsigned short value\n";

34 std::cerr<<"thread size: number of threads to running xproxy server\n";

35 return 1;

36 }

37 xproxy_server srv(atoi(argv[1]), argv[2], atoi(argv[3]), atoi(argv[4]));

38

39 srv.run();

40 }

41 catch(std::exception& e)

42 {

43 std::cerr<<"exception: "<<e.what()<<"\n";

44 }

45

46 return 0;

47 }

1

18 #ifndef XPROXY_SERVER_HPP

19 #define XPROXY_SERVER_HPP

20

21 #include <string>

22

23 #include <boost/asio.hpp>

24 #include <boost/noncopyable.hpp>

25 #include <boost/shared_ptr.hpp>

26 #include "xproxy_connection.hpp"

27

28 using boost::asio::ip::tcp;

29

30 class xproxy_server:private boost::noncopyable

31 {

32 public:

33

34 //construction of xproxy_server, which takes the destination machine(analysing server)'s

35

36 //address and port(ipv4) as arguments

37

38 explicit xproxy_server(unsigned short local_port, const std::string& ana_address, unsigned short ana_port,

39 std::size_t thread_pool_size = 1);

40

41 //Run the server's io_service loop

42

43 void run();

44

45 //Stop the server

46

47 void stop();

48 private:

49 //Handle the completion of an asynchronous accept from the login client

50

51 void handle_accept(const boost::system::error_code& e);

52

53 //The number of threads that will call io_service::run()

54

55 std::size_t thread_pool_size_;

56

57 //The io_service used to perform asynchronous operations.

58

59 boost::asio::io_service io_service_;

60

61 //Acceptor used to listen for incoming proxy connectins

62

63 boost::asio::ip::tcp::acceptor acceptor_;

64

65 //Local endpoint corresponding to the login client

66

67 tcp::endpoint local_endpoint_;

68

69 //The endpoint to analysing server

70

71 tcp::endpoint analysing_server_endpoint_;

72

73 //The next connectin to be accepted.

74

75 xproxy_connection_ptr new_connection_;

76 };

77

78 #endif

1

18

19 #include "xproxy_server.hpp"

20 #include <boost/thread.hpp>

21 #include <boost/bind.hpp>

22 #include <boost/shared_ptr.hpp>

23 #include <vector>

24

25 xproxy_server::xproxy_server(unsigned short local_port,

26 const std::string& ana_address,

27 unsigned short ana_port,

28 std::size_t thread_pool_size)

29 :thread_pool_size_(thread_pool_size),

30 acceptor_(io_service_),

31 local_endpoint_(tcp::v4(), local_port),

32 analysing_server_endpoint_(boost::asio::ip::address::from_string(ana_address), ana_port),

33 new_connection_(new xproxy_connection(io_service_, analysing_server_endpoint_))

34 {

35 acceptor_.open(local_endpoint_.protocol());

36 acceptor_.set_option(tcp::acceptor::reuse_address(true));

37 acceptor_.bind(local_endpoint_);

38 acceptor_.listen();

39 acceptor_.async_accept(new_connection_->login_clt_sock(),

40 boost::bind(&xproxy_server::handle_accept, this,

41 boost::asio::placeholders::error));

42 }

43

44 void xproxy_server::run()

45 {

46 std::vector<boost::shared_ptr<boost::thread> > threads;

47 for(std::size_t i = 0; i < thread_pool_size_; ++i)

48 {

49 boost::shared_ptr<boost::thread> thread(new boost::thread(

50 boost::bind(&boost::asio::io_service::run, &io_service_)));

51 threads.push_back(thread);

52 }

53

54 for(std::size_t i = 0; i < threads.size(); ++i)

55 {

56 threads[i]->join();

57 }

58 }

59

60 void xproxy_server::stop()

61 {

62 io_service_.stop();

63 }

64

65 void xproxy_server::handle_accept(const boost::system::error_code& e)

66 {

67 if(!e)

68 {

69 new_connection_->start();

70 new_connection_.reset(new xproxy_connection(io_service_,

71 analysing_server_endpoint_));

72

73 acceptor_.async_accept(new_connection_->login_clt_sock(),

74 boost::bind(&xproxy_server::handle_accept, this,

75 boost::asio::placeholders::error));

76 }

77 }

1

18 #ifndef XPROXY_CONNECTION_HPP

19 #define XPROXY_CONNECTION_HPP

20

21 #include <boost/asio.hpp>

22 #include <boost/array.hpp>

23 #include <boost/noncopyable.hpp>

24 #include <boost/shared_ptr.hpp>

25 #include <boost/enable_shared_from_this.hpp>

26

27 using boost::asio::ip::tcp;

28

29 class xproxy_connection:public boost::enable_shared_from_this<xproxy_connection>,

30 private boost::noncopyable

31 {

32 public:

33 //Contruct a connection with the given io_service, the analysing server address and port

34

35 explicit xproxy_connection(boost::asio::io_service& io_service,

36 tcp::endpoint& ana_endpoint);

37

38 ~xproxy_connection();

39

40 //Start the asyncronous connection to analysing server

41

42 void start();

43

44 tcp::socket& login_clt_sock();

45

46 private:

47 //Handle completion of connection to analysing server

48

49 void handle_connect_to_ana_server(const boost::system::error_code& e);

50

51 //Handle completion of login client socket read

52

53 void handle_login_clt_sock_read(const boost::system::error_code& e,

54 std::size_t bytes_transferred);

55

56 //Handle completion of analysing server socket write

57

58 void handle_ana_srv_sock_write(const boost::system::error_code& e);

59

60 //Handle completion of analysing server socket read

61

62 void handle_ana_srv_sock_read(const boost::system::error_code& e,

63 std::size_t bytes_transferred);

64

65 //Handle completion of login client socket write

66

67 void handle_login_clt_sock_write(const boost::system::error_code& e);

68

69 //Strand to ensure the connection's handles are not called concurrently

70

71 boost::asio::io_service::strand strand_;

72

73 //analysing server endpoint

74

75 tcp::endpoint ana_endpoint_;

76

77 //socket to the flex login client

78

79 tcp::socket login_clt_sock_;

80

81 //socket to analysing server

82

83 tcp::socket ana_srv_sock_;

84

85 //buffer used to recieve data from the login client

86

87 boost::array<char, 1024> clt_buffer_;

88

89 //buffer used to recieve data from the analysing server

90

91 boost::array<char, 1024> srv_buffer_;

92 };

93

94 typedef boost::shared_ptr<xproxy_connection> xproxy_connection_ptr;

95

96 #endif

1

18

19 #include "xproxy_connection.hpp"

20 #include <vector>

21 #include <iostream>

22 #include <boost/bind.hpp>

23

24 xproxy_connection::xproxy_connection(boost::asio::io_service& io_service,

25 tcp::endpoint& ana_endpoint)

26 :strand_(io_service),

27 ana_endpoint_(ana_endpoint),

28 login_clt_sock_(io_service),

29 ana_srv_sock_(io_service)

30 {

31 std::cout<<"new connection construct\n";

32 }

33

34 xproxy_connection::~xproxy_connection()

35 {

36 std::cout<<"connection destruct\n";

37 }

38

39 tcp::socket& xproxy_connection::login_clt_sock()

40 {

41 return login_clt_sock_;

42 }

43

44 void xproxy_connection::start()

45 {

46 std::cout<<"connection start to connect to analysing server

\n";

47 ana_srv_sock_.async_connect(ana_endpoint_,

48 strand_.wrap(boost::bind(&xproxy_connection::handle_connect_to_ana_server,

49 shared_from_this(), boost::asio::placeholders::error)));

50 }

51

52 void xproxy_connection::handle_connect_to_ana_server(const boost::system::error_code& e)

53 {

54 if(!e)

55 {

56 std::cout<<"connect to analysing server succeed,"

57 <<"now start to receive data from both sides

\n";

58

59 login_clt_sock_.async_read_some(boost::asio::buffer(clt_buffer_),

60 strand_.wrap(

61 boost::bind(&xproxy_connection::handle_login_clt_sock_read,

62 shared_from_this(), boost::asio::placeholders::error,

63 boost::asio::placeholders::bytes_transferred)));

64

65 ana_srv_sock_.async_read_some(boost::asio::buffer(srv_buffer_),

66 strand_.wrap(

67 boost::bind(&xproxy_connection::handle_ana_srv_sock_read,

68 shared_from_this(), boost::asio::placeholders::error,

69 boost::asio::placeholders::bytes_transferred)));

70

71 }

72 }

73

74 void xproxy_connection::handle_login_clt_sock_read

75 (const boost::system::error_code& e, std::size_t bytes_transferred)

76 {

77 if(!e)

78 {

79 std::cout<<"data read from login client:\n";

80 std::cout.write(clt_buffer_.data(), bytes_transferred);

81 std::cout<<"\nnow send it to analysing server

\n";

82 boost::asio::async_write(ana_srv_sock_,

83 boost::asio::buffer(clt_buffer_.data(), bytes_transferred),

84 strand_.wrap(boost::bind(

85 &xproxy_connection::handle_ana_srv_sock_write,

86 shared_from_this(), boost::asio::placeholders::error)));

87 }

88 else

89 {

90 std::cout<<"read data from login client error, "

91 <<"now need to shutdown this connection\n";

92 ana_srv_sock_.cancel();

93 }

94 }

95

96 void xproxy_connection::handle_ana_srv_sock_write(const boost::system::error_code& e)

97 {

98 if(!e)

99 {

100 std::cout<<"data send to analysing server complete, "

101 <<"now start to receive data from login client again

\n";

102 login_clt_sock_.async_read_some(boost::asio::buffer(clt_buffer_),

103 strand_.wrap(

104 boost::bind(&xproxy_connection::handle_login_clt_sock_read,

105 shared_from_this(), boost::asio::placeholders::error,

106 boost::asio::placeholders::bytes_transferred)));

107 }

108 }

109

110 void xproxy_connection::handle_ana_srv_sock_read(

111 const boost::system::error_code& e,

112 std::size_t bytes_transferred)

113 {

114 if(!e)

115 {

116 std::cout<<"data read from analysing server:\n";

117 std::cout.write(srv_buffer_.data(), bytes_transferred);

118 std::cout<<"\nnow send it to login client

\n";

119 boost::asio::async_write(login_clt_sock_,

120 boost::asio::buffer(srv_buffer_.data(), bytes_transferred),

121 strand_.wrap(

122 boost::bind(&xproxy_connection::handle_login_clt_sock_write,

123 shared_from_this(), boost::asio::placeholders::error)));

124 }

125 else

126 {

127 std::cout<<"read data from analysing server error, "

128 <<"now need to shutdown this connection\n";

129 login_clt_sock_.cancel();

130 }

131 }

132

133 void xproxy_connection::handle_login_clt_sock_write(const boost::system::error_code& e)

134 {

135 if(!e)

136 {

137 std::cout<<"data send to login client complete, "

138 <<"now start to receive data from analysing server again

\n";

139 ana_srv_sock_.async_read_some(boost::asio::buffer(srv_buffer_),

140 strand_.wrap(

141 boost::bind(&xproxy_connection::handle_ana_srv_sock_read,

142 shared_from_this(), boost::asio::placeholders::error,

143 boost::asio::placeholders::bytes_transferred)));

144 }

145 }

1 BOOST_INC=/home/done/dev_lib/boost_1_38_0/

2 BOOST_LIB=/home/done/dev_lib/boostlibs/

3 LIB_THREAD=$(BOOST_LIB)libboost_thread-gcc42-mt-s-1_38.a

4 LIB_SYSTEM=$(BOOST_LIB)libboost_system-gcc42-mt-s-1_38.a

5 SYS_INC=/usr/include/

6 CPP_INC=/usr/include/c++/4.2/

7 default:xproxy_main.o xproxy_server.o xproxy_connection.o

8 g++ -o xproxy xproxy_main.o xproxy_server.o xproxy_connection.o $(LIB_THREAD) $(LIB_SYSTEM) -lpthread

9 xproxy_main.o:xproxy_main.cpp xproxy_server.hpp

10 g++ -c -I$(SYS_INC) -I$(BOOST_INC) -I$(CPP_INC) xproxy_main.cpp

11 xproxy_server.o:xproxy_server.cpp xproxy_connection.hpp

12 g++ -c -I$(SYS_INC) -I$(BOOST_INC) -I$(CPP_INC) xproxy_server.cpp

13 xproxy_connection.o:xproxy_connection.cpp xproxy_connection.hpp

14 g++ -c -I$(SYS_INC) -I$(BOOST_INC) -I$(CPP_INC) xproxy_connection.cpp

15 clean:

16 rm *.o xproxy 2>/dev/null
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: