您的位置:首页 > 理论基础 > 计算机网络

WAMP RPC & Publish & Subscribe c++ 实现简析 & TCP 粘包处理方法

2016-06-16 15:05 826 查看
基于tcp/ip 的 RPC和Pub&Sub 基本上都是基于 boost::asio 实现的。

一个wamp_session表示一个wamp端的连接,wamp 提供了注册 callee,调用一个caller 或者向一个 topic pub&sub 数据的功能。

步骤:

创建一个 wamp session,建立 TPC/IP 连接

注册 callee, subscribe topic etc.

wamp_session::start(进行 hand_shake, 然后开始循环 receive_message,再根据获得的数据进行相应处理)

调用 caller, publish topic etc.

整个过程是通过 wamp_session::start 开始的

template<typename IStream, typename OStream>
boost::future<bool> wamp_session<IStream, OStream>::start()
{
// Send the initial handshake packet informing the server which
// serialization format we wish to use, and our maximum message size.
m_handshake_buffer[0] = 0x7F; // magic byte
m_handshake_buffer[1] = 0xF2; // we are ready to receive messages up to 2**24 octets and encoded using MsgPack
m_handshake_buffer[2] = 0x00; // reserved
m_handshake_buffer[3] = 0x00; // reserved

boost::asio::write(
m_out,
boost::asio::buffer(m_handshake_buffer, sizeof(m_handshake_buffer)));  // 握手连接,通知服务器

std::weak_ptr<wamp_session<IStream, OStream>> weak_self = this->shared_from_this();
auto handshake_reply = [=](const boost::system::error_code& error) {
auto shared_self = weak_self.lock();
if (shared_self) {
shared_self->got_handshake_reply(error);  // 收到握手连接回应的处理,然后开始 receive_message
}
};

// 由于TCP通讯存在粘包,以4节字描述消息大小来分割消息——也可以通过其他方式分割,例如:结束符
// Read the 4-byte reply from the server
boost::asio::async_read(
m_in,
boost::asio::buffer(m_handshake_buffer, sizeof(m_handshake_buffer)),
boost::bind<void>(handshake_reply, boost::asio::placeholders::error));

return m_handshake.get_future();
}


这是握手连接的 handler:

template<typename IStream, typename OStream>
void wamp_session<IStream, OStream>::got_handshake_reply(const boost::system::error_code& error)
{
...
receive_message();  // 开始处理message
}


接收并处理消息(获得数据长度):

template<typename IStream, typename OStream>
void wamp_session<IStream, OStream>::receive_message()
{
if (m_debug) {
std::cerr << "RX preparing to receive message .." << std::endl;
}

// read 4 octets msg length prefix ..
boost::asio::async_read(m_in,
boost::asio::buffer(m_message_length_buffer, sizeof(m_message_length_buffer)),
bind(&wamp_session<IStream, OStream>::got_message_header, this->shared_from_this(), boost::asio::placeholders::error));  // char m_message_length_buffer[4];
}


消息头部的处理(就是处理接受的四个字节,获得消息的长度并接受指定长度的消息):

template<typename IStream, typename OStream>
void wamp_session<IStream, OStream>::got_message_header(const boost::system::error_code& error)
{
if (!error) {
m_message_length = ntohl(*((uint32_t*) &m_message_length_buffer));

if (m_debug) {
std::cerr << "RX message (" << m_message_length << " octets) ..." << std::endl;
}

// read actual message
m_unpacker.reserve_buffer(m_message_length);

boost::asio::async_read(m_in,
boost::asio::buffer(m_unpacker.buffer(), m_message_length),
bind(&wamp_session<IStream, OStream>::got_message_body, this->shared_from_this(), boost::asio::placeholders::error));
} else {
// TODO: Well this is no good. The session will basically just become unresponsive
// at this point as we will no longer be trying to asynchronously receive messages.
// Perhaps we should just try and read the next header.
}
}


处理消息body:

template<typename IStream, typename OStream>
void wamp_session<IStream, OStream>::got_message_body(const boost::system::error_code& error)
{
if (!error) {
if (m_debug) {
std::cerr << "RX message received." << std::endl;
}

m_unpacker.buffer_consumed(m_message_length);
msgpack::unpacked result;  // msgpack 串行化

while (m_unpacker.next(&result)) {
msgpack::object obj(result.get());

if (m_debug) {
std::cerr << "RX WAMP message: " << obj << std::endl;
}

got_message(obj, std::move(result.zone()));  // 根据 obj 中数据进行相应的函数调用,处理等,即一堆 switch case
}

if (!m_stopped) {
receive_message();  // 继续循环接受消息。。。
}
} else {
// TODO: Well this is no good. The session will basically just become unresponsive
// at this point as we will no longer be trying to asynchronously receive messages.
// Perhaps we should just try and read the next header.
}
}


注:

boost::asio::async_read 是非阻塞的,立即返回,而其 handler 是在两种情况下会返回的:

This function is used to asynchronously read a certain number of bytes of data from a stream. The function call always returns immediately. The asynchronous operation will continue until one of the following conditions is true:

The supplied buffer is full (that is, it has reached maximum size). // 即读取到指定大小的数据

An error occurred.

async_read handler(回调函数)的格式:

void handler(
const boost::system::error_code& error, // Result of operation.

std::size_t bytes_transferred           // Number of bytes copied into the
// buffers. If an error occurred,
// this will be the  number of
// bytes successfully transferred
// prior to the error.
);


关于TCP粘包:

所谓的TCP粘包简单的说就是通过TCP协议发送了多条独立的数据,但接收的时候,有些数据合并成了一个数据包被接收。

例如:

server 端分5次分别发送 1,2,3,4,5

client端可能第一次接受到数据1;第二次接收到2,3,4,第三次接收到5。

TCP通讯为何存在粘包呢?主要原因是TCP是以流的方式来处理数据,再加上网络上MTU的往往小于在应用处理的消息数据,所以就会引发一次接收的数据无法满足消息的需要,导致粘包的存在。处理粘包的唯一方法就是制定应用层的数据通讯协议,通过协议来规范现有接收的数据是否满足消息数据的需要。在应用中处理粘包的基础方法主要有两种分别是以4字节描述消息大小或以结束符,实际上也有两者相结合的如HTTP,redis的通讯协议等。

如图以4字节描述消息大小的数据结构:



粘包的解决办法可以归纳如下:

消息定长,例如每个报文的大小为固定长度200字节,如果不够,空位补全空格。

在包尾增加回车换行符进行分割,例如FTP协议。

将消息分为消息头和消息体,消息头中包含消息长度的字段,通常设计思路为消息头的第一个字段使用 int32 来表示消息的总长度(例如本文所述 wamp 协议)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: