Whether Thread Pool is Needed for You?
2016-04-25 22:53
579 查看
你到底需不需要内存池?
先问自己两个问题:是否有很多请求需要重复性的进行处理?
而且每个请求是相互独立的?
你是否需要等待IO操作,或是文件操作?
如果你回答YES,那么你需要一个线程池来帮助你。
我们为什么需要内存池?
通常情况下,IO操作都会需要很长的一段时间才能完成。所以,在一个单线程的应用程序中,在IO操作期间,系统资源都会进行等待。如果使用多线程,效率就会大大的提高。所以我们需要线程池更高效的完成多线程操作。内存池会有什么样的作用?
一个应用程序其实应该避免频繁的创建和销毁线程。内存池就是可以帮助管理线程,避免频繁的创建和销毁,更加高效。
线程池的设计
线程池会创建一些特定的线程,并且等待被请求。一旦有请求到达了线程池,一个线程就会被激活并且开始执行。当这个请求完成的时候,这个线程会退回到等待请求的状态。当用户请求destroy的时候,线程池中所有的线程都会退出。设计图如下:
ThreadPool
这个类将会创建、管理、析构线程池。所以,你的应用程序需要创建ThreadPool类的一个对象。
AbstractRequest
这表示线程池的一个请求。客户端应用程序需要继承这个类,并且根据自己的情况实现线程中要执行的函数。
Logger
记录一些日志,错误等信息。
源代码:
.h#ifndef _THREAD_POOL_MGR_H_ #define _THREAD_POOL_MGR_H_ #include <windows.h> #include <list> namespace TP { /** * Logger - This is base class for the error logger class and it is polymorphic. * The users of the ThreadPool create a class which derived from this * and override LogError() and LogInfo() for their own error logging mechanism. * The default error logging will be in output window. */ class Logger { public: // Constructor Logger(){}; // Destructor virtual ~Logger(){}; // Log error description. void LogError( const long lActiveReq_i, const std::wstring& wstrError_i ); // Log information. void LogInfo( const long lActiveReq_i, const std::wstring& wstrInfo_i ); // Override this function to log errors. Default log will be in output window. virtual void LogError( const std::wstring& wstrError_i ); // Override this function to log informations. Default log will be in output window. virtual void LogInfo( const std::wstring& wstrInfo_i ); private: // Log thread ID, Active thread count and last error. void PrepareLog( const long lActiveReq_i, std::wstring& wstrLog_io ); }; /** * SyncObject - The class is a wrapper of Critical section object to provide * synchronization for thread pool. */ class SyncObject { public: // Constructor SyncObject() { ::InitializeCriticalSection( &m_stCriticalSection ); } // Destructor ~SyncObject() { ::DeleteCriticalSection( &m_stCriticalSection ); } // Lock critical section. bool Lock() { ::EnterCriticalSection( &m_stCriticalSection ); return true; } // Unlock critical section. bool Unlock() { ::LeaveCriticalSection( &m_stCriticalSection ); return true; } private: SyncObject( const SyncObject& ); SyncObject& operator = ( const SyncObject& ); private: // Critical section object. CRITICAL_SECTION m_stCriticalSection; }; /** * AutoLock - This class own synchronization object during construction and * release the ownership during the destruction. */ class AutoLock { public: /** * Parameterized constructor * * @param LockObj_i - Synchronization object. * @return Nil * @exception Nil * @see Nil * @since 1.0 */ AutoLock( SyncObject& LockObj_i ) : m_pSyncObject( &LockObj_i ) { if( NULL != m_pSyncObject ) { m_pSyncObject->Lock(); } } /** * Destructor. * * @param Nil * @return Nil * @exception Nil * @see Nil * @since 1.0 */ ~AutoLock() { if( NULL != m_pSyncObject ) { m_pSyncObject->Unlock(); m_pSyncObject = NULL; } } private: SyncObject* m_pSyncObject; }; /** * AbstractRequest - This is abstract base class for the request to be processed in thread pool. * and it is polymorphic. The users of the ThreadPool must create a class * which derived from this and override Execute() function. */ class AbstractRequest { public: // Constructor AbstractRequest() : m_bAborted( false ), m_usRequestID( 0u ){} // Destructor virtual ~AbstractRequest(){} // Thread procedure to be override in derived class. This function should return if request aborted. // Abort request can check by calling IsAborted() function during time consuming operation. virtual long Execute() = 0; // Set request ID. void SetRequestID( unsigned short uRequestID_i ) { AutoLock LockRequest( m_LockWorkerThread ); m_usRequestID = uRequestID_i; } // Get request ID. unsigned short GetRequestID() { AutoLock LockRequest( m_LockWorkerThread ); return m_usRequestID; } // Abort the processing of the request. void Abort() { AutoLock LockRequest( m_LockWorkerThread ); m_bAborted = true; } // Clear abort flag for re-posting the same request. void ClearAbortFlag() { AutoLock LockRequest( m_LockWorkerThread ); m_bAborted = false; } protected: // Check for the abort request bool IsAborted() { AutoLock LockRequest( m_LockWorkerThread ); return m_bAborted; } // Prepare error or information log. void PrepareLog( std::wstring& wstrLog_io ); protected: // Synchronization object for resource locking. SyncObject m_LockWorkerThread; private: // Abort flag. bool m_bAborted; // Request Identifier. unsigned short m_usRequestID; }; /** * AutoCounter - Increment and decrement counter */ class AutoCounter { public: // Constructor. AutoCounter( unsigned short& usCount_io, SyncObject& Lock_io ) : m_usCount( usCount_io ), m_LockThread( Lock_io ) { AutoLock Lock( m_LockThread ); m_usCount++; } // Destructor. ~AutoCounter() { AutoLock Lock( m_LockThread ); m_usCount--; } private: // Counter variable. unsigned short& m_usCount; // Synchronization object for resource locking. SyncObject& m_LockThread; }; typedef std::list<AbstractRequest*> REQUEST_QUEUE; /** * ThreadPool - This class create and destroy thread pool based on the request. * The requested to be processed can be post to pool as derived object of * AbstractRequest. Also a class can be derive from Logger to error and * information logging. */ class ThreadPool { public: // Constructor. ThreadPool(); // Destructor. ~ThreadPool(); // Create thread pool with specified number of threads. bool Create( const unsigned short usThreadCount_i, Logger* pLogger_io = NULL ); // Destroy the existing thread pool. bool Destroy(); // Post request to thread pool for processing. bool PostRequest( AbstractRequest* pRequest_io ); private: AbstractRequest* PopRequest( REQUEST_QUEUE& RequestQueue_io ); bool AddThreads(); bool NotifyThread(); bool ProcessRequests(); bool WaitForRequest(); bool DestroyPool(); bool IsDestroyed(); void SetDestroyFlag( const bool bFlag_i ); void CancelRequests(); void LogError( const std::wstring& wstrError_i ); void LogInfo( const std::wstring& wstrInfo_i ); static UINT WINAPI ThreadProc( LPVOID pParam_i ); private: ThreadPool( const ThreadPool& ); ThreadPool& operator = ( const ThreadPool& ); private: // Used for thread pool destruction. bool m_bDestroyed; // Hold thread count in the pool. unsigned short m_usThreadCount; // Released semaphore count. unsigned short m_usSemaphoreCount; // Active thread count. unsigned short m_lActiveThread; // Active thread count. unsigned short m_usPendingReqCount; // Manage active thread count in pool. HANDLE m_hSemaphore; // Hold thread handles. HANDLE* m_phThreadList; // Request queue. REQUEST_QUEUE m_RequestQueue; // Synchronization object for resource locking. SyncObject m_LockWorkerThread; // User defined error and information logger class. Logger* m_pLogger; // Default error and information logger. Logger m_Logger; }; } // namespace TP #endif // #ifndef _THREAD_POOL_MGR_H_
.cpp
/** * @author : Suresh */ #include "ThreadPool.h" #include <sstream> #include <iomanip> namespace TP { /** * Log error description. * * @param lActiveReq_i - Count of active requests. * @param wstrError_i - Error message. */ void Logger::LogError( const long lActiveReq_i, const std::wstring& wstrError_i ) { std::wstring wstrLog( wstrError_i ); PrepareLog( lActiveReq_i, wstrLog ); LogError( wstrLog ); } /** * Log information. * * @param lActiveReq_i - Count of active requests. * @param wstrInfo_i - Information message. */ void Logger::LogInfo( const long lActiveReq_i, const std::wstring& wstrInfo_i ) { std::wstring wstrLog( wstrInfo_i ); PrepareLog( lActiveReq_i, wstrLog ); LogInfo( wstrLog ); } /** * Override this function to log errors. Default log will be in output window. * * @param wstrError_i - Error description */ void Logger::LogError( const std::wstring& wstrError_i ) { OutputDebugString( wstrError_i.c_str()); } /** * Override this function to log informations. Default log will be in output window. * * @param wstrInfo_i - Information description. */ void Logger::LogInfo( const std::wstring& wstrInfo_i ) { OutputDebugString( wstrInfo_i.c_str()); } /** * Log thread ID, Active thread count and last error. * * @param lActiveReq_i - Active thread count. * @param wstrLog_io - Error or information description */ void Logger::PrepareLog( const long lActiveReq_i, std::wstring& wstrLog_io ) { std::wstringstream wstrmLog; wstrmLog << L"##TP## [TID=" << std::setfill( L'0' ) << std::setw(8) << ::GetCurrentThreadId() << L"] [ACTIVE REQUEST=" << std::setw(4) << lActiveReq_i << L"] [LAST ERROR=" << std::setw(4) << ::GetLastError() << L"] " << wstrLog_io.c_str() << + L"]"; wstrLog_io = wstrmLog.str(); } /** * Prepare error or information log. * * @param wstrLog_io - Log information */ void AbstractRequest::PrepareLog( std::wstring& wstrLog_io ) { std::wstringstream wstrmLog; wstrmLog << std::setfill( L'0' ); wstrmLog << L"##RQ## [RID=" << std::setw(8) << GetRequestID() << L"] [Desc=" << wstrLog_io.c_str() << + L"]"; wstrLog_io = wstrmLog.str(); } /** * Constructor */ ThreadPool::ThreadPool() : m_bDestroyed( false ), m_usThreadCount( 0u ), m_usSemaphoreCount( 0u ), m_lActiveThread( 0u ), m_usPendingReqCount( 0u ), m_hSemaphore( NULL ), m_phThreadList( NULL ), m_pLogger( &m_Logger ) { } /** * Destructor */ ThreadPool::~ThreadPool() { if( NULL != m_phThreadList ) { if( !Destroy()) { LogError( L"Destroy() failed" ); } } } /** * Create thread pool with specified number of threads. * * @param usThreadCount_i - Thread count. * @param pLogger_i - Logger instance to log errors and informations */ bool ThreadPool::Create( const unsigned short usThreadCount_i, Logger* pLogger_i ) { try { // Assign logger object. If user not provided then use existing and // error will be logged in output window. m_pLogger = ( NULL != pLogger_i ) ? pLogger_i : &m_Logger; // Check thread pool is initialized already. if( NULL != m_phThreadList ) { LogError( L"ThreadPool already created" ); return false; } // Validate thread count. if( 0 == usThreadCount_i ) { LogError( L"Minimum allowed thread count is one" ); return false; } if( usThreadCount_i > 64 ) { LogError( L"Maximum allowed thread count is 64" ); return false; } LogInfo( L"Thread pool creation requested" ); // Initialize values. m_lActiveThread = 0u; m_usSemaphoreCount = 0u; m_usPendingReqCount = 0u; m_usThreadCount = usThreadCount_i; // Create semaphore for thread count management. m_hSemaphore = CreateSemaphore( NULL, 0, m_usThreadCount, NULL ); if( NULL == m_hSemaphore ) { LogError( L"Semaphore creation failed" ); m_usThreadCount = 0u; return false; } // Create worker threads and make pool active if( !AddThreads()) { LogError( L"Threads creation failed" ); Destroy(); return false; } SetDestroyFlag( false ); LogInfo( L"Thread pool created successfully" ); return true; } catch( ... ) { LogError( L"Exception occurred in Create()" ); return false; } } /** * Destroy thread pool. */ bool ThreadPool::Destroy() { try { // Check whether thread pool already destroyed. if( NULL == m_phThreadList ) { LogError( L"ThreadPool is already destroyed or not created yet" ); return false; } // Cancel all requests. CancelRequests(); // Set destroyed flag to true for exiting threads. SetDestroyFlag( true ); // Release remaining semaphores to exit thread. { AutoLock LockThread( m_LockWorkerThread ); if( m_lActiveThread < m_usThreadCount ) { if( NULL == ReleaseSemaphore( m_hSemaphore, m_usThreadCount - m_lActiveThread, NULL )) { LogError( L"Failed to release Semaphore" ); return false; } } } // Wait for destroy completion and clean the thread pool. if( !DestroyPool()) { LogError( L"Thread pool destruction failed" ); return false; } LogInfo( L"Thread Pool destroyed successfully" ); return true; } catch( ... ) { LogError( L"Exception occurred in Destroy()" ); return false; } } /** * Post request to thread pool for processing * * @param pRequest_io - Request to be processed. */ bool ThreadPool::PostRequest( AbstractRequest* pRequest_io ) { try { AutoLock LockThread( m_LockWorkerThread ); if( NULL == m_phThreadList ) { LogError( L"ThreadPool is destroyed or not created yet" ); return false; } m_RequestQueue.push_back( pRequest_io ); if( m_usSemaphoreCount < m_usThreadCount ) { // Thread available to process, so notify thread. if( !NotifyThread()) { LogError( L"NotifyThread failed" ); // Request notification failed. Try after some time. m_usPendingReqCount++; return false; } } else { // Thread not available to process. m_usPendingReqCount++; } return true; } catch( ... ) { LogError( L"Exception occurred in PostRequest()" ); return false; } } /** * Pop request from queue for processing. * * @param RequestQueue_io - Request queue. * @return AbstractRequest* - Request pointer. */ AbstractRequest* ThreadPool::PopRequest( REQUEST_QUEUE& RequestQueue_io ) { AutoLock LockThread( m_LockWorkerThread ); if( !RequestQueue_io.empty()) { AbstractRequest* pRequest = RequestQueue_io.front(); RequestQueue_io.remove( pRequest ); return pRequest; } return 0; } /** * Create specified number of threads. Initial status of threads will be waiting. */ bool ThreadPool::AddThreads() { try { // Allocate memory for all threads. m_phThreadList = new HANDLE[m_usThreadCount]; if( NULL == m_phThreadList ) { LogError( L"Memory allocation for thread handle failed" ); return false; } // Create worker threads. DWORD dwThreadID = 0; for( unsigned short usIdx = 0u; usIdx < m_usThreadCount; usIdx++ ) { // Create worker thread m_phThreadList[usIdx] = CreateThread( 0, 0, reinterpret_cast<LPTHREAD_START_ROUTINE>( ThreadPool::ThreadProc ), this, 0, &dwThreadID ); if( NULL == m_phThreadList[usIdx] ) { LogError( L"CreateThread failed" ); return false; } } return true; } catch( ... ) { LogError( L"Exception occurred in AddThreads()" ); return false; } } /** * Add request to queue and release semaphore by one. */ bool ThreadPool::NotifyThread() { try { AutoLock LockThread( m_LockWorkerThread ); // Release semaphore by one to process this request. if( NULL == ReleaseSemaphore( m_hSemaphore, 1, NULL )) { LogError( L"ReleaseSemaphore failed" ); return false; } m_usSemaphoreCount++; return true; } catch( ... ) { LogError( L"Exception occurred in NotifyThread()" ); m_RequestQueue.pop_back(); return false; } } /** * Process request in queue. */ bool ThreadPool::ProcessRequests() { bool bContinue( true ); do { try { LogInfo( L"Thread WAITING" ); // Wait for request. if( !WaitForRequest()) { LogError( L"WaitForRequest() failed" ); continue; } // Thread counter. AutoCounter Counter( m_lActiveThread, m_LockWorkerThread ); LogInfo( L"Thread ACTIVE" ); // Check thread pool destroy request. if( IsDestroyed()) { LogInfo( L"Thread EXITING" ); break; } // Get request from request queue. AbstractRequest* pRequest = PopRequest( m_RequestQueue ); if( NULL == pRequest ) { LogError( L"PopRequest failed" ); continue; } // Execute the request. long lReturn = pRequest->Execute(); if( NULL != lReturn ) { LogError( L"Request execution failed" ); continue; } // Check thread pool destroy request. if( IsDestroyed()) { LogInfo( L"Thread EXITING" ); break; } AutoLock LockThread( m_LockWorkerThread ); // Inform thread if any pending request. if( m_usPendingReqCount > 0 ) { if( m_usSemaphoreCount < m_usThreadCount ) { // Thread available to process, so notify thread. if( !NotifyThread()) { LogError( L"NotifyThread failed" ); continue; } m_usPendingReqCount--; } } } catch( ... ) { LogError( L"Exception occurred in ProcessRequests()" ); continue; } } while( bContinue ); return true; } /** * Wait for request queuing to thread pool. */ bool ThreadPool::WaitForRequest() { try { // Wait released when requested queued. DWORD dwReturn = WaitForSingleObject( m_hSemaphore, INFINITE ); if( WAIT_OBJECT_0 != dwReturn ) { LogError( L"WaitForSingleObject failed" ); return false; } AutoLock LockThread( m_LockWorkerThread ); m_usSemaphoreCount--; // Clear previous error. ::SetLastError( 0 ); return true; } catch( ... ) { LogError( L"Exception occurred in WaitForRequest()" ); return false; } } /** * Destroy and clean up thread pool. */ bool ThreadPool::DestroyPool() { try { // Wait for the exist of threads. DWORD dwReturn = WaitForMultipleObjects( m_usThreadCount, m_phThreadList, TRUE, INFINITE ); if( WAIT_OBJECT_0 != dwReturn ) { LogError( L"WaitForMultipleObjects failed" ); return false; } // Close all threads. for( USHORT uIdx = 0u; uIdx < m_usThreadCount; uIdx++ ) { if( TRUE != CloseHandle( m_phThreadList[uIdx] )) { LogError( L"CloseHandle failed for threads" ); return false; } } // Clear memory allocated for threads. delete[] m_phThreadList; m_phThreadList = 0; // Close the semaphore if( TRUE != CloseHandle( m_hSemaphore )) { LogError( L"CloseHandle failed for semaphore" ); return false; } // Clear request queue. m_RequestQueue.clear(); return true; } catch( ... ) { LogError( L"Exception occurred in DestroyPool()" ); return false; } } /** * Check for destroy request. */ inline bool ThreadPool::IsDestroyed() { // Avoid synchronization issues if destroy requested after validation. AutoLock LockThread( m_LockWorkerThread ); // During thread pool destruction all semaphores are released // to exit all threads. return m_bDestroyed; } /** * Set destroy flag */ inline void ThreadPool::SetDestroyFlag( const bool bFlag_i ) { AutoLock LockThread( m_LockWorkerThread ); m_bDestroyed = bFlag_i; } /** * Cancel all processing request in pool. */ void ThreadPool::CancelRequests() { try { // Avoid synchronization issues if destroy requested after validation. AutoLock LockThread( m_LockWorkerThread ); LogInfo( L"Thread pool destroy requested" ); // Clear main queue. m_RequestQueue.clear(); } catch( ... ) { LogError( L"Exception occurred in CancelRequests()" ); } } /** * Log error in thread pool. * * @param wstrError_i - Error description. */ void ThreadPool::LogError( const std::wstring& wstrError_i ) { if( NULL != m_pLogger ) { m_pLogger->LogError( m_lActiveThread, wstrError_i ); } } /** * Log information in thread pool. * * @param wstrInfo_i - Information description. */ void ThreadPool::LogInfo( const std::wstring& wstrInfo_i ) { if( NULL != m_pLogger ) { m_pLogger->LogInfo( m_lActiveThread, wstrInfo_i ); } } /** * worker thread procedure. * * @param pParam_i - ThreadPool instance. * @return UINT - Return 0 on success. */ UINT ThreadPool::ThreadProc( LPVOID pParam_i ) { ThreadPool* pThreadPool = NULL; try { ThreadPool* pThreadPool = reinterpret_cast<ThreadPool*>( pParam_i ); if( NULL == pThreadPool ) { return 1; } if( !pThreadPool->ProcessRequests()) { pThreadPool->LogError( L"ProcessRequests() failed" ); return 1; } return 0; } catch( ... ) { if( NULL != pThreadPool ) { pThreadPool->LogError( L"Exception occurred in ThreadProc()" ); } return 1; } } } // namespace TP
相关文章推荐
- Whether Thread Pool is Needed for You?
- 快速求幂
- 团队项目-个人博客-4.25
- SpringMVC环境下实现的Ajax异步请求(JSON格式数据) 推荐
- 【剑指offer-Java版】14调整数组顺序使奇数位于偶数前面
- Javascript获取页面元素相对和绝对位置
- Python学习-机器学习实战-ch06 支持向量机
- 【Linux】lsof 命令,记一次端口占用查询
- 第三百八十八天 how can i 坚持
- Cocos2d-x 3.x 声音播放
- Android SDK 无法更新或更新慢解决方法
- springMVC(4)------@RequestParam映射请求参数
- win防火墙阻止程序
- 第一阶段冲刺(第八天)
- BZOJ 2440 完全平方数
- Leetcode Maximum Depth of Binary Tree 104
- 模式识别之特征评估
- 【剑指offer-Java版】13O(1)时间删除链表结点
- JS实现全屏页面切换
- [Drools]JAVA规则引擎2 -- Drools实例