您的位置:首页 > 编程语言 > Java开发

LinkedBlockingQueue原理分析---基于JDK8

2018-01-26 16:06 363 查看
1.常用的阻塞队列 

1)ArrayBlockingQueue:规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的.

2)LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的

3)PriorityBlockingQueue:类似于LinkedBlockQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序.

4)SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的.

其中LinkedBlockingQueue和ArrayBlockingQueue比较起来,它们背后所用的数据结构不一样,导致LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue

2.LinkedBlockingQueue原理

基于链表实现,线程安全的阻塞队列。
使用锁分离方式提高并发,双锁(ReentrantLock):takeLock、putLock,允许读写并行,remove(e)和contain()、clear()需要同时获取2个锁。
FIFO先进先出模式。
在大部分并发场景下,LinkedBlockingQueue的吞吐量比ArrayBlockingQueue更好,双锁,入队和出队同时进行
根据构造传入的容量大小决定有界还是无界,默认不传的话,大小Integer.Max

3.LinkedBlockingQueue的几个关键属性

static class Node<E> {
E item;

/**后继节点
*/
Node<E> next;

Node(E x) { item = x; }
}

/** 队列容量,默认最大,可指定大小 */
private final int capacity;

/** 当前容量 */
private final AtomicInteger count = new AtomicInteger();

/**
* 头节点.
* Invariant: head.item == null
*/
transient Node<E> head;

/**
* 尾节点.
* Invariant: last.next == null
*/
private transient Node<E> last;

/** 定义的出队和入队分离锁,2个队列空和满的出队和入队条件 Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();


构造函数:默认是队列,可指定为有界,或初始给于一个初始集合数据

public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}

/**
* 指定有界大小,同时初始化head和tail节点
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity} is not greater
*         than zero
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}

/**
* 遍历集合元素,放到队列进行初始化  ---  无界队列
*
* @param c the collection of elements to initially contain
* @throws NullPointerException if the specified collection or any
*         of its elements are null
*/
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}

4.BlockingQueue源码分析

//入队,将元素添加到对尾等价  last.next = node; last = last.next
private void enqueue(Node<E> node) {
last = last.next = node;
}

/**
* 出队,从头部出
*
* @return the node
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}


// 队列已满:false
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity) // 队列容量达到最大值,添加失败
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // 获取插入锁putLock
try {
if (count.get() < capacity) { // 加锁后再次判断队列是否已满
enqueue(node); // 入队
c = count.getAndIncrement(); // 返回Inc之前的值
if (c + 1 < capacity) // 插入节点后队列未满
notFull.signal(); // 唤醒notFull上的等待线程
}
} finally {
putLock.unlock(); // 释放插入锁
}
if (c == 0)
signalNotEmpty(); // 如果offer前队列为空,则唤醒notEmpty上的等待线程
return c >= 0;
}


public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException 方法和offer(E e)代码和功能均相似,但是如果在指定时间内未插入成功则会返回false。

比offer(E e)多的部分代码分析:

long nanos = unit.toNanos(timeout);  //将指定的时间长度转换为毫秒来进行处理
while (count.get() == capacity) {
if (nanos <= 0) // 等待的剩余时间小于等于0,那么直接返回false
return false;
nanos = notFull.awaitNanos(nanos); // 最多等待时间(纳秒)
}


//插入节点:\n线程入队操作前会获取putLock锁,插入数据完毕后释放;
队列未满将新建Node节点,添加到队列末尾;
队列已满则阻塞线程(notFull.await())或返回false;若线程B取出数据,则会调用notFull.signal()唤醒notFull上的等待线程(线程A继续插数据)。
若入队前队列为空,则唤醒notEmpty上等待的获取数据的线程

// 一直阻塞直到插入成功
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 可中断的锁获取操作(优先考虑响应中断),如果线程由于获取锁而处于Blocked状态时,线程将被中断而不再继续等待(throws InterruptedException),可避免死锁。
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
// 队列若满线程将处于等待状态。while循环可避免“伪唤醒”(线程被唤醒时队列大小依旧达到最大值)
while (count.get() == capacity) {
notFull.await(); // notFull:入队条件
}
enqueue(node); // 将node链接到队列尾部
c = count.getAndIncrement(); // 元素入队后队列元素总和
if (c + 1 < capacity) // 队列未满
notFull.signal(); // 唤醒其他执行入队列的线程
} finally {
putLock.unlock(); // 释放锁
}
// c=0说明队列之前为空,出队列线程均处于等待状态。添加一个元素后,队列已不为空,于是唤醒等待获取元素的线程
if (c == 0)
signalNotEmpty();
}


获取方法

先看几个重要方法:

[java]
view plain
copy

/** 

 * 唤醒等待插入数据的线程. Called only from take/poll. 

 */  

private void signalNotFull() {  

    final ReentrantLock putLock = this.putLock;  

    putLock.lock();  

    try {  

        notFull.signal();  

    } finally {  

        putLock.unlock();  

    }  

}  

/** 

* 队列头部元素出队. 



* @return the node 

*/  

private E dequeue() {  

    // assert takeLock.isHeldByCurrentThread();  

    // assert head.item == null;  

    Node<E> h = head; // 临时变量h  

    Node<E> first = h.next;  

    h.next = h; // 形成环引用help GC  

    head = first;  

    E x = first.item;  

    first.item = null;  

    return x;  

}  

4.1、poll()

[java]
view plain
copy

// 队列为空返回null而不是抛异常  

public E poll() {  

    final AtomicInteger count = this.count;  

    if (count.get() == 0)  

        return null;  

    E x = null;  

    int c = -1;  

    final ReentrantLock takeLock = this.takeLock;  

    takeLock.lock();  

    try {  

        if (count.get() > 0) {  

            x = dequeue();  

            c = count.getAndDecrement(); // 减1并返回旧值  

            if (c > 1)  

                notEmpty.signal(); // 唤醒其他取数据的线程  

        }  

    } finally {  

        takeLock.unlock();  

    }  

 // c等于capacity说明poll之前队列已满,poll一个元素后便可唤醒其他等待插入数据的线程  

    if (c == capacity)  

        signalNotFull();  

    return x;  

}  

衍生方法:

// 为poll方法增加了时间限制,指定时间未取回数据则返回null

[java]
view plain
copy

public E poll(long timeout, TimeUnit unit)throws InterruptedException{}  

4.2、take()

// 一直阻塞直到取回数据

[java]
view plain
copy

public E take() throws InterruptedException {  

    E x;  

    int c = -1;  

    final AtomicInteger count = this.count;  

    final ReentrantLock takeLock = this.takeLock;  

    takeLock.lockInterruptibly();  

    try {  

        while (count.get() == 0) { // 队列为空,一直等待  

            notEmpty.await();  

        }  

        x = dequeue(); // 出队  

        c = count.getAndDecrement();  

        if (c > 1) // take数据前队列大小大于1,则take后队列至少还有1个元素  

            notEmpty.signal(); // 唤醒其他取数据的线程  

    } finally {  

        takeLock.unlock();  

    }  

    if (c == capacity)  

        signalNotFull(); //唤醒其他等待插入数据的线程  

    return x;  

}  

4.3、drainTo(Collection<? super E> c, int maxElements)

// 移除最多maxElements个元素并将其加入集合

[java]
view plain
copy

public int drainTo(Collection<? super E> c, int maxElements) {  

    if (c == null)  

        throw new NullPointerException();  

    if (c == this)  

        throw new IllegalArgumentException();  

    if (maxElements <= 0)  

        return 0;  

    boolean signalNotFull = false;  

    final ReentrantLock takeLock = this.takeLock;  

    takeLock.lock();  

    try {  

        int n = Math.min(maxElements, count.get());//转移元素数量不能超过队列总量   

        // count.get provides visibility to first n Nodes  

        Node<E> h = head;  

        int i = 0;  

        try {  

            while (i < n) {  

                Node<E> p = h.next;//从队首获取元素  

                c.add(p.item);  

                p.item = null;//p为临时变量,置null方便GC  

                h.next = h;  

                h = p;  

                ++i;  

            }  

            return n;  

        } finally {  

            // Restore invariants even if c.add() threw  

            if (i > 0) { // 有数据被转移到集合c中  

                // assert h.item == null;  

                head = h;  

 //如果转移前的队列大小等于队列容量,则说明现在队列未满  

 // 更新count为队列实际大小(减去i得到)  

                signalNotFull = (count.getAndAdd(-i) == capacity);  

            }  

        }  

    } finally {  

        takeLock.unlock();  

        if (signalNotFull)  

            signalNotFull(); // 唤醒其他等待插入数据的线程  

    }  

}  

衍生方法:

// 将[所有]可用元素加入集合c

[java]
view plain
copy

 public int drainTo(Collection<? super E> c) {  

    return drainTo(c, Integer.MAX_VALUE);  

}  

4.4、boolean retainAll(Collection<?> c)

// 仅保留集合c中包含的元素,队列因此请求而改变则返回true

[java]
view plain
copy

public boolean retainAll(Collection<?> c) {  

    Objects.requireNonNull(c); // 集合为null则throw NPE  

    boolean modified = false;  

    Iterator<E> it = iterator();  

    while (it.hasNext()) {  

        if (!c.contains(it.next())) {  

            it.remove();  

            modified = true; // 队列因此请求而改变则返回true  

        }  

    }  

    return modified;  

}  

LinkedBlockingQueue取数据小结:

线程A取数据前会获取takeLock锁,取完数据后释放锁。

队列有数据则(通常)返回队首数据;

若队列为空,则阻塞线程(notEmpty.await())或返回null等;当线程B插入数据后,会调用notEmpty.signal()唤醒notEmpty上的等待线程(线程A继续取数据)。

若取数据前队列已满,则通过notFull.signal()唤醒notFull上等待插入数据的线程。

5、检测方法(取回但不移除)

5.1、E peek()

// 返回队列头,队列为空返回null

[java]
view plain
copy

public E peek() {  

    if (count.get() == 0)  

        return null;  

    final ReentrantLock takeLock = this.takeLock;  

    takeLock.lock();  

    try {  

        Node<E> first = head.next;  

        if (first == null)  

            return null;  

        else  

            return first.item;  

    } finally {  

        takeLock.unlock();  

    }  

}  

6、综述

6.1、LinkedBlockingQueue通过对 插入、取出数据 使用不同的锁,实现多线程对竞争资源的互斥访问;

6.2、(之前队列为空)添加数据后调用signalNotEmpty()方法唤醒等待取数据的线程;(之前队列已满)取数据后调用signalNotFull()唤醒等待插入数据的线程。这种唤醒模式可节省线程等待时间。

6.3、个别操作需要调用方法fullyLock()同时获取putLock、takeLock两把锁(如方法:clear()、contains(Object o)、remove(Object o)、toArray()、toArray(T[] a)、toString()),注意fullyLock和fullyUnlock获取锁和解锁的顺序刚好相反,避免死锁。

[java]
view plain
copy

/** 

 * Locks to prevent both puts and takes. 

 */  

void fullyLock() {  

    putLock.lock();  

    takeLock.lock();  

}  

/** 

 * Unlocks to allow both puts and takes. 

 */  

void fullyUnlock() {  

    takeLock.unlock();  

    putLock.unlock();  

}  

6.4、线程唤醒signal()

值得注意的是,对notEmpty和notFull的唤醒操作均使用的是signal()而不是signalAll()。

signalAll() 虽然能唤醒Condition上所有等待的线程,但却并不见得会节省资源,相反,唤醒操作会带来上下文切换,且会有锁的竞争。此外,由于此处获取的锁均是同一个(putLock或takeLock),同一时刻被锁的线程只有一个,也就无从谈起唤醒多个线程了。

6.5、LinkedBlockingQueue与ArrayBlockingQueue简要比较

ArrayBlockingQueue底层基于数组,创建时必须指定队列大小,“有界”;LinkedBlockingQueue“无界”,节点动态创建,节点出队后可被GC,故伸缩性较好;

ArrayBlockingQueue入队和出队使用同一个lock(但数据读写操作已非常简洁),读取和写入操作无法并行,LinkedBlockingQueue使用双锁可并行读写,其吞吐量更高。

ArrayBlockingQueue在插入或删除元素时直接放入数组指定位置(putIndex、takeIndex),不会产生或销毁任何额外的对象实例;而LinkedBlockingQueue则会生成一个额外的Node对象,在高效并发处理大量数据时,对GC的影响存在一定的区别。

参考、感谢:
http://blog.csdn.net/u010887744/article/details/73010691
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: