您的位置:首页 > 理论基础 > 数据结构算法

一个简单的Thread缓冲池的实现

2009-11-13 19:54 411 查看
 
在应用中,我们常常需要
Thread

缓冲池来做一些事以提高程序的效率和并发性。本文演示了如何利用
Queue

这种数据结构实现一个简单的
Thread

缓冲池。

 

一个
Thread

缓冲池可以设计成以下这样:缓冲池由几个工作
Thread

和一个
Queue

组成,
Client

负责把任务放到
Queue

里面(
put

方法),而工作
Thread

就依次取出这些任务并执行它们(
get

方法)。

 

Queue

的一个经典实现是使用一个循环数组(这个实现在很多数据结构的书上都有介绍),如下图所示:



图示是一个大小为
size

的数组,这个循环数组可以被想象成首尾相连的一个环。
oldest

指向
Queue

中最老的数据所在的位置,
next

指向下一个可以放新数据的位置。

放入一个新数据到
next

的位置后,需要更新
next


next = (next + 1) % size;


oldest

位置取出一个数据后,需要更新
oldest


oldest = (oldest + 1) % size;


oldest == next

的时候,
Queue

为空,


(next + 1) % size == oldest

的时候,
Queue

为满。

(注意:为了区分
Queue

为空和为满的情况,实际上
Queue

里面最多能放
size-1

个数据。)

 

因为这个
Queue

会同时被多个线程访问,需要考虑在这种情况下
Queue

如何工作。首先,
Queue

需要是线程安全的,可以用
Java

里的
synchronized

关键字来确保同时只有一个
Thread

在访问
Queue.

 

我们还可以注意到当
Queue

为空的时候,
get

操作是无法进行的;当
Queue

为满的时候,
put

操作又是无法进行的。在多线程访问遇到这种情况时,一般希望执行操作的线程可以等待(
block

)直到该操作可以进行下去。比如,但一个
Thread

在一个空
Queue

上执行
get

方法的时候,这个
Thread

应当等待
(block)

,直到另外的
Thread

执行该
Queue


put

方法后,再继续执行下去。在
Java

里面,
Object

对象的
wait(),notify()

方法提供了这样的功能。

 
把上面的内容结合起来,就是一个
SyncQueue

的类:

 
public

class

SyncQueue
{

   

   
public

SyncQueue(
int

size)
{

      
_array
=
new

Object[size];

      
_size
=
size;

      
_oldest
=
0;

      
_next
=
0;

   
}

   

   
public

synchronized

void

put(Object
o)
{

      
while

(full())
{

          
try

{

             
wait();

          
}
catch

(InterruptedException
ex)
{

             
throw

new

ExceptionAdapter(ex);

          
}

      
}

      
_array[_next]
=
o;

      
_next
=
(_next
+
1)
%
_size;

      
notify();

   
}

   

   
public

synchronized

Object
get()
{

      
while

(empty())
{

          
try

{

             
wait();

          
}
catch

(InterruptedException
ex)
{

             
throw

new

ExceptionAdapter(ex);

          
}

      
}

      
Object
ret
=
_array[_oldest];

      
_oldest
=
(_oldest
+
1)
%
_size;

      
notify();

      
return

ret;

   
}

   

   
protected

boolean

empty()
{

      
return

_next
==
_oldest;

   
}

   

   
protected

boolean

full()
{

      
return

(_next
+
1)
%
_size
==
_oldest;

   
}

   

   
protected

Object
[]
_array;

   
protected

int

_next;

   
protected

int

_oldest;

   
protected

int

_size;

}

可以注意一下
get


put

方法中
while

的使用,如果换成
if

是会有问题的。这是个很容易犯的错误。

;-)
在以上代码中使用了
ExceptionAdapter
这个类,它的作用是把一个
checked Exception
包装成
RuntimeException
。详细的说明可以参考我的
避免在
Java中使用
Checked Exception


一文。

 

接下来我们需要一个对象来表现
Thread
缓冲池所要执行的任务。可以发现
JDK
中的
Runnable interface
非常合适这个角色。

 

最后,剩下工作线程的实现就很简单了:从
SyncQueue
里取出一个
Runnable

对象并执行它。

 
public

class

Worker
implements

Runnable
{

   

   
public

Worker(SyncQueue
queue)
{

      
_queue
=
queue;

   
}

   

   
public

void

run()
{

      
while

(
true

)
{

          
Runnable
task
=
(Runnable)
_queue.get();

          
task.run();

      
}

   
}

   

   
protected

SyncQueue
_queue
=
null

;

}

 

下面是一个使用这个
Thread

缓冲池的例子:

 
      
//

构造
Thread
缓冲池

      
SyncQueue
queue
=
new

SyncQueue(10);

      
for

(
int

i
=
0;
i
<
5;
i
++)
{

          
new

Thread(
new

Worker(queue)).start();

      
}

      

      
//

使用
Thread
缓冲池

      
Runnable
task
=
new

MyTask();

      
queue.put(task);

 

为了使本文中的代码尽可能简单,这个
Thread
缓冲池的实现是一个基本的框架。当使用到实际中时,一些其他功能也可以在这一基础上添加,比如异常处理,动态调整缓冲池大小等等。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息