您的位置:首页 > 其它

进程间通信笔记(3)—POSIX消息队列

2016-09-02 15:56 323 查看

POSIX 消息队列

1.概述

消息队列可认为是一个消息链表,头消息指定当前队列的两个属性:队列中允许的最大消息数和每个消息的最大大小。而每个消息本身除了数据之外还包括优先级和数据长度等内容。

1.1POSIX消息队列和SystemV消息队列的区别

1.对于Posix消息队列读操作总是返回最高优先级的最早消息,而SystemV则可以返回任意指定优先级的消息。

2.当往一个空队列放置消息时,Posix消息队列允许产生一个信号或者启动一个线程,SystemV不具备这样的机制。

1.2Posix消息队列数据结构

设想消息队列是一个链表,链表的头节点包含了最大消息数
mq_maxmsg
和每个消息的最大数据量
mq_msgsize
两个属性,数据节点则除了消息本身还有优先级,长度等字段信息。如图:



2.POSIX消息队列编程

POSIX 消息队列的IPC对象是以文件的方式存在于虚拟文件系统中,这些文件可以在终端使用
ls
命令查看,和使用
rm
命令删除。当我们要操作这些消息队列时,需要挂在消息队列文件系统:

su
Password:
# mkdir /dev/mqueue
# mount -t mqueue none /dev/mqueue
$exit


2.1 mq_open、mq_close和mq_unlink

#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>

mqd_t mq_open(const char *name, int oflag);
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);

Link with -lrt.


mq_open()
创建或者打开一个已经存在的POSIX 消息队列,这些消息队列用
name
参数来标识,一般是”/xxxx”的形式,创建成功后,消息队列将存在挂载在文件系统上的目录,例如
/dev/mqueue


#include <mqueue.h>

int mq_close(mqd_t mqdes);

Link with -lrt.


close()
函数的功能类似,关闭描述符,但是消息队列仍然存在,要删除消息队列,需要使用
mq_unlink()


#include <mqueue.h>

int mq_unlink(const char *name);

Link with -lrt


2.2 消息队列属性

每个消息队列有四个属性,如下结构体所示:

struct mq_attr {
long mq_flags;       /* Flags: 0 or O_NONBLOCK */
long mq_maxmsg;      /* Max number of messages on queue */
long mq_msgsize;     /* Max size of a message (in bytes) */
long mq_curmsgs;     /*numbers of messages currently in queue */
};


这些属性的默认值可以通过下述方式查看:

cat /proc/sys/fs/mqueue/msg_max         // 对应mq_maxmsg 默认10
cat /proc/sys/fs/mqueue/msgsize_max     //对应mq_msgsize 默认8192
cat /proc/sys/fs/mqueue/queues_max      //最大消息队列数 默认256


也可以通过
mq_setattr
mq_getattr
这两个函数来设置和获取状态:

#include <mqueue.h>

int mq_getattr(mqd_t mqdes, struct mq_attr *attr);

int mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr);

Link with -lrt.


2.3 mq_send和mq_receive函数

#include <mqueue.h>
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int  msg_prio);
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);


write
read
类似,不同的是,
mq_send
在发送消息时最后一个参数作为消息的优先级,
mq_receive
最后一个参数指针用来返回消息优先级。如果优先级并不需要,那么将
unsigned int msg_prio
设置为0,
unsigned int *msg_prio
设置为NULL。

2.4完整测试代码

简单的读写,写入端依次写入优先级不同的五个消息,读入端依次读出,根据POSIX消息队列的定义,读出顺序将是依次读出优先级高的最早消息。

写入端


//psxmq_send.c

#include <mqueue.h>
#include <stdbool.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <errno.h>
#include <string.h>

#define MAXSIZE 1024
int main(int argc,char**argv)
{
if(argc!=2)
{
printf("Usage: %s /mqname \r\n",argv[0]);
return -1;
}
char Msg[MAXSIZE]="Hello World";
char *name = argv[1];
int flags = O_RDWR | O_CREAT | O_EXCL ;
mode_t mode = S_IRUSR | S_IWUSR| S_IRGRP |S_IROTH;
//set mq_attr
struct mq_attr attr;
attr.mq_flags=0;
attr.mq_maxmsg=10;
attr.mq_msgsize=sizeof(Msg);
attr.mq_curmsgs=0;

mqd_t mqid = mq_open(name,flags,mode,&attr);
if(mqid==-1)
{
printf("error %s (%d)\r\n",strerror(errno),errno);
return -1;
}
int i;
for(i=0;i<5;i++)
{
if(mq_send(mqid,Msg,strlen(Msg),i)==-1)
{
perror("mq_send error");
return -1;
}
}
mq_close(mqid);
return 0;
}


读出端


//psxmq_recv.c

#include <mqueue.h>
#include <stdbool.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <errno.h>
#include <string.h>

int main(int argc,char**argv)
{
if(argc !=2)
{
printf("Usage: %s /mqname \r\n",argv[0]);
return -1;
}
const int MAXSIZE =1024;
char RecvBuff[MAXSIZE];
int prio;
ssize_t n;
char* name  = argv[1];
int   flags = O_RDONLY;

memset(RecvBuff,0x00,sizeof(RecvBuff));
mqd_t mqid = mq_open(name, flags);

struct mq_attr attr;
if(mqid==-1)
{
printf("error %s (%d)\r\n",strerror(errno),errno);
return -1;
}
while(true)
{
if(mq_getattr(mqid,&attr)==-1)
{
printf("get attr error\r\n");
break;
}
if(attr.mq_curmsgs==(long)0)
{
printf("no messages in queue\r\n");
break;
}

if((n=mq_receive(mqid,RecvBuff,sizeof(RecvBuff),&prio))==-1)
{
perror("mq_receive error");
return -1;
}
printf("read %ld bytes\r\n",(long)n);
printf("prio is %d\r\n",prio);
printf("%s \r\n",RecvBuff);
}
mq_close(mqid);
mq_unlink(name);
return 0;
}


Makefile


PROGS =Send Recv
CLEANFILES = core core.* *.core *.o temp.* *.out typescript* \
*.lc *.lh *.bsdi *.sparc *.uw

all :${PROGS}

CFLAGS+=-g
LIBS+=-lrt

Recv: psxmq_recv.o
${CC} ${CFLAGS} ${LIBPATH} $^  -o $@ ${LIBS}
@rm *.o

Send: psxmq_send.o
${CC} ${CFLAGS} ${LIBPATH} $^ -o $@   ${LIBS}
@rm *.o

clean:
rm -f ${PROGS} ${CLEANFILES}


3.参考

1.《UNP卷2》

2.http://manpages.ubuntu.com/manpages/wily/man7/mq_overview.7.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: