您的位置:首页 > 运维架构 > Linux

linux系统c++线程池的实现

2016-06-03 10:25 344 查看
1.线程池基本原理

  在传统服务器结构中, 常是 有一个总的 监听线程监听有没有新的用户连接服务器, 每当有一个新的 用户进入, 服务器就开启一个新的线程用户处理这 个用户的数据包。这个线程只服务于这个用户 , 当 用户与服务器端关闭连接以后, 服务器端销毁这个线程。然而频繁地开辟与销毁线程极大地占用了系统的资源。而且在大量用户的情况下, 系统为了开辟和销毁线程将浪费大量的时间和资源。线程池提供了一个解决外部大量用户与服务器有限资源的矛盾, 线程池和传统的一个用户对应一 个线程的处理方法不同, 它的基本思想就是在程序 开始时就在内存中开辟一些线程,
线程的数目是 固定的,他们独自形成一个类, 屏蔽了对外的操作, 而服务器只需要将数据包交给线程池就可以了。当有新的客户请求到达时 , 不是新创建一个线程为其服务 , 而是从“池子”中选择一个空闲的线程为新的客户请求服务 ,服务完毕后 , 线程进入空闲线程池中。如果没有线程空闲 的 话, 就 将 数 据 包 暂 时 积 累 , 等 待 线 程 池 内 有 线 程空闲以后再进行处理。通过对多个任务重用已经存在的线程对象 , 降低了对线程对象创建和销毁的开销。当客户请求 时 , 线程对象 已 经 存 在 , 可
以 提 高 请 求 的响应时间 , 从而整体地提高了系统服务的表现。

  一般来说实现一个线程池主要包括以下几个组成部分:

1)线程管理器:用于创建并管理线程池。

2)工作线程:线程池中实际执行任务的线程。在初始化线程时会预先创建好固定数目的线程在池中,这些初始化的线程一般处于空闲状态,一般不占用CPU,占用较小的内存空间。

3)任务接口:每个任务必须实现的接口,当线程池的任务队列中有可执行任务时,被空闲的工作线程调去执行(线程的闲与忙是通过互斥量实现的,跟前面文章中的设置标志位差不多),把任务抽象出来形成接口,可以做到线程池与具体的任务无关。

4)任务队列:用来存放没有处理的任务,提供一种缓冲机制,实现这种结构有好几种方法,常用的是队列,主要运用先进先出原理,另外一种是链表之类的数据结构,可以动态的为它分配内存空间,应用中比较灵活,下文中就是用到的链表。

下面的不在赘述百度《线程池技术在并发服务器中的应用》写的非常详细!

转自:http://blog.csdn.net/zouxinfox/article/details/3560891

  什么时候需要创建线程池呢?简单的说,如果一个应用需要频繁的创建和销毁线程,而任务执行的时间又非常短,这样线程创建和销毁的带来的开销就不容忽视,这时也是线程池该出场的机会了。如果线程创建和销毁时间相比任务执行时间可以忽略不计,则没有必要使用线程池了。、

以下是线程池头文件的定义:

#ifndef _THREAD_POOL_H_

#define _THREAD_POOL_H_

#include <stdio.h>  

#include <stdlib.h>  

#include <unistd.h>  

#include <sys/types.h>  

#include <pthread.h>  

#include <assert.h>  

//实际工作的函数指针

typedef void* (*ProcFuncPtr) (void *arg);  

/* 

*线程池里所有运行和等待的任务都是一个CThread_worker 

*由于所有任务都在链表里,所以是一个链表结构 

*/  

typedef struct worker  

{  
ProcFuncPtr m_FunPtr;  

    void*      
m_Arg;/*回调函数的参数*/  

    struct worker*  m_next;
    boolm_FuncStopFlag;

} CThread_worker;  

  

class AutoSndMsg_ThreadPool

{

public:
AutoSndMsg_ThreadPool();
~AutoSndMsg_ThreadPool();
bool Init_ThreadPool(int tThreadNum);
bool AddTask_ThreadPool(ProcFuncPtr tFuncPtr,void* tParam);
bool StopTask_ThreadPool();
bool Destory_ThreadPool(); 

private:
void* AutoSndMsgThreadPoolRun (void *tParam);  

private:

    pthread_mutex_t
m_QueueLock;  

    pthread_cond_t  m_QueueCond;  

    /*是否销毁线程池*/  

    bool m_PoolShutdownFlag;  

    /*链表结构,线程池中所有等待任务*/  

    CThread_worker *m_QueueList;  

    /*当前等待队列的任务数目*/  

    int m_QueueListSize;  

    /*链表结构,线程池中所有线程*/  

    pthread_t*m_ThreadIdList;  

    /*线程池中允许的活动线程数目*/  

    int m_MaxThreadNum;  

    bool
m_AllThreadStopFlag;

};

#endif

以下是我们线程池实现文件:#include "AutoSndMsg_ThreadPool.h"

using namespace std;

#include <string>

#include <iostream>

#include <boost/thread.hpp>

#include <boost/function.hpp>

#include <boost/thread/condition.hpp>

#include <boost/noncopyable.hpp>

boost::function<void* (void* tParam)> Func_Process;

void* AutoSndMsgFunc(void* tParam)
4000

{
Func_Process(tParam);

}

AutoSndMsg_ThreadPool::AutoSndMsg_ThreadPool()

{

}

AutoSndMsg_ThreadPool::~AutoSndMsg_ThreadPool()

{

}

bool AutoSndMsg_ThreadPool::Init_ThreadPool(int tThreadNum)

{

    pthread_mutex_init(&m_QueueLock, NULL);  

    pthread_cond_init(&m_QueueCond, NULL);  

    Func_Process = boost::bind(&AutoSndMsg_ThreadPool::AutoSndMsgThreadPoolRun,this,_1);

    m_QueueList = NULL;  

    m_QueueListSize = 0;  

    m_MaxThreadNum = tThreadNum;

    m_PoolShutdownFlag = false;  

    m_AllThreadStopFlag = false;

 

    m_ThreadIdList = (pthread_t*)malloc(m_MaxThreadNum * sizeof (pthread_t));  

    int i = 0;  

    for (i = 0; i < m_MaxThreadNum; i++)  

    {   

        //pthread_create (&(m_ThreadIdList[i]), NULL, (boost::bind(&AutoSndMsg_ThreadPool::AutoSndMsgThreadPoolRun,this,_1)),NULL);  

        pthread_create (&(m_ThreadIdList[i]), NULL, AutoSndMsgFunc,NULL);  
}  
return true;

}

bool AutoSndMsg_ThreadPool::AddTask_ThreadPool(ProcFuncPtr tFuncPtr,void* tParam)

{

    /*构造一个新任务*/  

    CThread_worker *newworker = (CThread_worker *) malloc (sizeof (CThread_worker));  

    newworker->m_FunPtr = tFuncPtr;  

    newworker->m_Arg = tParam;  

    newworker->m_FuncStopFlag = false;

    newworker->m_next = NULL;/*别忘置空*/  

  

    pthread_mutex_lock (&m_QueueLock);  

    /*将任务加入到等待队列中*/  

    CThread_worker *member = m_QueueList;  

    if (member != NULL)  

    {  

        while (member->m_next != NULL)  

            member = member->m_next;  

        member->m_next = newworker;  

    }  

    else  

    {  

        m_QueueList = newworker;  

    }  

    assert (m_QueueList != NULL);  

    m_QueueListSize++;  

    pthread_mutex_unlock (&m_QueueLock);  

    /*好了,等待队列中有任务了,唤醒一个等待线程; 

    注意如果所有线程都在忙碌,这句没有任何作用*/  

    pthread_cond_signal (&m_QueueCond);  
return true;

}

bool AutoSndMsg_ThreadPool::StopTask_ThreadPool()

{
m_AllThreadStopFlag = true;
pthread_mutex_lock (&m_QueueLock);

    CThread_worker *thead = NULL;  

    while (m_QueueList != NULL)  

    {  

        thead = m_QueueList;  

        m_QueueList = m_QueueList->m_next;  
thead->m_FuncStopFlag = true;

    }  
pthread_mutex_unlock (&m_QueueLock);  

}

/*销毁线程池,等待队列中的任务不会再被执行,但是正在运行的线程会一直 

把任务运行完后再退出*/

bool AutoSndMsg_ThreadPool::Destory_ThreadPool()

{
if (m_PoolShutdownFlag)  

        return false;/*防止两次调用*/  

    m_PoolShutdownFlag = true;  

  

    /*唤醒所有等待线程,线程池要销毁了*/  

    pthread_cond_broadcast (&m_QueueCond);  

  

    /*阻塞等待线程退出,否则就成僵尸了*/  

    int tThreadIndx;  

    for (tThreadIndx = 0; tThreadIndx < m_MaxThreadNum; tThreadIndx++)  

        pthread_join (m_ThreadIdList[tThreadIndx], NULL);  

    free (m_ThreadIdList);  

  

    /*销毁等待队列*/  

    CThread_worker *thead = NULL;  

    while (m_QueueList != NULL)  

    {  

        thead = m_QueueList;  

        m_QueueList = m_QueueList->m_next;  

        free (thead);  

    }  

    /*条件变量和互斥量也别忘了销毁*/  

    pthread_mutex_destroy(&m_QueueLock);  

    pthread_cond_destroy(&m_QueueCond);  
return true;

}

void* AutoSndMsg_ThreadPool::AutoSndMsgThreadPoolRun(void *tParam)

{

    while (!m_AllThreadStopFlag)  

    {  

    cout<<"[AutoSndMsg_ThreadPool][AutoSndMsgThreadPoolRun] enter"<<endl;

        pthread_mutex_lock (&m_QueueLock);  

        /*如果等待队列为0并且不销毁线程池,则处于阻塞状态; 注意 

        pthread_cond_wait是一个原子操作,等待前会解锁,唤醒后会加锁*/  

        while (m_QueueListSize == 0 && !m_PoolShutdownFlag)  

        {  

            printf("thread 0x%x is waiting\n", pthread_self ());  

            pthread_cond_wait (&m_QueueCond, &m_QueueLock);  

        }  

  

        /*线程池要销毁了*/  

        if (m_PoolShutdownFlag)  

        {  

            /*遇到break,continue,return等跳转语句,千万不要忘记先解锁*/  

            pthread_mutex_unlock (&m_QueueLock);  

            printf ("thread 0x%x will exit\n", pthread_self ());  

            pthread_exit (NULL);  

        }  

        /*assert是调试的好帮手*/  

        assert (m_QueueListSize != 0);  

        assert (m_QueueList != NULL);  

          

        /*等待队列长度减去1,并取出链表中的头元素*/  

        CThread_worker *worker = m_QueueList;  

        pthread_mutex_unlock (&m_QueueLock);  

  

        /*调用回调函数,执行任务*/  

        (*(worker->m_FunPtr)) (worker);

if (!m_PoolShutdownFlag)  

        {

m_QueueList = worker->m_next;  

        m_QueueListSize--;  

        free (worker);
worker = NULL;
}

    }  

    pthread_exit (NULL);  

}

以下是我们测试的demo文件

#include "AutoSndMsg_ThreadPool.h"

#include <signal.h>

#include "session.h"

using namespace std;

#include <iostream>

static AutoSndMsg_ThreadPool* g_threadPoolPtr = NULL;

//消息SIGHUP,SIGINT,SIGQUIT信号处理函数

void sigroutine(int dunno)

{
/* 信号处理例程,其中dunno将会得到信号的值 */

    switch (dunno) 
{
case 1:
cout<<"[main][sigroutine]get SIGHUP signal"<<endl;
break;

    case 2:
cout<<"[main][sigroutine]get SIGINT signal"<<endl;
break;
    case 3:
cout<<"[main][sigroutine]get SIGQUIT signal"<<endl;
    break;

      }
if(NULL != g_threadPoolPtr)
{
g_threadPoolPtr->StopTask_ThreadPool();
g_threadPoolPtr->Destory_ThreadPool();
delete(g_threadPoolPtr);
g_threadPoolPtr = NULL;
}
exit(0);

}

void* TestAutoSndMsgFunc(void *arg)

{
//防止内存的泄露,最好能智能指针
boost::shared_ptr<session> tIptrSession = boost::make_shared<session>();
intSndMsgCnt = 0;
bool m_LoginSuccFlag = false;
CThread_worker* tWorker = (CThread_worker*)arg;
while(!tWorker->m_FuncStopFlag)
{
//该线程要做什么就做什么吧
if(false == m_LoginSuccFlag)
{
//没有登录或者登录失败,便做登录操作
string ServerHost = "http://mq.snailyun.com";
int    ServerPort = 8066;
string RoomId = "1000";
string UsrId = "Xuqinglai";
if(true == tIptrSession->login(ServerHost,ServerPort,UsrId,RoomId))
{
tIptrSession->wait_login(m_LoginSuccFlag);
}
if(false == m_LoginSuccFlag)
{
sleep(1);
}
}
else
{
//登录成功后的操作发送消息
string message = "SndTestMsg for LinuxDemo";
string usrId = "allPeople";
bool RcvMsgSuccFlag = false;
if(tIptrSession->send_msg(message,usrId,NULL))
{
tIptrSession->wait_sendMsg(RcvMsgSuccFlag);
const char* tSndMsgStr = (RcvMsgSuccFlag ? "success":"failure");
SndMsgCnt++;
}
//发送消息200次以后,用户重新登录
if(200 == SndMsgCnt)
{
tIptrSession->logout();
m_LoginSuccFlag = false;
sleep(1);
SndMsgCnt = 0;
}
else
{
if(eState_Logined==tIptrSession->getloginstate())
{
m_LoginSuccFlag = true;
}
else
{
m_LoginSuccFlag = false;
sleep(1);
}
}
}
}

}

//主函数

int main(int argc, char* argv[])

{
//* 下面设置三个信号的处理方法
signal(SIGHUP, sigroutine); 

    signal(SIGINT, sigroutine);

    signal(SIGQUIT, sigroutine);

//初始化线程大小为10
g_threadPoolPtr = new AutoSndMsg_ThreadPool();
if(NULL != g_threadPoolPtr)
{
g_threadPoolPtr->Init_ThreadPool(10);
}
int Idx = 0;
for(Idx = 0;Idx<10;Idx++)
{
g_threadPoolPtr->AddTask_ThreadPool(TestAutoSndMsgFunc,NULL);
}

    while (NULL != g_threadPoolPtr) 
{
if('q' == getchar())
{
cout<<"[main][main] usr input quit cmd,so exit!"<<endl;
g_threadPoolPtr->StopTask_ThreadPool();
g_threadPoolPtr->Destory_ThreadPool();
delete(g_threadPoolPtr);
g_threadPoolPtr = NULL;
break;
}

    }

    return 0;

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: