您的位置:首页 > 其它

同一个父进程的多个子进程之间的通信

2015-09-25 19:34 239 查看
对于父子进程之间的通信, 或者是没有亲缘关系的进程之间的通信, 之前都有过多多少少的了解, 就不作说明了.

这里想要讨论的是多个子进程之间的通信.

以前若是有多个子进程之间通信的需要, 那么我可能会选择使用共享内存? 使用消息队列? 或者使用一系列进程间通信机制

但这些同样适用于没有亲缘关系的进程之间的通信. 这样做似乎没有必要.

这里将要实现的子进程之间的通信, 使用了两个技术:

一个是子进程继承父进程的描述符特性(这里是socketpair产生的管道描述符)

一个是利用socketpair传递描述符特性.


大致步骤如下:

父进程调用sokcetpair, 产生一个管道, fork产生第一个子进程, 这样, 父子进程就能通过这个管道进行通信了

父进程再次调用socketpair, 之后fork出第二个子进程, 父进程与第二个子进程也能通过这个管道进行通信了

第二个子进程同时继承了父进程与第一个子进程之间通信的管道, 说明此时第二个子进程能透过这个管道口向第一个子进程发送消息

只是这是单向的. 第一个子进程却完全不了解第二个子进程的管道口, 甚至其是否存在都一无所知

父进程 通过 其与第一个子进程之间的管道口 将 父进程与第二个子进程通信的管道口 传递给第一个子进程

这时子进程记录下这个管道口, 就能向第二个子进程发送数据了

(要注意的是, 描述符并不是一个单纯的数字, 其在内核中对应着一组信息, 所以传递描述符需要特殊的方法)

第三个子进程被fork出来, 同样, 它知道如何向第一和第二个子进程发送数据, 父进程则需要分别告诉第一和第二个子进程与第三个子进程通信的描述符

这样, 产生的多个子进程之间就能两两通信了.

要注意的是, 每个新fork出来的子进程都要关闭不使用的之前进程的另一个管道描述符.

下面就给出实现:

头文件:

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>

#ifndef CPC_H
#define CPC_H

#define DEBUG
#define MAX_EVENT 64
//消息分两种, 一种是传递描述符; 一种在验证的时候我们会发送字符串验证
enum message_type{FD_TRANS = 0, MSG_TRANS};

//做个规定, socketpair的数组, 0给父进程用. 1给子进程用
//所需的关于每个进程的结构体
typedef struct {
pid_t pid;
int index;
int channel[2];
}process_t;

typedef struct {
enum message_type type;
//消息是来自哪个进程的
int sourceIndex;
}info_t;

typedef struct{
//传递描述符用这个
int fd;
//传递字符串用这个
char str[64];
}content_t;

typedef struct {
info_t info;
content_t data;
}message_t;

//每个子进程主函数体
void child_run(int index, process_t *processes);
//消息读写函数
int write_channel_fd(int fd, message_t *data);
int recv_channel(int fd, message_t *data);
//添加事件到epoll中
void add_fd_to_epoll_in(int epollfd, int fd);

#endif


头文件实现:

#include "CPC.h"

int write_channel_fd(int fd, message_t *data){
struct msghdr msg_wr;
struct iovec iov[1];
struct cmsghdr *ptr = NULL;

union{
//使用这个结构体与真正要发的数据放在一起是为了使辅助数据的地址对齐
struct cmsghdr cm;
char ctl[CMSG_SPACE(sizeof(int))];
}ctl_un;

switch(data->info.type) {
case FD_TRANS:
msg_wr.msg_control = ctl_un.ctl;
msg_wr.msg_controllen = sizeof(ctl_un.ctl);

ptr = CMSG_FIRSTHDR(&msg_wr);
ptr->cmsg_len = CMSG_LEN(sizeof(int));
ptr->cmsg_level = SOL_SOCKET;
ptr->cmsg_type = SCM_RIGHTS;
*((int *)CMSG_DATA(ptr)) = data->data.fd;

iov[0].iov_base = (void *)(&(data->info));
iov[0].iov_len = sizeof(info_t);
break;
case MSG_TRANS:
msg_wr.msg_control = NULL;
msg_wr.msg_controllen = 0;

iov[0].iov_base = data;
iov[0].iov_len = sizeof(message_t);
break;
}

msg_wr.msg_name = NULL;
msg_wr.msg_namelen = 0;

msg_wr.msg_iov = iov;
msg_wr.msg_iovlen = 1;

return (sendmsg(fd, &msg_wr, 0));
}

int recv_channel(int fd, message_t *data){
struct msghdr msg_rc;
struct iovec iov[1];
ssize_t n;

union{
struct cmsghdr cm;
char ctl[CMSG_SPACE(sizeof(int))];
}ctl_un;

struct cmsghdr *ptr = NULL;
msg_rc.msg_control = ctl_un.ctl;
msg_rc.msg_controllen = sizeof(ctl_un.ctl);

msg_rc.msg_name = NULL;
msg_rc.msg_namelen = 0;

iov[0].iov_base = (void *)data;
iov[0].iov_len = sizeof(message_t);

msg_rc.msg_iov = iov;
msg_rc.msg_iovlen = 1;

if((n = recvmsg(fd, &msg_rc, 0)) < 0){
perror("recvmsg error");
return n;
}
else if(n == 0){
//如果子进程收到0字节数据, 表明另一端已经关闭了.
//这里的另一端指的是所有其他进程, 包括父进程
//如果某一个进程关闭了, 但还有其他进程持有该管道另一端, 则不会有0字节数据出现
//所以我们这里如果其它进程都没了, 此进程也结束.
fprintf(stderr, "peer close the socket\n");
exit(1);
}

if( ( ptr = CMSG_FIRSTHDR( &msg_rc ) ) != NULL     //!> now we need only one,
&& ptr->cmsg_len == CMSG_LEN( sizeof( int ) )        //!> we should use 'for' when
)                                                                                //!> there are many fds
{
if( ptr->cmsg_level != SOL_SOCKET )
{
fprintf(stderr, "Ctl level should be SOL_SOCKET\n");
exit(EXIT_FAILURE);
}

if( ptr->cmsg_type != SCM_RIGHTS )
{
fprintf(stderr, "Ctl type should be SCM_RIGHTS\n");
exit(EXIT_FAILURE);
}

data->data.fd = *(int*)CMSG_DATA(ptr);    //!> get the data : the file des*
}
else
{
data->data.fd = -1;
}

return n;
}

void add_fd_to_epoll_in(int epollfd, int fd){
struct epoll_event event;
event.events = EPOLLIN;
event.data.fd = fd;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
}

void child_run(int index, process_t *processes){
int n, k;
message_t out;
struct epoll_event events[MAX_EVENT];
int epollfd = epoll_create(10);

if(epollfd < 0){
perror("epoll_create error");
exit(-1);
}

out.info.type = MSG_TRANS;
out.info.sourceIndex = index;
out.data.fd = -1;
memset(out.data.str, '\0', 64);
snprintf(out.data.str, 63, "I'm %d, who was born later than you",index);

//记得关闭不用的端口
close(processes[index].channel[0]);
processes[index].channel[0] = -1;
for(k=0; k<index; k++){
//向之前就产生的子进程打招呼...
write_channel_fd(processes[k].channel[0], &out);
//在父进程中已经关闭了, 不用在关
//close(processes[k].channel[1]);
processes[k].channel[1] = -1;
}

add_fd_to_epoll_in(epollfd, processes[index].channel[1]);

for(;;){
n = epoll_wait(epollfd, events, MAX_EVENT, -1);
printf("epoll return :%d\n", n);
if(n < 0){
perror("epoll_wait error");
exit(-1);
}
else if(n == 0)
break;

if(events[0].events & EPOLLIN){
message_t msg;
int n;
n = recv_channel(events[0].data.fd, &msg);
if(n == 0){
close(processes[index].channel[1]);
continue;
}
int newfd, sourceIndex;
switch(msg.info.type){
case FD_TRANS:
//添加到processes里面去
newfd = msg.data.fd;
if(newfd < 0){
fprintf(stderr, "child %d fail to receive fd\n", index);
exit(-1);
}

sourceIndex = msg.info.sourceIndex;
processes[sourceIndex].channel[0] = newfd;
processes[sourceIndex].channel[1] = -1;
#ifdef DEBUG
printf("I am child %d, I have just received %d's fd, %d bytes\n", index, sourceIndex, n);
#endif
printf("I am child %d, I'm gonna say hello to child %d\n", index, sourceIndex);
out.info.type = MSG_TRANS;
out.info.sourceIndex = index;
out.data.fd = -1;
memset(out.data.str, '\0', 64);
snprintf(out.data.str, 63, "hello,I'm %d", index);
n = write_channel_fd(processes[sourceIndex].channel[0], &out);
printf("I am child %d, I send %d bytes to %d\n", index, n, sourceIndex);
break;
case MSG_TRANS:
sourceIndex = msg.info.sourceIndex;
printf("I'm child %d, I have just received a message %d bytes from %d: \n%s\n", index,n,sourceIndex, msg.data.str);
break;
}
}
}
}


main函数:

//目的是在对父进程发送一个信号后, 每个子进程会向其他子同级别的子进程发送一句问候.
//传入进程个数. 不传的话默认是2个
#include "CPC.h"

int main(int ac, char *av[])
{
int numOfProcesses, i, j;
process_t *processes;
size_t n;
pid_t pid;

if(ac == 2)
numOfProcesses = atoi(av[1]);
else if(ac == 1)
numOfProcesses = 2;
else{
fprintf(stderr, "Usage : %s num\n", av[0]);
exit(-1);
}

processes = (process_t *)malloc(sizeof(process_t) * numOfProcesses);
for(i=0; i<numOfProcesses; i++)
processes[i].index = -1;

for(i=0; i<numOfProcesses; i++){
message_t msg;
if(socketpair(AF_UNIX, SOCK_STREAM, 0, processes[i].channel) < 0){
perror("socketpair error");
exit(-1);
}

pid = fork();
switch(pid){
case -1:
perror("fork error");
exit(-1);
case 0:
//记得在子进程里关掉其他子进程的管道某端
child_run(i, processes);
exit(1);
default:
break;
}

processes[i].pid = pid;
processes[i].index = i;

msg.info.type = FD_TRANS;
msg.info.sourceIndex = i;
msg.data.fd = processes[i].channel[0];        //父进程通过它和某子进程的管道 把 父进程和某子进程的通信口 传过去
//给之前所有子进程发送这个口
for(j=0; j<i; j++){
#ifdef DEBUG
printf("As father, I just want to send child %d's fd to child %d\n",i, j);
#endif
//传递每个描述符
//父子进程会不会同时向同一个子进程传递数据?
//因为父进程用0向子进程传数据, 所起其他子进程也用这个0来传
n = write_channel_fd(processes[j].channel[0], &msg);
close(processes[j].channel[1]);
#ifdef DEBUG
printf("As father, I have just sent child %d's fd to child %d , %d bytes\n",i, j, n);
#endif
}
}

//以上, 就已经将numOfProcesses个子进程连接起来了. 下面, 再实现一个功能.
//父进程收到SIGINT信号后, 让子进程退出

return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: