您的位置:首页 > 产品设计 > UI/UE

BlockingQueue 实现解析

2016-05-29 21:26 323 查看

I. 简介

在并发开发中,BlockingArray和BlockingQueue因为其实现的阻塞功能常用来实现生产者-消费者模式,本文将分析源码来说明阻塞功能是如何实现的.

II. Condition

在分析源码前,首先需要对Condition类有一定了解.

以下是JDK 对Condition类的解释.

Condition factors out the Object monitor methods (wait, notify and notifyAll) into distinct objects to give the effect of having multiple wait-sets per object, by combining them with the use of arbitrary Lock implementations. Where a Lock replaces the use of synchronized methods and statements, a Condition replaces the use of the Object monitor methods.

Conditions (also known as condition queues or condition variables) provide a means for one thread to suspend execution (to “wait”) until notified by another thread that some state condition may now be true. Because access to this shared state information occurs in different threads, it must be protected, so a lock of some form is associated with the condition. The key property that waiting for a condition provides is that it atomically releases the associated lock and suspends the current thread, just like Object.wait.

A Condition instance is intrinsically bound to a lock. To obtain a Condition instance for a particular Lock instance use its newCondition() method.

简而言之,Condition 是和Lock绑定在一起、配合Lock 实现功能的一个类;Lock 的灵活性在此体现出来了.

通过调用Condition类下的
await()
signal()
方法,可以分别阻塞线程和通知线程某个操作是否可以进行,不然就持续阻塞. 这便是BlockingQueue 的实现方法.

下面将用同样是来自JDK的例子来说明.

class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull  = lock.newCondition();
final Condition notEmpty = lock.newCondition();

final Object[] items = new Object[100];
int putptr, takeptr, count;

public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}

public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}


BoundedBuffer 有两个方法,
put()
take()
. 同时也有两个Condition类对象,都是通过Lock的
newCondition()
方法获得的,这两个对象便是这个实现阻塞功能的类的关键.

put()
为例,首先获得锁,防止多个线程同时操作。随后查看数组中有多少个值,如果值已经达到了数组的上限,则阻塞线程,也就是调用Condition的
await()
方法。那么,在阻塞后,调用
put()
方法的线程要怎么才能继续呢?

take()
方法中,有一个
signal()
方法的调用,意思是:“如果notFull 对象正在阻塞了某个线程,你(notFull对象) 可以解除阻塞继续让线程运行了”. 也就是说,调用
put()
方法,并且因为数组已满而被阻塞的线程,在另一个线程调用了
take()
方法后,可以继续完成操作了.

III. BlockingQueue

BlockingQueue 的实现其实和上述类相似. 下面为源码.

public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to
preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}


此处使用了AtomicInteger 来记录有多少个变量存储;从try模块开始,可以看到,实现方法和II中的BoundedBuffer 基本一样,只是在逻辑判断上会更加的严密.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  jdk 源码 并发 java