QT 线程池 + TCP 小试(一)线程池的简单实现
2012-08-11 13:13
561 查看
*免分资源链接点击打开链接http://download.csdn.net/detail/goldenhawking/4492378
很久以前做过ACE + MFC/QT 的中轻量级线程池应用,大概就是利用线程池执行客户机上的运算需求,将结果返回。ACE是跨平台重量级的通信中间件,与常见的应用程序框架需要精心契合,才能不出问题。最近想到既然QT框架本身就已经具有各类功能,何不玩一玩呢
,那就开搞!这个实验的代码可以从我的资源内下载。
第一步打算实现的模式,我们需要一个设置为CPU核心数的线程池,这个线程池可以异步接受N个数据生产者传入的数据,均衡的分配处理任务,处理后的数据返回给某1个或者几个消费者。有两种均衡方法。一种是生产者粒度的均衡。同一个生产者的各批数据FIFO顺序不被打破,这需要判断,当处理线程队列中还有该生产者的数据时,不改变当前处理线程。第二种是数据粒度的并行,某个生产者传来的数据被分配到不同的线程,不保证后到的数据后被处理(也可能先到的处理的慢,后到的快)。
这种异步队列机制如果在MFC、WinAPI中,需要手工使用 Mutex 同步队列,更可恶的是分配的数据对象的生存期非常微妙,一不小心就会出红叉叉。QT首先为我们提供了信号和槽的机制,且该机制原生支持跨线程。假设我们在16核心服务器上,则使用 15个 QThread对象管理15组工作线程(留一个给主界面)。但是,如果仔细看了QT的文档,就会发现QThread的信号事件循环默认是在创建者中(很多时候就是主线程!),所以,要想让槽在子线程运行,一般是派生一个QObject的类,并把对象MoveToThread到某个QThread管理的线程上去。这样,信号和槽就是全异步FIFO了。其次,QT提供了引用计数的QByteArray封装,这个东西在参数传递的时候,速度很快,很少出现memcpy,生存期也特别容易控制。虽然C++11里有
shared_ptr<T>,但是那个东西还是需要在一开始new 一个int8型的存储区,很讨厌。
说了这么多,上关键代码。
先是线程池的封装qghthreadengine.h
实现qghthreadengine.cpp:
相应实现qghthreadobject.cpp
最后,是供用户重载的实际处理方法的纯虚基类qghthreadtaskitem.h
下次,继续写如何实现一个TCP链路,让这个线程池活起来
很久以前做过ACE + MFC/QT 的中轻量级线程池应用,大概就是利用线程池执行客户机上的运算需求,将结果返回。ACE是跨平台重量级的通信中间件,与常见的应用程序框架需要精心契合,才能不出问题。最近想到既然QT框架本身就已经具有各类功能,何不玩一玩呢
,那就开搞!这个实验的代码可以从我的资源内下载。
第一步打算实现的模式,我们需要一个设置为CPU核心数的线程池,这个线程池可以异步接受N个数据生产者传入的数据,均衡的分配处理任务,处理后的数据返回给某1个或者几个消费者。有两种均衡方法。一种是生产者粒度的均衡。同一个生产者的各批数据FIFO顺序不被打破,这需要判断,当处理线程队列中还有该生产者的数据时,不改变当前处理线程。第二种是数据粒度的并行,某个生产者传来的数据被分配到不同的线程,不保证后到的数据后被处理(也可能先到的处理的慢,后到的快)。
这种异步队列机制如果在MFC、WinAPI中,需要手工使用 Mutex 同步队列,更可恶的是分配的数据对象的生存期非常微妙,一不小心就会出红叉叉。QT首先为我们提供了信号和槽的机制,且该机制原生支持跨线程。假设我们在16核心服务器上,则使用 15个 QThread对象管理15组工作线程(留一个给主界面)。但是,如果仔细看了QT的文档,就会发现QThread的信号事件循环默认是在创建者中(很多时候就是主线程!),所以,要想让槽在子线程运行,一般是派生一个QObject的类,并把对象MoveToThread到某个QThread管理的线程上去。这样,信号和槽就是全异步FIFO了。其次,QT提供了引用计数的QByteArray封装,这个东西在参数传递的时候,速度很快,很少出现memcpy,生存期也特别容易控制。虽然C++11里有
shared_ptr<T>,但是那个东西还是需要在一开始new 一个int8型的存储区,很讨厌。
说了这么多,上关键代码。
先是线程池的封装qghthreadengine.h
#ifndef QGHTHREADENGINE_H #define QGHTHREADENGINE_H #include <QObject> #include <QThread> #include <QVector> #include <QList> #include <QMap> #include <QMutex> #include "qghthreadtaskitem.h" #include "qghthreadobject.h" //线程池引擎,帮助用户进行动态平衡 class QGHThreadEngine : public QObject { Q_OBJECT public: QGHThreadEngine(QObject *parent,QGHThreadTaskItem * pTaskItem,int nThreads = 2,bool bFIFOKeep = true); ~QGHThreadEngine(); protected: QVector<QThread *> m_ThreadPool; QVector<QGHThreadObject *> m_ThreadObjs; QGHThreadTaskItem * m_pThreadTaskItem; int m_nThreads; bool m_bFIFOKeep; private: //各个m_ThreadPool\m_ThreadObjs的任务数 QMap<QObject *,qint32> m_map_Tasks; //m_bFIFOKeep == true 时,下面两个成员将保证非空闲的单个 data_source 将始终在单一线程处理 //各个data_source 目前的处理线程 QMap<QObject *,QObject *> m_map_busy_source_task; //各个data_source 目前的排队数目 QMap<QObject *,int> m_map_busy_source_counter; public: void SetThreadTaskItem(QGHThreadTaskItem * pTaskItem); QList<qint32> CurrentLoad() { return m_map_Tasks.values(); } public slots: void append_new(QObject * data_source, const QByteArray & data); //捕获QGHThreadObject::sig_process_finished, 以便管理data_source的 FIFO 顺序 void on_sig_process_finished(QObject * data_source); signals: //************************************ // Method: do_task // FullName: QGHThreadEngine::do_task // Access: public // Returns: void // Qualifier: // Parameter: QObject * 任务来源 (相同任务源的任务,在队列非空时会被安排到同一个线程处理,以确保对相同源的FIFO) // Parameter: QByteArray 任务体 // Parameter: QObject * 处理任务的线程对象(QGHThreadObject) //************************************ void do_task(QObject *, const QByteArray &,QObject *); }; #endif // QGHTHREADENGINE_H
实现qghthreadengine.cpp:
#include "qghthreadengine.h" #include <assert.h> QGHThreadEngine::QGHThreadEngine(QObject *parent,QGHThreadTaskItem * pTaskItem,int nThreads,bool bFIFOKeep) : QObject(parent), m_nThreads(nThreads), m_pThreadTaskItem(pTaskItem), m_bFIFOKeep(bFIFOKeep) { assert(nThreads>0 && nThreads<512 && pTaskItem!=NULL); //创建固定数目的线程 for (int i=0;i<nThreads;i++) { QThread * pNewThread = new QThread(this); QGHThreadObject * pNewObject = new QGHThreadObject(0,pTaskItem); //记录下来 m_ThreadPool.push_back(pNewThread); m_ThreadObjs.push_back(pNewObject); m_map_Tasks[pNewObject] = 0; pNewThread->start(); //把QGHThreadObject的信号、曹处理搬移到子线程内 pNewObject->moveToThread(pNewThread); //连接处理完成消息 connect(pNewObject,SIGNAL(sig_process_finished(QObject *)),this,SLOT(on_sig_process_finished(QObject *))); //连接处理新任务消息 connect(this,SIGNAL(do_task(QObject *, const QByteArray &,QObject *)),pNewObject,SLOT(process(QObject *, const QByteArray &,QObject *))); } } QGHThreadEngine::~QGHThreadEngine() { foreach(QGHThreadObject * obj,m_ThreadObjs) { disconnect(obj,SIGNAL(sig_process_finished(QObject *)),this,SLOT(on_sig_process_finished(QObject *))); obj->deleteLater(); } foreach(QThread * th ,m_ThreadPool) { disconnect(this,SIGNAL(do_task(QObject *, QByteArray,QObject *)),th,SLOT(process(QObject *, QByteArray,QObject *))); th->exit(0); th->wait(); } } //负载均衡添加任务,生产者的信号要挂接到这个槽上 void QGHThreadEngine::append_new(QObject * data_source, const QByteArray & data) { QObject * pMinObj = 0; //对一批来自同一数据源的数据,使用同样的数据源处理,以免发生多线程扰乱FIFO对单个data_source的完整性 if (m_map_busy_source_counter.find(data_source)!=m_map_busy_source_counter.end()&& m_bFIFOKeep==true) { m_map_busy_source_counter[data_source]++; pMinObj = m_map_busy_source_task[data_source]; } else { qint32 nMinCost = 0x7fffffff; //寻找现在最空闲的一个线程 for (QMap<QObject *,qint32>::iterator p = m_map_Tasks.begin();p!=m_map_Tasks.end();p++) { if (p.value()< nMinCost) { nMinCost = p.value(); pMinObj = p.key(); } } if (pMinObj) { m_map_busy_source_counter[data_source] = 1; m_map_busy_source_task[data_source] = pMinObj; } } if (pMinObj) { m_map_Tasks[pMinObj]++; emit do_task(data_source,data,pMinObj); } } void QGHThreadEngine::on_sig_process_finished(QObject * data_source) { if (m_map_Tasks.find(sender())!=m_map_Tasks.end()) { m_map_Tasks[sender()]--; } if (m_map_busy_source_counter.find(data_source)!=m_map_busy_source_counter.end()) { m_map_busy_source_counter[data_source]--; if (m_map_busy_source_counter[data_source]<=0) { m_map_busy_source_counter.remove(data_source); m_map_busy_source_task.remove(data_source); } } }用于绑定的 qghthreadobject.h
#ifndef QGHTHREADOBJECT_H #define QGHTHREADOBJECT_H #include <QObject> #include "qghthreadtaskitem.h" //用于在子线程内具体承担事件循环的类,用户无需重载 class QGHThreadObject:public QObject { Q_OBJECT public: QGHThreadObject(QObject *parent,QGHThreadTaskItem * pThreadTaskItem); ~QGHThreadObject(); public: void SetThreadTaskItem(QGHThreadTaskItem * pThreadTaskItem); public slots: //************************************ // Method: process // FullName: QGHThreadObject::process // Access: public // Returns: void // Qualifier: // Parameter: QObject * 任务来源 (相同任务源的任务,在队列非空时会被安排到同一个线程处理,以确保对相同源的FIFO) // Parameter: QByteArray 任务体 // Parameter: QObject * 处理任务的线程对象(QGHThreadObject) //************************************ void process(QObject * data_source, const QByteArray &data,QObject * target); private: QGHThreadTaskItem * m_pThreadTaskItem; signals: //信号,表示一次处理已经完成。QGHThreadEngine捕获该信号,管理data_source的 FIFO 顺序 void sig_process_finished(QObject * data_source); }; #endif
相应实现qghthreadobject.cpp
#include "qghthreadobject.h" #include <assert.h> QGHThreadObject::QGHThreadObject(QObject *parent,QGHThreadTaskItem * pThreadTaskItem) : QObject(parent), m_pThreadTaskItem(pThreadTaskItem) { assert(pThreadTaskItem!=NULL); } QGHThreadObject::~QGHThreadObject() { } void QGHThreadObject::process(QObject * data_source, const QByteArray &data,QObject * target) { if (target==this) { m_pThreadTaskItem->run(data_source,data); emit sig_process_finished(data_source); } } void QGHThreadObject::SetThreadTaskItem(QGHThreadTaskItem * pThreadTaskItem) { assert(pThreadTaskItem!=NULL); m_pThreadTaskItem = pThreadTaskItem; }
最后,是供用户重载的实际处理方法的纯虚基类qghthreadtaskitem.h
#ifndef QGHTHREADTASKITEM_H #define QGHTHREADTASKITEM_H #include <QObject> //用户重载该类,实现自定义方法的线程池调用 class QGHThreadTaskItem:public QObject { Q_OBJECT public: QGHThreadTaskItem(QObject *parent); ~QGHThreadTaskItem(); public: virtual void run(QObject * task_source, const QByteArray & data_array) = 0; }; #endif
下次,继续写如何实现一个TCP链路,让这个线程池活起来
相关文章推荐
- QT 线程池 + TCP 小试(一)线程池的简单实现
- QT 线程池 + TCP 小试(一)线程池的简单实现
- QT 线程池 + TCP 小试(一)线程池的简单实现
- QT 线程池 + TCP 小试(一)线程池的简单实现
- QT 线程池 + TCP 小试(二)实现通信功能
- QT 线程池 + TCP 小试(三)实现最终功能
- QT 线程池 + TCP 小试(二)实现通信功能
- QT 线程池 + TCP 小试(二)实现通信功能
- QT 线程池 + TCP 小试(三)实现最终功能
- QT 线程池 + TCP 小试(三)实现最终功能
- QT 线程池 + TCP 小试(三)实现最终功能
- QT 线程池 + TCP 小试(二)实现通信功能
- QT 线程池 + TCP 小试(三)实现最终功能
- 【原创】TCP Socket 简单练习 --- 线程池实现并发服务器 分类: Linux --- 应用程序设计 2014-12-25 13:59 50人阅读 评论(0) 收藏
- 【原创】TCP Socket 简单练习 --- 线程池实现并发服务器
- Qt实现简单的TCP网络电子词典(带界面,能并发)
- QT之TCP通信简单实现
- Qt与Java实现Tcp网络通信,收发简单字符串。
- Socket TCP 简单聊天的实现
- Linux C++ 一个线程池的简单实现(附代码)