您的位置:首页 > 运维架构 > Linux

Linux进程通信IPC--消息队列MessageQueue

2016-08-11 22:57 441 查看
前言: Linux 进程通信系列文章是对工作中使用的技术进行描述总结,使用了Boost 库IPC 实现,因此文章的代码部分选自Boost example 文档。

消息队列

消息队列(MessageQueue, 以下简称MQ)为同一台计算机进程间通信(IPC)提供了一种方式,提供了从一个进程向另外一个进程发送数据块的方法。

MQ被创建后,不属于任何进程,独立于进程存在,因此MQ必须具有唯一的名称,进程使用名称访问它,此外MQ是持久的存在的,即使创建的进程死亡后仍然留在内存中,需要通过显示调用remove 删除。MQ被创建后,每个进程都可以向消息队列中发送和从消息队列中获取消息,读取过程不可避免产生竞争,也必然需要同步,不过该同步过程由消息队列本身完成,用户不需要关心。

线程通过三种方式向MQ中发送/从MQ中获取消息:

1) 阻塞式:发送消息时,若MQ已满,则发送方将会阻塞等待直到MQ消息取出后有空间, 投递成功后返回;获取消息时,若MQ为空,则获取方阻塞等待直到MQ有新的消息,获取消息成功后返回。

2) Timed 阻塞式:发送方和接收方都自定义超时时间,在该超时时间内阻塞式等待, 若超时时间到了,则发送方无论是否发送成功,接收方无论是否获取成功都会返回,避免一直阻塞等待。

3) Try发送/获取式:发送方尝试发送消息至消息队列,若队列不满,则投递成功返回,否则投递失败但也立即返回;接收方尝试从消息队列中获取消息,若消息队列不为空则获取成功返回,否则获取失败立即返回。该方式特点是:无论MQ是否满或者空,都能立即返回。

消息队列实现

消息队列模型并不复杂,Boost IPC 编程提供了跨平台的消息队列实现,简单方便。

创建/打开消息队列:

message_queue_t(create_only_t create_only,  //仅创建
const char *name,          //消息队列名称,进程通过名称访问该消息队列
size_type max_num_msg,     //max 消息数量
size_type max_msg_size,    //max 消息占用空间
const permissions &perm = permissions());

message_queue_t(open_only_t open_only,      //仅打开
const char *name);         //消息队列名称,进程通过名称访问该消息队列

message_queue_t(open_or_create_t open_or_create, //如果不存在则创建,存在则打开
const char *name,
size_type max_num_msg,
size_type max_msg_size,
const permissions &perm = permissions());


发送/接收消息:

void send (const void *buffer,      //消息内容
size_type buffer_size,   //消息内容size
unsigned int priority);  //优先级

bool try_send(const void *buffer,
size_type buffer_size,
unsigned int priority);

bool timed_send(const void *buffer,
size_type buffer_size,
unsigned int priority,
const boost::posix_time::ptime& abs_time);  //超时时间

void receive(void *buffer,                 //用于存储获取消息的buffer
size_type buffer_size,   //用于存储获取消息buffer的size
size_type &recvd_size,   //实际获取的size
unsigned int &priority);

bool try_receive(void *buffer,
size_type buffer_size,
size_type &recvd_size,
unsigned int &priority);

bool timed_receive(void *buffer,
size_type buffer_size,
size_type &recvd_size,
unsigned int &priority,
const boost::posix_time::ptime &abs_time);  //超时时间


删除消息队列:

static bool remove(const char *name);   //这是静态函数,消息队列不依赖于进程


消息队列举例

使用example,选自boost 官方文档:

创建发送方:

using namespace boost::interprocess;

int main ()
{
try{
//Erase previous message queue
message_queue::remove("message_queue");

//Create a message_queue.
message_queue mq(create_only,               //only create
"message_queue",           //name
100,                       //max message number
sizeof(int));              //max message size

//Send 100 numbers
for(int i = 0; i < 100; ++i){
mq.send(&i, sizeof(i), 0);
}
}
catch(interprocess_exception &ex){
std::cout << ex.what() << std::endl;
return 1;
}

return 0;
}


接收方:

using namespace boost::interprocess;

int main ()
{
try{
//Open a message queue.
message_queue mq(open_only,         //only create
"message_queue");  //name

unsigned int<
4000
/span> priority;
message_queue::size_type recvd_size;

//Receive 100 numbers
for(int i = 0; i < 100; ++i){
int number;
mq.receive(&number, sizeof(number), recvd_size, priority);
if(number != i || recvd_size != sizeof(number))
return 1;
}
}
catch(interprocess_exception &ex){
message_queue::remove("message_queue");
std::cout << ex.what() << std::endl;
return 1;
}

message_queue::remove("message_queue");  //删除MQ
return 0;
}


需要注意:

MQ只是简单的将要发送的数据在内存中进行拷贝,所以我们在发送复杂结构或对象时,我们需要将其序列化后再发送,接收端接收时要反序列化,也就是说我们要自己去定义区分一条消息(定义通迅协议)。

A message queue just copies raw bytes between processes and does not send objects. This means that if we want to send an object using a message queue the object must be binary serializable. For example, we can send integers between processes but not a std::string. You should use Boost.Serialization or use advanced Boost.Interprocess mechanisms to send complex data between processes.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: