您的位置:首页 > 运维架构 > Linux

linux进程间通讯-消息队列

2016-10-27 17:36 253 查看

1、什么是消息队列

 1.1 基本概念

  消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法。每个数据块都含有一个类型,接收进程可以选择地接收含有不同类型的数据结构。消息队列没有命名管道的同步和阻塞问题,但是消息队列与命名管道一样,每个数据块都有一个最大长度的限制。Linux用宏MSGMAX和MSGMNB来限制一条消息的最大长度和一个队列的最大长度。

  每个消息队列都有一个队列头。在内核中用结构struct msg_queue来描述,队列头中包含了该消息队列的大量信息,包括消息队列键值、用户ID、组ID、消息队列中消息数目等等,甚至记录了最近对消息队列读写进程的ID。读者可以访问这些信息,也可以设置其中的某些信息。在用户空间,消息队列头用结构struct msqid_ds来描述,所以在用户空间我们用结构msqid_ds用来设置或返回消息队列的信息,其实它是把内核中的数据结构struct msg_queue的内容读到用户空间的数据结构struct msqid_ds中而已。

  下图说明了内核与消息队列是怎样建立起联系的,其中,struct ipc_ids msg_ids是内核中记录消息队列的全局数据结构。



 1.2 数据结构

用户空间数据结构

a、struct msqid_ds结构体

struct msqid_ds {
  struct ipc_perm msg_perm;
  struct msg *msg_first;        /* first message on queue,unused  */
  struct msg *msg_last;     /* last message in queue,unused */
  __kernel_time_t msg_stime;    /* last msgsnd time */
  __kernel_time_t msg_rtime;    /* last msgrcv time */
  __kernel_time_t msg_ctime;    /* last change time */
  unsigned long  msg_lcbytes;   /* Reuse junk fields for 32 bit */
  unsigned long  msg_lqbytes;   /* ditto */
  unsigned short msg_cbytes;    /* current number of bytes on queue */
  unsigned short msg_qnum;  /* number of messages in queue */
  unsigned short msg_qbytes;    /* max number of bytes on queue */
   __kernel_ipc_pid_t msg_lspid; /* pid of last msgsnd */
   __kernel_ipc_pid_t msg_lrpid; /* last receive pid */
};


  msgque[MSGMNI]是一个msqid_ds结构的指针数组,每个msqid_ds结构指针代表一个系统消息队列,msgque[MSGMNI]的大小为MSGMNI=128,也就是说系统最多有MSGMNI=128个消息队列。一个struct msqid_ds代表一个消息队列。

b、struct msgbuf结构体

struct msgbuf {
  long mtype;
  char mtext[1];
};


  消息队列最大的灵活性在于,我们可以自己定义传递给队列的消息的数据类型的。不过这个类型并不是随便定义的,msgbuf结构给了我们一个这类数据类型的基本结构定义。

内核空间数据结构

(贴出来只是为了更好的理解消息队列,用户使用时可以无视)

a、struct msg_queue结构体

/* one msq_queue structure for each present queue on the system */
struct msg_queue {
  struct kern_ipc_perm q_perm;
   time_t q_stime;            /* last msgsnd time */
   time_t q_rtime;            /* last msgrcv time */
   time_t q_ctime;            /* last change time */
  unsigned long q_cbytes;        /* current number of bytes on queue */
  unsigned long q_qnum;      /* number of messages in queue */
  unsigned long q_qbytes;        /* max number of bytes on queue */
  pid_t q_lspid;         /* pid of last msgsnd */
  pid_t q_lrpid;         /* last receive pid */
  struct list_head q_messages;
  struct list_head q_receivers;
  struct list_head q_senders;
};


  当发送一个消息到该消息队列时,把发送的消息构造成一个msg结构对象,并添加到q_messages队列中,接收消息的时候也是从q_messages队列尾部查找到一个msg_type匹配的msg节点,从链表队列中删除该msg节点。

  

b、struct msg_msg结构体

/* one msg_msg structure for each message */
struct msg_msg {
  struct list_head m_list;
  long m_type;
  size_t m_ts;     /* message text size */
  struct msg_msgseg *next;
  void *security;
  /* the actual message follows immediately */
};


  消息队列在系统内核中是以消息链表的形式出现的,而完成消息链表每个节点结构定义的就是msg_msg结构。

2、消息队列的system_v调用

msgget函数

该函数用来创建和访问一个消息队列。它的原型为:

int msgget(key_t, key, int msgflg);


  与其他的IPC机制一样,程序必须提供一个键来命名某个特定的消息队列。

  msgflg是一个权限标志,表示消息队列的访问权限,它与文件的访问权限一样。msgflg可以与IPC_CREAT做或操作,表示当key所命名的消息队列不存在时创建一个消息队列,如果key所命名的消息队列存在时,IPC_CREAT标志会被忽略,而只返回一个标识符。它返回一个以key命名的消息队列的标识符(非零整数),失败时返回-1

msgsnd函数

该函数用来把消息添加到消息队列中。它的原型为:

int msgsend(int msgid, const void *msg_ptr, size_t msg_sz, int msgflg);


  msgid是由msgget函数返回的消息队列标识符。

  msg_ptr是一个指向准备发送消息的指针,但是消息的数据结构却有一定的要求,指针msg_ptr所指向的消息结构一定要是以一个长整型成员变量开始的结构体,接收函数将用这个成员来确定消息的类型。

  msg_sz是msg_ptr指向的消息的长度,注意是消息的长度,而不是整个结构体的长度,也就是说msg_sz是不包括长整型消息类型成员变量的长度。

  如果调用成功,消息数据的一分副本将被放到消息队列中,并返回0,失败时返回-1.

msgrcv函数

该函数用来从一个消息队列获取消息,它的原型为:

int msgrcv(int msgid, void *msg_ptr, size_t msg_sz, long int msgtype, int msgflg);


  msgtype可以实现一种简单的接收优先级。如果msgtype为0,就获取队列中的第一个消息。如果它的值大于零,将获取具有相同消息类型的第一个信息。如果它小于零,就获取类型等于或小于msgtype的绝对值的第一个消息。

  msgflg用于控制当队列中没有相应类型的消息可以接收时将发生的事情。

  调用成功时,该函数返回放到接收缓存区中的字节数,消息被复制到由msg_ptr指向的用户分配的缓存区中,然后删除消息队列中的对应消息。失败时返回-1.

msgctl函数

该函数用来控制消息队列,它与共享内存的shmctl函数相似,它的原型为:

int msgctl(int msgid, int command, struct msgid_ds *buf);


成功时返回0,失败时返回-1。command是将要采取的动作,它可以取3个值,

IPC_STAT:把msgid_ds结构中的数据设置为消息队列的当前关联值,即用消息队列的当前关联值覆盖msgid_ds的值。

IPC_SET:如果进程有足够的权限,就把消息列队的当前关联值设置为msgid_ds结构中给出的值

IPC_RMID:删除消息队列

  

3、消息队列与命名管道相比

1、消息队列也可以独立于发送和接收进程而存在,从而消除了在同步命名管道的打开和关闭时可能产生的困难。

2、同时通过发送消息还可以避免命名管道的同步和阻塞问题,不需要由进程自己来提供同步方法。

3、接收程序可以通过消息类型有选择地接收数据,而不是像命名管道中那样,只能默认地接收。

4、使用例子

  由于消息队列可以让不相关的进程进行行通信,所以我们在这里将会编写两个程序,msgreceive和msgsned来表示接收和发送信息。根据正常的情况,我们允许两个程序都可以创建消息,但只有接收者在接收完最后一个消息之后,它才把它删除。

  msgreceive.c文件

#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <sys/msg.h>

struct msg_st
{
long int msg_type;
char text[BUFSIZ];
};

int main()
{
int running = 1;
int msgid = -1;
struct msg_st data;
long int msgtype = 0; //注意1

//建立消息队列
msgid = msgget((key_t)1234, 0666 | IPC_CREAT);
if(msgid == -1)
{
fprintf(stderr, "msgget failed with error: %d\n", errno);
exit(EXIT_FAILURE);
}
//从队列中获取消息,直到遇到end消息为止
while(running)
{
if(msgrcv(msgid, (void*)&data, BUFSIZ, msgtype, 0) == -1)
{
fprintf(stderr, "msgrcv failed with errno: %d\n", errno);
exit(EXIT_FAILURE);
}
printf("You wrote: %s\n",data.text);
//遇到end结束
if(strncmp(data.text, "end", 3) == 0)
running = 0;
}
//删除消息队列
if(msgctl(msgid, IPC_RMID, 0) == -1)
{
fprintf(stderr, "msgctl(IPC_RMID) failed\n");
exit(EXIT_FAILURE);
}
exit(EXIT_SUCCESS);
}


  msgsend.c文件

#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/msg.h>
#include <errno.h>

#define MAX_TEXT 512
struct msg_st
{
long int msg_type;
char text[MAX_TEXT];
};

int main()
{
int running = 1;
struct msg_st data;
char buffer[BUFSIZ];
int msgid = -1;

//建立消息队列
msgid = msgget((key_t)1234, 0666 | IPC_CREAT);
if(msgid == -1)
{
fprintf(stderr, "msgget failed with error: %d\n", errno);
exit(EXIT_FAILURE);
}

//向消息队列中写消息,直到写入end
while(running)
{
//输入数据
printf("Enter some text: ");
fgets(buffer, BUFSIZ, stdin);
data.msg_type = 1;    //注意2
strcpy(data.text, buffer);
//向队列发送数据
if(msgsnd(msgid, (void*)&data, MAX_TEXT, 0) == -1)
{
fprintf(stderr, "msgsnd failed\n");
exit(EXIT_FAILURE);
}
//输入end结束输入
if(strncmp(buffer, "end", 3) == 0)
running = 0;
sleep(1);
}
exit(EXIT_SUCCESS);
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息