您的位置:首页 > 其它

生产者/消费者模型、读写模型、线程池

2010-07-14 16:07 211 查看
转自:------------------------http://www.javaeye.com/topic/174591

生产者/消费者模型

有了信号量这个利器,我们就可以处理比较复杂的线程同步模型了。

首先,我们来看一个比较简单的生产者/消费者模型。还是以Java代码为例。

public static final Object signal = new Object();

public static final char[] buf = new char[1024]; // 需要同步访问的共享资源

// 生产者代码

… produce() {

for(… ) { // 循环执行

synchronized(signal){

// 产生一些东西,放到 buf 共享资源中

signal.notify(); //然后通知消费者

signal.wait(); // 然后自己进入signal待召队列

}

}

}

// 消费者代码

… consume() {

for(… ) { // 循环执行

synchronized(signal){

signal.wait(); // 进入signal待召队列,等待生产者的通知

// 读取buf 共享资源里面的东西

signal.notify(); // 然后通知生产者

}

}

}

上述的生产者/消费者模型的实现非常简单,只用了一个信号量signal。这只是一段示意代码。

实际上的生产者/消费者模型的实现可能非常复杂。可以引入buf已满或者已空的判断,可以引入更多的信号量,也可以引入一个环状的buf链。但那些都是性能优化方面的工作,基本的信号量工作方式还是不变的。

生产者/消费者模型是典型的Coroutine。而且,当消费者或者生产者线程进入待召队列的时候,当前的运行栈状态就暂时保存在系统当中,这种状况又是典型的Continuation。

因此,我们完全可以用信号量机制自己实现Coroutine和Continuation。其实,那些在语法层面上支持Coroutine和Continuation的语言,内部实现原理也是采用类似的信号量同步机制。

读写模型

读写模型是一个稍微复杂一些的模型。

一份共享资源允许多个读者同时读取。但是只要有一个写者在写这份共享资源,任何其他的读者和写者都不能访问这份共享资源。

读写模型实现起来,不仅需要信号量机制,还需要额外的读者计数和写者计数。

public static final Object signal = new Object();

public static int readers = 0;

public static int writers = 0;

// 读者代码

… read() {

for(… ) { // 循环执行

synchronized(signal){

while( writers > 0 )

signal.wait(); // 如果有人在写,那么就放弃执行,进入待召队列

// 能够到达这里,说明没有人在写

readers ++ ; // 增加一个读者计数,表示本线程在读取

} // 这里出了synchronized范围,释放同步锁.以便其他线程读取.

// 进行一些读取操作

synchronized(signal){

readers --; // 读取完成,减少一个读者计数,表示本线程不在读取

signal.notifyAll(); // 通知待召队列里面的所有其他线程

}

}

}

// 写者代码

… write() {

for(… ) { // 循环执行

synchronized(signal){

while( writers > 0 || readers > 0)

signal.wait();// 如果有人在写或读,那么就放弃执行,进入待召队列

// 能够到达这里,说明没有人在写,也没有人在读

writers ++ ; // 增加一个写者计数,表示本线程在写

// 进行一些写操作

writers --; // 读取完成,减少一个读者计数,表示本线程不在写

signal.notifyAll(); // 通知待召队列里面的所有其他线程

}

}

}

上述代码只是一段示意代码。实际应用中,人们通常抽取出来一个专门的读写同步锁。

interface ReadWriteLock {

… getReadLock();

… releaseReadLock();

… getWriteLock();

… releaseWriteLock();

}

具体的实现原理也是类似的信号量同步机制。

class RWLock {

… readers, writers;

… synchronized … getReadLock() { // 相当于synchronized(this)



while( writers > 0 )

this.wait(); // 这里我们把RWLock对象本身作为信号量

readers++;

}

…synchronized … releaseReadLock(){ //相当于synchronized(this)

readers--;

this.notifyAll(); // // 这里我们把RWLock对象本身作为信号量

}

…synchronized … getWriteLock(){// 相当于synchronized(this)

while( writers > 0 || readers > 0 )

this.wait(); // 这里我们把RWLock对象本身作为信号量

writers++;

}

…synchronized … releaseWriteLock(){// 相当于synchronized(this)

writers--;

this.notifyAll(); // // 这里我们把RWLock对象本身作为信号量

}

}

具体用法是

public static final RWLock lock = new RWLock();

… read() {

lock.getReadLock();

// 读取

lock.releaseReadLock();

}

… write() {

lock.getWriteLock();

// 读取

lock.releaseWriteLock();

}

这种用法要求在执行一些处理之前,一定要执行某项特殊操作,处理之后一定也要执行某项特殊操作。这种人为的顺序性,无疑增加了代码的耦合度,降低了代码的独立性。很有可能会成为线程死锁和资源操作冲突的根源。

这点一直让我不安,可是没有找到方法避免。毕竟,死锁或者资源操作冲突,是线程的固有问题。

很巧的是,正在我惴惴不安的时候,我的一个朋友提供了一个信息。Sun公司根据JCR,决定在jdk1.5中引入关于concurrency(并发)的部分。

以下这个网址是concurrency部分的util.concurrent一个实现。非常好的信息。对于处理多线程并发问题,很有帮助。
http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html
里面提供了一个ReadWriteLock类,标准用法如下。

Standard usage of ReadWriteLock:

class X {

ReadWriteLock rw;

// ...

public void read() throws InterruptedException {

rw.readLock().acquire();

try {

// ... do the read

}

finally {

rw.readlock().release()

}

}

public void write() throws InterruptedException {

rw.writeLock().acquire();

try {

// ... do the write

}

finally {

rw.writelock().release()

}

}

}

我们可以看到,ReadWriteLock同样要求调用的顺序——aquire()和release()。我对自己的例子增强了一点信心。

我又查看了WriterPreferenceReadWriteLock类,看到里面成对的方法,startRead(),endRead();startWrite(),endWrite()。我的心情完全放松了下来。我的思路虽然粗糙,但大体的方向是正确的。

线程池

线程是一种比较昂贵的资源。有些系统为了重用线程,引入了线程池的机制。

线程池的工作原理如下:

首先,系统会启动一定数量的线程。这些线程就构成了一个线程池。

当有任务要做的时候,系统就从线程池里面选一个空闲的线程。然后把这个线程标记为“正在运行”。然后把任务传给这个线程执行。线程执行任务完成之后,就把自己标记为“空闲”。

这个过程并不难以理解。难以理解的是,一般来说,线程执行完成之后,运行栈等系统资源就会释放,线程对象就被回收了。一个已经完成的线程,又如何能回到线程池的空闲线程队列中呢?

秘诀就在于,线程池里面的线程永远不会执行完成。线程池里面的线程,都是一个无穷循环。具体代码如下:

Thread pooledThread {

… theTask …. // theTask成员变量,表示要执行的任务

… run() {

while( true ) { // 永不停止的循环

signal.wait(); // 等待系统的通知

theTask.run(); // 执行任务

}

}

}

系统只需要调用 signal.notify() 就可以启动一个空闲线程。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: