用C++简单实现的——BlockingQueue类(java)
2017-05-12 15:23
239 查看
最近,简单整理完Linux线程方面的知识点,突发奇想的想写个线程池来玩玩。经过多方的考虑,决定仿java的线程池来写。由于没有学过java,最后的结果可能差距很大,好像扯远了回到正题来。Java线程池中使用到了BlockingQueue类那就先来试着用C++来实现它吧!
最终只实现了BlockingQueue中的ArrayBlockingQueue(基于数组的阻塞队列实现)和LinkedBlockingQueue(基于链表的阻塞队列)也只实现了BlockingQueue中4个核心的方法。好像对线程池来说够用了。
1.Mutext.h
#ifndef __BLOCKING_QUEUE_H__
#define __BLOCKING_QUEUE_H__
//在使用队列的时候需要注意,是否需要定义拷贝构造和赋值函数
template<typename T>
class CBlockingQueue
{
public:
CBlockingQueue() { }
virtual ~CBlockingQueue() { }
//放入数据的方法
virtual bool offer(const T &anObject) = 0; //不阻塞当前执行方法的线程
virtual void put(const T &anObject) = 0; //会阻塞当前执行方法的线程
//获取数据的方法(注:如果队列为空调用poll则返回一个初始化的T,当做未定义)
virtual T poll(void) = 0; //取走BlockingQueue首位的对象,调用改方法必须确保队列不为空
virtual T take(void) = 0; //取走BlockingQueue首位的对象,若队列为空则阻塞
//添加一个方法,用来判断队列是否为空
virtual bool empty(void) = 0;
//添加一个方法,用来查询队列中还有多少数据
virtual int size(void) = 0;
//添加一个方法,用来判断队列是否是满的
virtual bool full(void) = 0;
};
#endif //#ifndef __BLOCKING_QUEUE_H__
4.ArrayBlockingQueue.h
#ifndef __ARRAY_BLOCKING_QUEUE_H__
#define __ARRAY_BLOCKING_QUEUE_H__
#include "BlockingQueue.h"
#include "Mutex.h"
#include "Condition.h"
//目前觉得这个类存在问题:在取出元素的时候,是否需要显示的调用析构函数。
//这需要看T的定义以及深拷贝构造和赋值函数了。
template<typename T>
class CArrayBlockingQueue : public CBlockingQueue<T>
{
public:
CArrayBlockingQueue(int iSize)
:m_iSize(iSize)
,m_iCount(0)
,m_MutexLock()
,m_Condition(m_MutexLock)
{
m_pArray = new T[iSize];
m_pStart = m_pArray;
m_pEnd = m_pArray;
}
~CArrayBlockingQueue()
{
delete[] m_pArray;
}
virtual bool offer(const T &anObject)
{
//如果有可能将anObject添加到队列中,如果可以容纳,则返回
//成功,否则返回失败。本方法不阻塞当前执行方法的线程
CMutexLockPart PartLock(m_MutexLock);
if (m_iCount < m_iSize)
{
//队列还未满的情况
*m_pEnd = anObject;
m_iCount++;
m_pEnd++;
if (m_pEnd == m_pArray + m_iSize)
{
m_pEnd = m_pArray;
}
m_Condition.signal();
return true;
}
return false;
}
virtual void put(const T &anObject)
{
//添加元素到队列里,如果容量满了会阻塞到容量不满
m_MutexLock.lock();
if(m_iCount == m_iSize)
{
m_Condition.wait(); //队列满就阻塞
}
//队列还未满的情况
*m_pEnd = anObject;
m_pEnd++;
if (m_pEnd == m_pArray + m_iSize)
{
m_pEnd = m_pArray;
}
m_iCount++;
m_MutexLock.unlock();
m_Condition.signal();
}
virtual T poll(void)
{
//删除队列头部元素,如果队列为空,则是未定义的行为,否则返回元素.
CMutexLockPart PartLock(m_MutexLock);
if (m_iCount <= 0)
{
//队列为空
T tNull;
return tNull;
}
T tRet = *m_pStart;
m_pStart++;
if (m_pStart == m_pArray + m_iSize)
{
m_pStart = m_pArray;
}
m_iCount--;
return tRet;
}
virtual T take(void)
{
//删除队列头部元素,如果队列为空,则阻塞。
m_MutexLock.lock();
if (m_iCount <= 0)
{
m_Condition.wait(); //队列空就阻塞
}
T tRet = *m_pStart;
m_pStart++;
if (m_pStart == m_pArray + m_iSize)
{
m_pStart = m_pArray;
}
m_iCount--;
m_MutexLock.unlock();
return tRet;
}
virtual bool empty(void)
{
//如果m_iCount == 0那么队列就为空
if (m_iCount == 0)
return true;
return false;
}
virtual int size(void)
{
return m_iCount;
}
virtual bool full(void)
{
return m_iSize == m_iCount;
}
private:
T *m_pArray;
int m_iSize;
int m_iCount;
T *m_pStart;
T *m_pEnd;
CMutexLock m_MutexLock;
CCondition m_Condition;
};
#endif //#ifndef __ARRAY_BLOCKING_QUEUE_H__
5.LinkedBlockingQueue.h
#ifndef __LINKED_BLOCKING_QUEUE_H__
#define __LINKED_BLOCKING_QUEUE_H__
#include <list>
#include <limits.h>
#include "Mutex.h"
#include "Condition.h"
#include "BlockingQueue.h"
template<typename T>
class CLinkedBlockingQueue : public CBlockingQueue<T>
{
public:
CLinkedBlockingQueue(int iSize = INT_MAX)
:m_listQueue()
,m_iSize(iSize)
,m_MutexLock()
,m_Condition(m_MutexLock)
{
}
~CLinkedBlockingQueue()
{
m_listQueue.clear();
}
bool offer(const T &anObject)
{
//如果有可能将anObject添加到队列中,如果可以容纳,则返回
//成功,否则返回失败。本方法不阻塞当前执行方法的线程
CMutexLockPart PartLock(m_MutexLock);
if (static_cast<int>(m_listQueue.size()) >= m_iSize)
{
//超过了队列的最大值 ֵ
return false;
}
m_listQueue.push_back(anObject);
return true;
}
void put(const T &anObject)
{
//添加元素到队列里,如果容量满了会阻塞到容量不满
m_MutexLock.lock();
if (static_cast<int>(m_listQueue.size()) >= m_iSize)
{
//超过了队列的最大值,阻塞等待
m_Condition.wait();
}
m_listQueue.push_back(anObject);
m_MutexLock.unlock();
m_Condition.signal();
}
T poll(void)
{
//删除队列头部元素,如果队列为空,则该行为未定义。否则返回元素.
CMutexLockPart PartLock(m_MutexLock);
if (m_listQueue.empty())
{
//队列为空
T tNULL;
return tNULL;
}
T tRet = m_listQueue.front();
m_listQueue.pop_front();
return tRet;
}
T take(void)
{
//删除队列头部元素,如果队列为空,则阻塞。
m_MutexLock.lock();
if (m_listQueue.empty())
{
m_Condition.wait(); //队列空就阻塞
}
T tRet = m_listQueue.front();
m_listQueue.pop_front();
m_MutexLock.unlock();
return tRet;
}
bool empty(void)
{
//直接调用STL提供的方法就行了
return m_listQueue.empty();
}
int size(void)
{
return m_listQueue.size();
}
bool full(void)
{
return m_iSize == static_cast<int>(m_listQueue.size());
}
private:
//这里直接使用STL的链表,后面会有数据结构专题
//这边就直接使用现成的,嘿嘿
std::list<T> m_listQueue;
int m_iSize;
CMutexLock m_MutexLock;
CCondition m_Condition;
};
#endif //#ifndef __LINKED_BLOCKING_QUEUE_H__
6.main.cpp
最终只实现了BlockingQueue中的ArrayBlockingQueue(基于数组的阻塞队列实现)和LinkedBlockingQueue(基于链表的阻塞队列)也只实现了BlockingQueue中4个核心的方法。好像对线程池来说够用了。
1.Mutext.h
#ifndef __MUTEX_H__ #define __MUTEX_H__ #include <pthread.h> class CMutexLock { public: explicit CMutexLock() { pthread_mutex_init(&m_Mutex, NULL); } ~CMutexLock() { pthread_mutex_destroy(&m_Mutex); } void lock() { pthread_mutex_lock(&m_Mutex); } void unlock() { pthread_mutex_unlock(&m_Mutex); } pthread_mutex_t *get() { return &m_Mutex; } private: friend class CCondition; pthread_mutex_t m_Mutex; }; class CMutexLockPart { public: explicit CMutexLockPart(CMutexLock &MutexLock) :m_MutexLock(MutexLock) { m_MutexLock.lock(); } ~CMutexLockPart() { m_MutexLock.unlock(); } private: CMutexLock &m_MutexLock; }; #endif //#ifndef __MUTEX_H__2.Condition.h
#ifndef __CONDITION_H__ #define __CONDITION_H__ #include "Mutex.h" class CCondition { public: explicit CCondition(CMutexLock &MutexLock) :m_MutexLock(MutexLock) { pthread_cond_init(&m_Condition, NULL); } ~CCondition() { pthread_cond_destroy(&m_Condition); } void wait() { pthread_cond_wait(&m_Condition, m_MutexLock.get()); } void signal() { pthread_cond_signal(&m_Condition); } void broadcast() { pthread_cond_broadcast(&m_Condition); } private: CMutexLock &m_MutexLock; pthread_cond_t m_Condition; }; #endif //#ifndef __CONDITION_H__3.BlockingQueue.h
#ifndef __BLOCKING_QUEUE_H__
#define __BLOCKING_QUEUE_H__
//在使用队列的时候需要注意,是否需要定义拷贝构造和赋值函数
template<typename T>
class CBlockingQueue
{
public:
CBlockingQueue() { }
virtual ~CBlockingQueue() { }
//放入数据的方法
virtual bool offer(const T &anObject) = 0; //不阻塞当前执行方法的线程
virtual void put(const T &anObject) = 0; //会阻塞当前执行方法的线程
//获取数据的方法(注:如果队列为空调用poll则返回一个初始化的T,当做未定义)
virtual T poll(void) = 0; //取走BlockingQueue首位的对象,调用改方法必须确保队列不为空
virtual T take(void) = 0; //取走BlockingQueue首位的对象,若队列为空则阻塞
//添加一个方法,用来判断队列是否为空
virtual bool empty(void) = 0;
//添加一个方法,用来查询队列中还有多少数据
virtual int size(void) = 0;
//添加一个方法,用来判断队列是否是满的
virtual bool full(void) = 0;
};
#endif //#ifndef __BLOCKING_QUEUE_H__
4.ArrayBlockingQueue.h
#ifndef __ARRAY_BLOCKING_QUEUE_H__
#define __ARRAY_BLOCKING_QUEUE_H__
#include "BlockingQueue.h"
#include "Mutex.h"
#include "Condition.h"
//目前觉得这个类存在问题:在取出元素的时候,是否需要显示的调用析构函数。
//这需要看T的定义以及深拷贝构造和赋值函数了。
template<typename T>
class CArrayBlockingQueue : public CBlockingQueue<T>
{
public:
CArrayBlockingQueue(int iSize)
:m_iSize(iSize)
,m_iCount(0)
,m_MutexLock()
,m_Condition(m_MutexLock)
{
m_pArray = new T[iSize];
m_pStart = m_pArray;
m_pEnd = m_pArray;
}
~CArrayBlockingQueue()
{
delete[] m_pArray;
}
virtual bool offer(const T &anObject)
{
//如果有可能将anObject添加到队列中,如果可以容纳,则返回
//成功,否则返回失败。本方法不阻塞当前执行方法的线程
CMutexLockPart PartLock(m_MutexLock);
if (m_iCount < m_iSize)
{
//队列还未满的情况
*m_pEnd = anObject;
m_iCount++;
m_pEnd++;
if (m_pEnd == m_pArray + m_iSize)
{
m_pEnd = m_pArray;
}
m_Condition.signal();
return true;
}
return false;
}
virtual void put(const T &anObject)
{
//添加元素到队列里,如果容量满了会阻塞到容量不满
m_MutexLock.lock();
if(m_iCount == m_iSize)
{
m_Condition.wait(); //队列满就阻塞
}
//队列还未满的情况
*m_pEnd = anObject;
m_pEnd++;
if (m_pEnd == m_pArray + m_iSize)
{
m_pEnd = m_pArray;
}
m_iCount++;
m_MutexLock.unlock();
m_Condition.signal();
}
virtual T poll(void)
{
//删除队列头部元素,如果队列为空,则是未定义的行为,否则返回元素.
CMutexLockPart PartLock(m_MutexLock);
if (m_iCount <= 0)
{
//队列为空
T tNull;
return tNull;
}
T tRet = *m_pStart;
m_pStart++;
if (m_pStart == m_pArray + m_iSize)
{
m_pStart = m_pArray;
}
m_iCount--;
return tRet;
}
virtual T take(void)
{
//删除队列头部元素,如果队列为空,则阻塞。
m_MutexLock.lock();
if (m_iCount <= 0)
{
m_Condition.wait(); //队列空就阻塞
}
T tRet = *m_pStart;
m_pStart++;
if (m_pStart == m_pArray + m_iSize)
{
m_pStart = m_pArray;
}
m_iCount--;
m_MutexLock.unlock();
return tRet;
}
virtual bool empty(void)
{
//如果m_iCount == 0那么队列就为空
if (m_iCount == 0)
return true;
return false;
}
virtual int size(void)
{
return m_iCount;
}
virtual bool full(void)
{
return m_iSize == m_iCount;
}
private:
T *m_pArray;
int m_iSize;
int m_iCount;
T *m_pStart;
T *m_pEnd;
CMutexLock m_MutexLock;
CCondition m_Condition;
};
#endif //#ifndef __ARRAY_BLOCKING_QUEUE_H__
5.LinkedBlockingQueue.h
#ifndef __LINKED_BLOCKING_QUEUE_H__
#define __LINKED_BLOCKING_QUEUE_H__
#include <list>
#include <limits.h>
#include "Mutex.h"
#include "Condition.h"
#include "BlockingQueue.h"
template<typename T>
class CLinkedBlockingQueue : public CBlockingQueue<T>
{
public:
CLinkedBlockingQueue(int iSize = INT_MAX)
:m_listQueue()
,m_iSize(iSize)
,m_MutexLock()
,m_Condition(m_MutexLock)
{
}
~CLinkedBlockingQueue()
{
m_listQueue.clear();
}
bool offer(const T &anObject)
{
//如果有可能将anObject添加到队列中,如果可以容纳,则返回
//成功,否则返回失败。本方法不阻塞当前执行方法的线程
CMutexLockPart PartLock(m_MutexLock);
if (static_cast<int>(m_listQueue.size()) >= m_iSize)
{
//超过了队列的最大值 ֵ
return false;
}
m_listQueue.push_back(anObject);
return true;
}
void put(const T &anObject)
{
//添加元素到队列里,如果容量满了会阻塞到容量不满
m_MutexLock.lock();
if (static_cast<int>(m_listQueue.size()) >= m_iSize)
{
//超过了队列的最大值,阻塞等待
m_Condition.wait();
}
m_listQueue.push_back(anObject);
m_MutexLock.unlock();
m_Condition.signal();
}
T poll(void)
{
//删除队列头部元素,如果队列为空,则该行为未定义。否则返回元素.
CMutexLockPart PartLock(m_MutexLock);
if (m_listQueue.empty())
{
//队列为空
T tNULL;
return tNULL;
}
T tRet = m_listQueue.front();
m_listQueue.pop_front();
return tRet;
}
T take(void)
{
//删除队列头部元素,如果队列为空,则阻塞。
m_MutexLock.lock();
if (m_listQueue.empty())
{
m_Condition.wait(); //队列空就阻塞
}
T tRet = m_listQueue.front();
m_listQueue.pop_front();
m_MutexLock.unlock();
return tRet;
}
bool empty(void)
{
//直接调用STL提供的方法就行了
return m_listQueue.empty();
}
int size(void)
{
return m_listQueue.size();
}
bool full(void)
{
return m_iSize == static_cast<int>(m_listQueue.size());
}
private:
//这里直接使用STL的链表,后面会有数据结构专题
//这边就直接使用现成的,嘿嘿
std::list<T> m_listQueue;
int m_iSize;
CMutexLock m_MutexLock;
CCondition m_Condition;
};
#endif //#ifndef __LINKED_BLOCKING_QUEUE_H__
6.main.cpp
#include <stdio.h> #include <unistd.h> #include <pthread.h> #include "ArrayBlockingQueue.h" #include "LinkedBlockingQueue.h" using namespace std; void *pthreadFunc1(void *pArg) { printf("pthreadFunc1 start runing ...\n"); CBlockingQueue<int> *pBlockingQueue = static_cast<CBlockingQueue<int> *>(pArg); bool bRet; for (int i = 0; i < 25; i++) { bRet = pBlockingQueue->offer(i); if (bRet == false) printf("offer(%d) false, queue fill\n", i); else printf("offer(%d) true\n", i); usleep(1000); } printf("pthreadFunc1 end runing ...\n"); pthread_exit((void *)1); } void *pthreadFunc2(void *pArg) { printf("pthreadFunc2 start runing ...\n"); CBlockingQueue<int> *pBlockingQueue = static_cast<CBlockingQueue<int> *>(pArg); int iValue; bool bRet; for (int i = 0; i < 50; i++) { iValue = pBlockingQueue->poll(bRet); if (bRet == false) printf("poll() false, queue empty\n"); else printf("poll() value = %d\n", iValue); usleep(500); } printf("pthreadFunc2 end runing ...\n"); pthread_exit((void *)2); } void *pthreadFunc3(void *pArg) { printf("pthreadFunc3 start runing ...\n"); CBlockingQueue<int> *pBlockingQueue = static_cast<CBlockingQueue<int> *>(pArg); for (int i = 0; i < 50; i++) { pBlockingQueue->put(i); printf("put(%d) true\n", i); usleep(100); } printf("pthreadFunc3 end runing ...\n"); pthread_exit((void *)1); } void *pthreadFunc4(void *pArg) { printf("pthreadFunc4 start runing ...\n"); CBlockingQueue<int> *pBlockingQueue = static_cast<CBlockingQueue<int> *>(pArg); int iValue; for (int i = 0; i < 50; i++) { iValue = pBlockingQueue->take(); printf("take() value = %d\n", iValue); usleep(70); } printf("pthreadFunc4 end runing ...\n"); pthread_exit((void *)2); } void testOfferAndPoll(void) { //CBlockingQueue<int> *pBlockingQueue = new CArrayBlockingQueue<int>(10); CBlockingQueue<int> *pBlockingQueue = new CLinkedBlockingQueue<int>(); pthread_t pthreadId1; pthread_t pthreadId2; pthread_create(&pthreadId1, NULL, &pthreadFunc1, pBlockingQueue); pthread_create(&pthreadId2, NULL, &pthreadFunc2, pBlockingQueue); pthread_join(pthreadId1, NULL); pthread_join(pthreadId2, NULL); delete pBlockingQueue; } void testPutAndTake(void) { //CBlockingQueue<int> *pBlockingQueue = new CArrayBlockingQueue<int>(10); CBlockingQueue<int> *pBlockingQueue = new CLinkedBlockingQueue<int>(); pthread_t pthreadId1; pthread_t pthreadId2; pthread_create(&pthreadId1, NULL, &pthreadFunc3, pBlockingQueue); pthread_create(&pthreadId2, NULL, &pthreadFunc4, pBlockingQueue); pthread_join(pthreadId1, NULL); pthread_join(pthreadId2, NULL); delete pBlockingQueue; } int main(void) { testOfferAndPoll(); testPutAndTake(); return 0; }测试的结果好像还行。
相关文章推荐
- c++和java类的简单实现
- java调用c/c++代码简单实现以及遇见的坑
- 简单的C++加载jvm实现--简单的日志输出--JAVA端程序
- JNI简单实现Java调用C++/C的HelloWorld
- JNI简单实现Java调用C++/C的HelloWorld
- Java与C++实现相同的MD5加密算法简单实例
- JNI实现最简单的JAVA调用C/C++代码
- JNI简单实现Java调用C++/C的HelloWorld
- I/O流的设计(Java的InputStream/OuputStream和Reader/Writer的C++实现)
- [Java] 多线程的简单实现
- 用Java简单实现文件分割与合并
- 用C++实现简单的文件I/O操作
- 初学者看过来:简单谈谈 C/C++ 递归的思想,实现,以及和循环的关系。
- 用Java实现的字符串简单加密
- 标准纯C++实现简单的词法分析器(一)
- 使用异或进行简单的密码加密(JAVA实现)
- IP层的封装(Java的InetAddress类的C++实现)
- 用java实现简单的多线程下载
- 时间和日历类的设计(Java的Date和Calendar的C++实现)
- 一个简单的用JAVA实现的屏幕抓图(源代码)