您的位置:首页 > 运维架构 > Linux

linux 消息队列

2014-01-04 17:51 211 查看
linux消息队列
消息队列的实现包括创建或打开消息队列、添加消息、读取消息和控制消息队列这四种操作。

其中创建或打开消息队列使用的函数是msgget,这里创建的消息队列的数量会受到系统消息队列数量的限制;

添加消息使用的函数是msgsnd函数,它把消息添加到已打开的消息队列末尾;

读取消息使用的函数是msgrcv,它把消息从消息队列中取走,与FIFO不同的是,这里可以指定取走某一条消息;
最后控制消息队列使用的函数是msgctl,它可以完成多功能。

函数介绍:

1.msgget函数:

msgget- get a message queue identifier

头文件:
#include<sys/types.h>

#include<sys/ipc.h>

#include<sys/msg.h>

函数原型: intmsgget(key_t key, int msgflg);

msgget()要么返回一个新创建的消息队列的标识符,要么返回具有相同关键字值的>队列的标识符。
key:是一个全系统唯一的关键字,标识要获取或创建的消息队列,其他想要与这个队列连接的进程也必须使用相同的关键字
msg_flag:创建一个队列时,这个标志是IPC_CREATE与权限标志的逻辑或(权限标志和标准文件权限标志相同--读、写和执行,消息队列的用户号和组号由创建它的进程决定)。
例子:
msgid= msgget(KEY,IPC_CREAT|0777))
返回值:
成功:消息队列ID
出错:-1

key:可以用ftok来建立,也可以用户自定义。注意,两个进程的key相同最终msgget才能返回一个相同的msgqid。如果key不相同,生成的msgqid自然不相等,那么这两个进程之间无法通信。
/*ftok - convert a pathname and a project identifier to a System VIPC

key
key_tftok(const char *pathname, int proj_id);

#include<sys/types.h>

#include<sys/ipc.h>

*/
2.msgsnd函数

头文件:
#include<sys/types.h>

#include <sys/ipc.h>

#include<sys/msg.h>

函数原型:intmsgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

发送消息给消息队列

msgb参数是结构体的指针:
structmsgbuf {

longmtype; /* message type, must be > 0 */

charmtext[1]; /* message data */

};
mtext的大小是由
msgsz指定的。
msqid:消息队列ID(msgget返回)
msgflag:

IPC_NOWAIT:若消息并没有立即发送而调用进程会立即返回。
0:msgsnd调用阻塞直到条件满足为止。
返回值:
0成功
-1失败
例子:
mtext是一个消息内容可以为任意格式,例如:
Structprivate_msgbuf {
long mtype;
char name[30];
char gender;
unsigned int age;
char cell[20];
};
Key_t key;
Intmsqid;
structprivate_msgbuf pmb = {2, “Yuhui”, ‘M’, 30, “13810933881”};
msqid= msgget(8888, IPC_CREAT | 0666);
Msgsnd(msqid,(struct msgbuf *) &pmb, sizeof(pmb), 0);

实例:
先定义结构体和宏
structmsq_command {

unsignedint msq_main_cmd;
unsignedint msq_sub_cmd;
unsignedint msq_res;
charmsq_data[1];
};
这是消息的具体内容
structmsq_buffer {
longm_type;
charm_text[LENGTH_MEDIUM];
};
intmsgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
函数中的msgp参数

#defineMSQ_REQUEST_SND_ID 19790416
#defineMSQ_RESPONSE_RCV_ID 19790416

#defineMSQ_REQUEST_RCV_ID 20091105
#defineMSQ_RESPONSE_SND_ID 20091105

#defineMSQ_CMD_GET 1
#defineMSQ_CMD_SET 2

#defineGUARDIAN_USER 0
#defineROOT_USER 1
#defineCOMMON_USER 2
#defineWEB_USER 3
#defineSNMP_USER 4
发送端:

intrcv_size;
intmsq_request_snd_id;//获得消息队列id
intmsq_request_rcv_id;

structmsq_buffer rcv_data;//接受数据
structmsq_buffer snd_data;//发送数据
structmsq_command * msq_snd;//发送数据消息正文
structmsq_command * msq_rcv;//接受消息正文
//int msgget(key_t key, int msgflg);

msq_request_snd_id= msgget(MSQ_REQUEST_SND_ID, IPC_CREAT | 0600);
msq_request_rcv_id= msgget(MSQ_REQUEST_RCV_ID, IPC_CREAT | 0600);
msq_snd= (struct msq_command *) snd_data.m_text;//强制类型转换.共享内存,指针指向同一个位置
snd_data.m_type= COMMON_USER;
msq_snd->msq_main_cmd= MSQ_CMD_GET;
msq_snd->msq_sub_cmd= CMD_ID_GET_SYS_VERSION;
msq_snd->msq_res= 0;
//int msgsnd(int msqid, const void *msgp, size_t msgsz,int msgflg);
if(0 != msgsnd(msq_request_snd_id, &snd_data, LENGTH_MEDIUM, 0))
printf("msggeterror: %s.\n", strerror(errno));

printf("--- send finished.\n");
printf("--- waiting for response.\n");
//接受返回消息
rcv_size= msgrcv(msq_request_rcv_id, &rcv_data, LENGTH_MEDIUM, 0, 0);

if(rcv_size > 0)
perror("msgrcv");

msq_rcv= (struct msq_command *)rcv_data.m_text;

printf("msq_main_cmdis: %d.\n", msq_rcv->msq_main_cmd);
printf("msq_sub_cmdis: %d.\n", msq_rcv->msq_sub_cmd);
printf("msq_resis: %d.\n", msq_rcv->msq_res);
printf("msq_data is: %s.\n", msq_rcv->msq_data);
这是发送消息:

接受消息:

接受端:
首先,我们先初始化我们的服务端,接收命令处理端。代码如下:

intinit_msq_command_response (void)
{
pid_tpid;
volatileunsigned int i = 0;

printf("--- in init.\n");
//建立子进程
if((pid = fork()) < 0) {
printf("FatalError! Can not fork: %s.\n.", strerror(errno));
return-1;
}
elseif (0 == pid) {
i++;
if(1 == i)
printf("messagequeue process is running.\n");
//子进程执行函数
msq_command_response();
}

return0;
}

接下来,是我们的消息队列的处理端,代码如下:

intmsq_command_response (void)
{
intmsq_response_rcv_id, msq_response_snd_id;
intrcv_size;
structmsq_buffer rcv_data;
structmsq_buffer snd_data;
structmsq_command * msq_cmd;
structmsq_command * msq_res;
//得到消息队列id
msq_response_rcv_id= msgget(MSQ_RESPONSE_RCV_ID, IPC_CREAT | 0600);
msq_response_snd_id= msgget(MSQ_RESPONSE_SND_ID, IPC_CREAT | 0600);

if((-1 == msq_response_snd_id) || (-1 == msq_response_rcv_id)) {
printf("FatalError! Can not create Message Queue: %s.\n.", strerror(errno));
return-1;
}
//等待消息的到来
while(1) {
rcv_size= msgrcv (msq_response_rcv_id, &rcv_data, LENGTH_MEDIUM, 0, 0);
//接受消息
if(-1 == rcv_size)
continue;//如果没有接受到消息则退出再次接受
//接受到了消息
msq_cmd= (struct msq_command *) rcv_data.m_text;//指向同一块位置
/*snd_data.m_type= COMMON_USER;
msq_snd->msq_main_cmd= MSQ_CMD_GET;
msq_snd->msq_sub_cmd= CMD_ID_GET_SYS_VERSION;
msq_snd->msq_res= 0;
发送端发送的消息
*/
/*Fill message back parameters */
snd_data.m_type= rcv_data.m_type;
msq_res= (struct msq_command *) snd_data.m_text;
msq_res->msq_main_cmd= msq_cmd->msq_main_cmd;
msq_res->msq_sub_cmd= msq_cmd->msq_sub_cmd;
msq_res->msq_res= msq_cmd->msq_res;

printf("Gotmessage, msq_cmd->msq_main_cmd is: %d.\n",msq_cmd->msq_main_cmd);
printf("Gotmessage, msq_cmd->msq_sub_cmd is: %d.\n",msq_cmd->msq_sub_cmd);
printf("Gotmessage, msq_cmd->msq_res is: %d.\n", msq_cmd->msq_res);

if(MSQ_CMD_SET == msq_cmd->msq_main_cmd) {
printf("MessageQueue, receive SET function.\n ");
}else if (MSQ_CMD_GET == msq_cmd->msq_main_cmd) {
switch(msq_cmd->msq_sub_cmd)
{
caseCMD_ID_GET_SYS_VERSION:
strcpy(msq_res->msq_data,"0.0.4");
//返回处理消息
if(0 != msgsnd(msq_response_snd_id, &snd_data, LENGTH_MEDIUM, 0))
printf("msggeterror: %s.\n", strerror(errno));
printf("--- msq_command_response send message finished.\n");

break;
caseCMD_ID_GET_SYS_NAME:
break;
caseCMD_ID_SET_SYS_NAME:
break;
caseCMD_ID_GET_SYS_CONTACT:
break;
caseCMD_ID_SET_SYS_CONTACT:
break;
caseCMD_ID_GET_SERIAL_TAG:
break;
caseCMD_ID_GET_SERVICE_TAG:
break;
default:
break;

}
}
}

return0;

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: