您的位置:首页 > 运维架构 > Linux

消息队列的简单实现

2016-05-22 14:41 519 查看
消息队列:一个进程向另一个进程发送数据块
消息队列基于消息,管道基于字节流

消息队列是用链表实现

1.创建:int megget(key_t key, int msgflag)
key:函数ftok()的返回值
msgflag:IPC_CREAT是创建新的消息队列;IPC_EXCL与IPC_CREAT一起使用,即如果要创建的消息队列已存在,则返回错误
成功:返回队列标识符
失败:-1
2.获取:ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg)
msqid:消息队列的识别码
msgp:指向消息缓冲区的指针,用来暂时存储发送和接收的消息,是用户自己定义的一个通用结构
struct msgbuf
{
long mtype;  //消息类型,必须> 0
char mtext[__SIZE__];  //消息文本
}
msgsz:消息大小
msgtyp:消息类型(>0,返回其类型为mtype的第一个消息)
msgflg:控制函数行为(0表示忽略)

成功:返回拷贝到mtext数组的实际字节数
失败:-1
3.发送:int msgsnd(int msqid, void *msgp, size_t msgsz, int msgflg)
成功:返回0
失败:-1
4.设置属性:int msgctl(int msqid, int cmd, struct msqid_ds *buf)
对msqid标识的消息队列执行cmd操作
cmd操作:IPC_STAT(获取对应的msqid_ds的数据结构,保存在buf指定的地址空间)
IPC_SET(设置消息队列的属性,保存在buf中)
IPC_RMID(从内核删除msqid标识的消息队列)
msqid_ds:描述队列当前状态
成功:0
失败:-1

comm.h
#ifndef __COMM__
#define __COMM__

#include <stdio.h>
#include <stdlib.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/types.h>
#include <string.h>

#define __SIZE__ 1024
#define FILEPATH "/tmp/.msg1"
#define ID 0

const int ser_send_type=1;
const int cli_send_type=2;

typedef struct msg_info
{
char mtext[__SIZE__];
long mtype;
}msginfo;

#endif

client.h
#ifndef __CLIENT__
#define __CLIENT__

#include "comm.h"

int cli_start();
int cli_end(int);

#endif
client.c
#include "client.h"

int msg_id= -1;

int cli_start()
{

//create the client's buf which storage the msg
msginfo cli_info;

//create the key
key_t key= ftok(FILEPATH, ID);
if(key< 0)
{
perror("ftok");
exit(1);
}
//get the ID of msg_queue
msg_id= msgget(key, 0);
if(msg_id< 0)
{
perror("cli get key id failed");
exit(2);
}

while(1)
{
//client said
printf("client:>");
fgets(cli_info.mtext, __SIZE__, stdin);  //write in the cli_info
//when it will exit?
if(strncasecmp(cli_info.mtext, "quit", 4)==0)
{
printf("client bye!\n");
break;
}
cli_info.mtype= cli_send_type;
//send the msg(send judge)
if(msgsnd(msg_id, &cli_info, sizeof(cli_info.mtext),0)== -1) //send fail
{
perror("client send msg fail");
exit(3);
}

//receive the msg
memset(cli_info.mtext, '\0', sizeof(cli_info.mtext));  //reset first
if(msgrcv(msg_id, &cli_info, sizeof(cli_info.mtext), ser_send_type, 0)==-1)  //receive fail
{
perror("client msgrcv fail");
exit(4);
}
printf("server:>%s\n", cli_info.mtext);  //the msg receive from server have storaged in the cli_info.mtext
fflush(stdout);
}
return 0;
}
int cli_end(int id)
{
if(msgctl(id, IPC_RMID, NULL)==-1)  //fail
{
perror("delete th msg_queue from kernel fail");
exit(5);
}
else
{
printf("delete the msg_queue from kernel success\n");
return 0;
}
}
static void delete_msg(void)
{
if(msg_id!= -1)  //the msg_queue exist
{
cli_end(msg_id);
}
printf("delete the msg queue end\n");
}
int main(int argc,char *argv[])
{
atexit(delete_msg);
if(cli_start()==0)
{
printf("cli start success\n");
}
else
{
printf("cli start failed\n");
}
return 0;
}

server.h
#ifndef __SERVER__
#define __SERVER__

#include "comm.h"

int ser_start();
int ser_end(int);

#endif
server.c
#include "server.h"

int msg_id= -1;
int ser_start()
{

//create the server's buf which storage the msg
msginfo ser_info;

//create the key
key_t key= ftok(FILEPATH, ID);
if(key< 0)
{
perror("ftok");
exit(1);
}
//get the ID of msg_queue
msg_id= msgget(key, IPC_CREAT|IPC_CREAT|0666);
if(msg_id< 0)
{
perror("ser get key id failed");
exit(2);
}

while(1)
{
//reveive the msg
if(msgrcv(msg_id, &ser_info, sizeof(ser_info.mtext), cli_send_type, 0)==-1) //fail
{
perror("server msgrcv fail");
exit(3);
}
printf("client:>%s\n",ser_info.mtext);  //first,output the client's msg

printf("server:>");  //then,server will said
memset(ser_info.mtext, '\0', sizeof(ser_info.mtext));  //reset first
fgets(ser_info.mtext, __SIZE__, stdin);  //write in the ser_info
//when it will exit?
if(strncasecmp(ser_info.mtext, "quit", 4)==0)
{
printf("server bye!\n");
break;
}
ser_info.mtype= ser_send_type;
//send the msg
if(msgsnd(msg_id, &ser_info, __SIZE__, 0)==-1)  //fail
{
perror("server msgsnd fail");
exit(4);
}
fflush(stdout);
}
return 0;
}
int ser_end(int id)
{
if(msgctl(id, IPC_RMID, NULL)== -1)  //fail
{
perror("delete the msg_queue from kernel fail");
exit(5);
}
else
{
printf("delete the msg_queue from kernel success\n ");
return 0;
}
}
static void delete_msg(void)
{
if(msg_id!= -1)  //the msg_queue exist
{
ser_end(msg_id);
}
printf("delete the msg_queue end\n");
}
int main(int argc, char *argv[])
{
atexit(delete_msg);
if(ser_start()==0)  //success
{
printf("ser start success\n");
}
else
{
printf("ser start failed\n");
}
return 0;
}

Makefile
.PHONY:all
all:client server
client:client.c
gcc -o $@ $^
server:server.c
gcc -o $@ $^
.PHONY:clean
clean:
rm -f client server
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Linux