您的位置:首页 > 编程语言

简单线程池的代码实现

2012-04-13 16:27 369 查看
基于ACE的线程池框架,同时运用 omnithread库 自己编写了一个简单的线程池代码。

ACE的线程池框架,参见 http://www.cppblog.com/jerryma/archive/2009/11/02/99997.html
在这里只是一个简单的代码,里面还有很多需要考虑的地方。

代码如下:

#include "stdafx.h"
#include <assert.h> //assert
#include <iostream> //cout
// #include <vector>
#include <list>
#include <atlcomcli.h> //ATL
#include "omnithread/omnithread.h"
#include "FSUnknownImpl.h"
#include "IFSUnknown.h"

using namespace std;
using namespace ATL;

class CTaskThread;
class IManager
{
public:
virtual void return_to_work(CTaskThread *pTaskThread, WCHAR *msg) = 0;
};

/*
满足先进先出FIFO

插入情况分类:
1:  空进行插入
2: 满进行插入
3: 一般情况插入
enqueue
{
//避免出现满的情况
if(isFull)
do nothing

if(空)
{
插入数据
head = 0;
tail = 0;
}
else
{
if(tail == MAXSIZE-1)//当前tail 等于最大下标值
{
tail =  0;
}
else
{
tail++;
}
}
wcscpy(tail, data);
++datalen;
}

出队情况分类:
1:空出队
2:满出队
3:一般情况出队

dequeue
{
//避免出现空的情况
if(empty)
do nothing

if(datalen==MAXSIZE)
{
//满有两种情况,[head.....tail] 和 [...tail head...]
//其中第二种情况还包含一种特例,[......tail head] , head当前指向的是数组最大下标值
wcscpy(recMsg, head);
if(head+1==MAXSIZE)
{
head = 0;
}
else
{
head++;
}
}
else
{
wcscpy(recMsg, head);
head++;
}
--datalen;
}
//*/

const UINT MSG_QUEUE_SIZE = 5;
class CMsgQueue
{
public:
CMsgQueue(UINT nMaxSize = MSG_QUEUE_SIZE):m_nHeadIndex(-1),m_nTailIndex(-1),m_nValidRecNums(0)
{
m_nMaxSize = nMaxSize;
m_p_not_empty_cond = new omni_condition(&m_mutex);
m_p_not_full_cond = new omni_condition(&m_mutex);
}

virtual ~CMsgQueue()
{
if(m_p_not_empty_cond!=NULL)
{
delete m_p_not_empty_cond;
m_p_not_empty_cond = NULL;
}
if(m_p_not_full_cond != NULL)
{
delete m_p_not_full_cond;
m_p_not_full_cond = NULL;
}
}

void enqueue(WCHAR *msg)
{
assert(m_nMaxSize != 0);
m_mutex.lock();

while(isFull_i())
{
m_p_not_full_cond->wait();
}

if(-1 == m_nHeadIndex)
{
m_nHeadIndex = 0;
m_nTailIndex = 0;
}
else
{
if(m_nMaxSize-1 == m_nTailIndex)//当前tail 等于最大下标值
{
m_nTailIndex = 0;
}
else
{
++m_nTailIndex;
}
}

UINT nLen = wcslen(msg)+1;
m_pMsg[m_nTailIndex] = new WCHAR[nLen];
wmemset(m_pMsg[m_nTailIndex], 0, nLen);
wcscpy(m_pMsg[m_nTailIndex], msg);
++m_nValidRecNums;

m_p_not_empty_cond->signal();
m_mutex.unlock();
}

bool dequeue(WCHAR *&msg)
{
assert(m_nMaxSize != 0);
// assert(m_nHeadIndex != -1 && m_nTailIndex != -1);

m_mutex.lock();
while(isEmpty_i())
{
m_p_not_empty_cond->wait();
}

if(m_nValidRecNums == m_nMaxSize)
{
//满有两种情况,[head.....tail] 和 [...tail head...]
//其中第二种情况还包含一种特例,[......tail head] , head当前指向的是数组最大下标值
UINT nLen = wcslen(m_pMsg[0])+1;
msg = new WCHAR[nLen];
wmemset(msg, 0, nLen);
wcscpy(msg, m_pMsg[m_nHeadIndex]);
delete[] m_pMsg[m_nHeadIndex];
m_pMsg[m_nHeadIndex] = NULL;
if(m_nHeadIndex+1 == m_nMaxSize)
{
m_nHeadIndex = 0;
}
else
{
++m_nHeadIndex;
}
}
else
{
UINT nLen = wcslen(m_pMsg[m_nHeadIndex])+1;
msg = new WCHAR[nLen];
wmemset(msg, 0, nLen);
wcscpy(msg, m_pMsg[m_nHeadIndex]);
delete[] m_pMsg[m_nHeadIndex];
m_pMsg[m_nHeadIndex] = NULL;

++m_nHeadIndex;
}

--m_nValidRecNums;

m_p_not_full_cond->signal();
m_mutex.unlock();
return TRUE;
}

BOOL isFull()
{
m_mutex.lock();
BOOL bFull = isFull_i();
m_mutex.unlock();
return bFull;
}
BOOL isEmpty()
{
m_mutex.lock();
BOOL bEmpty = isEmpty_i();
m_mutex.unlock();
return bEmpty;
}

private:
BOOL isFull_i()
{
return m_nValidRecNums == m_nMaxSize;
}
BOOL isEmpty_i()
{
return 0==m_nValidRecNums;
}

private:
omni_condition* m_p_not_empty_cond;
omni_condition* m_p_not_full_cond;
omni_mutex m_mutex;

//队列的头,尾下标志值,有效值从 0 开始,无效值为-1.
int m_nHeadIndex; //取记录索引值
int m_nTailIndex; //存记录索引值

//有效记录数及最大存放数
int m_nValidRecNums;
int m_nMaxSize;

WCHAR *m_pMsg[256];
};

class CTaskThread : public omni_thread
{
public:
bool getq(WCHAR *&msg)
{
return m_queue.dequeue(msg);
}
void putq(WCHAR *msg)
{
m_queue.enqueue(msg);
}
virtual void run(void*) = 0;

protected:
CTaskThread()
{
}

virtual ~CTaskThread()
{

}

protected:
CMsgQueue m_queue;
};

class CWorkerThread : public CTaskThread
{
public:
CWorkerThread(IManager *pManager)
{
m_pManager = pManager;
}

virtual void run(void*)
{
while(1)
{
WCHAR *msg;
if (this->getq(msg))
{
// break;
}

m_pManager->return_to_work(this, msg);

delete[] msg;
msg = NULL;
}
}
public:
IManager *m_pManager;
};

const UINT POOL_SIZE = 2;
class CThreadManager  : public CTaskThread, public IManager
{
public:
CThreadManager(UINT nPoolSize=POOL_SIZE):m_nPoolSize(nPoolSize)
{
m_pCond = new omni_condition(&m_lockWorkingThreadQueue);
}

virtual void run(void*)
{
createThreadPool();

while(1)
{
WCHAR *msg = NULL;
if (this->getq(msg))
{
// break;
}

//获取到了消息,然后在线程队列中选择空闲的工作者线程
{
m_lockWorkingThreadQueue.lock();
if(m_vecThread.empty())
m_pCond->wait();

(*(m_vecThread.begin()))->putq(msg);
delete[] msg;
msg = NULL;
m_vecThread.pop_front();
m_lockWorkingThreadQueue.unlock();
}
}
}

virtual void return_to_work(CTaskThread *pTaskThread, WCHAR *msg)
{
m_lockWorkingThreadQueue.lock();
std::wcout<<L"the work thread "<<this->self()<<L" get the message is ["<<msg<<L"]"<<std::endl;
CTaskThread* t = pTaskThread;
m_vecThread.push_back(t);
m_pCond->signal();
m_lockWorkingThreadQueue.unlock();
}

void createThreadPool()
{
for(UINT i=0; i<m_nPoolSize; ++i)
{
CTaskThread* wt = new CWorkerThread(this);
m_vecThread.push_back(wt);
wt->start();
}
}

public:
omni_mutex m_lockWorkingThreadQueue;
omni_condition *m_pCond;
UINT m_nPoolSize;
std::list<CTaskThread*> m_vecThread;
};

int main (int argc, char **argv)
{
CThreadManager tm;
tm.start();

WCHAR *pMsgWorker[5] = {{L"123"},{L"abc"},{L"!@#"},{L"_()"},{L"<>?"}};
for(int i=0; i<5; ++i)
{
tm.putq(pMsgWorker[i]);
}

//阻塞主线程
std::wcout<<L"please input some message at will, which makes the main thread exit"<<std::endl;
int n;
std::cin>>n;

return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息