Linux进程通信IPC--消息队列MessageQueue
2016-08-11 22:57
441 查看
前言: Linux 进程通信系列文章是对工作中使用的技术进行描述总结,使用了Boost 库IPC 实现,因此文章的代码部分选自Boost example 文档。
MQ被创建后,不属于任何进程,独立于进程存在,因此MQ必须具有唯一的名称,进程使用名称访问它,此外MQ是持久的存在的,即使创建的进程死亡后仍然留在内存中,需要通过显示调用remove 删除。MQ被创建后,每个进程都可以向消息队列中发送和从消息队列中获取消息,读取过程不可避免产生竞争,也必然需要同步,不过该同步过程由消息队列本身完成,用户不需要关心。
线程通过三种方式向MQ中发送/从MQ中获取消息:
1) 阻塞式:发送消息时,若MQ已满,则发送方将会阻塞等待直到MQ消息取出后有空间, 投递成功后返回;获取消息时,若MQ为空,则获取方阻塞等待直到MQ有新的消息,获取消息成功后返回。
2) Timed 阻塞式:发送方和接收方都自定义超时时间,在该超时时间内阻塞式等待, 若超时时间到了,则发送方无论是否发送成功,接收方无论是否获取成功都会返回,避免一直阻塞等待。
3) Try发送/获取式:发送方尝试发送消息至消息队列,若队列不满,则投递成功返回,否则投递失败但也立即返回;接收方尝试从消息队列中获取消息,若消息队列不为空则获取成功返回,否则获取失败立即返回。该方式特点是:无论MQ是否满或者空,都能立即返回。
创建/打开消息队列:
发送/接收消息:
删除消息队列:
创建发送方:
接收方:
需要注意:
MQ只是简单的将要发送的数据在内存中进行拷贝,所以我们在发送复杂结构或对象时,我们需要将其序列化后再发送,接收端接收时要反序列化,也就是说我们要自己去定义区分一条消息(定义通迅协议)。
A message queue just copies raw bytes between processes and does not send objects. This means that if we want to send an object using a message queue the object must be binary serializable. For example, we can send integers between processes but not a std::string. You should use Boost.Serialization or use advanced Boost.Interprocess mechanisms to send complex data between processes.
消息队列
消息队列(MessageQueue, 以下简称MQ)为同一台计算机进程间通信(IPC)提供了一种方式,提供了从一个进程向另外一个进程发送数据块的方法。MQ被创建后,不属于任何进程,独立于进程存在,因此MQ必须具有唯一的名称,进程使用名称访问它,此外MQ是持久的存在的,即使创建的进程死亡后仍然留在内存中,需要通过显示调用remove 删除。MQ被创建后,每个进程都可以向消息队列中发送和从消息队列中获取消息,读取过程不可避免产生竞争,也必然需要同步,不过该同步过程由消息队列本身完成,用户不需要关心。
线程通过三种方式向MQ中发送/从MQ中获取消息:
1) 阻塞式:发送消息时,若MQ已满,则发送方将会阻塞等待直到MQ消息取出后有空间, 投递成功后返回;获取消息时,若MQ为空,则获取方阻塞等待直到MQ有新的消息,获取消息成功后返回。
2) Timed 阻塞式:发送方和接收方都自定义超时时间,在该超时时间内阻塞式等待, 若超时时间到了,则发送方无论是否发送成功,接收方无论是否获取成功都会返回,避免一直阻塞等待。
3) Try发送/获取式:发送方尝试发送消息至消息队列,若队列不满,则投递成功返回,否则投递失败但也立即返回;接收方尝试从消息队列中获取消息,若消息队列不为空则获取成功返回,否则获取失败立即返回。该方式特点是:无论MQ是否满或者空,都能立即返回。
消息队列实现
消息队列模型并不复杂,Boost IPC 编程提供了跨平台的消息队列实现,简单方便。创建/打开消息队列:
message_queue_t(create_only_t create_only, //仅创建 const char *name, //消息队列名称,进程通过名称访问该消息队列 size_type max_num_msg, //max 消息数量 size_type max_msg_size, //max 消息占用空间 const permissions &perm = permissions()); message_queue_t(open_only_t open_only, //仅打开 const char *name); //消息队列名称,进程通过名称访问该消息队列 message_queue_t(open_or_create_t open_or_create, //如果不存在则创建,存在则打开 const char *name, size_type max_num_msg, size_type max_msg_size, const permissions &perm = permissions());
发送/接收消息:
void send (const void *buffer, //消息内容 size_type buffer_size, //消息内容size unsigned int priority); //优先级 bool try_send(const void *buffer, size_type buffer_size, unsigned int priority); bool timed_send(const void *buffer, size_type buffer_size, unsigned int priority, const boost::posix_time::ptime& abs_time); //超时时间 void receive(void *buffer, //用于存储获取消息的buffer size_type buffer_size, //用于存储获取消息buffer的size size_type &recvd_size, //实际获取的size unsigned int &priority); bool try_receive(void *buffer, size_type buffer_size, size_type &recvd_size, unsigned int &priority); bool timed_receive(void *buffer, size_type buffer_size, size_type &recvd_size, unsigned int &priority, const boost::posix_time::ptime &abs_time); //超时时间
删除消息队列:
static bool remove(const char *name); //这是静态函数,消息队列不依赖于进程
消息队列举例
使用example,选自boost 官方文档:创建发送方:
using namespace boost::interprocess; int main () { try{ //Erase previous message queue message_queue::remove("message_queue"); //Create a message_queue. message_queue mq(create_only, //only create "message_queue", //name 100, //max message number sizeof(int)); //max message size //Send 100 numbers for(int i = 0; i < 100; ++i){ mq.send(&i, sizeof(i), 0); } } catch(interprocess_exception &ex){ std::cout << ex.what() << std::endl; return 1; } return 0; }
接收方:
using namespace boost::interprocess; int main () { try{ //Open a message queue. message_queue mq(open_only, //only create "message_queue"); //name unsigned int< 4000 /span> priority; message_queue::size_type recvd_size; //Receive 100 numbers for(int i = 0; i < 100; ++i){ int number; mq.receive(&number, sizeof(number), recvd_size, priority); if(number != i || recvd_size != sizeof(number)) return 1; } } catch(interprocess_exception &ex){ message_queue::remove("message_queue"); std::cout << ex.what() << std::endl; return 1; } message_queue::remove("message_queue"); //删除MQ return 0; }
需要注意:
MQ只是简单的将要发送的数据在内存中进行拷贝,所以我们在发送复杂结构或对象时,我们需要将其序列化后再发送,接收端接收时要反序列化,也就是说我们要自己去定义区分一条消息(定义通迅协议)。
A message queue just copies raw bytes between processes and does not send objects. This means that if we want to send an object using a message queue the object must be binary serializable. For example, we can send integers between processes but not a std::string. You should use Boost.Serialization or use advanced Boost.Interprocess mechanisms to send complex data between processes.
相关文章推荐
- Linux — IPC进程通信之消息队列详解
- linux基础编程:进程通信之System V IPC:消息队列,信号量,共享内存
- linux基础编程:进程通信之System V IPC:消息队列,信号量,共享内存
- linux消息队列进程通信
- Linux进程通信之POSIX消息队列
- linux进程通信---消息队列
- Linux进程通信之POSIX消息队列
- linux进程通信之消息队列
- linux消息队列进程通信
- Linux进程通信之POSIX消息队列
- LINUX进程通信--消息队列
- msgget();msgsnd();msgrcv();msgctl(); 消息队列 Linux进程间的通信方式之消息队列
- Linux进程之间通信消息队列
- Linux进程通信 消息队列
- linux消息队列进程通信
- [Linux] 进程通信-消息队列
- 进程通信 IPC 之消息队列
- 学习笔记——操作系统_Linux进程通信之消息队列
- Linux消息队列进程通信的介绍
- linux 进程间间通信使用消息队列