WAMP RPC & Publish & Subscribe c++ 实现简析 & TCP 粘包处理方法
2016-06-16 15:05
826 查看
基于tcp/ip 的 RPC和Pub&Sub 基本上都是基于 boost::asio 实现的。
一个wamp_session表示一个wamp端的连接,wamp 提供了注册 callee,调用一个caller 或者向一个 topic pub&sub 数据的功能。
步骤:
创建一个 wamp session,建立 TPC/IP 连接
注册 callee, subscribe topic etc.
wamp_session::start(进行 hand_shake, 然后开始循环 receive_message,再根据获得的数据进行相应处理)
调用 caller, publish topic etc.
整个过程是通过 wamp_session::start 开始的
这是握手连接的 handler:
接收并处理消息(获得数据长度):
消息头部的处理(就是处理接受的四个字节,获得消息的长度并接受指定长度的消息):
处理消息body:
注:
boost::asio::async_read 是非阻塞的,立即返回,而其 handler 是在两种情况下会返回的:
This function is used to asynchronously read a certain number of bytes of data from a stream. The function call always returns immediately. The asynchronous operation will continue until one of the following conditions is true:
The supplied buffer is full (that is, it has reached maximum size). // 即读取到指定大小的数据
An error occurred.
async_read handler(回调函数)的格式:
关于TCP粘包:
所谓的TCP粘包简单的说就是通过TCP协议发送了多条独立的数据,但接收的时候,有些数据合并成了一个数据包被接收。
例如:
server 端分5次分别发送 1,2,3,4,5
client端可能第一次接受到数据1;第二次接收到2,3,4,第三次接收到5。
TCP通讯为何存在粘包呢?主要原因是TCP是以流的方式来处理数据,再加上网络上MTU的往往小于在应用处理的消息数据,所以就会引发一次接收的数据无法满足消息的需要,导致粘包的存在。处理粘包的唯一方法就是制定应用层的数据通讯协议,通过协议来规范现有接收的数据是否满足消息数据的需要。在应用中处理粘包的基础方法主要有两种分别是以4字节描述消息大小或以结束符,实际上也有两者相结合的如HTTP,redis的通讯协议等。
如图以4字节描述消息大小的数据结构:
粘包的解决办法可以归纳如下:
消息定长,例如每个报文的大小为固定长度200字节,如果不够,空位补全空格。
在包尾增加回车换行符进行分割,例如FTP协议。
将消息分为消息头和消息体,消息头中包含消息长度的字段,通常设计思路为消息头的第一个字段使用 int32 来表示消息的总长度(例如本文所述 wamp 协议)
一个wamp_session表示一个wamp端的连接,wamp 提供了注册 callee,调用一个caller 或者向一个 topic pub&sub 数据的功能。
步骤:
创建一个 wamp session,建立 TPC/IP 连接
注册 callee, subscribe topic etc.
wamp_session::start(进行 hand_shake, 然后开始循环 receive_message,再根据获得的数据进行相应处理)
调用 caller, publish topic etc.
整个过程是通过 wamp_session::start 开始的
template<typename IStream, typename OStream> boost::future<bool> wamp_session<IStream, OStream>::start() { // Send the initial handshake packet informing the server which // serialization format we wish to use, and our maximum message size. m_handshake_buffer[0] = 0x7F; // magic byte m_handshake_buffer[1] = 0xF2; // we are ready to receive messages up to 2**24 octets and encoded using MsgPack m_handshake_buffer[2] = 0x00; // reserved m_handshake_buffer[3] = 0x00; // reserved boost::asio::write( m_out, boost::asio::buffer(m_handshake_buffer, sizeof(m_handshake_buffer))); // 握手连接,通知服务器 std::weak_ptr<wamp_session<IStream, OStream>> weak_self = this->shared_from_this(); auto handshake_reply = [=](const boost::system::error_code& error) { auto shared_self = weak_self.lock(); if (shared_self) { shared_self->got_handshake_reply(error); // 收到握手连接回应的处理,然后开始 receive_message } }; // 由于TCP通讯存在粘包,以4节字描述消息大小来分割消息——也可以通过其他方式分割,例如:结束符 // Read the 4-byte reply from the server boost::asio::async_read( m_in, boost::asio::buffer(m_handshake_buffer, sizeof(m_handshake_buffer)), boost::bind<void>(handshake_reply, boost::asio::placeholders::error)); return m_handshake.get_future(); }
这是握手连接的 handler:
template<typename IStream, typename OStream> void wamp_session<IStream, OStream>::got_handshake_reply(const boost::system::error_code& error) { ... receive_message(); // 开始处理message }
接收并处理消息(获得数据长度):
template<typename IStream, typename OStream> void wamp_session<IStream, OStream>::receive_message() { if (m_debug) { std::cerr << "RX preparing to receive message .." << std::endl; } // read 4 octets msg length prefix .. boost::asio::async_read(m_in, boost::asio::buffer(m_message_length_buffer, sizeof(m_message_length_buffer)), bind(&wamp_session<IStream, OStream>::got_message_header, this->shared_from_this(), boost::asio::placeholders::error)); // char m_message_length_buffer[4]; }
消息头部的处理(就是处理接受的四个字节,获得消息的长度并接受指定长度的消息):
template<typename IStream, typename OStream> void wamp_session<IStream, OStream>::got_message_header(const boost::system::error_code& error) { if (!error) { m_message_length = ntohl(*((uint32_t*) &m_message_length_buffer)); if (m_debug) { std::cerr << "RX message (" << m_message_length << " octets) ..." << std::endl; } // read actual message m_unpacker.reserve_buffer(m_message_length); boost::asio::async_read(m_in, boost::asio::buffer(m_unpacker.buffer(), m_message_length), bind(&wamp_session<IStream, OStream>::got_message_body, this->shared_from_this(), boost::asio::placeholders::error)); } else { // TODO: Well this is no good. The session will basically just become unresponsive // at this point as we will no longer be trying to asynchronously receive messages. // Perhaps we should just try and read the next header. } }
处理消息body:
template<typename IStream, typename OStream> void wamp_session<IStream, OStream>::got_message_body(const boost::system::error_code& error) { if (!error) { if (m_debug) { std::cerr << "RX message received." << std::endl; } m_unpacker.buffer_consumed(m_message_length); msgpack::unpacked result; // msgpack 串行化 while (m_unpacker.next(&result)) { msgpack::object obj(result.get()); if (m_debug) { std::cerr << "RX WAMP message: " << obj << std::endl; } got_message(obj, std::move(result.zone())); // 根据 obj 中数据进行相应的函数调用,处理等,即一堆 switch case } if (!m_stopped) { receive_message(); // 继续循环接受消息。。。 } } else { // TODO: Well this is no good. The session will basically just become unresponsive // at this point as we will no longer be trying to asynchronously receive messages. // Perhaps we should just try and read the next header. } }
注:
boost::asio::async_read 是非阻塞的,立即返回,而其 handler 是在两种情况下会返回的:
This function is used to asynchronously read a certain number of bytes of data from a stream. The function call always returns immediately. The asynchronous operation will continue until one of the following conditions is true:
The supplied buffer is full (that is, it has reached maximum size). // 即读取到指定大小的数据
An error occurred.
async_read handler(回调函数)的格式:
void handler( const boost::system::error_code& error, // Result of operation. std::size_t bytes_transferred // Number of bytes copied into the // buffers. If an error occurred, // this will be the number of // bytes successfully transferred // prior to the error. );
关于TCP粘包:
所谓的TCP粘包简单的说就是通过TCP协议发送了多条独立的数据,但接收的时候,有些数据合并成了一个数据包被接收。
例如:
server 端分5次分别发送 1,2,3,4,5
client端可能第一次接受到数据1;第二次接收到2,3,4,第三次接收到5。
TCP通讯为何存在粘包呢?主要原因是TCP是以流的方式来处理数据,再加上网络上MTU的往往小于在应用处理的消息数据,所以就会引发一次接收的数据无法满足消息的需要,导致粘包的存在。处理粘包的唯一方法就是制定应用层的数据通讯协议,通过协议来规范现有接收的数据是否满足消息数据的需要。在应用中处理粘包的基础方法主要有两种分别是以4字节描述消息大小或以结束符,实际上也有两者相结合的如HTTP,redis的通讯协议等。
如图以4字节描述消息大小的数据结构:
粘包的解决办法可以归纳如下:
消息定长,例如每个报文的大小为固定长度200字节,如果不够,空位补全空格。
在包尾增加回车换行符进行分割,例如FTP协议。
将消息分为消息头和消息体,消息头中包含消息长度的字段,通常设计思路为消息头的第一个字段使用 int32 来表示消息的总长度(例如本文所述 wamp 协议)
相关文章推荐
- https单向认证及tomcat配置https方法
- 网络信息安全学习平台---脚本关第14题
- 笔记本电脑右下角网络图标显示红叉
- iOS 使用 AFNetwork && Alamofire 实时监测网络状况
- HTTP原理之网络基础
- cactiez安装配置
- HTTP协议详解(真的很经典)
- 一站式学习Wireshark(五):TCP窗口与拥塞处理
- 大端模式与小端模式、网络字节顺序与主机字节顺序
- 文件上传 servlet 从HttpServletRequest.getInputStream()中获得消息内容
- 前馈网络求导概论(一)·Softmax篇
- Mina和Netty的选择
- tensorflow从0开始(4)——解读mnist程序
- [置顶] Android http get请求里中文是乱码的解决(其中一种情况)
- Http协议
- Java的HTTP通信
- libuv学习笔记(12)
- HTTP权威指南读后感
- Android 几种网络请求的区别与联系
- Android 几种网络请求的区别与联系