Extended Thread Pool(可扩展的线程池)
2009-06-04 12:22
357 查看
Extended Thread Pool
的特征
IoC
支持(使用了
Unity
)
扩展队列
扩展任务项
限制了工作线程的最大数量
动态线程分配
线程优先级支持
扩展日志
Extended Thread Pool
的设计
ITaskItem
表示任务
ITaskQueue
表示任务的队列逻辑
ITaskQueueController
表示消费者和生产者之间的通信逻辑(线程安全)
WorkThread
表示线程执行
ExtendedThreadPool
控制线程的执行
让我们来深入了解每个类:
ITaskItem
表示要完成工作
public
interface
ITaskItem
{
ThreadPriority Priority {
get
; }
void
DoWork();
}
ThreadPriority – WorkThread
的优先级,可以在每个任务中指定
ITaskQueue
另一个管理任务队列的接口
public
interface
ITaskQueue
{
int
Count {
get
; }
void
Enqueue(ITaskItem item);
ITaskItem Dequeue();
}
ITaskQueueController
提供了消费者和生产者之间的通信逻辑
public
interface
ITaskQueueController : IDisposable
{
int
ConsumersWaiting {
get
; }
void
Enqueue(ITaskItem item);
ITaskItem Dequeue();
}
从
ITaskQueueController
接口派生了两个队列控制对象:
DefaultTaskQueueController
BoundedTaskQueueController
Default Task Queue Controller
DefaultTaskQueueController
对与
ITaskQueue
是线程安全的封装:
public
class
DefaultTaskQueueController : TaskQueueControllerBase
{
public
DefaultTaskQueueController(ITaskQueue taskQueue)
:
base
(taskQueue)
{
}
#region Overrides of TaskQueueControllerBase
public
override
void
Enqueue(ITaskItem item)
{
lock
(_locker)
{
_taskQueue.Enqueue(item);
if
(_consumersWaiting > 0)
Monitor.PulseAll(_locker);
}
}
public
override
ITaskItem Dequeue()
{
ITaskItem taskItem;
lock
(_locker)
{
while
(_taskQueue.Count == 0 && !_isDispose)
{
_consumersWaiting++;
Monitor.Wait(_locker);
_consumersWaiting--;
}
if
(_isDispose)
return
null
;
taskItem = _taskQueue.Dequeue();
}
return
taskItem;
}
#endregion
}
Bounded Task Queue Controller
BoundedTaskQueueController
(线程安全)如果生产者任务或者创建项的任务比消费者的处理速度快,系统将会无限的消耗内存。
BoundedTaskQueueController
允许限制队列的大小来限制生产者的越界。
public
class
BoundedTaskQueueController : TaskQueueControllerBase
{
private
readonly
int
_maxTasksCount;
private
int
_producersWaiting;
public
BoundedTaskQueueController(ITaskQueue taskQueue,
int
maxTasksCount)
:
base
(taskQueue)
{
if
(maxTasksCount < 1)
throw
new
ArgumentException(
"MaxTasksCount should be greater 0"
);
_maxTasksCount = maxTasksCount;
}
public
override
void
Enqueue(ITaskItem item)
{
lock
(_locker)
{
while
(_taskQueue.Count == (_maxTasksCount - 1) && !_isDispose)
{
_producersWaiting++;
Monitor.Wait(_locker);
_producersWaiting--;
}
_taskQueue.Enqueue(item);
if
(_consumersWaiting > 0)
Monitor.PulseAll(_locker);
}
}
public
override
ITaskItem Dequeue()
{
ITaskItem taskItem;
lock
(_locker)
{
while
(_taskQueue.Count == 0 && !_isDispose)
{
_consumersWaiting++;
Monitor.Wait(_locker);
_consumersWaiting--;
}
if
(_isDispose)
return
null
;
taskItem = _taskQueue.Dequeue();
if
(_producersWaiting > 0)
Monitor.PulseAll(_locker);
}
return
taskItem;
}
}
Extended Thread Pool
ExtendedThreadPool
类管理接口
ITaskQueueController
。
AddTask
方法用来添加任务到任务通道中去。如果最大的线程限制未达到并且
ConsamersWaitin =
0
,
WorkThread
类的新实例将被创建。
public
void
AddTask(ITaskItem item)
{
if
(item.IsNull())
throw
new
ArgumentNullException(
"item"
);
if
(!(Enum.IsDefined(
typeof
(ThreadPriority), item.Priority)))
throw
new
ArgumentException(
"priority"
);
TaskQueueController.Enqueue(item);
if
(IsStartNewWorker())
CreateWorkThread();
}
Work Thread
WorkThread
类,执行任务项和提供日志记录的。
public
void
Start()
{
while
(_isRun)
{
try
{
ITaskItem item = _taskQueueController.Dequeue();
if
(item.IsNull())
continue
;
DoWork(item);
}
catch
(Exception ex)
{
_logger.Error(ex.Message);
}
}
}
Thread Pool Extensibility
ExtendedThreadPool
类提供了
Ioc
支持,
ITaskQueueController
接口标记了
Dependency
属性。
[Dependency]
public
ITaskQueueController TaskQueueController {
private
get
;
set
; }
如果你需要更加强大的任务队列,就必须实现
ITaskQueue
接口,并不需要担心线程安全的问题;也可以创建
ITaskQueueController
。我使用
Unity
来配置
ExtendedThreadPool
。
例子
在其中一个项目中,我使用了
multi-threading
进行
MSMQ
交互。
SampleTask
派生了接口
ITaskItem
,可以从项目
CoreDefaultMsmqSample
中了解详情。
public
void
DoWork()
{
using
(var transaction =
new
MessageQueueTransaction())
{
try
{
transaction.Begin();
Message msmqMessage =
_queue.Receive(TimeSpan.FromMilliseconds(500), transaction);
if
(msmqMessage !=
null
)
{
var message = (ExternalMessage) msmqMessage.Body;
LogManager.GetLogger(GetType()).Info(
"Task has been "
+
"done, info {0}"
.FormatWith(message.Data));
//Do work
Thread.Sleep(1000);
}
}
catch
(Exception ex)
{
transaction.Abort();
LogManager.GetLogger(GetType()).Error(ex.Message);
return
;
}
transaction.Commit();
}
}
总结
作者考虑到了线程池的可配置性和扩展性,在引用这些代码后,写出的线程应用会非常简洁,且不需要考虑线程之间的复杂操作.如果,你觉得有写模块是你
不需要的,比如日志模块或者IoC模块(这是什么模块我也不清楚),可以自行把代码删除掉,希望原作者会继续他的思路完善这个框架.
的特征
IoC
支持(使用了
Unity
)
扩展队列
扩展任务项
限制了工作线程的最大数量
动态线程分配
线程优先级支持
扩展日志
Extended Thread Pool
的设计
ITaskItem
表示任务
ITaskQueue
表示任务的队列逻辑
ITaskQueueController
表示消费者和生产者之间的通信逻辑(线程安全)
WorkThread
表示线程执行
ExtendedThreadPool
控制线程的执行
让我们来深入了解每个类:
ITaskItem
表示要完成工作
public
interface
ITaskItem
{
ThreadPriority Priority {
get
; }
void
DoWork();
}
ThreadPriority – WorkThread
的优先级,可以在每个任务中指定
ITaskQueue
另一个管理任务队列的接口
public
interface
ITaskQueue
{
int
Count {
get
; }
void
Enqueue(ITaskItem item);
ITaskItem Dequeue();
}
ITaskQueueController
提供了消费者和生产者之间的通信逻辑
public
interface
ITaskQueueController : IDisposable
{
int
ConsumersWaiting {
get
; }
void
Enqueue(ITaskItem item);
ITaskItem Dequeue();
}
从
ITaskQueueController
接口派生了两个队列控制对象:
DefaultTaskQueueController
BoundedTaskQueueController
Default Task Queue Controller
DefaultTaskQueueController
对与
ITaskQueue
是线程安全的封装:
public
class
DefaultTaskQueueController : TaskQueueControllerBase
{
public
DefaultTaskQueueController(ITaskQueue taskQueue)
:
base
(taskQueue)
{
}
#region Overrides of TaskQueueControllerBase
public
override
void
Enqueue(ITaskItem item)
{
lock
(_locker)
{
_taskQueue.Enqueue(item);
if
(_consumersWaiting > 0)
Monitor.PulseAll(_locker);
}
}
public
override
ITaskItem Dequeue()
{
ITaskItem taskItem;
lock
(_locker)
{
while
(_taskQueue.Count == 0 && !_isDispose)
{
_consumersWaiting++;
Monitor.Wait(_locker);
_consumersWaiting--;
}
if
(_isDispose)
return
null
;
taskItem = _taskQueue.Dequeue();
}
return
taskItem;
}
#endregion
}
Bounded Task Queue Controller
BoundedTaskQueueController
(线程安全)如果生产者任务或者创建项的任务比消费者的处理速度快,系统将会无限的消耗内存。
BoundedTaskQueueController
允许限制队列的大小来限制生产者的越界。
public
class
BoundedTaskQueueController : TaskQueueControllerBase
{
private
readonly
int
_maxTasksCount;
private
int
_producersWaiting;
public
BoundedTaskQueueController(ITaskQueue taskQueue,
int
maxTasksCount)
:
base
(taskQueue)
{
if
(maxTasksCount < 1)
throw
new
ArgumentException(
"MaxTasksCount should be greater 0"
);
_maxTasksCount = maxTasksCount;
}
public
override
void
Enqueue(ITaskItem item)
{
lock
(_locker)
{
while
(_taskQueue.Count == (_maxTasksCount - 1) && !_isDispose)
{
_producersWaiting++;
Monitor.Wait(_locker);
_producersWaiting--;
}
_taskQueue.Enqueue(item);
if
(_consumersWaiting > 0)
Monitor.PulseAll(_locker);
}
}
public
override
ITaskItem Dequeue()
{
ITaskItem taskItem;
lock
(_locker)
{
while
(_taskQueue.Count == 0 && !_isDispose)
{
_consumersWaiting++;
Monitor.Wait(_locker);
_consumersWaiting--;
}
if
(_isDispose)
return
null
;
taskItem = _taskQueue.Dequeue();
if
(_producersWaiting > 0)
Monitor.PulseAll(_locker);
}
return
taskItem;
}
}
Extended Thread Pool
ExtendedThreadPool
类管理接口
ITaskQueueController
。
AddTask
方法用来添加任务到任务通道中去。如果最大的线程限制未达到并且
ConsamersWaitin =
0
,
WorkThread
类的新实例将被创建。
public
void
AddTask(ITaskItem item)
{
if
(item.IsNull())
throw
new
ArgumentNullException(
"item"
);
if
(!(Enum.IsDefined(
typeof
(ThreadPriority), item.Priority)))
throw
new
ArgumentException(
"priority"
);
TaskQueueController.Enqueue(item);
if
(IsStartNewWorker())
CreateWorkThread();
}
Work Thread
WorkThread
类,执行任务项和提供日志记录的。
public
void
Start()
{
while
(_isRun)
{
try
{
ITaskItem item = _taskQueueController.Dequeue();
if
(item.IsNull())
continue
;
DoWork(item);
}
catch
(Exception ex)
{
_logger.Error(ex.Message);
}
}
}
Thread Pool Extensibility
ExtendedThreadPool
类提供了
Ioc
支持,
ITaskQueueController
接口标记了
Dependency
属性。
[Dependency]
public
ITaskQueueController TaskQueueController {
private
get
;
set
; }
如果你需要更加强大的任务队列,就必须实现
ITaskQueue
接口,并不需要担心线程安全的问题;也可以创建
ITaskQueueController
。我使用
Unity
来配置
ExtendedThreadPool
。
例子
在其中一个项目中,我使用了
multi-threading
进行
MSMQ
交互。
SampleTask
派生了接口
ITaskItem
,可以从项目
CoreDefaultMsmqSample
中了解详情。
public
void
DoWork()
{
using
(var transaction =
new
MessageQueueTransaction())
{
try
{
transaction.Begin();
Message msmqMessage =
_queue.Receive(TimeSpan.FromMilliseconds(500), transaction);
if
(msmqMessage !=
null
)
{
var message = (ExternalMessage) msmqMessage.Body;
LogManager.GetLogger(GetType()).Info(
"Task has been "
+
"done, info {0}"
.FormatWith(message.Data));
//Do work
Thread.Sleep(1000);
}
}
catch
(Exception ex)
{
transaction.Abort();
LogManager.GetLogger(GetType()).Error(ex.Message);
return
;
}
transaction.Commit();
}
}
总结
作者考虑到了线程池的可配置性和扩展性,在引用这些代码后,写出的线程应用会非常简洁,且不需要考虑线程之间的复杂操作.如果,你觉得有写模块是你
不需要的,比如日志模块或者IoC模块(这是什么模块我也不清楚),可以自行把代码删除掉,希望原作者会继续他的思路完善这个框架.
相关文章推荐
- Extended Thread Pool(可扩展的线程池)
- boost扩展工具-线程池(threadpool)
- ThreadPool 之 线程池概览
- 线程池学习之ThreadPool Class
- ThreadPool 线程池的作用
- MinerThreadPool.java 线程池
- newCachedThreadPool线程池
- SPRING中的线程池ThreadPoolTaskExecutor
- Windows核心编程——》第十一章 线程池(The Windows Thread Pool)
- Java四种线程池newCachedThreadPool,newFixedThreadPool,newScheduledThreadPool,newSingleThreadExecutor
- 实现Spring整合线程池ThreadPoolTaskExecutor
- Nodejs事件引擎libuv源码剖析之:高效线程池(threadpool)的实现
- 线程池ThreadPool的常用方法介绍
- C 线程池 thread_pool
- Spring的线程池ThreadPoolTaskExecutor
- newScheduledThreadPool延时任务线程池,实现原理
- Android通过AsyncTask与ThreadPool(线程池)两种方式异步加载大量数据的分析与对比
- ThreadPool(线程池) in .Net
- SPRING中的线程池ThreadPoolTaskExecutor
- Android 应用开发 之通过AsyncTask与ThreadPool(线程池)两种方式异步加载大量数据的分析与对比