您的位置:首页 > 编程语言 > C语言/C++

用C++简单实现的——BlockingQueue类(java)

2017-05-12 15:23 239 查看
最近,简单整理完Linux线程方面的知识点,突发奇想的想写个线程池来玩玩。经过多方的考虑,决定仿java的线程池来写。由于没有学过java,最后的结果可能差距很大,好像扯远了回到正题来。Java线程池中使用到了BlockingQueue类那就先来试着用C++来实现它吧!

最终只实现了BlockingQueue中的ArrayBlockingQueue(基于数组的阻塞队列实现)和LinkedBlockingQueue(基于链表的阻塞队列)也只实现了BlockingQueue中4个核心的方法。好像对线程池来说够用了。

1.Mutext.h

#ifndef __MUTEX_H__
#define __MUTEX_H__

#include <pthread.h>

class CMutexLock
{
public:
explicit CMutexLock()
{
pthread_mutex_init(&m_Mutex, NULL);
}

~CMutexLock()
{
pthread_mutex_destroy(&m_Mutex);
}

void lock()
{
pthread_mutex_lock(&m_Mutex);
}

void unlock()
{
pthread_mutex_unlock(&m_Mutex);
}

pthread_mutex_t *get()
{
return &m_Mutex;
}

private:
friend class CCondition;

pthread_mutex_t m_Mutex;
};

class CMutexLockPart
{
public:
explicit CMutexLockPart(CMutexLock &MutexLock)
:m_MutexLock(MutexLock)
{
m_MutexLock.lock();
}

~CMutexLockPart()
{
m_MutexLock.unlock();
}

private:
CMutexLock &m_MutexLock;
};

#endif  //#ifndef __MUTEX_H__
2.Condition.h

#ifndef __CONDITION_H__
#define __CONDITION_H__

#include "Mutex.h"

class CCondition
{
public:
explicit CCondition(CMutexLock &MutexLock)
:m_MutexLock(MutexLock)
{
pthread_cond_init(&m_Condition, NULL);
}

~CCondition()
{
pthread_cond_destroy(&m_Condition);
}

void wait()
{
pthread_cond_wait(&m_Condition, m_MutexLock.get());
}

void signal()
{
pthread_cond_signal(&m_Condition);
}

void broadcast()
{
pthread_cond_broadcast(&m_Condition);
}

private:
CMutexLock &m_MutexLock;
pthread_cond_t m_Condition;
};

#endif	//#ifndef __CONDITION_H__
3.BlockingQueue.h
#ifndef __BLOCKING_QUEUE_H__
#define __BLOCKING_QUEUE_H__

//在使用队列的时候需要注意,是否需要定义拷贝构造和赋值函数
template<typename T>
class CBlockingQueue
{
public:
CBlockingQueue() { }
virtual ~CBlockingQueue() { }

//放入数据的方法
virtual bool offer(const T &anObject) = 0; //不阻塞当前执行方法的线程
virtual void put(const T &anObject) = 0; //会阻塞当前执行方法的线程

//获取数据的方法(注:如果队列为空调用poll则返回一个初始化的T,当做未定义)
virtual T poll(void) = 0; //取走BlockingQueue首位的对象,调用改方法必须确保队列不为空
virtual T take(void) = 0; //取走BlockingQueue首位的对象,若队列为空则阻塞

//添加一个方法,用来判断队列是否为空
virtual bool empty(void) = 0;

//添加一个方法,用来查询队列中还有多少数据
virtual int size(void) = 0;

//添加一个方法,用来判断队列是否是满的
virtual bool full(void) = 0;
};

#endif //#ifndef __BLOCKING_QUEUE_H__
4.ArrayBlockingQueue.h
#ifndef __ARRAY_BLOCKING_QUEUE_H__
#define __ARRAY_BLOCKING_QUEUE_H__

#include "BlockingQueue.h"
#include "Mutex.h"
#include "Condition.h"

//目前觉得这个类存在问题:在取出元素的时候,是否需要显示的调用析构函数。
//这需要看T的定义以及深拷贝构造和赋值函数了。
template<typename T>
class CArrayBlockingQueue : public CBlockingQueue<T>
{
public:
CArrayBlockingQueue(int iSize)
:m_iSize(iSize)
,m_iCount(0)
,m_MutexLock()
,m_Condition(m_MutexLock)
{
m_pArray = new T[iSize];
m_pStart = m_pArray;
m_pEnd = m_pArray;
}

~CArrayBlockingQueue()
{
delete[] m_pArray;
}

virtual bool offer(const T &anObject)
{
//如果有可能将anObject添加到队列中,如果可以容纳,则返回
//成功,否则返回失败。本方法不阻塞当前执行方法的线程
CMutexLockPart PartLock(m_MutexLock);
if (m_iCount < m_iSize)
{
//队列还未满的情况
*m_pEnd = anObject;
m_iCount++;
m_pEnd++;
if (m_pEnd == m_pArray + m_iSize)
{
m_pEnd = m_pArray;
}
m_Condition.signal();
return true;
}

return false;
}

virtual void put(const T &anObject)
{
//添加元素到队列里,如果容量满了会阻塞到容量不满
m_MutexLock.lock();
if(m_iCount == m_iSize)
{
m_Condition.wait(); //队列满就阻塞
}

//队列还未满的情况
*m_pEnd = anObject;
m_pEnd++;
if (m_pEnd == m_pArray + m_iSize)
{
m_pEnd = m_pArray;
}
m_iCount++;
m_MutexLock.unlock();

m_Condition.signal();
}

virtual T poll(void)
{
//删除队列头部元素,如果队列为空,则是未定义的行为,否则返回元素.
CMutexLockPart PartLock(m_MutexLock);
if (m_iCount <= 0)
{
//队列为空
T tNull;
return tNull;
}

T tRet = *m_pStart;
m_pStart++;
if (m_pStart == m_pArray + m_iSize)
{
m_pStart = m_pArray;
}
m_iCount--;
return tRet;
}

virtual T take(void)
{
//删除队列头部元素,如果队列为空,则阻塞。
m_MutexLock.lock();
if (m_iCount <= 0)
{
m_Condition.wait(); //队列空就阻塞
}

T tRet = *m_pStart;
m_pStart++;
if (m_pStart == m_pArray + m_iSize)
{
m_pStart = m_pArray;
}
m_iCount--;
m_MutexLock.unlock();

return tRet;
}

virtual bool empty(void)
{
//如果m_iCount == 0那么队列就为空
if (m_iCount == 0)
return true;

return false;
}

virtual int size(void)
{
return m_iCount;
}

virtual bool full(void)
{
return m_iSize == m_iCount;
}

private:
T *m_pArray;
int m_iSize;
int m_iCount;
T *m_pStart;
T *m_pEnd;
CMutexLock m_MutexLock;
CCondition m_Condition;
};

#endif //#ifndef __ARRAY_BLOCKING_QUEUE_H__
5.LinkedBlockingQueue.h
#ifndef __LINKED_BLOCKING_QUEUE_H__
#define __LINKED_BLOCKING_QUEUE_H__

#include <list>
#include <limits.h>
#include "Mutex.h"
#include "Condition.h"
#include "BlockingQueue.h"

template<typename T>
class CLinkedBlockingQueue : public CBlockingQueue<T>
{
public:
CLinkedBlockingQueue(int iSize = INT_MAX)
:m_listQueue()
,m_iSize(iSize)
,m_MutexLock()
,m_Condition(m_MutexLock)
{

}

~CLinkedBlockingQueue()
{
m_listQueue.clear();
}

bool offer(const T &anObject)
{
//如果有可能将anObject添加到队列中,如果可以容纳,则返回
//成功,否则返回失败。本方法不阻塞当前执行方法的线程
CMutexLockPart PartLock(m_MutexLock);
if (static_cast<int>(m_listQueue.size()) >= m_iSize)
{
//超过了队列的最大值 ֵ
return false;
}

m_listQueue.push_back(anObject);
return true;
}

void put(const T &anObject)
{
//添加元素到队列里,如果容量满了会阻塞到容量不满
m_MutexLock.lock();
if (static_cast<int>(m_listQueue.size()) >= m_iSize)
{
//超过了队列的最大值,阻塞等待
m_Condition.wait();
}

m_listQueue.push_back(anObject);

m_MutexLock.unlock();

m_Condition.signal();
}

T poll(void)
{
//删除队列头部元素,如果队列为空,则该行为未定义。否则返回元素.
CMutexLockPart PartLock(m_MutexLock);
if (m_listQueue.empty())
{
//队列为空
T tNULL;
return tNULL;
}

T tRet = m_listQueue.front();
m_listQueue.pop_front();

return tRet;
}

T take(void)
{
//删除队列头部元素,如果队列为空,则阻塞。
m_MutexLock.lock();
if (m_listQueue.empty())
{
m_Condition.wait(); //队列空就阻塞
}

T tRet = m_listQueue.front();
m_listQueue.pop_front();

m_MutexLock.unlock();
return tRet;
}

bool empty(void)
{
//直接调用STL提供的方法就行了
return m_listQueue.empty();
}

int size(void)
{
return m_listQueue.size();
}

bool full(void)
{
return m_iSize == static_cast<int>(m_listQueue.size());
}

private:
//这里直接使用STL的链表,后面会有数据结构专题
//这边就直接使用现成的,嘿嘿
std::list<T> m_listQueue;
int m_iSize;
CMutexLock m_MutexLock;
CCondition m_Condition;
};

#endif //#ifndef __LINKED_BLOCKING_QUEUE_H__
6.main.cpp

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include "ArrayBlockingQueue.h"
#include "LinkedBlockingQueue.h"

using namespace std;

void *pthreadFunc1(void *pArg)
{
printf("pthreadFunc1 start runing ...\n");
CBlockingQueue<int> *pBlockingQueue = static_cast<CBlockingQueue<int> *>(pArg);

bool bRet;
for (int i = 0; i < 25; i++)
{
bRet = pBlockingQueue->offer(i);
if (bRet == false)
printf("offer(%d) false, queue fill\n", i);
else
printf("offer(%d) true\n", i);

usleep(1000);
}

printf("pthreadFunc1 end runing ...\n");

pthread_exit((void *)1);
}

void *pthreadFunc2(void *pArg)
{
printf("pthreadFunc2 start runing ...\n");
CBlockingQueue<int> *pBlockingQueue = static_cast<CBlockingQueue<int> *>(pArg);

int iValue;
bool bRet;
for (int i = 0; i < 50; i++)
{
iValue = pBlockingQueue->poll(bRet);
if (bRet == false)
printf("poll() false, queue empty\n");
else
printf("poll() value = %d\n", iValue);

usleep(500);
}

printf("pthreadFunc2 end runing ...\n");

pthread_exit((void *)2);
}

void *pthreadFunc3(void *pArg)
{
printf("pthreadFunc3 start runing ...\n");
CBlockingQueue<int> *pBlockingQueue = static_cast<CBlockingQueue<int> *>(pArg);

for (int i = 0; i < 50; i++)
{
pBlockingQueue->put(i);
printf("put(%d) true\n", i);
usleep(100);
}

printf("pthreadFunc3 end runing ...\n");

pthread_exit((void *)1);
}

void *pthreadFunc4(void *pArg)
{
printf("pthreadFunc4 start runing ...\n");
CBlockingQueue<int> *pBlockingQueue = static_cast<CBlockingQueue<int> *>(pArg);

int iValue;
for (int i = 0; i < 50; i++)
{
iValue = pBlockingQueue->take();
printf("take() value = %d\n", iValue);

usleep(70);
}

printf("pthreadFunc4 end runing ...\n");

pthread_exit((void *)2);
}

void testOfferAndPoll(void)
{
//CBlockingQueue<int> *pBlockingQueue = new CArrayBlockingQueue<int>(10);
CBlockingQueue<int> *pBlockingQueue = new CLinkedBlockingQueue<int>();

pthread_t pthreadId1;
pthread_t pthreadId2;
pthread_create(&pthreadId1, NULL, &pthreadFunc1, pBlockingQueue);
pthread_create(&pthreadId2, NULL, &pthreadFunc2, pBlockingQueue);

pthread_join(pthreadId1, NULL);
pthread_join(pthreadId2, NULL);

delete pBlockingQueue;
}

void testPutAndTake(void)
{
//CBlockingQueue<int> *pBlockingQueue = new CArrayBlockingQueue<int>(10);
CBlockingQueue<int> *pBlockingQueue = new CLinkedBlockingQueue<int>();

pthread_t pthreadId1;
pthread_t pthreadId2;
pthread_create(&pthreadId1, NULL, &pthreadFunc3, pBlockingQueue);
pthread_create(&pthreadId2, NULL, &pthreadFunc4, pBlockingQueue);

pthread_join(pthreadId1, NULL);
pthread_join(pthreadId2, NULL);

delete pBlockingQueue;
}

int main(void)
{
testOfferAndPoll();

testPutAndTake();

return 0;
}
测试的结果好像还行。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: