您的位置:首页 > 运维架构 > Linux

UNIX/Linux进程间通信IPC系列(四)消息队列

2014-02-23 23:06 585 查看
消息队列

消息队列是消息的链表,存放在内核中并由消息队列标识符标识。在某个进程往一个队列写入消息之前,并不需要另外某个进程在该队列上等待消息的到达。这跟管道和FIFO是相反的,对后两者来说,除非读出者已存在,否则先有写入者是没有意义的。



管道和FIFO都是随进程持续的,XSI IPC(消息队列、信号量、共享内存)都是随内核持续的。

当一个管道或FIFO的最后一次关闭发生时,仍在该管道或FIFO上的数据将被丢弃。消息队列,除非内核自举或显式删除,否则其一直存在。



对于系统中的每个消息队列,内核维护一个定义在<sys/msg.h>头文件中的信息结构。

struct msqid_ds {

struct ipc_perm msg_perm ;

struct msg* msg_first ; //指向队列中的第一个消息

struct msg* msg_last ; //指向队列中的最后一个消息

……

} ;



msgget函数

调用的第一个函数通常是msgget,其功能是打开一个现存队列或创建一个新队列。

#include <sys/msg.h>

int msgget (key_t key, int oflag) ;

返回值是一个整数标识符,其他三个msg函数就用它来指代该队列。它是基于指定的key产生的,而key既可以是ftok的返回值,也可以是常值IPC_PRIVATE。

oflag是读写权限的组合(用于打开时)。它还可以是IPC_CREATE或IPC_CREATE | IPC_EXCL(用于创建时)。



创建消息队列(实例)

//创建一个消息队列,在创建时指定队列的最大消息数和每个消息的最大大小
//本程序使用说明:
//./create -e 路径名
#include <unistd.h>
#include <sys/types.h>
#include <sys/msg.h>
#include <sys/ipc.h> //包含ftok()
#include <fcntl.h>   //包含getopt() 及相关的外部变量\
#include <stdio.h>
#include <stdlib.h>

int 
main(int argc, char** argv)
{
    int c, flags ;
    int mqid ; //消息队列id

    flags = IPC_CREAT ; //设置消息队列的标记
    //从命令行读取 队列的最大消息数 消息的最大大小
    while ((c = getopt(argc, argv, "e")) != -1)
    {
        switch(c)
        {
        case 'e':
            flags |= IPC_EXCL ; //排他性创建:若已存在此名的消息队列,则返回错误
            break;
        }
    }

    //若用户没有指定选项
    if (optind != argc -1)
       puts("请按格式输入:[-e] [-m maxmsg] [-z msgsize] <name>") ;

    //创建消息队列
    mqid = msgget(ftok(argv[optind], 0), flags) ;

    exit(0) ;
}



msgsnd函数

使用msgsnd打开一个消息队列后,我们使用msgsnd往其上放置一个消息。

#include <sys/msg.h>

int msgsnd (int msqid, const void *ptr, size_t length, int flag) ;

其中msqid是由msgget返回的标识符。ptr是一个结构指针,该结构具有如下模板(我们需要按这个模板自己定义结构体)

struct mymesg {

long mtype ; //消息类型(大于0)

char mtext[512] ; //消息数据

} ;

//结构体的名字和其中变量名都由我们自己确定,我们只要按照这个模板定义即可。

消息数据mtext中,任何形式的数据都是允许的,无论是二进制数据还是文本,内核根本不解释消息数据的内容。(我们可以在消息的数据部分 再分割一部分 根据需要定义自己的通信协议)

参数length指定了待发送消息数据部分的长度。

参数flag的值可以指定为IPC_NOWAIT。这类似于文件IO的非阻塞IO标志。若消息队列已满,则指定IPC_NOWAIT使得msgsnd立即出错返回EAGAIN。

如果没有指定IPC_NOWAIT,则进程阻塞直到下述情况出现为止:①有空间可以容纳要发送的消息 ②从系统中删除了此队列(返回EIDRM“标识符被删除”)③捕捉到一个信号,并从信号处理程序返回(返回EINTR)



向消息队列发送消息(示例)

//向消息队列发送消息(把一个指定了长度和类型的消息放置到某个队列中)
//我们必须在不同进程中约定好 标识消息队列的 路径名和工程ID,
//可把此两者放置到一个头文件中,让用消息队列通信的进程代码都包含此头文件。
//
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ipc.h> //包含ftok
#include <sys/msg.h>
#include <sys/types.h>
#include <sys/fcntl.h>

#define MSG_W 0200
#define BUF_SIZE 512

typedef struct msgbuf
{
    long mtype ;
    char mdata[BUF_SIZE] ;
} mymsg_t ;

int 
main(int argc, char** argv)
{
    int            mqid ;    //消息队列的描述符
    size_t         msglen ;  //消息的长度
    long           msgtype ; //消息的类型
    mymsg_t*  ptr ;     //消息结构的指针

    //用户未按格式输入
    if (argc != 3)
        puts("usage: send <pathname> <type>") ;

    msgtype = atoi(argv[2]) ;

    //获取已存在消息队列的描述符
    mqid = msgget(ftok(argv[1], 0), MSG_W) ;

    //构造一条消息
    ptr = calloc(sizeof(long) + msglen, sizeof(char)) ;
    ptr->mtype = msgtype ;
    snprintf(ptr->mdata, BUF_SIZE, "Hi,Boy~") ;

    //发送消息
    msglen = strlen(ptr->mdata) ;
    msgsnd(mqid, ptr, msglen, 0) ;

   exit(0) ;
}



msgrcv函数

使用msgrcv函数从某个消息队列中读出一个消息。

#include <sys/msg.h>

ssize_t
msgrcv
(int msqid, void* ptr, size_t length, long type, int flag) ;

参数ptr指定所接收消息的存放位置。参数length指定了数据部分大小(只想要多长的数据)

参数type指定希望从队列中读出什么样的消息。

type == 0
返回队列中的第一个消息

type > 0
返回队列中消息类型为type的第一个消息

type < 0
返回队列中消息类型值小于或等于type绝对值的消息,如果这种消息有若干个。则取类型值最小的消息。

(如果一个消息队列由多个客户进程和一个服务器进程使用,那么type字段可以用来包含客户进程的进程ID)



参数flag可以指定为IPC_NOWAIT,使操作不阻塞。

//从一个消息队列中读出一个消息, -n命令行选项指定非阻塞 -t命令行选项指定接收的消息类型
//
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/ipc.h> //包含ftok
#include <sys/msg.h>
#include <sys/types.h>
#include <sys/fcntl.h>
#include <sys/stat.h>

#define MSG_R 0400
#define MSG_BUF_SIZE (512 + sizeof(long))

typedef struct msgbuf
{
    long mtype ;
    char mdata[MSG_BUF_SIZE-sizeof(long)] ;
} mymsg_t ;

int 
main(int argc, char** argv)
{
    int            c = 0 ;
    int            recvCntlFlag = 0 ;
    int            mqid = 0 ;
    long           msgtype = 0 ;
    ssize_t        recvBytes = 0 ; //已接收的字节数
    struct msgbuf* buf = NULL ;    //存储接收消息的缓冲区的指针   

    //读取用户输入的命令行
    while ((c = getopt(argc, argv, "nt:")) != -1)
    {
        switch(c)
        {
        case 'n' :
            recvCntlFlag |= IPC_NOWAIT ;
            break ;
        case 't' :
            msgtype = atol(optarg) ;
            break ;
        }
    }

    if (optind != argc-1)
        puts("usage:msgrcv [-n] [-t typeno] <pathname>") ;

    //获取要接收队列的id
    mqid = msgget(ftok(argv[optind], 0), MSG_R) ;

    //开辟缓冲区 接收消息
    buf = malloc(MSG_BUF_SIZE) ;
    recvBytes = msgrcv(mqid, buf, MSG_BUF_SIZE, msgtype, recvCntlFlag ) ;

    //输出消息内容
    printf("recv:%d bytes type=%ld %s",recvBytes, buf->mtype, buf->mdata) ;
    exit(0) ;
}



msgctl函数

msgctl函数提供在一个消息队列上的各种控制操作。

#include <sys/msg.h>

int msgctl (int msqid, in cmd, struct msqid_ds * buff) ;

参数cmd说明对由msqid指定的队列要执行的命令:

IPC_STAT
:取此队列的msqid_ds结构,并将它存放在buf指向的结构中。

IPC_SET
:按由buf指向结构中的值,设置与此队列相关结构中的字段。

IPC_RMID:从系统中删除该消息队列以及仍在该队列中的所有数据。

(这三条命令也可用于信号量和共享存储)

//删除一个队列,我们以IPC_RMID命令调用msgctl
//
#include <stdio.h>
#incldue <stdlib.h>
#include <sys/ipc.h> 
#include <sys/msg.h>

int 
main(int argc, char** argv)
{
    int mqid ;

    if (argc != 2)
        puts("usage:remove <pathname>") ;

    mqid = msgget(ftok(argv[1], 0), 0) ;
    msgctl(mqid, IPC_RMID, NULL) ;
    exit(0) ;
}
./create 任意一路径名
./send 路径名 消息类型(正整数)
./recv 路径名 ------或:./recv -t 消息类型 路径名
./remove 路径名


客户-服务器模型





//-------------------头文件msgqueue.h ------------------
#ifndef _MAGQUEUE_H_
#define _MAGQUEUE_H_
#include <sys/ipc.h> //包含ftok
#include <sys/msg.h>
#include <sys/types.h>

//消息队列的读 写模式掩码
#define MSG_W 0200
#define MSG_R 0400

//定义众所周知的消息队列键
#define MQ_KEY1 128L

#define DATA_SIZE 512

typedef struct msgbuf
{
    long mtype ;
    char mdata[DATA_SIZE] ;
} mymsg_t ;

#endif

//-----------------客户端进程-----------------

#include "msgqueue.h"
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

void client(int, int) ;

int 
main(int argc, char** argv)
{
    int  msgqid ;

    //打开消息队列
    msgqid = msgget(MQ_KEY1, 0) ;
    if (msgqid < 0)
    {
        puts("Open msg queue error!\n") ;
        exit(0) ;
    }

    client(msgqid, msgqid) ;
    exit(0) ;
}

void
client(int readfd, int writefd)
{
    mymsg_t msgToServer ;
    mymsg_t msgFromServer ;
    char*   writePtr ;
    ssize_t pidLen ;
    ssize_t dataLen ;
    ssize_t recvBytes ;
    int     pid ;

    //-------构造一条消息-----
    //消息类型为1
    msgToServer.mtype = 1 ;

    //在消息头部放本进程ID和空格
    pid = getpid() ;
    snprintf(msgToServer.mdata, DATA_SIZE, "%ld ", pid) ;
    pidLen = strlen(msgToServer.mdata) ;
    writePtr = msgToServer.mdata + pidLen ;

    //从标准输入读入文件路径
    fgets(writePtr, DATA_SIZE - pidLen, stdin) ;
    dataLen = strlen(msgToServer.mdata) ;  
    if (msgToServer.mdata[dataLen-1] == '\n') //删除换行符
    {
        msgToServer.mdata[dataLen-1] = '\0' ;
    }

    //发送消息
    if (msgsnd(writefd, &msgToServer, strlen(msgToServer.mdata), 0) == -1)
    {
        puts("Send Error!");
        exit(0) ;
    }

    //-----接收来自服务器的消息
    while ((recvBytes = msgrcv(readfd, &msgFromServer, DATA_SIZE, pid, 0)) > 0)
    {
        write(STDOUT_FILENO, msgFromServer.mdata, recvBytes) ;
    }

}

//---------------服务器端进程---------------
//消息队列是双向通信的,故用单个队列就够用。
//我们用每个消息的类型来标识该消息是从客户到服务器,还是从服务器到客户。
//客户向队列发类型为1、PID和路径名。
//服务器向队列发类型为客户进程ID的文件内容。
//
//小心死锁隐患:
//客户们可以填满消息队列,妨碍服务器发送应答,于是客户被阻塞在发送中,服务器也被阻塞。
//避免的方法是:约定服务器对消息队列总是使用非阻塞写。

#include "msgqueue.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

void server(int, int) ;

int 
main(int argc, char** argv)
{
    int  msgqid;

    //创建消息队列
    msgqid = msgget(MQ_KEY1, IPC_CREAT) ;
    if (msgqid < 0)
    {
        puts("Create msg queue error!\n") ;
        exit(0) ;
    }

    server(msgqid, msgqid) ;
    exit(0) ;
}

void
server(int readfd, int writefd)
{
    FILE*    fp ;
    pid_t    clientPid ;
    mymsg_t* msgPtr ;
    ssize_t  recvBytes ;
    char*    pathStr ;    

    for ( ; ; )
    {
        //从消息队列中读取来自客户的请求文件路径
        msgPtr = malloc(DATA_SIZE + sizeof(long)) ;
        recvBytes = msgrcv(readfd, msgPtr, DATA_SIZE, 1, 0) ; //阻塞读
        if (recvBytes <= 0)
        {
            puts("pathname missing") ;
            continue ;
        }
        msgPtr->mdata[recvBytes] = '\0' ;

        //分析消息,提取客户PID,文件路径
        if ((pathStr = strchr(msgPtr->mdata, ' ')) == NULL)
        {
            puts("bogus request!") ;
            continue ;
        }
        *pathStr++ = 0 ;
        clientPid = atol(msgPtr->mdata) ;

        //读取文件内容 返回给客户
        msgPtr->mtype = clientPid ; //msgPtr既作为接收消息 又用作发送消息
        if ((fp = fopen(pathStr, "r")) == NULL)
        {
            //读取文件失败,返回给客户失败信息(在原消息内容后 添加错误信息)
            snprintf(msgPtr->mdata + recvBytes, sizeof(msgPtr->mdata) -recvBytes, 
                    ": can't open!") ;

            if (msgsnd(writefd, msgPtr, strlen(msgPtr->mdata), IPC_NOWAIT) == -1)
            {
                puts("Send Error!");
                exit(0);
            }
        }
        else
        {   //copy文件内容 发给客户
            while (fgets(msgPtr->mdata, DATA_SIZE, fp) != NULL)
            {
                msgsnd(writefd, msgPtr, strlen(msgPtr->mdata), IPC_NOWAIT) ; //非阻塞写
            }
        }
    }//for()
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: