您的位置:首页 > 编程语言 > C语言/C++

如何实现进程池

2015-10-28 15:22 375 查看
如何实现进程池

http://blog.csdn.net/guosha/article/details/3874998 这是原作者链接 感谢作者分享

有位站友问,如何在linux下实现进程池技术,原贴见:
http://topic.csdn.net/u/20090206/16/b424e1c1-90dc-4589-a63f-1d90ed6560ae.html

之前虽对进程池这个名词早有耳闻,但一直没有真正接触过。乍一听好像有点复杂,但稍微一想却也简单,下面我就按自己的想法来实现一个简单的模型。

 

跟最简单的资源管理一样,进程池技术的应该致少由以下两部分组成

资源进程
预先创建好的空闲进程,管理进程会把工作分发到空闲进程来处理。

管理进程

管理进程负责创建资源进程,把工作交给空闲资源进程处理,回收已经处理完工作的资源进程。

 

上面资源进程跟管理进程的概念很好理解,下面就是进程池的关键,管理进程如何有效的管理资源进程,如何分配任务给资源进程,如何回收空闲资源进程。其实稍加分析一下也很简单,管理进程要有效的管理资源进程,那么管理进程跟资源进程间必然需要交互,如何交互呢?linux下两个进程如何交互?IPC呗,信号,信号量,消息队列,管道,随便选一个合适的就可以了。

下面我就用信号加消息队列来完成上面帖子里的进程池需求。

[cpp]
view plaincopyprint?

//主接收网络消息进程  
int main()  
{  
    int iReLen;  
    char szBuffer[64];  
    ...  
    //初始化进程池  
    InitProcessPoll();  
    ....  
    while(1)  
    {  
        iRelen = recv(iSock, szBuffer, sizeof(szbuffer), 0);  
        if (iRelen > 0)  
        {  
            SubmitWork(szBuffer, iRelen);   //提交工作到资源进程  
        }     
    }  
    ...  
}  
  
//  
int SubmitWork(void *pBuffer, int iMsgLen)  
{  
    int iMsgQueID;  
      
    iMsgQueID = GetMsgQue(key, 0); //取得跟管理进程通信的消息队更句柄;  
      
    msgsnd(iMsgQueID, pBuffer, iMsgLen, 0);  
}  
  
int InitProcessPoll(const int iProcessNum)  
{  
    int iPid;  
      
    //创建管理进程  
    iPid = fork();  
    if (iPid == 0)  
    {  
        InitMngProcess(iProcessNum);  
    }     
    return 0;  
}  
  
typedef struct  
{  
    int pid;  
    int iFlag;  
} T_ProcessStatus;  
  
//指向资源进程管理结构  
T_ProcessStatus *pProcessMng = NULL;  
  
//记录有总共有多少个资源进程  
INT32 iMaxProcessNum = 0;  
  
//初始管理进程管理结构并创建资源子进程,最后接收外部工作请求并分发到资源进行进行处理  
InitMngProcess(const int iProcessNum)  
{  
    int i;  
    int iPid;  
    int iRtn;  
    int iMsgLen;  
    int iMsgQue1, iMsgQue2;  
    char szBuffer[64];  
      
    //创建管理进程结构  
    pProcessMng = calloc(iProcessNum, sizeof(T_ProcessStatus))  
  
    for (i = 0; i < iProcessNum; i++)  
    {  
        iPid = fork();  
        if (iPid == 0);  
        {  
            //资源进程;  
            ResourceProcess();  
        }  
        pProcessMng[i].pid = iPid; //记录资源进程的进程号  
        pProcessMng[i].iFlag = 0;   //把资源进程置为空闲  
    }  
      
    iMaxProcessNum = iProcessNum;  
  
    //创建外部跟管理进程通信的消息队列;  
    iMsgQue1 = CreateMsgQue();  
  
    //创建管理进程跟资源进程通信的消息队列;  
    iMsgQue2 = CreateMsgQue();  
  
    //安装资源进程回收信号处理函数  
    signal(SIGUSR1, ReleaseAProcess);     
  
    //开始接收外部传入的任务  
    while(1)  
    {  
        //接收外部的工作请求  
        iMsgLen = msgrcv(iMsgQue1, szBuffer, sizeof(szBuffer), 0);  
          
        //转发工作请求到资源进程处理消息队列  
        iRtn = msgsnd(iMsgQue2, szBuffer, iMsgLen, 0);  
      
        //通知其中的一个空闲资源进程进行处理  
        NoticeAIdleProcess();  
    }  
}  
  
//通知一个空闲资源进程进行处理  
int NoticeAIdleProcess()  
{  
    int i;  
      
    for (i = 0; i < iMaxProcessNum; i++)  
    {  
        if (pProcessMng[i].iFlag == 0)  
        {  
            pProessMng[i].Flag = 1;  
            kill(processMng[i].pid, SIGUSR1);  
            return 0;  
        }  
    }  
      
    return -1;    
}  
  
//回收一个处理完成的资源进程  
void ReleaseAProcess(int iPid)  
{  
    int i;  
      
    for (i = 0; i < iMaxProcessNum; i++)  
    {  
        if (pProessMng[i].pid == iPid)  
        {  
            pProcessMng[i].iFlag = 0;  
            return;  
        }  
    }  
  
    return;  
}  
  
//资源进程的处理  
void ResourceProcess()  
{  
    //安装有工作通知信号处理  
    signal(SIGUSR1, SIG_IGN);  
  
    //设置只对SIGUSR1信号感兴趣  
    sigprocmask()  
    while(1)  
    {  
        //没有消息处理时挂起  
        pause();  
          
        //处理工作  
        void StartWork()  
  
        //通知管理进程工作处理完成  
        NoticeMngProcessFinishedWork();   
    }  
}  
  
//处理消息  
void StartWork()  
{  
    char szBuffer[64];  
    int iMsgID;  
    int iRtn;  
      
    iMsgID = msgget();  
    iRtn = msgrcv(iMsgID, szBuffer, sizeof(szBuffer), 0);  
      
    //现在在子进程里取得了需要处理的消息,可以开始处理该消息直到消息处理完成  
  
    return;   
}  
  
//通知管理进程回收资源进程  
void NoticeMngProcessFinishedWork();  
{  
    kill(MngProcessPid, SIGUSR1);  
      
    return;  
}  

//主接收网络消息进程
int main()
{
int iReLen;
char szBuffer[64];
...
//初始化进程池
InitProcessPoll();
....
while(1)
{
iRelen = recv(iSock, szBuffer, sizeof(szbuffer), 0);
if (iRelen > 0)
{
SubmitWork(szBuffer, iRelen);	//提交工作到资源进程
}
}
...
}

//
int SubmitWork(void *pBuffer, int iMsgLen)
{
int iMsgQueID;

iMsgQueID = GetMsgQue(key, 0); //取得跟管理进程通信的消息队更句柄;

msgsnd(iMsgQueID, pBuffer, iMsgLen, 0);
}

int InitProcessPoll(const int iProcessNum)
{
int iPid;

//创建管理进程
iPid = fork();
if (iPid == 0)
{
InitMngProcess(iProcessNum);
}
return 0;
}

typedef struct
{
int pid;
int iFlag;
} T_ProcessStatus;

//指向资源进程管理结构
T_ProcessStatus *pProcessMng = NULL;

//记录有总共有多少个资源进程
INT32 iMaxProcessNum = 0;

//初始管理进程管理结构并创建资源子进程,最后接收外部工作请求并分发到资源进行进行处理
InitMngProcess(const int iProcessNum)
{
int i;
int iPid;
int iRtn;
int iMsgLen;
int iMsgQue1, iMsgQue2;
char szBuffer[64];

//创建管理进程结构
pProcessMng = calloc(iProcessNum, sizeof(T_ProcessStatus))

for (i = 0; i < iProcessNum; i++)
{
iPid = fork();
if (iPid == 0);
{
//资源进程;
ResourceProcess();
}
pProcessMng[i].pid = iPid; //记录资源进程的进程号
pProcessMng[i].iFlag = 0;	//把资源进程置为空闲
}

iMaxProcessNum = iProcessNum;

//创建外部跟管理进程通信的消息队列;
iMsgQue1 = CreateMsgQue();

//创建管理进程跟资源进程通信的消息队列;
iMsgQue2 = CreateMsgQue();

//安装资源进程回收信号处理函数
signal(SIGUSR1, ReleaseAProcess);

//开始接收外部传入的任务
while(1)
{
//接收外部的工作请求
iMsgLen = msgrcv(iMsgQue1, szBuffer, sizeof(szBuffer), 0);

//转发工作请求到资源进程处理消息队列
iRtn = msgsnd(iMsgQue2, szBuffer, iMsgLen, 0);

//通知其中的一个空闲资源进程进行处理
NoticeAIdleProcess();
}
}

//通知一个空闲资源进程进行处理
int NoticeAIdleProcess()
{
int i;

for (i = 0; i < iMaxProcessNum; i++)
{
if (pProcessMng[i].iFlag == 0)
{
pProessMng[i].Flag = 1;
kill(processMng[i].pid, SIGUSR1);
return 0;
}
}

return -1;
}

//回收一个处理完成的资源进程
void ReleaseAProcess(int iPid)
{
int i;

for (i = 0; i < iMaxProcessNum; i++)
{
if (pProessMng[i].pid == iPid)
{
pProcessMng[i].iFlag = 0;
return;
}
}

return;
}

//资源进程的处理
void ResourceProcess()
{
//安装有工作通知信号处理
signal(SIGUSR1, SIG_IGN);

//设置只对SIGUSR1信号感兴趣
sigprocmask()
while(1)
{
//没有消息处理时挂起
pause();

//处理工作
void StartWork()

//通知管理进程工作处理完成
NoticeMngProcessFinishedWork();
}
}

//处理消息
void StartWork()
{
char szBuffer[64];
int iMsgID;
int iRtn;

iMsgID = msgget();
iRtn = msgrcv(iMsgID, szBuffer, sizeof(szBuffer), 0);

//现在在子进程里取得了需要处理的消息,可以开始处理该消息直到消息处理完成

return;
}

//通知管理进程回收资源进程
void NoticeMngProcessFinishedWork();
{
kill(MngProcessPid, SIGUSR1);

return;
}


 

如上,一个基本的进程池处理模型就建立好了, 当然你不能直接拿到编译,因为里面很多都是伪代码,另个上面的模型忽略了很多细节处理,很多地方是很不可靠的。

现在你是不是对进程池技术更了解了呢?
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息