您的位置:首页 > 理论基础 > 数据结构算法

3.Java数据结构原理解析-Queue系列

2017-11-28 09:21 781 查看
Queue,也就是队列,满足FIFO的特性。

在Java中,Queue是一个接口,它的实现类有很多,其中非线程安全的代表是LinkedList,线程安全的有阻塞和非阻塞的,阻塞的大都实现了Queue的子接口BlockingQueue(阻塞队列),例如:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue等。非阻塞的有ConcurrentLinkedQueue。

Queue接口方法定义:

//添加元素,成功返回true,容量不够抛IllegalStateException
boolean add(E e)
//添加元素,成功返回true,容量不足返回false
boolean offer(E e)

//移除队首元素,队列为空时抛NoSuchElementException
E remove()
//移除队首元素,队列为空时返回null
E poll()

//查看队首元素,队列为空时抛NoSuchElementException
E element()
//查看队首元素,队列为空时返回null
E peek()


BlockingQueue接口定义(BlockingQueue除了继承Queue定义的方法外,还加入了自己的阻塞方法):

//添加元素,容量不足阻塞
void put(E e) throws InterruptedException
//添加元素,容量不足时等待指定时间
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException

//移除队首元素,队列为空时阻塞
E take() throws InterruptedException
//移除队首元素,队列为空时等待指定时间
E poll(long timeout, TimeUnit unit) throws InterruptedException


队列大多数是在多线程环境下使用的,生产者线程往队列中添加元素,消费者线程从队列中取出元素。所以,下面重点讨论ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue是采用什么样的数据结构和算法来保证队列的线程安全性的。

1.阻塞队列 ArrayBlockingQueue

ArrayBlockingQueue底层的数据结构是数组和循环队列,使用一个可重入锁和这个锁的两个条件对象进行并发控制。

首先,来看看ArrayBlockingQueue的属性。

//存放元素的数组
final Object[] items;
//循环队列头指针,起始值为0
int takeIndex;
//循环队列尾指针,指向下一个元素插入的位置,起始值为0
int putIndex;
//元素的个数
int count;

//可重入锁(被final修饰,之所以没有初始化,是因为所有的构造方法里面都对lock进行了初始化)
final ReentrantLock lock;
//队列非空条件
private final Condition notEmpty;
//队列未满条件
private final Condition notFull;


ArrayBlockingQueue的长度是固定的,无法扩容,所以创建一个ArrayBlockingQueue对象时,必须指定队列的容量,并且ArrayBlockingQueue不允许原始为null。从构造函数上可以看出这一点。

//创建一个指定容量的队列,锁默认是非公平的
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
//创建一个指定容量、指定锁的公平性的队列
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
//创建锁
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull =  lock.newCondition();
}

//用现有的集合创建一个指定容量、指定锁的公平性的队列
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
//创建队列
this(capacity, fair);

final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion(锁定只用于可见性,而不是互斥)
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
//尾指针
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}


(1)插入元素add、offer、put

public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}


public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
insert(e);
return true;
}
} finally {
lock.unlock();
}
}

private void insert(E x) {
items[putIndex] = x;
//队列尾指针+1
putIndex = inc(putIndex);
++count;
//通知在notEmpty上等待的线程
notEmpty.signal();
}

//循环加。循环队列的实现就体现在这里
final int inc(int i) {
return (++i == items.length) ? 0 : i;
}

public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
//在锁上等待,直到获取锁,但是会响应中断,优先考虑响应中断,而不是响应锁的普通获取或重入获取。
//不明白为什么add和offer方法使用lock,而put方法使用lockInterruptibly?
lock.lockInterruptibly();
try {
//队列已满,在notFull对象上等待
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}


(2)取出元素remove、poll、take

public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}

public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//队列为空时返回null
return (count == 0) ? null : extract();
} finally {
lock.unlock();
}
}

private E extract() {
final Object[] items = this.items;
E x = this.<E>cast(items[takeIndex]);
items[takeIndex] = null;
//队列头指针+1
takeIndex = inc(takeIndex);
--count;
//通知在notFull对象上等待的线程
notFull.signal();
return x;
}

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//队列为空时,在notEmpty上等待
while (count == 0)
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}


2.阻塞队列 LinkedBlockingQueueue

LinkedBlockingQueueue底层的数据结构是单向链表,使用两个可重入锁(放锁和拿锁)和对象的条件对象来进行并发控制。

LinkedBlockingQueueue由于使用了两个锁,所以允许同时添加和取出元素。这一点是和ArrayBlockingQueue最大的区别。

一个类的属性体现了这个类的数据结构,我们首先看看LinkedBlockingQueueue的属性

//链表的节点。从节点可以看出该链表只有一个next指针,是单向的,
static class Node<E> {
E item;

Node<E> next;

Node(E x) { item = x; }
}

//队列的容量,定义为final,说明所有的构造方法必须初始化容量
private final int capacity;

//元素的个数,因为使用了放锁和拿锁两个锁,所以同时添加和取出元素时存在并发问题,使用原子操作来保证元素的个数的准确性
private final AtomicInteger count = new AtomicInteger(0);

//单向链表头指针,head.item永远为null。(定义为transient说明不能序列化)
private transient Node<E> head;

//单向链表尾指针,last.next永远为null。(定义为transient说明不能序列化)
private transient Node<E> last;

//拿锁(控制remove、poll、take方法等)
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();

//放锁(控制add、offer、put方法等)
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();


照常理来说,取出一个元素后,队列应该是notFull,那么拿锁控制的是应该是notFull的条件变量,但是因为此处存在两把锁,可能在取出元素后,又有元素加入了。所有此处拿锁控制的是notEmpty,取出元素后,只要判断剩下的元素是否大于1就可以了,因为不可能有两个线程同时执行取操作。

(1)插入元素add、offer、put

//add方法是在AbstractQueue实现了,所以跟ArrayBlockingQueue一样
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }

public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node(e);
//锁定放锁
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//队列未满
if (count.get() < capacity) {
enqueue(node);
//队列长度+1
c = count.getAndIncrement();
//插入之后,队列还是未满,通知在notFull对象上的等待的线程(例如:put方法)
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
//c==0表示插入之前队列为空,队列为空说明可能有读线程在阻塞,如果c>0,说明肯定没有读线程在阻塞
if (c == 0)
signalNotEmpty();
return c >= 0;
}

//signalNotEmpty虽然用在offder/put中,但是从不在putLock的同步区内。这样就保证同一时刻只持有一个锁,这样就不会出现死锁问题。
//???关于此处为什么加锁的问题,暂时就是这样理解的
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}

//入队。入队操作很简单,就是将链表尾指针的next节点指向当前节点,并把当前节点设置为尾指针
private void enqueue(Node<E> node) {
last = last.next = node;
}


(2)取出操作remove、poll、take

//remove()方法是在AbstractQueue中实现了,跟ArrayBlockingQueue一样
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}

public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
//出队
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

//出队操作比较简单,就是将单链表的头指针指向下一个元素
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
//不是很明白这个,如果要帮助GC,直接将h.next=null不是更好吗?
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}

public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
//在notEmpty条件上等待
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}


3.阻塞队列 SynchronousQueue

  SynchronousQueue跟上面两个阻塞队列不同,它内部没有容器,一个生产线程put的时候,如果当前没有消费线程执行take,此生产线程必须阻塞,等待一个消费线程调用take操作,take操作将会唤醒该生产线程,同时消费线程会获取生产线程的数据(即数据传递),这样的一个过程称为一次配对过程(当然也可以先take后put,原理是一样的)。

可以参考:https://zhuanlan.zhihu.com/p/29227508

https://www.jianshu.com/p/376d368cb44f?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io

4.非阻塞队列 ConcurrentLinkedQueue

ArrayBlockingQueue和LinkedBlockingQueue都是阻塞的,阻塞体现在入队和出队的时候需要加锁。

下面介绍的ConcurrentLinkedQueue是非阻塞的,ConcurrentLinkedQueue底层的数据结构和LinkedBlockingQueue相同,也是使用单链表,不同的是ConcurrentLinkedQueue通过
sun.misc.Unsafe
类的CAS操作来保证线程安全的。

Unsafe类提供了硬件级别的原子操作,主要compareAndSwapXXX方法实现。

关于Unsafe,网上有很多资源,请自行查阅。

我们首先来看看ConcurrentLinkedQueue的成员变量。

//单链表头节点
private transient volatile Node<E> head;

//单链表尾节点
private transient volatile Node<E> tail;

//节点类型。与LinkedBlockingQueue不同的是,所有的赋值操作都是通过Unsafe对象的CAS来完成的,所以是线程安全的
private static class Node<E> {
volatile E item;
volatile Node<E> next;

Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}

//为item赋值
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}

void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}

//为next指针赋值
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}

private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;

static {
try {
//获取item属性和next属性的内存地址
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}


(1)入队add、offer



需要注意的是,每次入队之后,tail并不是总指向最后一个节点。奇数时是倒数第二个节点,偶数时是第一个节点。

public boolean add(E e) {
return offer(e);
}

public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);

for (Node<E> t = tail, p = t;;) {
//获得p的下一个节点
Node<E> q = p.next;
//如果下一个节点是null,也就是p节点就是尾节点
if (q == null) {
//将单链表的尾节点的next指针指向新节点
if (p.casNext(null, newNode)) {
if (p != t)
//如果tail不是尾节点则将入队节点设置为tail。
// 如果失败了,那么说明有其他线程已经把tail移动过
casTail(t, newNode);
return true;
}
// Lost CAS race to another thread; re-read next
}
// 如果p节点等于p的next节点,则说明p节点和q节点都为空,表示队列刚初始化,所以返回
else if (p == q)
// We have fallen off list.  If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable.  Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}

//为队列的尾节点赋值
private boolean casTail(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}


可参考:http://blog.csdn.net/u013991521/article/details/53068549
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  queue