您的位置:首页 > 其它

进程通信之消息队列

2017-05-17 10:19 453 查看
在前面,我们了解进程通信的概念以及常见的进程通信方式,我做个整理如下图:



我们继续通过实例来学习进程通信,今天主要是讲消息队列是如何实现进程通信的,代码均在Ubuntu16.04下测试。



如上图所示,消息队列是一种间接的通信方式,它提供了一种从一个进程向另一个进程发送一个数据块的方法。 每个数据块都被认为含有一个类型,接收进程可以独立地接收含有不同类型的数据结构。我们可以通过发送消息来避免命名管道的同步和阻塞问题。但是消息队列与命名管道一样,每个数据块都有一个最大长度的限制。

注意消息队列和管道的不同点

消息队列是双向通信,管道一般都是单向通信

消息队列是基于消息(因此有类型这么一说)的,而管道是基于字节流的

消息队列的生命周期是随内核存在而存在,而非管道随进程存在

消息队列一般会涉及到这么些函数:



对于这些,我们可以在Linux下同man命令获取信息,我们简要介绍一下这些函数。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgget(key_t key, int msgflg);


msgget用于创建一个新的消息队列或者打开一个现存的消息队列

其中key用于区别不同的消息队列,这样两个不相关进程可以通过事先约定的key值通过消息队列进行消息收发。例如进程A向key消息队列发送消息,进程B从Key消息队列读取消息。一般来说,key可通过ftok函数获得。

而msgflg则是一个权限标志,表示消息队列的访问权限,它与文件的访问权限一样。它一般指定两个参数:IPC _ CREAT和IPC _ EXCL,如果单独用IPC _ CREAT时候,它就创建一个消息队列,如果该消息队列已存在则打开,如果指定IPC _ CREAT和IPC _ EXCL,如果该消息队列存在的话则出错。

注意IPC _ EXCL单独使用无意义,他只有和IPC _ CREAT一起使用才有意义,保证消息队列是新建的而非已有的!

该函数返回一个以key命名的消息队列的标识符(非零整数),失败时返回-1。

因此创建create _ msg _ queue或者获取get _ msg _ queue一个消息队列可以这么写:

int msg_queue(int msgflag)
{
key_t key = ftok(PATH, PROJ_ID);
if (key == -1)
{
printf("ftok failure\n");
exit(-1);
}

int msg_queue_id = msgget(key, msgflag | 0666); //消息队列的权限为0666
if (msg_queue_id == -1)
{
printf("queue_id failure\n");
exit(-2);
}
return msg_queue_id;
}

int get_msg_queue()
{
return msg_queue(IPC_CREAT);
}

int create_msg_queue()
{
return msg_queue(IPC_CREAT | IPC_EXCL);
}


对于这个消息队列,我采用是server端创建消息队列,client获取消息队列,这样子,他们两就可以进行进程通信了。我们可以用ipcs -q命令查看消息队列,如下图:



消息队列的id为32768,而client和server端打印也符合要求。

注意一点,当我们执行完程序后,用ipcs -q依旧能查看消息队列,此时client和server进程已经结束,因此也验证我们前面所说的,消息队列的生命周期是随内核存在的

那么,这样一来问题就来了,如果我们进程中不去显式销毁消息队列,显然消息队列会越来越多占用资源,因此,我们需要删除消息队列,这里有两个办法,一是使用ipcrm -q msqid命令删除,二是采用函数调用删除。我们分别说一下。



而更多的,我们采用调用函数销毁消息队列,而在这里,需要调用下面这个函数:

int msgctl (int msqid, int cmd, struct msqid_ds *buf); //消息队列属性控制
参数:
msqid:消息队列的标识符。
cmd:执行的控制命令,即要执行的操作。包括以下选项:
IPC_STAT:读取消息队列属性。取得此队列的msqid_ds 结构,并将其存放在buf指向的结构中。
IPC_SET :设置消息队列属性。
IPC_RMID:删除消息队列。
IPC_INFO:读取消息队列基本情况。此命令等同于 ipcs 命令。
这 4 条命令(IPC_STAT、IPC_SET、IPC_INFO 和 IPC_RMID)也可用于信号量和共享存储。
buf:临时的 msqid_ds 结构体类型的变量。用于存储读取的消息队列属性或需要修改的消息队列属性。

举例:msgctl(qid, IPC_RMID, NULL)  //删除消息队列


因此删除代码就很好写了:

void destroy_msg_queue(int msqid)
{
int ret = msgctl(msqid, IPC_RMID, NULL);
if (ret == -1)
{
fprintf(stderr, "destroy_msg_queue failure\n");
exit(-3);
}
}


创建好一个消息队列,client和server两个也都得到消息队列ID,下面就开始我们的进程通信,也就是发送信息,接收信息。

下面介绍msgsnd,msgrcv函数。

int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg); //将消息送入消息队列
参数:
msqid:消息队列的标识符。
msgp:指向消息缓冲区的指针,此位置用来暂时存储发送和接收的消息,是一个用户可定义的通用结构,形态如下

struct msgbuf {
long mtype;     /* 消息类型,必须 > 0 */
char mtext[size];  /* 消息文本 */
};
msgsz:消息的大小。
msgflg:用来指明进程在队列数据满(msgsnd)或空(msgrcv)的情况下所应采取的行动。
如果设置为 IPC_NOWAIT,则在消息队列已满时不发送消息并且调用进程立即返回错误信息EAGAIN。
如果设置为 0,则调用进程阻塞直至消息队列不为满。
size:用户可指定大小
返回说明:
成功执行时,msgsnd()返回0, 失败返回-1

举例:msgsnd(g_msg_id,&msg_snd,sizeof(msg_snd.msg_item),IPC_NOWAIT); //非阻塞发送消息

ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg); //从消息队列读取信息
参数:
msgtyp:
msgtyp = 0:收取队列中的第一条消息,任意类型。
msgtyp > 0:收取第一条 msgtyp 类型的消息。  这个能判断自己该不该收这个消息是不是想要的发信人发来的
msgtyp < 0:收取第一条最低类型(小于或等于 msgtyp 的绝对值)的消息。
其他参数参考msgsnd函数。

返回说明:
成功执行时,msgrcv()返回0, 失败返回-1

举例:msgrcv(msgid,&msg_rbuf,sizeof(msg_rbuf.msg_item),10,0); //阻塞接收


实例

介绍完以上函数特性,我们写一个进程通信的简单实例,类似一个小型聊天程序,基于消息队列实现。

如下图:



它有客户端client和服务器端server,初始化,我们让server端建立消息队列,client获取该消息队列,然后两者进行相互通信,比如client发出信息,server接受信息,然后server发送信息,client接受信息。

注意几点:

消息队列是由server端建立和销毁的,client端获取改消息队列,因此,必须先运行./ server然后运行./client

默认是client先发信息,server端获取到消息,然后server发送信息,client接受这样子体现交互性

代码如下:

Com.h声明

#ifndef __MSG_QUEUE__
#define __MSG_QUEUE__

#include<unistd.h>
#include<stdio.h>
#include<stdlib.h>
#include<sys/ipc.h>
#include<sys/msg.h>
#include<string.h>

#define CLIENT_TYPE 1
#define SERVER_TYPE 2

#define PATH "./"
#define PROJ_ID 256
#define TEXT_SIZE 512

struct MsgBuf
{
long mtype;
char mtext[TEXT_SIZE];
};

typedef struct MsgBuf MsgBuf;

int msg_queue();

int get_msg_queue();

int create_msg_queue();

int send_msg(int msg_queue_id, MsgBuf* msgbuf, long type);

int receive_msg(int msg_queue_id, MsgBuf* msgbuf, long type);

void destroy_msg_queue(int msqid);

#endif


Com.c对Com.h实现

#include "com.h"

int msg_queue(int msgflag)
{
key_t key = ftok(PATH, PROJ_ID);
if (key == -1)
{
fprintf(stderr, "ftok failure\n");
exit(-1);
}

int msg_queue_id = msgget(key, msgflag | 0666);
if (msg_queue_id == -1)
{
fprintf(stderr, "queue_id failure\n");
exit(-2);
}
return msg_queue_id;
}

int get_msg_queue()
{
return msg_queue(IPC_CREAT);
}

int create_msg_queue()
{
return msg_queue(IPC_CREAT | IPC_EXCL);
}

void destroy_msg_queue(int msqid) { int ret = msgctl(msqid, IPC_RMID, NULL); if (ret == -1) { fprintf(stderr, "destroy_msg_queue failure\n"); exit(-3); } }

int send_msg(int msg_queue_id, MsgBuf* msgbuf, long type)
{
msgbuf->mtype = type;
msgsnd(msg_queue_id, (const void*)msgbuf, sizeof(msgbuf->mtext), 0); //以阻塞方式发送
}

int receive_msg(int msg_queue_id, MsgBuf* msgbuf, long type)
{
msgbuf->mtype = type;
msgrcv(msg_queue_id, (void*)msgbuf, sizeof(msgbuf->mtext), type, 0);
}


Server.c

#include "com.h"

int main()
{
int msg_queue_id = create_msg_queue();
printf("server succeeds in connecting the client\n\n");

MsgBuf msgbuf;
//memset(msgbuf.mtext, '\0', TEXT_SIZE);

while (1)
{
receive_msg(msg_queue_id, &msgbuf, CLIENT_TYPE);
printf("client # %s\n", msgbuf.mtext);
msgbuf.mtext[0] = '\0';//清空缓冲区

printf("please input the message # ");
scanf("%s", msgbuf.mtext);
send_msg(msg_queue_id, &msgbuf, SERVER_TYPE);
msgbuf.mtext[0] = '\0';//清空缓冲区
}

destroy_msg_queue(msg_queue_id);
return 0;
}


Client.c

#include"com.h"

int main()
{
int msg_queue_id = get_msg_queue();
printf("client succeeds in connecting with the server!\n\n");

MsgBuf msgbuf;
//memset(msgbuf.mtext, '\0', TEXT_SIZE);

while (1)
{
printf("please input the message # ");
scanf("%s", msgbuf.mtext);
send_msg(msg_queue_id, &msgbuf, CLIENT_TYPE);
msgbuf.mtext[0] = '\0';//清空缓冲区

receive_msg(msg_queue_id, &msgbuf, SERVER_TYPE); //client希望接收server端的信息
printf("server # %s\n", msgbuf.mtext);
msgbuf.mtext[0] = '\0';//清空缓冲区
}

return 0;
}


makefile

.PHONY:all
all:client server
client:client.c com.c
gcc -o $@ $^
server:server.c com.c
gcc -o $@ $^
.PHONY:clean
clean:
rm -f client server
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: