您的位置:首页 > 产品设计 > UI/UE

Java 集合框架分析:PriorityBlockingQueue java1.8

2016-08-09 23:02 447 查看
相关文章:

Java 集合框架分析:Set

http://blog.csdn.net/youyou1543724847/article/details/52733723

Java 集合框架分析:LinkedList

http://blog.csdn.net/youyou1543724847/article/details/52734935

Java 集合框架分析:DelayQueue

http://blog.csdn.net/youyou1543724847/article/details/52176504

Java 集合框架分析:ArrayBlockingQueue

http://blog.csdn.net/youyou1543724847/article/details/52174308

Java 集合框架分析:ArrayDeque

http://blog.csdn.net/youyou1543724847/article/details/52170026

Java 集合框架分析:PriorityBlockingQueue

http://blog.csdn.net/youyou1543724847/article/details/52166985

Java 集合框架分析:JAVA Queue源码分析

http://blog.csdn.net/youyou1543724847/article/details/52164895

Java 集合框架分析:关于Set,Map集合中元素判等的方式

http://blog.csdn.net/youyou1543724847/article/details/52733766

Java 集合框架分析:ConcurrentModificationException

http://blog.csdn.net/youyou1543724847/article/details/52733780

Java 集合框架分析:线程安全的集合

http://blog.csdn.net/youyou1543724847/article/details/52734876

Java 集合框架分析:JAVA集合中的一些边边角角的知识

http://blog.csdn.net/youyou1543724847/article/details/52734918

哈哈,终于有了第二篇博客了,终于知道编辑一个博客需要注意什么了,希望坚持下去,每天看点小源码!

目录

1.简述PriorityBlockingQueue

2.主要方法及实现

3.使用过程中需要注意的地方

4.和其他的相关容器的比较

5.总结

简述PriorityBlockingQueue

特点:

1.属于并发安全的集合。(什么是并发安全的集合:即多线程的情况下,不会出现不确定的状态)。

2.无界的队列。

3.它的blocking表现在取元素时,如果队列为空,则取元素的线程会阻塞。

4.不允许null元素

主要方法及实现

1.主要的成员

同Priorityqueue,使用底层数组保存数据,拥有两把锁,一个可重入锁,一个自旋锁。

private final ReentrantLock lock;

/**
* Condition for blocking when empty
*/
private final Condition notEmpty;

/**
* Spinlock for allocation, acquired via CAS.
*/
private transient volatile int allocationSpinLock;
private transient Object[] queue;

/**
* The number of elements in the priority queue.
*/
private transient int size;


2.插入

步骤:

a.null检测

b.因为priorityblockingQueue是线程安全的,所以,插入删除都是需要加锁的。这里先进行加锁。

c.如果需要扩容,则先扩容。关于扩容操作,一会再说。

d.插入元素,调整顺序(和priorityqueue是一样的)

e.发送信息,激活阻塞的取数据的线程

f.释放锁(必须的,因为不是用的Synchronized,而是用的lock进行控制的)

public boolean offer(E e)
{
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try
{
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal();
}
finally { lock.unlock(); }
return true;
}


3.扩容操作

扩容发生在你要插入元素时,发现底层数组大小不够,则需要扩容。这里在判断时,已经获取的reentrantlock锁,因为要扩容,说明queue不为空。另一方面,扩容时,需要发生底层数组的重新复制到新数组中,而取数据的线程当前还不会读到新加入的数据(先取你之间加入的,这是happen-before原则,你新数据还没有插入成功,别人是看不到的),所以为了提高并发量,这里需要先释放reentrantlock,让其他的读线程能够进行(写线程还是会阻塞,因为要写,还是要进行扩容,会在获取扩容锁时阻塞)。突然想到一个问题,如果A写入时,发现满了,因此要扩容,所示扩容期间,释放了锁。B poll操作,然后C要写入,但是此时容量是允许的,这不就让C在A之前了么?这个问题是什么情况?

这里进行扩容使用的是CAS锁,当获取锁不成功时,说明有其他线程在扩容,则等待。在成功扩容之后,需要重新获取主锁(即reentrantlock),然后修改queue底层数组引用。

private void tryGrow(Object[] array, int oldCap)
{
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1))
{
try {
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0)
{    // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
if (newArray == null) // back off if another thread is allocating
Thread.yield();
lock.lock();
if (newArray != null && queue == array)
{
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}


4.删除元素

删除元素有多个版本:

不阻塞的:poll()(如果没有元素可用,则直接返回null)

阻塞的:take()

等待一段时间的:poll(long timeout, TimeUnit unit)

实现都大概相同:

public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null && nanos > 0)
nanos = notEmpty.awaitNanos(nanos);
} finally {
lock.unlock();
}
return result;
}


使用过程中需要注意的地方

1.priorityblockingqueue在操作队列时,都是共用的一把锁(在扩容时,用到了自旋锁,会释放一段主锁,然后重新获取)

2.peek,offer,poll,size等都是要获取同一把锁的,效率不是很高

3.在序列化时,为了提供效率,会先将数据放入到priorityqueue中,然后一次性加入到阻塞队列中,增加操作效率(不用每次都获取锁)

和其他的相关容器的比较

和priorityqueue基本相同,处理有加锁的操作外。

总结

除非是在多线程中,否则不要使用,几乎所有操作都要竞争同一把锁。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java 多线程