一个简单的线程池的实现
2016-08-03 16:45
447 查看
转自http://blog.csdn.net/yongshengfree/article/details/4536695#
线程池的原理大家都知道,直接上代码了^_^
Thread.h
[cpp] view
plain copy
#ifndef __THREAD_H
#define __THREAD_H
#include <vector>
#include <string>
#include <pthread.h>
using namespace std;
/**
* 执行任务的类,设置任务数据并执行
*/
class CTask
{
protected:
string m_strTaskName; /** 任务的名称 */
void* m_ptrData; /** 要执行的任务的具体数据 */
public:
CTask(){}
CTask(string taskName)
{
m_strTaskName = taskName;
m_ptrData = NULL;
}
virtual int Run()= 0;
void SetData(void* data); /** 设置任务数据 */
public:
virtual ~CTask(){}
};
/**
* 线程池管理类的实现
*/
class CThreadPool
{
private:
static vector<CTask*> m_vecTaskList; /** 任务列表 */
static bool shutdown; /** 线程退出标志 */
int m_iThreadNum; /** 线程池中启动的线程数 */
pthread_t *pthread_id;
static pthread_mutex_t m_pthreadMutex; /** 线程同步锁 */
static pthread_cond_t m_pthreadCond; /** 线程同步的条件变量 */
protected:
static void* ThreadFunc(void * threadData); /** 新线程的线程回调函数 */
static int MoveToIdle(pthread_t tid); /** 线程执行结束后,把自己放入到空闲线程中 */
static int MoveToBusy(pthread_t tid); /** 移入到忙碌线程中去 */
int Create(); /** 创建线程池中的线程 */
public:
CThreadPool(int threadNum = 10);
int AddTask(CTask *task); /** 把任务添加到任务队列中 */
int StopAll(); /** 使线程池中的线程退出 */
int getTaskSize(); /** 获取当前任务队列中的任务数 */
};
#endif
实现文件
Thread.cpp
[cpp] view
plain copy
#include "Thread.h"
#include <iostream>
void CTask::SetData(void * data)
{
m_ptrData = data;
}
vector<CTask*> CThreadPool::m_vecTaskList; //任务列表
bool CThreadPool::shutdown = false;
pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;
/**
* 线程池管理类构造函数
*/
CThreadPool::CThreadPool(int threadNum)
{
this->m_iThreadNum = threadNum;
cout << "I will create " << threadNum << " threads" << endl;
Create();
}
/**
* 线程回调函数
*/
void* CThreadPool::ThreadFunc(void* threadData)
{
pthread_t tid = pthread_self();
while (1)
{
pthread_mutex_lock(&m_pthreadMutex);
while (m_vecTaskList.size() == 0 && !shutdown)
{
pthread_cond_wait(&m_pthreadCond, &m_pthreadMutex);
}
if (shutdown)
{
pthread_mutex_unlock(&m_pthreadMutex);
printf("thread %lu will exit/n", pthread_self());
pthread_exit(NULL);
}
printf("tid %lu run/n", tid);
vector<CTask*>::iterator iter = m_vecTaskList.begin();
/**
* 取出一个任务并处理之
*/
CTask* task = *iter;
if (iter != m_vecTaskList.end())
{
task = *iter;
m_vecTaskList.erase(iter);
}
pthread_mutex_unlock(&m_pthreadMutex);
task->Run(); /** 执行任务 */
printf("tid:%lu idle/n", tid);
}
return (void*)0;
}
/**
* 往任务队列里边添加任务并发出线程同步信号
*/
int CThreadPool::AddTask(CTask *task)
{
pthread_mutex_lock(&m_pthreadMutex);
this->m_vecTaskList.push_back(task);
pthread_mutex_unlock(&m_pthreadMutex);
pthread_cond_signal(&m_pthreadCond);
return 0;
}
/**
* 创建线程
*/
int CThreadPool::Create()
{
pthread_id = (pthread_t*)malloc(sizeof(pthread_t) * m_iThreadNum);
for(int i = 0; i < m_iThreadNum; i++)
{
pthread_create(&pthread_id[i], NULL, ThreadFunc, NULL);
}
return 0;
}
/**
* 停止所有线程
*/
int CThreadPool::StopAll()
{
/** 避免重复调用 */
if (shutdown)
{
return -1;
}
printf("Now I will end all threads!!/n");
/** 唤醒所有等待线程,线程池要销毁了 */
shutdown = true;
pthread_cond_broadcast(&m_pthreadCond);
/** 阻塞等待线程退出,否则就成僵尸了 */
for (int i = 0; i < m_iThreadNum; i++)
{
pthread_join(pthread_id[i], NULL);
}
free(pthread_id);
pthread_id = NULL;
/** 销毁条件变量和互斥体 */
pthread_mutex_destroy(&m_pthreadMutex);
pthread_cond_destroy(&m_pthreadCond);
return 0;
}
/**
* 获取当前队列中任务数
*/
int CThreadPool::getTaskSize()
{
return m_vecTaskList.size();
}
main函数文件
[cpp] view
plain copy
#include "Thread.h"
#include <iostream>
class CMyTask: public CTask
{
public:
CMyTask(){}
inline int Run()
{
printf("%s/n", (char*)this->m_ptrData);
sleep(10);
return 0;
}
};
int main()
{
CMyTask taskObj;
char szTmp[] = "this is the first thread running";
taskObj.SetData((void*)szTmp);
CThreadPool threadPool(10);
for(int i = 0; i < 20; i++)
{
threadPool.AddTask(&taskObj);
}
while(1)
{
printf("there are still %d tasks need to handle/n", threadPool.getTaskSize());
if (threadPool.getTaskSize() == 0)
{
if (threadPool.StopAll() == -1)
{
printf("Now I will exit from main/n");
exit(0);
}
}
sleep(2);
}
return 0;
}
CMakeLists.txt
1 cmake_minimum_required(VERSION 2.8)
2
3 project(Thread_pool)
4
5 find_package(Threads)#调用pthread库需要的语句1
6
7 aux_source_directory(. DIR_SRCS)
8
9 add_executable(main ${DIR_SRCS})
10
11 target_link_libraries(main ${CMAKE_THREAD_LIBS_INIT})#调用pthread库需要的语句2
线程池的原理大家都知道,直接上代码了^_^
Thread.h
[cpp] view
plain copy
#ifndef __THREAD_H
#define __THREAD_H
#include <vector>
#include <string>
#include <pthread.h>
using namespace std;
/**
* 执行任务的类,设置任务数据并执行
*/
class CTask
{
protected:
string m_strTaskName; /** 任务的名称 */
void* m_ptrData; /** 要执行的任务的具体数据 */
public:
CTask(){}
CTask(string taskName)
{
m_strTaskName = taskName;
m_ptrData = NULL;
}
virtual int Run()= 0;
void SetData(void* data); /** 设置任务数据 */
public:
virtual ~CTask(){}
};
/**
* 线程池管理类的实现
*/
class CThreadPool
{
private:
static vector<CTask*> m_vecTaskList; /** 任务列表 */
static bool shutdown; /** 线程退出标志 */
int m_iThreadNum; /** 线程池中启动的线程数 */
pthread_t *pthread_id;
static pthread_mutex_t m_pthreadMutex; /** 线程同步锁 */
static pthread_cond_t m_pthreadCond; /** 线程同步的条件变量 */
protected:
static void* ThreadFunc(void * threadData); /** 新线程的线程回调函数 */
static int MoveToIdle(pthread_t tid); /** 线程执行结束后,把自己放入到空闲线程中 */
static int MoveToBusy(pthread_t tid); /** 移入到忙碌线程中去 */
int Create(); /** 创建线程池中的线程 */
public:
CThreadPool(int threadNum = 10);
int AddTask(CTask *task); /** 把任务添加到任务队列中 */
int StopAll(); /** 使线程池中的线程退出 */
int getTaskSize(); /** 获取当前任务队列中的任务数 */
};
#endif
实现文件
Thread.cpp
[cpp] view
plain copy
#include "Thread.h"
#include <iostream>
void CTask::SetData(void * data)
{
m_ptrData = data;
}
vector<CTask*> CThreadPool::m_vecTaskList; //任务列表
bool CThreadPool::shutdown = false;
pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;
/**
* 线程池管理类构造函数
*/
CThreadPool::CThreadPool(int threadNum)
{
this->m_iThreadNum = threadNum;
cout << "I will create " << threadNum << " threads" << endl;
Create();
}
/**
* 线程回调函数
*/
void* CThreadPool::ThreadFunc(void* threadData)
{
pthread_t tid = pthread_self();
while (1)
{
pthread_mutex_lock(&m_pthreadMutex);
while (m_vecTaskList.size() == 0 && !shutdown)
{
pthread_cond_wait(&m_pthreadCond, &m_pthreadMutex);
}
if (shutdown)
{
pthread_mutex_unlock(&m_pthreadMutex);
printf("thread %lu will exit/n", pthread_self());
pthread_exit(NULL);
}
printf("tid %lu run/n", tid);
vector<CTask*>::iterator iter = m_vecTaskList.begin();
/**
* 取出一个任务并处理之
*/
CTask* task = *iter;
if (iter != m_vecTaskList.end())
{
task = *iter;
m_vecTaskList.erase(iter);
}
pthread_mutex_unlock(&m_pthreadMutex);
task->Run(); /** 执行任务 */
printf("tid:%lu idle/n", tid);
}
return (void*)0;
}
/**
* 往任务队列里边添加任务并发出线程同步信号
*/
int CThreadPool::AddTask(CTask *task)
{
pthread_mutex_lock(&m_pthreadMutex);
this->m_vecTaskList.push_back(task);
pthread_mutex_unlock(&m_pthreadMutex);
pthread_cond_signal(&m_pthreadCond);
return 0;
}
/**
* 创建线程
*/
int CThreadPool::Create()
{
pthread_id = (pthread_t*)malloc(sizeof(pthread_t) * m_iThreadNum);
for(int i = 0; i < m_iThreadNum; i++)
{
pthread_create(&pthread_id[i], NULL, ThreadFunc, NULL);
}
return 0;
}
/**
* 停止所有线程
*/
int CThreadPool::StopAll()
{
/** 避免重复调用 */
if (shutdown)
{
return -1;
}
printf("Now I will end all threads!!/n");
/** 唤醒所有等待线程,线程池要销毁了 */
shutdown = true;
pthread_cond_broadcast(&m_pthreadCond);
/** 阻塞等待线程退出,否则就成僵尸了 */
for (int i = 0; i < m_iThreadNum; i++)
{
pthread_join(pthread_id[i], NULL);
}
free(pthread_id);
pthread_id = NULL;
/** 销毁条件变量和互斥体 */
pthread_mutex_destroy(&m_pthreadMutex);
pthread_cond_destroy(&m_pthreadCond);
return 0;
}
/**
* 获取当前队列中任务数
*/
int CThreadPool::getTaskSize()
{
return m_vecTaskList.size();
}
main函数文件
[cpp] view
plain copy
#include "Thread.h"
#include <iostream>
class CMyTask: public CTask
{
public:
CMyTask(){}
inline int Run()
{
printf("%s/n", (char*)this->m_ptrData);
sleep(10);
return 0;
}
};
int main()
{
CMyTask taskObj;
char szTmp[] = "this is the first thread running";
taskObj.SetData((void*)szTmp);
CThreadPool threadPool(10);
for(int i = 0; i < 20; i++)
{
threadPool.AddTask(&taskObj);
}
while(1)
{
printf("there are still %d tasks need to handle/n", threadPool.getTaskSize());
if (threadPool.getTaskSize() == 0)
{
if (threadPool.StopAll() == -1)
{
printf("Now I will exit from main/n");
exit(0);
}
}
sleep(2);
}
return 0;
}
CMakeLists.txt
1 cmake_minimum_required(VERSION 2.8)
2
3 project(Thread_pool)
4
5 find_package(Threads)#调用pthread库需要的语句1
6
7 aux_source_directory(. DIR_SRCS)
8
9 add_executable(main ${DIR_SRCS})
10
11 target_link_libraries(main ${CMAKE_THREAD_LIBS_INIT})#调用pthread库需要的语句2
相关文章推荐
- 一个线程池的简单的实现
- 一个简单的线程池实现
- 分享:一个简单的线程池的实现
- 一个简单线程池的实现
- 使用common-pool实现的一个简单的线程池
- 一个简单的线程池实现
- 一个简单线程池的实现
- 一个简单线程池的实现---需进一步完善
- 一个简单的线程池的实现(C++)
- 一个通用简单线程池实现的初步封装(C语言)
- 一个简单线程池的实现
- 一个简单的线程池实现(java版)
- 一个简单的线程池的实现(C++)
- 一个简单的线程池实现(java版)
- 一个简单的线程池实现(java版)
- java线程池学习(二) —— 实现一个简单的线程池
- 一个简单的线程池工具类——可以实现对单个线程的控制
- Linux C++ 一个线程池的简单实现(附代码)
- Quartz SimpleThreadPool的源码,一个简单的线程池的实现原理