您的位置:首页 > 其它

ZeroMQ源码分析之Message

2014-12-15 22:03 106 查看
这篇博文主要分析了ZeroMQ源码中和message相关的消息机制,大致分析了ZeroMQ消息的基本结构,创建过程以及销毁过程。分析了ZeroMQ是如何区别对待长消息和短消息,并用指针计数的方法做到到零拷贝。其中涉及到union关键字的使用,强制类型转换和placement
new的用法等编程技巧,可供学习参考。本人水平有限,欢迎读者指正。

在实际项目中使用ZeroMQ创建消息时的代码通常如下:

zmq_msg_t msgName;	//第1行
zmq_msg_init(&msgName);	//第2行


这两条代码做了什么呢?
首先对第1行代码,在zmq.h中有如下定义:

typedef struct zmq_msg_t {unsigned char _ [32];} zmq_msg_t;


what?消息体就这样定义?也许这不是它的真面目。
看第2行代码,在zmq.cpp找到zmq_msg_init的实现方式。

int zmq_msg_init(zmq_msg_t *msg_){
return((zmq::msg_t*) msg_)->init();
}


晕,居然强制类型转换,把原本指向32字节空间的zmq_msg_t类型指针转换成了指向msg_t类型的指针。

现在来看一下这个神秘的msg_t,也就是ZeroMQ的消息类。
首先看msg.cpp中init()的实现

int zmq::msg_t::init(){
u.vsm.type =type_vsm;
u.vsm.flags =0;
u.vsm.size = 0;
return 0;
}


可以大致看出就是初始化了消息的类型、标志和消息内容大小。对应的看一下在msg.hpp是如何定义这个消息结构体的:
在msg.hpp的msg_t类中有:

union {
struct {
unsigned char unused [max_vsm_size + 1];
unsigned char type;
unsigned char flags;
} base;
struct {
unsigned char data [max_vsm_size];
unsigned char size;
unsigned char type;
unsigned char flags;
} vsm;
struct {
content_t *content;
unsigned char unused [max_vsm_size + 1 - sizeof (content_t*)];
unsigned char type;
unsigned char flags;
} lmsg;
struct {
void* data;
size_t size;
unsigned char unused
[max_vsm_size + 1 - sizeof (void*) - sizeof (size_t)];
unsigned char type;
unsigned char flags;
} cmsg;
struct {
unsigned char unused [max_vsm_size + 1];
unsigned char type;
unsigned char flags;
} delimiter;
} u;


可见这里利用union来压缩空间。union维护足够的空间来置放多个数据成员中的“一种”,而不是为每一个数据成员配置空间,在union中所有的数据成员共用一个空间,同一时间只能储存其中一个数据成员,所有的数据成员具有相同的起始地址。

从这里可以看出ZeroMQ的几种消息类型:vsm
(verysmall message?), lmsg (long message?), cmsg (constant message)
和delimiter 。

每个struct人为地控制为等长,其中unused数组就是用来控制每个struct的长度,使得后面的type和flags在每个struct中的存储位置是一样的。这样就可以做到,无论该消息是vsm或者lmsg或其他类型,只要调用u.base.type就能获取到这个消息的类型了。

enum {max_vsm_size =29};


通过vsm类型和lmsg类型的对比可以知道,ZeroMQ对短消息和长消息是区别对待的。对于短的消息,即不超过29字节的消息,直接复制赋值;而对于长消息,则需要在内存中分配空间,如下面代码所示:

//初始化消息大小

int zmq::msg_t::init_size (size_t size_)
{
if (size_ <= max_vsm_size) {
//当消息为小消息时
u.vsm.type = type_vsm;
u.vsm.flags = 0;
u.vsm.size = (unsigned char) size_;
}
else {

u.lmsg.type = type_lmsg;
u.lmsg.flags = 0;
u.lmsg.content =
(content_t*) malloc (sizeof (content_t) + size_);
//消息为长消息,需要分配内存空间
if (unlikely (!u.lmsg.content)) {
errno = ENOMEM;
return -1;
}

u.lmsg.content->data = u.lmsg.content + 1;
//指向在内存空间中分配的消息内容的地址
u.lmsg.content->size = size_;
u.lmsg.content->ffn = NULL;
u.lmsg.content->hint = NULL;
new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
}
return 0;
}


//初始化消息内容

int zmq::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_,
void *hint_)
{
//  If data is NULL and size is not 0, a segfault
//  would occur once the data is accessed
assert (data_ != NULL || size_ == 0);

//  Initialize constant message if there's no need to deallocate
if(ffn_ == NULL) {
//如果销毁函数为空,则该消息为常量消息
u.cmsg.type = type_cmsg;
u.cmsg.flags = 0;
u.cmsg.data = data_;
u.cmsg.size = size_;
}
else {
u.lmsg.type = type_lmsg;
u.lmsg.flags = 0;
u.lmsg.content = (content_t*) malloc (sizeof (content_t));
if (!u.lmsg.content) {
errno = ENOMEM;
return -1;
}

u.lmsg.content->data = data_;
u.lmsg.content->size = size_;
u.lmsg.content->ffn = ffn_;
u.lmsg.content->hint = hint_;
new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
//placement new 的用法,后面说明
}
return 0;

}


这里有必要看一下上面出现的content的结构:

struct content_t
{
void *data;
size_t size;
msg_free_fn *ffn;
void *hint;
zmq::atomic_counter_t refcnt;
};


其中ffn为销毁消息时使用的函数指针。而refcnt则是该消息被共享次数的计数器。当该计算器计数为0,即该消息以及没有被使用时,则该消息销毁。当需要拷贝长消息时,只要把指针指向长消息内容即可,然后计数器加一,这便实现了ZeroMQ所谓的零拷贝。

在上面msg_t::init_data()中出现了这么一行:

new(&u.lmsg.content->refcnt) zmq::atomic_counter_t ();


这里使用了placement
new的写法。placement
new是用来实现定位构造的,也就是在取得了一块可以容纳指定类型对象的内存后,在这块内存上构造一个对象。对new的深入了解,可以参考这个博客:http://blog.csdn.net/songthin/article/details/1703966

再回过头来看最开头的地方,好像还有一个问题没解决:

typedef structzmq_msg_t {unsigned char _ [32];} zmq_msg_t;


int zmq_msg_init (zmq_msg_t *msg_)
{
return ((zmq::msg_t*) msg_)->init ();
}


这里做的强制类型转换,把原本指向32字节空间的zmq_msg_t类型指针转换成了指向msg_t类型的指针,为什么是32位呢,通过下面代码,对消息结构进行字节计算,不难发现每个消息结构就是占了32个字节的。只不过长消息中使用了指针指向了用于存储长消息数据的内存空间而已。所以不要被外表所蒙骗,要看到内在,才知道她的心是怎样的。

struct {
unsigned char data [max_vsm_size];
unsigned char size;
unsigned char type;
unsigned char flags;
} vsm;
struct {
content_t *content;
unsigned char unused [max_vsm_size + 1 - sizeof (content_t*)];
unsigned char type;
unsigned char flags;
} lmsg;


enum {max_vsm_size =29};


上面简单分析了消息的创建过程,下面接着来分析一下消息的销毁过程,这里主要指的是对于长消息的销毁,因为只有长消息需要动态内存分配。
前面分析content结构的时候提到过,content中ffn为销毁消息时使用的函数指针,而refcnt则是该消息被共享次数的计数器,当该计算器计数为0,即该消息以及没有被使用时,则该消息销毁。
长消息的创建过程大致如下:
        1.
设置长消息标志位;
        2.动态分配一块内存给长消息的content;
        3.content中的data指向在内存空间中分配的消息内容的地址;
        4.设置content销毁消息用的销毁函数;
        5.
设置计数器。

如果是长消息拷贝,直接使content指针指向同一块内存区域即可,同时区域内的计数器加一。

长消息的销毁过程只有逆向创建过程即可,见如下代码(附带注释):

int zmq::msg_t::close ()
{

if (u.base.type == type_lmsg) {

// 如果content不是共享的,或者它是共享的但它的指向计数以及降为零,则释放它
if (!(u.lmsg.flags & msg_t::shared) ||
!u.lmsg.content->refcnt.sub (1)) {

//  1. 使用 "placement new"操作去初始化这个指向计数器的,所以要调用对应的显式的析构函数
u.lmsg.content->refcnt.~atomic_counter_t ();

// 2. 调用content中的销毁函数销毁指向的内存数据
if (u.lmsg.content->ffn)
u.lmsg.content->ffn (u.lmsg.content->data,
u.lmsg.content->hint);

// 3. 释放content
free (u.lmsg.content);
}
}

//  使该消息失效
u.base.type = 0;

return 0;

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息