您的位置:首页 > 产品设计 > UI/UE

并发容器学习—LinkedBlockingQueue和LinkedBlockingDueue

2019-05-02 10:54 453 查看
一、LinkedBlockingQueue并发容器 1.Linked BlockingQueue的底层实现     与 ArrayBlockingQueue相似,Linked BlockingQueue也是一个阻塞队列,不过Linked BlockingQueue的底层数据结构不是数组,而是链表 ,不过底层实现虽然是链表,但Linked BlockingQueue中规定链表必须有界,即若内存足够,链表也不能是无限大的,链表最大只能 是 Integer. MAX_VALUE(默认容量)。 其底层结点的定义:  
//从这可知,LinkedBlockingQueue是个单链表
static class Node<E> {
E item;    //数据

Node<E> next;    //后继结点

Node(E x) { item = x; }
}

2.LinkedBlockingQueue的继承体系

    LinkedBlockingQueue的继承关系如下图所示,由继承关系可知LinkedBlockingQueue与ArrayBlockingQueue实现的功能是相同的,只在存储数据结构上不同。其父类及实现的接口在之前的学习中都已分析过,这里不在多说。

 

3.重要的属性以及构造方法

    在LinkedBlockingQueue中保证线程并发安全所使用的的方式与ArrayBlockingQueue相似,都是通过使用重入锁ReentrantLock来实现的,不同的是ArrayBlockingQueue不论出队还是入队使用的都是同一把锁,因此ArrayBlockingQueue在实际使用使是不能出入队并发执行的,而LinkedBlockingQueue在这边方面则不同,LinkedBlockingQueue的出入队是各一把锁,分别控制出入队操作。

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

//阻塞队列的容量,不能超过int类型的最大值
private final int capacity;

//队列中元素的数量,是个原子计数类型
private final AtomicInteger count = new AtomicInteger();

//底层链表的头结点,即队首
transient Node<E> head;

//底层链表的尾结点,即队尾
private transient Node<E> last;

//重入锁,用于出队操作
private final ReentrantLock takeLock = new ReentrantLock();

//队列允许出队条件
private final Condition notEmpty = takeLock.newCondition();

//重入锁,用于入地操作
private final ReentrantLock putLock = new ReentrantLock();

//队列允许入队条件
private final Condition notFull = putLock.newCondition();

//默认构造方法,队列容量为最大值
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}

//指定容量的构造方法
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);    //初始化链表
}

//拥有初始数据的阻塞队列
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
}

4.入队过程

    与ArrayBlockingQueue相同,LinkedBlockingQueue中的入队方法也是put、add和offer三种,不过相比ArrayBlockingQueue中方法,它们更简单一些。

//父类AbstractQueue中的添加方法,新增失败就抛异常
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}

//入队方法,容量不足就放弃入队
public boolean offer(E e) {
//从这可以知道,LinkedBlockingQueue中也是不允许null元素的
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;    //获取元素计数器

//判断队列是否已满,已满不允许在添加元素到队列中
if (count.get() == capacity)
return false;
int c = -1;    //用于记录队列中原有元素的数量
Node<E> node = new Node<E>(e);    //新建e元素接地
final ReentrantLock putLock = this.putLock;
putLock.lock();    //入队锁
try {

//获取锁后再次判断队列容量是否已满,已满放弃入队
if (count.get() < capacity) {
enqueue(node);    //真正执行入队的方法
c = count.getAndIncrement();    //元素计数+1,并且将原来元素的个数返回

//判断入队后,队列是否还有剩余容量,若有就唤醒其他某个入队操作线程
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();    //释放锁
}

//判断队列原来是不是空队列,即未新增元素之前是不是为空
//若为空,那么出队操作此时应该都在等待状态,需要唤醒某个出队操作的线程
if (c == 0)
signalNotEmpty();    //唤醒一个出队操作线程
return c >= 0;
}

//向队列中添加队尾元素
private void enqueue(Node<E> node) {
last = last.next = node;    //直接添加元素到链表尾结点
}

//唤醒出队操作的线程
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;    //获取出队锁
takeLock.lock();
try {
notEmpty.signal();    //随机唤醒某个出队操作线程
} finally {
takeLock.unlock();
}
}

//在一定时间内执行入队操作,若超过指定时间仍无法入队,那就放弃入队,可被中断
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {

//从这可以知道,LinkedBlockingQueue中也是不允许null元素的
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);    //换算超时时间单位
int c = -1;
final ReentrantLock putLock = this.putLock;    //获取入队锁
final AtomicInteger count = this.count;    //获取计数器
putLock.lockInterruptibly();    //可被中断的加锁
try {

//判断队列容量是否已满,使用循环是为了防止虚假唤醒
//队列若是已满,则等待一定时间在尝试入队
while (count.get() == capacity) {
if (nanos <= 0)    //判断等待时间是否还有剩余
return false;    //超时,返回入队失败
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}

//向队列中添加元素,若队列容量不足,则等待直到队列有空间后继续添加
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();

int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;    //获取入队锁
final AtomicInteger count = this.count;    //计数器
putLock.lockInterruptibly();
try {

//判断队列是否已满,队列已满则,入队操作线程进入等待状态
//唤醒后要再次判断队列是否有空间,防止虚假唤醒
while (count.get() == capacity) {
//队列已满,线程进入条件队列等待
notFull.await();
}
enqueue(node);    //入队
c = count.getAndIncrement();    //获取原数量,并增加队列中的元素数量
if (c + 1 < capacity)
//到此,说明队列中至少有一个元素,那么就能进行出队操作
//因此可以唤醒一个等待出队的线程执行出队操作
notFull.signal();
} finally {
putLock.unlock();
}

/判断队列原来是不是空队列,即未新增元素之前是不是为空
//若为空,那么出队操作此时应该都在等待状态,需要唤醒某个出队操作的线程
if (c == 0)
signalNotEmpty();
}

5.出队过程

    在LinkedBlockingQueue中的出队方法也是只有两种poll和take,一个不等待,一个等待。

//移除并返回队首元素,若队列为空,则返回null
public E poll() {
final AtomicInteger count = this.count;    //获取队列元素个数
if (count.get() == 0)    //判断是不是空队列,空队列直接返回null
return null;
E x = null;    //用于记录移除出队的结点
int c = -1;    //用于记录队列原来结点数量
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {

//判断队列是否为空,不为空才能移除并获取队首结点
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();    //记录旧计数,并结点更新计数

//出队后,队列仍不为空,那么久可以继续唤醒一个出队操作的线程
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

//在一定时间内尝试出队操作,若超时仍未成功,则返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)    //判断是否超时
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

//移除并返回队首元素,若队列为空,则等待
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//判断队列是否是空队列,若是空队列那么当前出队操作进入等待状态
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();

//判断队列是否为空,队列不空,则可以继续唤醒其他的出队操作线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}

//判断未进行本次出队操作前队列是否已满,队列若是已满,说明所有的
//入队操作要么处于等待状态,要么不能成功,而现在至少队列执行过一次
//出队操作,此时队列必然还有容量可以执行入队操作,因此可以唤醒任意一个
//执行入队操作的线程
if (c == capacity)
signalNotFull();
return x;
}

6.peek方法

//获取但不移除队首元素
public E peek() {
//队列中没有元素的话,就返回null
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;    //head.next结点就是队首元素对应的结点

//判断队首是否为null,为null说明队列是空队列
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}

7.remove方法

//遍历队列查找o元素对应的结点并将其从队列中移除
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();    //获取出入队锁,防止并发问题
try {

//遍历队列对应的链表,查找要移除的元素
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
//判断结点是否是要删除的结点,若是,那就要将该结点从
//链表中移除
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();    //将出入队锁都释放
}
}

//删除队列中元素时,要对其他的出入队操作进行同步
//因此删除操作要将出队锁和入队锁都获取到
void fullyLock() {
putLock.lock();
takeLock.lock();
}

//将结点p从链表中移除
void unlink(Node<E> p, Node<E> trail) {
p.item = null;
trail.next = p.next;    //trail的后继结点变为p的后继结点

//若是p是尾结点,那么将last变为trail(trail是p的前驱结点)
if (last == p)
last = trail;

//移除一个结点,那么结点计数要-1,并且可以唤醒入队操作的线程了
if (count.getAndDecrement() == capacity)
notFull.signal();
}

8.size的统计

public int size() {
return count.get();    //队列中的元素个数直接返回计数器值
}

二、LinkedBlockingDueue并发容器

1.LinkedBlockingDueue的底层实现

    LinkedBlockingDueue可以看做是LinkedBlockingQueue的升级版,LinkedBlockingQueue能做的LinkedBlockingDueue也能做,不能做的LinkedBlockingDueue还能做,其底层数据结构也是链表,不过与LinkedBlockingQueue的单链表不同,LinkedBlockingDueue是双向链表,并且还可以做堆栈使用。

    结点的定义如下:

static final class Node<E> {
//存储数据
E item;

//前驱结点
Node<E> prev;

//后继结点
Node<E> next;

Node(E x) {
item = x;
}
}

2.LinkedBlockingDueue的继承关系

    LinkedBlockingDueue的继承关系如下所示,相比LinkedBlockingQueue多实现了一个双端队列的接口。

    接下来看看BlockingDeque中定义了哪些方法:

public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> {
//双端队列中若还有容量,将元素添加到队首,否则抛出异常
void addFirst(E e);

//双端队列中若还有容量,将元素添加到队尾,否则抛出异常
void addLast(E e);

//双端队列中若还有容量,将元素添加到队首,否则返回false
boolean offerFirst(E e);

//双端队列中若还有容量,将元素添加到队尾,否则返回false
boolean offerLast(E e);

//双端队列中若还有容量,立即将元素添加到队首,否则等待容量有空闲在添加
void putFirst(E e) throws InterruptedException;

//双端队列中若还有容量,立即将元素添加到队尾,否则等待容量有空闲在添加
void putLast(E e) throws InterruptedException;

//在一定时间内尝试将元素添加到队首,若到指定时间还没添加成功,则返回false
boolean offerFirst(E e, long timeout, TimeUnit unit)
throws InterruptedException;

//在一定时间内尝试将元素添加到队尾,若到指定时间还没添加成功,则返回false
boolean offerLast(E e, long timeout, TimeUnit unit)
throws InterruptedException;

//若队列不空,立即获取并移除队首元素,若队列已空则等待到队列中有元素在执行
E takeFirst() throws InterruptedException;

//若队列不空,立即获取并移除队尾元素,若队列已空则等待到队列中有元素在执行
E takeLast() throws InterruptedException;

//在一定时间内尝试获取并移除队首元素,若到达指定时间队列中仍没有元素,直接放弃尝试,返回null
E pollFirst(long timeout, TimeUnit unit)
throws InterruptedException;

//在一定时间内尝试获取并移除队尾元素,若到达指定时间队列中仍没有元素,直接放弃尝试,返回null
E pollLast(long timeout, TimeUnit unit)
throws InterruptedException;

//移除队列中第一次出现的o元素
boolean removeFirstOccurrence(Object o);

//移除队列中最后一个出现的o元素
boolean removeLastOccurrence(Object o);

//双端队列中若还有容量,将元素添加到队尾,否则抛出异常
//该方法等同于addLast
boolean add(E e);

//双端队列中若还有容量,将元素添加到队尾,否则返回false
//该方法等同于offerLast
boolean offer(E e);

//若队列不满,则立即向队尾添加元素,否则等待队列有空间后在添加
void put(E e) throws InterruptedException;

//在一定时间内尝试将元素添加到队尾,若到指定时间还没添加成功,则返回false
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;

//获取并移除队首,若队列为空,则抛异常
E remove();

//获取并移除队首,若队列为空,则返回null
E poll();

//获取并移除队首,若队列为空,则等待队列不为空在执行
E take() throws InterruptedException;

//在一定时间内尝试获取并移除队尾元素,若到达指定时间队列中仍没有元素,直接放弃尝试,返回null
E poll(long timeout, TimeUnit unit)
throws InterruptedException;

//获取但不移除队首元素,若队列为空,那么抛异常
E element();

//获取但不移除队首元素,若队列为空,那么返回null
E peek();

//删除队里中第一次出现的o元素
boolean remove(Object o);

//判断队列中是否含有o元素
public boolean contains(Object o);

//队列中元素的数量
public int size();

//获取队列的迭代器
Iterator<E> iterator();

//向队列中压入一个元素,即向队首添加一个元素,若队列没有
//容量,则抛出异常,等同于addFirst
void push(E e);
}

3.重要属性及构造方法

    LinkedBlockingDueue也是个容量可选(最大为Integer.MAX_VALUE)的阻塞队列,且线程安全。与LinkedBlockingQueue相似,其线程安全也是通过ReentrantLock来实现的,不过略微不同的似,LinkedBlockingDueue的底层只有一个重入锁,而LinkedBlockingQueue则有两个。

public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
//队列的头结点
transient Node<E> first;

//队尾结点
transient Node<E> last;

//队列中的结点计数
private transient int count;

//队列容量
private final int capacity;

//重入锁
final ReentrantLock lock = new ReentrantLock();

//队列允许出队条件
private final Condition notEmpty = lock.newCondition();

//队列允许入队条件
private final Condition notFull = lock.newCondition();

//使用默认容量的队列
public LinkedBlockingDeque() {
this(Integer.MAX_VALUE);
}

//指定容量的队列
public LinkedBlockingDeque(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
}

//带有初始元素的队列
public LinkedBlockingDeque(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock lock = this.lock;
lock.lock(); // Never contended, but necessary for visibility
try {
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (!linkLast(new Node<E>(e)))
throw new IllegalStateException("Deque full");
}
} finally {
lock.unlock();
}
}
}

4.入队过程

    由对BlockingDeque的分析可知,LinkedBlockingDueue中存在着大量的入队方法,这里就不一一分析了,因为实现基本都差不多,只挑选个别来看看。

//addFirst方法的本质其实还是调用offerFirst
//向队首新增元素,若队列容量不足,则抛异常
public void addFirst(E e) {
if (!offerFirst(e))
throw new IllegalStateException("Deque full");
}

//向队首新增元素,若队列容量不足,则返回false
public boolean offerFirst(E e) {

//队列中不允许null元素存在
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);    //新建对应结点
final ReentrantLock lock = this.lock;
lock.lock();    //加锁
try {
return linkFirst(node);    //真正执行添加队首结点的方法
} finally {
lock.unlock();
}
}

//向队首新增结点
private boolean linkFirst(Node<E> node) {
//判断队列是否已满
if (count >= capacity)
return false;    //队列已满直接返回失败
Node<E> f = first;    //获取队首结点
node.next = f;    //将原队首结点设为新增结点的后继结点
first = node;    //将新增结点设为队首结点

//判断原队列中是否为空队列
if (last == null)
last = node;    //原队列若为空队列,那么此时队首队尾都是同一个结点
else
f.prev = node;    //设置原来的队首结点的前驱结点为新增结点
++count;    //队列中的结点数量+1
notEmpty.signal();    //唤醒执行出队操作的线程
return true;
}

//向队尾新增元素,若队列容量不足,则抛异常
public void addLast(E e) {
if (!offerLast(e))
throw new IllegalStateException("Deque full");
}

//向队尾新增元素,若队列容量不足,则返回false
public boolean offerLast(E e) {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
return linkLast(node);    //真正实现添加队尾结点的方法
} finally {
lock.unlock();
}
}

//向队尾新增结点
private boolean linkLast(Node<E> node) {
//判断队列是否已满
if (count >= capacity)
return false;
Node<E> l = last;    //获取当前队尾结点
node.prev = l;    //将新增结点的前驱设为l
last = node;    //新增节点设为队尾

//判断队列原本是否为空
//若为空,则新增结点既是队首也是队尾
if (first == null)
first = node;
else
l.next = node;    //将l节点的后继设为新增结点
++count;    //计数+1
notEmpty.signal();    //唤醒执行出队操作的线程
return true;
}

//向队首新增结点,若队列已满,则等待
public void putFirst(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkFirst(node))    //若新增队首结点失败,则线程进入等待状态
notFull.await();
} finally {
lock.unlock();
}
}

//向队尾新增结点,若队列已满,则等待
public void putLast(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkLast(node))    //若新增队尾结点失败,则线程进入等待状态
notFull.await();
} finally {
lock.unlock();
}
}

5.出队过程

    同入队一样,出队的方法也很多,这里也只选个别来分析:

//获取并移除队首元素,若队列为空,则返回null
public E pollFirst() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return unlinkFirst();    //将队首结点从队列中移除
} finally {
lock.unlock();
}
}

//移除队首节点的方法
private E unlinkFirst() {
//获取队首结点
Node<E> f = first;
//判断队列是否为空队列,空队列直接返回null
if (f == null)
return null;

//获取队首结点的后继结点,要作为新的队首结点
Node<E> n = f.next;
E item = f.item;    //获取队首节点的数据,用作返回值
f.item = null;    //清空队首结点,方便GC回收
f.next = f;     //队首出队的后继设为自己
first = n;    //设置新的队首为n

//判断队列是否还有结点
if (n == null)
last = null;    //队列若是空了,那么队尾也设为null
else
n.prev = null;    //新队首的前驱设为null
--count;    //队列中的结点计数-1

//唤醒执行入队操作的线程,队列刚执行一次出队操作,必然有剩余空间
//因此可以执行入队操作
notFull.signal();
return item;
}

//获取并移除队尾元素,若队列为空,则返回null
public E pollLast() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return unlinkLast();    //将队尾结点从队列中移除
} finally {
lock.unlock();
}
}

//移除队尾结点的方法
private E unlinkLast() {

//获取队尾引用
Node<E> l = last;

//判断队列是否是空队列,空队列直接返回null
if (l == null)
return null;
Node<E> p = l.prev;    //获取队尾的前驱结点,用作新的队尾结点
E item = l.item;
l.item = null;
l.prev = l;     //队尾结点出队后的前驱设为自身,方便GC回收
last = p;    //设置新队尾

//判断队尾出队后队列中是否还是有结点,即队列是否成了空队列
if (p == null)
first = null;    //空队列的队首也是null
else
p.next = null;    //新队尾的后继设为null
--count;    //结点计数-1
notFull.signal();    //唤醒入队操作的线程
return item;
}

//将队首移除并返回,若队列已空则等待
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;

//判断移除队首结点是否成功,失败则等待
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}

//将队尾移除并返回,若队列已空则等待
public E takeLast() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;

//判断移除队尾结点是否成功,失败则等待
while ( (x = unlinkLast()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}

6.总结

    LinkedBlockingDueue中其他的方法就不一一分析了,都比较简单。

    与LinkedBlockingQueue对比,LinkedBlockingDueue的线程安全以及阻塞等待的实现基本没有区别,两个阻塞队列基本可以通用(LinkedBlockingDueue用作栈时除外)。两个队列基本上只有两点不同:一个是底层数据结构的细微区别,LinkedBlockingQueue是单向链表,而LinkedBlockingDueue则是双向链表;另一个是重入锁的使用有些区别,LinkedBlockingDueue不论出入队都使用的是同一个锁对象,而LinkedBlockingQueue的出入队锁是分开的。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Peek