您的位置:首页 > 运维架构 > Linux

四.Linux环境进程间通信(三):消息队列:POSIX 消息队列

2014-06-18 11:32 363 查看
POSIX消息队列与System V消息队列的主要区别:

1.对POSIX队列的读总数返回最高优先级到最早消息,对SV队列到读则可以返回任意指定优先级的消息

2.当往一个空队列放置一个消息时,POSIX允许产生一个信号或启动一个线程,System V不提供此机制

消息的属性:

1.一个无符号整数的优先级(POSIX)或一个长整数的类型(SV)

2.消息的数据部分长度(可以为0)

3.数据本身(如果长度大于0)

POSIX消息队列总结:

mq_open创建一个新队列或者打开一个已经存在的队列

mq_close关闭队列

mq_unlink删除队列名,删除队列

mq_send往队列放置消息

mq_receive从一个队列中读出消息

mq_setattr和mq_getattr查询和设置队列的属性

mq_notify允许注册一个信号或者线程,在有一个消息被放置到空队列时,发送信号或者激活线程

每个消息被赋予一个小整数优先级,mq_receive总是返回最高优先级的最早消息

限制:

/proc/sys/fs/mqueue/msg_max 10

/proc/sys/fs/mqueue/msgsize_max 8192

/proc/sys/fs/mqueue/queues_max 256

创建一个新的消息队列或者打开一个已经存在的消息队列

<mqueue.h> 注意:编译加-lrt

<fcntl.h>

<sys/stat.h>

mqd_t mq_open(const char *name, int oflag);

mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);

成功返回描述字,失败返回-1并设置errno

name: 必须为/开头!!!

oflag: O_RDONLY, O_WRONLY, O_RDWR, O_CREAT, O_EXCL, O_NONBLOCK

关闭消息队列,但不能删除它

mqd_t mq_close(mqd_t mqdes);

成功返回0,失败返回-1

删除消息队列,不一定马上删除消息队列,但队列名会立即删除

mqd_t mq_unlink(const char *name);

成功返回0,失败返回-1

当某个进程还没有关闭此消息队列时,调用mq_unlink时,不会马上删除队列,当最后一个进程关闭队列时,该队列被删除

int flags;

mqd_t mqd;

flags = O_RDWR | O_CREAT | O_EXCL;

mqd = mq_open("/tmp.111", flags, 0644, NULL);

if (mqd == (mqd_t)-1) {

perror("mq_open");

return 1;

}

消息队列的属性

mq_getattr mq_setattr

mqd_t mq_getattr(mqd_t mqdes, struct mq_attr *attr);

mqd_t mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr);

成功返回0,失败返回-1

struct mq_attr {

long mq_flags; /* Flags: 0 or O_NONBLOCK */

long mq_maxmsg; /* Max. # of messages on queue */

long mq_msgsize; /* Max. message size (bytes) */

long mq_curmsgs; /* # of messages currently in queue */

};

mq_setattr只能修改mq_flags属性,maxmsg和msgsize在mq_open时设置

mqd_t mqd;

struct mq_attr attr;

mqd = mq_open(argv[1], O_RDONLY);

mq_getattr(mqd, &attr);

printf("maxmsg=%ld, msgsize=%ld, curmsgs=%ld\n",

attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs);

mq_close(mqd);

收发消息

mq_send mq_receive

mq_receive返回队列中最高优先级的最早消息,而且该优先级能随该消息的内容及其长度一起返回

ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);

成功返回消息的长度,消息的实际长度,不包括消息头;失败返回-1

msg_len指示msg_ptr的长度,必须大于等于mq_msgsize

如果msg_prio不为NULL,函数返回消息的优先级

如果队列为空,调用将阻塞,如果队列设置0_NONBLOCK,调用立即返回EAGAIN

// 向队列加入一条消息

mqd_t mqd;

char *msg;

size_t len;

unsigned int prio;

len = 100;

prio = 5;

mqd = mq_open("/abc.123", O_WRONLY);

msg = (char *)malloc(len);

memset(msg, 0, len);

mq_send(mqd, msg, len, prio);

// 从队列读入一条消息

mqd_t mqd;

char *msg;

size_t len;

int n;

unsigned int prio;

struct mq_attr attr;

mqd = mq_open("/abc.123", O_RDONLY);

mq_getattr(mqd, &attr);

len = attr.mq_msgsize;

msg = (char *)malloc(len);

memset(msg, 0, len);

n = mq_receive(mqd, msg, len, &prio);

printf("read %ld bytes, priority=%u\n", (long)n, prio);

队列限制

long int open_max = sysconf(_SC_MQ_OPEN_MAX); // -1

long int prio_max = sysconf(_SC_MQ_PRIO_MAX); // 32768

消息通告

当往空队列放置了一个消息时,通知进程

通告方式有2种:

1. 产生一个信号

2. 创建一个线程执行一个指定的函数

mqd_t mq_notify(mqd_t mqdes, const struct sigevent *notification);

成功返回0;失败返回-1

给队列建立或者删除异步事件通知

1.如果notification非空,那么当前进程希望在有一个消息到达而且队列先前为空时得到通知,该进程被注册为接收该队列的通知

2.如果notification为空,而且当前进程目前被注册为接收该队列的通知,那么现有注册将被撤销

3.任意时刻只有一个进程可以被注册为接收队列的通知

4.当有一个消息到达一个空队列,而且已经有一个进程被注册为接收该队列的通知时,只有在没有任何线程阻塞在该队列的mq_receive调用的前提下,通知才会发送。即在mq_receive调用中的阻塞比任何通知的注册都优先

5.当该通知已经发送给它的注册进程时,其注册即被撤销。该进程必须再次调用mq_notify以重新注册

6.当调用mq_notify但是队列不为空时,通知不会发送;当队列变为空,并且有一个消息入队时,才发送通知

union sigval { /* Data passed with notification */

int sival_int; /* Integer value */

void *sival_ptr; /* Pointer value */

};

struct sigevent {

int sigev_notify; /* Notification method */

int sigev_signo; /* Notification signal */

union sigval sigev_value; /* Data passed with notification */

void (*sigev_notify_function) (union sigval);

/* Function for thread notification */

void *sigev_notify_attributes;

/* Thread function attributes */

};

sigev_notify:SIGEV_NONE,SIGEV_SIGNAL,SIGEV_THREAD

// 使用非阻塞mq_receive的信号通知

volatile sig_atomic_t mqflag;

static void sig_usr1(int);

int main(int argc, char *argv[])

{

mqd_t mqd;

void *buf;

ssize_t n;

sigset_t zeromask, newmask, oldmask;

struct mq_attr attr;

struct sigevent sigev;

assert(argc == 2);

mqd = mq_open(argv[1], O_RDONLY|O_NONBLOCK);

mq_getattr(mqd, &attr);

buf = malloc(attr.mq_msgsize);

sigemptyset(&zeromask);

sigemptyset(&newmask);

sigemptyset(&oldmask);

sigaddset(&newmask, SIGUSR1);

signal(SIGUSR1, sig_usr1);

sigev.sigev_notify = SIGEV_SIGNAL;

sigev.sigev_signo = SIGUSR1;

mq_notify(mqd, &sigev);

for ( ; ; ) {

sigprocmask(SIG_BLOCK, &newmask, &oldmask);

while (mqflag == 0)

sigsuspend(&zeromask);

mqflag = 0;

mq_notify(mqd, &sigev);

while ((n = mq_receive(mqd, buf, attr.mq_msgsize, NULL)) >= 0) {

printf("read %ld bytes\n", (long)n);

}

if (errno != EAGAIN)

die("mq_receive");

sigprocmask(SIG_UNBLOCK, &newmask, NULL);

}

return 0;

}

static void sig_usr1(int signo)

{

mqflag = 1;

return;

}

// 使用sigwait代替信号处理程序的信号通知

#include <signal.h>

int sigwait(const sigset_t *set, int *sig);

成功返回0,并设置sig为收到的信号;失败返回错误码

int main(...)

{

...

sigemptyset(&newmask);

sigaddset(&newmask, SIGUSR1);

sigprocmask(SIGBLOCK, &newmask, NULL);

sigev.sigev_notify = SIGEV_SIGNAL;

sigev.sigev_signo = SIGUSR1;

mq_notify(mqd, &sigev);

for ( ; ; ) {

sigwait(&newmask, &signo);

if (signo == SIGUSR1) {

mq_notify(mqd, &sigev);

while ((n = mq_receive(mqd, buf, len, NULL)) >=0) {

printf("read %ld bytes\n", n);

}

if (errno != EAGAIN)

die("mq_receive");

}

}

...

}

// 使用select的POSIX消息队列

int pfds[2];

static void sig_usr1(int);

int main(int argc, char *arg[])

{

int fds;

char c;

fd_set rfds;

mqd_t mqd;

void *buf;

ssize_t n;

size_t len;

struct mq_attr attr;

struct sigevent sigev;

asset(argc == 2);

mqd = mq_open(argv[1], O_RDONLY|O_NONBLOCK);

mq_getattr(mqd, &attr);

len = attr.mq_msgsize;

buf = malloc(len);

pipe(pfds);

// 设置信号处理程序,建立通知

signal(SIGUSR1, sig_usr1);

sigev.sigev_notify = SIGEV_SIGNAL;

sigev.sigev_signo = SIGUSR1;

mq_notify(mqd, &sigev);

FD_ZERO(&rfds);

for ( ; ; ) {

FD_SET(pfds[0], &rfds);

nfds = select(pfds[0]+1, &rfds, NULL, NULL, NULL);

if (FD_ISSET(pfds[0], &rfds)) { // 管道可读

read(pfds[0], &c, 1);

mq_notify(mqd, &sigev);

while ((n = mq_receive(mqd, buf, len, NULL)) >= 0) {

printf("read %ld bytes\n", (long)n);

}

if (errno != EAGAIN)

die("mq_receive");

}

}

return 0;

}

static void sig_usr1(int signo)

{

write(pfds[1], "", 1); // 异步信号处理安全的函数

return;

}

// 收到通知后,启动一个线程,接收消息,然后结束进程

#include <pthread.h>

#include <mqueue.h>

#include <assert.h>

#include <stdio.h>

#include <stdlib.h>

#include <unistd.h>

#define die(msg) { perror(msg); exit(EXIT_FAILURE); }

static void tfunc(union sigval sv) /* Thread start function */

{

struct mq_attr attr;

ssize_t nr;

void *buf;

mqd_t mqdes = *((mqd_t *) sv.sival_ptr);

/* Determine max. msg size; allocate buffer to receive msg */

if (mq_getattr(mqdes, &attr) == -1) die("mq_getattr");

buf = malloc(attr.mq_msgsize);

if (buf == NULL) die("malloc");

nr = mq_receive(mqdes, buf, attr.mq_msgsize, NULL);

if (nr == -1) die("mq_receive");

printf("Read %ld bytes from MQ0\n", (long) nr);

free(buf);

exit(EXIT_SUCCESS); /* Terminate the process */

}

int main(int argc, char *argv[])

{

mqd_t mqdes;

struct sigevent not;

assert(argc == 2);

mqdes = mq_open(argv[1], O_RDONLY);

if (mqdes == (mqd_t) -1) die("mq_open");

not.sigev_notify = SIGEV_THREAD;

not.sigev_notify_function = tfunc;

not.sigev_notify_attributes = NULL;

not.sigev_value.sival_ptr = &mqdes; /* Arg. to thread func. */

if (mq_notify(mqdes, ¬) == -1) die("mq_notify");

pause(); /* Process will be terminated by thread function */

return 0;

}

// 启动一个新线程

mqd_t mqd;

struct mq_attr attr;

struct sigevent sigev;

static void notify_thread(union sigval);

int main(int argc, char *argv[])

{

assert(argc == 2);

mqd = mq_open(argv[1], O_RDONLY|O_NONBLOCK);

mq_getattr(mqd, &attr);

sigev.sigev_notify = SIGEV_THREAD;

sigev.sigev_value.sival_ptr = NULL;

sigev.sigev_notify_function = notify_thread;

sigev.sigev_notify_attributes = NULL;

mq_notify(mqd, &sigev);

for ( ; ; )

pause();

return 0;

}

static void notify_thread(union sigval arg)

{

ssize_t n;

size_t len;

void *buf;

len = attr.mq_msgsize;

printf("notify_thread started\n");

buf = malloc(len);

mq_notify(mqd, &sigev);

while ((n = mq_receive(mqd, buf, len, NULL)) >= 0) {

printf("read %ld bytes\n", (long)n);

}

if (errno != EAGAIN)

die("mq_receive");

free(buf);

pthread_exit(NULL);

}

POSIX实时信号

unix信号分为两大组:

实时信号:SIGRTMIN--SIGRTMAX

其他信号:SIGINT, SIGQUIT, SIGKILL, ...

信号的实时行为取决于SA_SIGINFO

实时行为包含以下特征:

1.信号是排队的,即如果一个信号产生了3次,它就递交3次。以FIFO的顺序排队

2.当有多种SIGRTMIN到SIGRTMAX范围内的解阻塞信号排队时,值较小的信号先于值较大的信号递交(注意:linux与此相反)

3.当某个非实时信号递交时,传递给它的信号处理的唯一参数是该信号的值,实时信号比其他信号传递更多的信息

4.有些新函数使用实时信号工作,如sigqueue用来代替kill

// 查看实时信号的递交顺序

static void sig_rt(int, siginfo_t *, void *);

int main(void)

{

int i, j;

pid_t pid;

sigset_t newset;

union sigval val;

printf("SIGRTMIN=%d, SIGRTMAX=%d\n", (int)SIGRTMIN, (int)SIGRTMAX);

pid = fork();

if (pid < 0) die("fork");

else if (pid == 0) {

/* 阻塞3个实时信号 */

sigemptyset(&newset);

sigaddset(&newset, SIGRTMIN);

sigaddset(&newset, SIGRTMIN+1);

sigaddset(&newset, SIGRTMIN+2);

sigprocmask(SIG_BLOCK, &newset, NULL);

signal_rt(SIGRTMIN, sig_rt);

signal_rt(SIGRTMIN+1, sig_rt);

signal_rt(SIGRTMIN+2, sig_rt);

sleep(6);

sigprocmask(SIG_UNBLOCK, &newset, NULL);

sleep(3);

exit(0);

}

else {

sleep(3);

for (i=SIGRTMIN; i<=SIGRTMIN+2; i++) {

for (j=0; j<=2; j++) {

val.sival_int = j;

sigqueue(pid, i, val);

printf("send signal signo=%d, val=%d\n", i, j);

}

}

exit(0);

}

}

static void sig_rt(int signo, siginfo_t *info, void *context)

{

printf("receive signal signo=%d, code=%d, ival=%d\n",

signo, info->si_code, info->si_value.sival_int);

}

typedef void sigfunc_rt(int, siginfo_t *, void *);

sigfunc_rt *signal_rt(int signo, sigfunc_rt *func)

{

struct sigaction act, oact;

act.sa_sigaction = func;

sigemptyset(&act.sa_mask);

act.sa_flags = SA_SIGINFO; /* 实时信号必须指定 */

if (signo == SIGALRM) {

#ifdef SA_INTERRUPT

act.sa_flags |= SA_INTERRUPT;

#endif

}

else {

#ifdef SA_RESTART

act.sa_flags |= SA_RESTART;

#endif

}

if (sigaction(signo, &act, &oact) < 0)

return (sigfunc_rt *)SIG_ERR;

else

return oact.sa_sigaction;

}

输出如下:

[root@jiangkun unp]# ./rtsig

SIGRTMIN=34, SIGRTMAX=64

send signal signo=34, val=0

send signal signo=34, val=1

send signal signo=34, val=2

send signal signo=35, val=0

send signal signo=35, val=1

send signal signo=35, val=2

send signal signo=36, val=0

send signal signo=36, val=1

send signal signo=36, val=2

receive signal signo=36, code=-1, ival=0

receive signal signo=36, code=-1, ival=1

receive signal signo=36, code=-1, ival=2

receive signal signo=35, code=-1, ival=0

receive signal signo=35, code=-1, ival=1

receive signal signo=35, code=-1, ival=2

receive signal signo=34, code=-1, ival=0

receive signal signo=34, code=-1, ival=1

receive signal signo=34, code=-1, ival=2

struct sigaction {

void (*sa_handler)(int);

void (*sa_sigaction)(int, siginfo_t *, void *);

sigset_t sa_mask;

int sa_flags;

void (*sa_restorer)(void); /* 被遗弃了! */

};

实时信号之所以是可靠的,因为在进程阻塞该信号的时间内,发给该进程的所有实时信号会排队,而非实时信号则会合并为一个信号。早期的kill函数只能向特 定的进程发送一个特定的信号,并且早期的信号处理函数也不能接受附加数据。siqueue和sigaction解决了这个问题。

下面这个例子中,进程先屏蔽SIGINT和SIGRTMIN两个信号,其中SIGINT是非实时信号,而SIGRTMIN为实时信号,接着进程睡眠,睡眠完成之后再接触对这两个信号的屏蔽,此时可以比较对两种信号的处理方式是否一样。

#include <stdio.h>

#include <string.h>

#include <signal.h>

#include <unistd.h>

void sig_handler(int, siginfo_t*, void*);

int main(int argc,char *argv[])

{

struct sigaction act;

sigset_t newmask, oldmask;

int rc;

sigemptyset(&newmask);

/* 往信号集中添加一个非实时信号 */

sigaddset(&newmask, SIGINT);

/* 往信号集中添加一个实时信号 */

sigaddset(&newmask, SIGRTMIN);

/* 屏蔽实时信号SIGRTMIN */

sigprocmask(SIG_BLOCK, &newmask, &oldmask);

act.sa_sigaction = sig_handler;

act.sa_flags = SA_SIGINFO;

if(sigaction(SIGINT, &act, NULL) < 0) {

printf("install signal error\n");

}

if(sigaction(SIGRTMIN, &act, NULL) < 0) {

printf("install signal error\n");

}

printf("pid = %d\n", getpid());

/* 进程睡眠,在此时间内的发给该进程的所有实时信号 将排队,不会有信号丢失 */

sleep(20);

/* 解除对SIGRTMIN信号的屏蔽,信号处理函数将会被调用 */

sigprocmask(SIG_SETMASK, &oldmask, NULL);

return 0;

}

void sig_handler(int signo, siginfo_t *info, void *context)

{

if(signo == SIGINT)

printf("Got a common signal\n");

else

printf("Got a real time signal\n");

}

将程序编译好之后,再开一个终端用于发送实时信号。

# ./sigqueue_receive

pid = 8871

进程开始睡眠……

在新的终端输入:

ecy@ecy-geek:~/pthreads$ kill -SIGRTMIN 8871

ecy@ecy-geek:~/pthreads$ kill -SIGRTMIN 8871

ecy@ecy-geek:~/pthreads$ kill -SIGRTMIN 8871

ecy@ecy-geek:~/pthreads$ kill -SIGRTMIN 8871

连续发送四个SIGRTMIN,接着回到之前的终端,连续四次按下"ctrl+c"。

^C^C^C^C

最后进程终于醒来,整个输出如下:

pid = 8871

^C^C^C^CGot a real time signal

Got a real time signal

Got a real time signal

Got a real time signal

Got a common signal

果然接受到四个实时信号,并且四次调用了信号处理函数,而对于SIGINT,虽然也按下了四次"ctrl+c",但是进程对其只做一次处理。这个例子中是先发实时信号后发非实时信号,所以信号处理函数先处理实

时信号,如果只是按照顺序注册信号的话,这很好理解,但是换一下,先按下了四次"ctrl+c"然后使用kill发四次实时信号,结果发现输出的结果仍然 一样,这说明实时信号的优先级比非实时信号要高,内核每个进程的信号组成一个双向链表,实时信号插入的时候就不是随便插在尾部了。

信号的优先级:信号实质上是软中断,中断有优先级,信号也有优先级。如果一个进程有多个未决信号,则对于同一个未决的实时信号,内核将按照发送的顺序来递 交信号。如果存在多个未决的实时信号,则值(或者说编号)越大的越先被递送。如果既存在不可靠信号,又存在可靠信号(实时信号),虽然POSIX对这一情 况没有明确规定,但Linux系统和大多数遵循POSIX标准的操作系统一样,将优先递交可靠信号。一个进程如果处理 SIGQUIT(3),SIGINT(2),SIGHUP(1)(通过"kill -l" 可以查看信号的编号),那么先后给该进程发送SIGINT,SIGHUP,SIGQUIT,处理的顺序会是SIGQUIT,SIGINT,SIGHUP,
不论改变这个三个信号的发送顺序,处理的顺序都是一样的。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: