您的位置:首页 > 其它

【Boost】boost库asio详解3——io_service作为work pool

2017-08-29 17:39 447 查看
无论如何使用,都能感觉到使用boost.asio实现服务器,不仅是一件非常轻松的事,而且代码很漂亮,逻辑也相当清晰,这点上很不同于ACE。
使用io_service作为处理工作的work pool,可以看到,就是通过io_service.post投递一个Handler到io_service的队列,Handler在这个io_service.run内部得到执行,有可能你会发现,io_services.dispatch的接口也和io_service.post一样,但不同的是它是直接调用而不是经过push到队列然后在io_services.run中执行,而在这个示例当中,显然我们需要把工作交到另一个线程去完成,这样才不会影响网络接收线程池的工作以达到高效率的接收数据,这种设计与前面的netsever其实相同,这就是典型的Half Sync/Half Async。二者的区别就是netsever自己实现了工作队列,而不是直接使用io_service,这种设计实际上在win下是使用了iocp作为工作队列。
不过我更倾向于前一种设计,因为那样做,代码一切都在自己的掌握中,而io_service则是经过许多封装代码,并且本身设计只是用于处理网络完成事件的。
无论如何使用,都能感觉到使用boost.asio实现服务器,不仅是一件非常轻松的事,而且代码很漂亮,逻辑也相当清晰,这点上很不同于ACE。

[cpp] view plain copy

print?

#include <stdio.h>

#include <cstdlib>

#include <iostream>

#include <boost/thread.hpp>

#include <boost/aligned_storage.hpp>

#include <boost/array.hpp>

#include <boost/bind.hpp>

#include <boost/enable_shared_from_this.hpp>

#include <boost/noncopyable.hpp>

#include <boost/shared_ptr.hpp>

#include <boost/asio.hpp>

using boost::asio::ip::tcp;

class handler_allocator

: private boost::noncopyable

{

public:

handler_allocator()

: in_use_(false)

{

}

void* allocate(std::size_t size)

{

if (!in_use_ && size < storage_.size)

{

in_use_ = true;

return storage_.address();

}

else

{

return ::operator new(size);

}

}

void deallocate(void* pointer)

{

if (pointer == storage_.address())

{

in_use_ = false;

}

else

{

::operator delete(pointer);

}

}

private:

// Storage space used for handler-based custom memory allocation.

boost::aligned_storage<1024> storage_;

// Whether the handler-based custom allocation storage has been used.

bool in_use_;

};

template <typename Handler>

class custom_alloc_handler

{

public:

custom_alloc_handler(handler_allocator& a, Handler h)

: allocator_(a),

handler_(h)

{

}

template <typename Arg1>

void operator()(Arg1 arg1)

{

handler_(arg1);

}

template <typename Arg1, typename Arg2>

void operator()(Arg1 arg1, Arg2 arg2)

{

handler_(arg1, arg2);

}

friend void* asio_handler_allocate(std::size_t size,

custom_alloc_handler<Handler>* this_handler)

{

return this_handler->allocator_.allocate(size);

}

friend void asio_handler_deallocate(void* pointer, std::size_t /*size*/,

custom_alloc_handler<Handler>* this_handler)

{

this_handler->allocator_.deallocate(pointer);

}

private:

handler_allocator& allocator_;

Handler handler_;

};

// Helper function to wrap a handler object to add custom allocation.

template <typename Handler>

inline custom_alloc_handler<Handler> make_custom_alloc_handler(

handler_allocator& a, Handler h)

{

return custom_alloc_handler<Handler>(a, h);

}

/// A pool of io_service objects.

class io_service_pool

: private boost::noncopyable

{

public:

/// Construct the io_service pool.

explicit io_service_pool(std::size_t pool_size) : next_io_service_(0)

{

if (pool_size == 0)

throw std::runtime_error("io_service_pool size is 0");

// Give all the io_services work to do so that their run() functions will not

// exit until they are explicitly stopped.

for (std::size_t i = 0; i < pool_size; ++i)

{

io_service_ptr io_service(new boost::asio::io_service);

work_ptr work(new boost::asio::io_service::work(*io_service));

io_services_.push_back(io_service);

work_.push_back(work);

}

}

// Run all io_service objects in the pool.

void run()

{

// Create a pool of threads to run all of the io_services.

std::vector<boost::shared_ptr<boost::thread> > threads;

for (std::size_t i = 0; i < io_services_.size(); ++i)

{

boost::shared_ptr<boost::thread> thread(new boost::thread(

boost::bind(&boost::asio::io_service::run, io_services_[i])));

threads.push_back(thread);

}

// Wait for all threads in the pool to exit.

for (std::size_t i = 0; i < threads.size(); ++i)

threads[i]->join();

}

// Stop all io_service objects in the pool.

void stop()

{

// Explicitly stop all io_services.

for (std::size_t i = 0; i < io_services_.size(); ++i)

io_services_[i]->stop();

}

// Get an io_service to use.

boost::asio::io_service& get_io_service()

{

// Use a round-robin scheme to choose the next io_service to use.

boost::asio::io_service& io_service = *io_services_[next_io_service_];

++next_io_service_;

if (next_io_service_ == io_services_.size())

next_io_service_ = 0;

return io_service;

}

private:

typedef boost::shared_ptr<boost::asio::io_service> io_service_ptr;

typedef boost::shared_ptr<boost::asio::io_service::work> work_ptr;

/// The pool of io_services.

std::vector<io_service_ptr> io_services_;

/// The work that keeps the io_services running.

std::vector<work_ptr> work_;

/// The next io_service to use for a connection.

std::size_t next_io_service_;

};

class session

: public boost::enable_shared_from_this<session>

{

public:

session(boost::asio::io_service& work_service

, boost::asio::io_service& io_service)

: socket_(io_service)

, io_work_service(work_service)

{

}

tcp::socket& socket()

{

return socket_;

}

void start()

{

socket_.async_read_some(boost::asio::buffer(data_),

make_custom_alloc_handler(allocator_,

boost::bind(&session::handle_read,

shared_from_this(),

boost::asio::placeholders::error,

boost::asio::placeholders::bytes_transferred)));

}

void handle_read(const boost::system::error_code& error,

size_t bytes_transferred)

{

if (!error)

{

boost::shared_ptr<std::vector<char> > buf(new std::vector<char>);

buf->resize(bytes_transferred);

std::copy(data_.begin(), data_.begin() + bytes_transferred, buf->begin());

io_work_service.post(boost::bind(&session::on_receive

, shared_from_this(), buf, bytes_transferred));

socket_.async_read_some(boost::asio::buffer(data_),

make_custom_alloc_handler(allocator_,

boost::bind(&session::handle_read,

shared_from_this(),

boost::asio::placeholders::error,

boost::asio::placeholders::bytes_transferred)));

}

}

void handle_write(const boost::system::error_code& error)

{

if (!error)

{

}

}

void on_receive(boost::shared_ptr<std::vector<char> > buffers

, size_t bytes_transferred)

{

char* data_stream = &(*buffers->begin());

// in here finish the work.

std::cout << "receive :" << bytes_transferred << " bytes." <<

"message :" << data_stream << std::endl;

}

private:

// The io_service used to finish the work.

boost::asio::io_service& io_work_service;

// The socket used to communicate with the client.

tcp::socket socket_;

// Buffer used to store data received from the client.

boost::array<char, 1024> data_;

// The allocator to use for handler-based custom memory allocation.

handler_allocator allocator_;

};

typedef boost::shared_ptr<session> session_ptr;

class server

{

public:

server(short port, std::size_t io_service_pool_size)

: io_service_pool_(io_service_pool_size)

, io_service_work_pool_(io_service_pool_size)

, acceptor_(io_service_pool_.get_io_service(), tcp::endpoint(tcp::v4(), port))

{

session_ptr new_session(new session(io_service_work_pool_.get_io_service()

, io_service_pool_.get_io_service()));

acceptor_.async_accept(new_session->socket(),

boost::bind(&server::handle_accept, this, new_session,

boost::asio::placeholders::error));

}

void handle_accept(session_ptr new_session,

const boost::system::error_code& error)

{

if (!error)

{

new_session->start();

new_session.reset(new session(io_service_work_pool_.get_io_service()

, io_service_pool_.get_io_service()));

acceptor_.async_accept(new_session->socket(),

boost::bind(&server::handle_accept, this, new_session,

boost::asio::placeholders::error));

}

}

void run()

{

io_thread_.reset(new boost::thread(boost::bind(&io_service_pool::run

, &io_service_pool_)));

work_thread_.reset(new boost::thread(boost::bind(&io_service_pool::run

, &io_service_work_pool_)));

}

void stop()

{

io_service_pool_.stop();

io_service_work_pool_.stop();

io_thread_->join();

work_thread_->join();

}

private:

boost::shared_ptr<boost::thread> io_thread_;

boost::shared_ptr<boost::thread> work_thread_;

io_service_pool io_service_pool_;

io_service_pool io_service_work_pool_;

tcp::acceptor acceptor_;

};

int main(int argc, char* argv[])

{

try

{

if (argc != 2)

{

std::cerr << "Usage: server <port>/n";

return 1;

}

using namespace std; // For atoi.

server s(atoi(argv[1]), 10);

s.run();

getchar();

s.stop();

}

catch (std::exception& e)

{

std::cerr << "Exception: " << e.what() << "/n";

}

return 0;

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: