您的位置:首页 > 产品设计 > UI/UE

java.util.concurrent.LinkedBlockingQueue

2012-07-11 13:36 513 查看

一、简介

LinkedBlockingQueue是BlockingQueue的一种使用Link List的实现,它对头和尾(取和添加操作)采用两把不同的锁,相对于ArrayBlockingQueue提高了吞吐量。它也是一种阻塞型的容器,适合于实现“消费者生产者”模式。

二、具体实现

LinkedBlockingQueue底层的定义如下:

[java]
view plaincopyprint?

public
class LinkedBlockingQueue<E>
extends AbstractQueue<E>

implements BlockingQueue<E>, java.io.Serializable {

static
class Node<E> {

/** The item, volatile to ensure barrier separating write and read */

volatile E item;

Node<E> next;
Node(E x) { item = x; }
}

// 支持原子操作

private
final AtomicInteger count =
new AtomicInteger(0);

// 链表的头和尾

private
transient Node<E> head;

private
transient Node<E> last;

// 针对取和添加操作的两把锁及其上的条件

private
final ReentrantLock takeLock =
new ReentrantLock();

private
final Condition notEmpty = takeLock.newCondition();

private
final ReentrantLock putLock =
new ReentrantLock();

private
final Condition notFull = putLock.newCondition();

...
}

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

static class Node<E> {
/** The item, volatile to ensure barrier separating write and read */
volatile E item;
Node<E> next;
Node(E x) { item = x; }
}

// 支持原子操作
private final AtomicInteger count = new AtomicInteger(0);

// 链表的头和尾
private transient Node<E> head;
private transient Node<E> last;

// 针对取和添加操作的两把锁及其上的条件
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();

...
}

LinkedBlockingQueue的添加操作:

[java]
view plaincopyprint?

public
class LinkedBlockingQueue<E>
extends AbstractQueue<E>

implements BlockingQueue<E>, java.io.Serializable {

private
void insert(E x) {

last = last.next = new Node<E>(x);

}

/**
* signal方法在被调用时,当前线程必须拥有该condition相关的锁!

* Signal a waiting take. Called only from put/offer (which do not

* otherwise ordinarily lock takeLock.)

*/

private
void signalNotEmpty() {

final ReentrantLock takeLock =
this.takeLock;

takeLock.lock();
try {

notEmpty.signal();
} finally {

takeLock.unlock();
}
}

public
void put(E o)
throws InterruptedException {

if (o ==
null)
throw
new NullPointerException();

int c = -1;

final ReentrantLock putLock =
this.putLock;

final AtomicInteger count =
this.count;

// 使用putLock

putLock.lockInterruptibly();
try {

try {

// 当容量已满时,等待notFull条件

while (count.get() == capacity)

notFull.await();
} catch (InterruptedException ie) {

notFull.signal(); // propagate to a non-interrupted thread

throw ie;

}
insert(o);
// 取出当前值,并将原数据增加1

c = count.getAndIncrement();
// 容量不满,再次激活notFull上等待的put线程

if (c +
1 < capacity)

notFull.signal();
} finally {

putLock.unlock();
}
// 必须先释放putLock再在notEmpty上signal,否则会造成死锁

if (c ==
0)

signalNotEmpty();
}

...
}

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

private void insert(E x) {
last = last.next = new Node<E>(x);
}

/**
* signal方法在被调用时,当前线程必须拥有该condition相关的锁!
* Signal a waiting take. Called only from put/offer (which do not
* otherwise ordinarily lock takeLock.)
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}

public void put(E o) throws InterruptedException {
if (o == null) throw new NullPointerException();
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 使用putLock
putLock.lockInterruptibly();
try {
try {
// 当容量已满时,等待notFull条件
while (count.get() == capacity)
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); // propagate to a non-interrupted thread
throw ie;
}
insert(o);
// 取出当前值,并将原数据增加1
c = count.getAndIncrement();
// 容量不满,再次激活notFull上等待的put线程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// 必须先释放putLock再在notEmpty上signal,否则会造成死锁
if (c == 0)
signalNotEmpty();
}

...
}


LinkedBlockingQueue的取操作:

[java]
view plaincopyprint?

public
class LinkedBlockingQueue<E>
extends AbstractQueue<E>

implements BlockingQueue<E>, java.io.Serializable {

private E extract() {

Node<E> first = head.next;
head = first;
E x = first.item;
first.item = null;

return x;

}

private
void signalNotFull() {

final ReentrantLock putLock =
this.putLock;

putLock.lock();
try {

notFull.signal();
} finally {

putLock.unlock();
}
}

public E take()
throws InterruptedException {

E x;
int c = -1;

final AtomicInteger count =
this.count;

final ReentrantLock takeLock =
this.takeLock;

// 使用takeLock

takeLock.lockInterruptibly();
try {

try {

// 若容量为空,等待notEmpty

while (count.get() ==
0)

notEmpty.await();
} catch (InterruptedException ie) {

notEmpty.signal(); // propagate to a non-interrupted thread

throw ie;

}

x = extract();
c = count.getAndDecrement();
// 再次激活notEmpty

if (c >
1)

notEmpty.signal();
} finally {

takeLock.unlock();
}
// take执行之前容量已满,则激活notFull

if (c == capacity)

signalNotFull();
return x;

}

...
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: