您的位置:首页 > 其它

Extended Thread Pool(可扩展的线程池)

2009-06-04 12:22 357 查看
Extended Thread Pool

的特征


IoC
支持(使用了
Unity


扩展队列

扩展任务项

限制了工作线程的最大数量

动态线程分配

线程优先级支持

扩展日志

Extended Thread Pool

的设计


ITaskItem
表示任务

ITaskQueue
表示任务的队列逻辑

ITaskQueueController
表示消费者和生产者之间的通信逻辑(线程安全)

WorkThread
表示线程执行

ExtendedThreadPool
控制线程的执行

让我们来深入了解每个类:

ITaskItem
表示要完成工作


public

interface
ITaskItem

{

ThreadPriority Priority {
get
; }


void
DoWork();

}

ThreadPriority – WorkThread
的优先级,可以在每个任务中指定

ITaskQueue
另一个管理任务队列的接口


public

interface
ITaskQueue

{


int
Count {
get
; }


void
Enqueue(ITaskItem item);

ITaskItem Dequeue();

}

ITaskQueueController
提供了消费者和生产者之间的通信逻辑


public

interface
ITaskQueueController : IDisposable

{


int
ConsumersWaiting {
get
; }


void
Enqueue(ITaskItem item);

ITaskItem Dequeue();

}


ITaskQueueController
接口派生了两个队列控制对象:

DefaultTaskQueueController


BoundedTaskQueueController


Default Task Queue Controller




DefaultTaskQueueController
对与
ITaskQueue
是线程安全的封装:

public

class
DefaultTaskQueueController : TaskQueueControllerBase

{


public
DefaultTaskQueueController(ITaskQueue taskQueue)

:
base
(taskQueue)

{

}

#region Overrides of TaskQueueControllerBase


public

override

void
Enqueue(ITaskItem item)

{


lock
(_locker)

{

_taskQueue.Enqueue(item);


if
(_consumersWaiting > 0)

Monitor.PulseAll(_locker);

}

}


public

override
ITaskItem Dequeue()

{

ITaskItem taskItem;


lock
(_locker)

{


while
(_taskQueue.Count == 0 && !_isDispose)

{

_consumersWaiting++;

Monitor.Wait(_locker);

_consumersWaiting--;

}


if
(_isDispose)


return

null
;

taskItem = _taskQueue.Dequeue();

}


return
taskItem;

}

#endregion

}

Bounded Task Queue Controller


BoundedTaskQueueController
(线程安全)如果生产者任务或者创建项的任务比消费者的处理速度快,系统将会无限的消耗内存。
BoundedTaskQueueController
允许限制队列的大小来限制生产者的越界。


public

class
BoundedTaskQueueController : TaskQueueControllerBase

{


private

readonly

int
_maxTasksCount;


private

int
_producersWaiting;


public
BoundedTaskQueueController(ITaskQueue taskQueue,
int
maxTasksCount)

:
base
(taskQueue)

{


if
(maxTasksCount < 1)


throw

new
ArgumentException(
"MaxTasksCount should be greater 0"
);

_maxTasksCount = maxTasksCount;

}


public

override

void
Enqueue(ITaskItem item)

{


lock
(_locker)

{


while
(_taskQueue.Count == (_maxTasksCount - 1) && !_isDispose)

{

_producersWaiting++;

Monitor.Wait(_locker);

_producersWaiting--;

}

_taskQueue.Enqueue(item);


if
(_consumersWaiting > 0)

Monitor.PulseAll(_locker);

}

}


public

override
ITaskItem Dequeue()

{

ITaskItem taskItem;


lock
(_locker)

{


while
(_taskQueue.Count == 0 && !_isDispose)

{

_consumersWaiting++;

Monitor.Wait(_locker);

_consumersWaiting--;

}


if
(_isDispose)


return

null
;

taskItem = _taskQueue.Dequeue();


if
(_producersWaiting > 0)

Monitor.PulseAll(_locker);

}


return
taskItem;

}

}

Extended Thread Pool




ExtendedThreadPool
类管理接口
ITaskQueueController

AddTask
方法用来添加任务到任务通道中去。如果最大的线程限制未达到并且
ConsamersWaitin =
0

WorkThread
类的新实例将被创建。


public

void
AddTask(ITaskItem item)

{


if
(item.IsNull())


throw

new
ArgumentNullException(
"item"
);


if
(!(Enum.IsDefined(
typeof
(ThreadPriority), item.Priority)))


throw

new
ArgumentException(
"priority"
);

TaskQueueController.Enqueue(item);


if
(IsStartNewWorker())

CreateWorkThread();

}

Work Thread




WorkThread
类,执行任务项和提供日志记录的。


public

void
Start()

{


while
(_isRun)

{


try

{

ITaskItem item = _taskQueueController.Dequeue();


if
(item.IsNull())


continue
;

DoWork(item);

}


catch
(Exception ex)

{

_logger.Error(ex.Message);

}

}

}

Thread Pool Extensibility




ExtendedThreadPool
类提供了
Ioc
支持,
ITaskQueueController
接口标记了
Dependency
属性。


[Dependency]

public
ITaskQueueController TaskQueueController {
private

get
;
set
; }

如果你需要更加强大的任务队列,就必须实现
ITaskQueue
接口,并不需要担心线程安全的问题;也可以创建
ITaskQueueController
。我使用
Unity
来配置
ExtendedThreadPool













例子




在其中一个项目中,我使用了
multi-threading
进行
MSMQ
交互。
SampleTask
派生了接口
ITaskItem
,可以从项目
CoreDefaultMsmqSample
中了解详情。


public

void
DoWork()

{


using
(var transaction =
new
MessageQueueTransaction())

{


try

{

transaction.Begin();

Message msmqMessage =

_queue.Receive(TimeSpan.FromMilliseconds(500), transaction);


if
(msmqMessage !=
null
)

{

var message = (ExternalMessage) msmqMessage.Body;

LogManager.GetLogger(GetType()).Info(
"Task has been "
+


"done, info {0}"
.FormatWith(message.Data));


//Do work

Thread.Sleep(1000);

}

}


catch
(Exception ex)

{

transaction.Abort();

LogManager.GetLogger(GetType()).Error(ex.Message);


return
;

}

transaction.Commit();

}

}

总结

作者考虑到了线程池的可配置性和扩展性,在引用这些代码后,写出的线程应用会非常简洁,且不需要考虑线程之间的复杂操作.如果,你觉得有写模块是你
不需要的,比如日志模块或者IoC模块(这是什么模块我也不清楚),可以自行把代码删除掉,希望原作者会继续他的思路完善这个框架.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: