Java并发----ArrayBlockingQueue
2016-11-06 22:46
633 查看
Java中有些多线程编程模式在很大程序上都依赖于Queue实现的线程安全性,所以非常有必要认识,首先来看一下接口定义,如下:
public interface Queue<E> extends Collection<E> {
// 向队列中添加元素
boolean add(E e);
boolean offer(E e);
// 删除队列元素
E remove();
E poll();
// 检查队列元素
E element();
E peek();
} BlockingQueue类继承了如上的接口,定义如下:
阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列。
Java提供了BlockingQueue接口的两个基本实现:LinkedBlockingQueue和ArrayBlockingQueue。他们都是FIFO队列,二者分别与LinkedList和ArrayList类似,但比同步List拥有更好的并发性能。他们的用法之间稍有区别,如已知队列的大小而能确定合适的边界时,用ArrayBlockingQueue非常高效。
下面来看ArrayBlockingQueue类的最主要的一个构造函数,如下:
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
} Lock的作用是提供独占锁机制,来保护竞争资源;而Condition是为了更加精细的对锁进行控制,它依赖于Lock,通过某个条件对多线程进行控制。notEmpty表示锁的非空条件。当某线程想从队列中取数据时,而此时又没有数据,则该线程通过notEmpty.await()进行等待;当其它线程向队列中插入了元素之后,就调用notEmpty.signal()唤醒之前通过notEmpty.await()进入等待状态的线程。同理,notFull表示“锁的满条件”。当某线程想向队列中插入元素,而此时队列已满时,该线程等待;当其它线程从队列中取出元素之后,就唤醒该等待的线程。
1、添加元素
public boolean add(E e) {
return super.add(e);
}
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
insert(e);
return true;
}
} finally {
lock.unlock();
}
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
insert(e);
return true;
} finally {
lock.unlock();
}
} (1)add(E e)方法会调用AbstractQueue类中的方法,代码如下:
(2)offer(E e)方法如果队列满了,则返回false,否则调用insert()方法进行元素的插入,这个方法的源代码如下:
理解如上代码,首先要认识两个变量的含义:takeIndex和putIndex。takeIndex表示下一个被取出元素的索引,putIndex表示下一个被添加元素的索引。它们的定义如下:
(3) put(E e)方法当加入元素时,如果队列已经满了,则阻塞等待;直到检测到不满时调用insert()方法进行插入。
(4)offer(E e, long timeout, TimeUnit unit) 如果在指定的时间内还无法插入队列,则返回false,表示插入失败。否则让插入队列等待一定的时间。如果插入成功,则返回true。
2、删除元素
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
}
return false;
} finally {
lock.unlock();
}
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : extract();
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return extract();
} finally {
lock.unlock();
}
} (1)remove(Object o)方法会移除元素值相同的元素。在移除过程中需要使用removeAt()方法,如下:
(3)take()方法调用时,如果此时队列为空,则阻塞等待;否则调用extract()方法返回元素值。
(4)poll(long timeout, TimeUnit unit) 在指定的时间内队列仍然为空则阻塞,超过指定时间返回null;队列不空直接调用extract()方法返回元素值。
3、查找元素
这个类还继承了AbstractQueue中的一个element()方法,如下:
4、遍历元素
to be returned by next // 下一次调用next()返回的元素 private E nextItem; // Element to be returned by next call to next // 上一次调用next()返回的元素 private E lastItem; // Element returned by last call to next // 上一次调用next()返回的元素的索引 private int lastRet; // Index of last element
returned, or -1 if none Itr() { final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); try { lastRet = -1; if ((remaining = count) > 0) nextItem = itemAt(nextIndex = takeIndex); } finally { lock.unlock(); } } public boolean hasNext() { return
remaining > 0; } public E next() { // 获取阻塞队列的锁 final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); try { // 若“剩余元素<=0”,则抛出异常。 if (remaining <= 0) throw new NoSuchElementException(); lastRet = nextIndex; // 获取第nextIndex位置的元素 E x = itemAt(nextIndex);
// check for fresher value if (x == null) { x = nextItem; // we are forced to report old value lastItem = null; // but ensure remove fails } else lastItem = x; while (--remaining > 0 && // skip over nulls (nextItem = itemAt(nextIndex = inc(nextIndex))) ==
null) ; return x; } finally { lock.unlock(); } } public void remove() { final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); try { int i = lastRet; if (i == -1) throw new IllegalStateException(); lastRet = -1; E x = lastItem; lastItem = null;
// only remove if item still at index if (x != null && x == items[i]) { boolean removingHead = (i == takeIndex); removeAt(i); if (!removingHead) nextIndex = dec(nextIndex); } } finally { lock.unlock(); } } }
本文参考:http://blog.csdn.net/mazhimazh/article/details/19239033
public interface Queue<E> extends Collection<E> {
// 向队列中添加元素
boolean add(E e);
boolean offer(E e);
// 删除队列元素
E remove();
E poll();
// 检查队列元素
E element();
E peek();
} BlockingQueue类继承了如上的接口,定义如下:
public interface BlockingQueue<E> extends Queue<E> { boolean add(E e); boolean offer(E e); void put(E e) throws InterruptedException; boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; E take() throws InterruptedException; E poll(long timeout, TimeUnit unit) throws InterruptedException; int remainingCapacity(); boolean remove(Object o); public boolean contains(Object o); int drainTo(Collection<? super E> c); int drainTo(Collection<? super E> c, int maxElements); }这个接口中本身定义的方法,加上从Queue接口中继承的方法后,可以将BlockingQueue方法大概分为4种形式。
阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列。
Java提供了BlockingQueue接口的两个基本实现:LinkedBlockingQueue和ArrayBlockingQueue。他们都是FIFO队列,二者分别与LinkedList和ArrayList类似,但比同步List拥有更好的并发性能。他们的用法之间稍有区别,如已知队列的大小而能确定合适的边界时,用ArrayBlockingQueue非常高效。
下面来看ArrayBlockingQueue类的最主要的一个构造函数,如下:
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
} Lock的作用是提供独占锁机制,来保护竞争资源;而Condition是为了更加精细的对锁进行控制,它依赖于Lock,通过某个条件对多线程进行控制。notEmpty表示锁的非空条件。当某线程想从队列中取数据时,而此时又没有数据,则该线程通过notEmpty.await()进行等待;当其它线程向队列中插入了元素之后,就调用notEmpty.signal()唤醒之前通过notEmpty.await()进入等待状态的线程。同理,notFull表示“锁的满条件”。当某线程想向队列中插入元素,而此时队列已满时,该线程等待;当其它线程从队列中取出元素之后,就唤醒该等待的线程。
1、添加元素
public boolean add(E e) {
return super.add(e);
}
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
insert(e);
return true;
}
} finally {
lock.unlock();
}
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
insert(e);
return true;
} finally {
lock.unlock();
}
} (1)add(E e)方法会调用AbstractQueue类中的方法,代码如下:
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }还是调用offer()方法添加元素,成功返回true,失败则抛出异常,表示队列已经满了。
(2)offer(E e)方法如果队列满了,则返回false,否则调用insert()方法进行元素的插入,这个方法的源代码如下:
private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; // 元素数量加1 notEmpty.signal(); // 唤醒取元素的线程 } <pre name="code" class="java"> final int inc(int i) { return (++i == items.length) ? 0 : i; //当i加1后如果队列已经满了,则设置下一个被添加元素的索引为0. }
理解如上代码,首先要认识两个变量的含义:takeIndex和putIndex。takeIndex表示下一个被取出元素的索引,putIndex表示下一个被添加元素的索引。它们的定义如下:
int takeIndex; // 下一个被取出元素的索引 int putIndex; // 下一个被添加元素的索引
(3) put(E e)方法当加入元素时,如果队列已经满了,则阻塞等待;直到检测到不满时调用insert()方法进行插入。
(4)offer(E e, long timeout, TimeUnit unit) 如果在指定的时间内还无法插入队列,则返回false,表示插入失败。否则让插入队列等待一定的时间。如果插入成功,则返回true。
2、删除元素
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
}
return false;
} finally {
lock.unlock();
}
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : extract();
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return extract();
} finally {
lock.unlock();
}
} (1)remove(Object o)方法会移除元素值相同的元素。在移除过程中需要使用removeAt()方法,如下:
void removeAt(int i) { // 移除索引处理的元素 final Object[] items = this.items; // if removing front item, just advance if (i == takeIndex) { items[takeIndex] = null; takeIndex = inc(takeIndex); // 下一个要取出元素的索引 } else { // slide over all others up through putIndex. for (;;) { int nexti = inc(i); if (nexti != putIndex) { items[i] = items[nexti]; i = nexti; } else { items[i] = null; putIndex = i; break; } } } --count; notFull.signal(); // 通知生产线程 }
(2)使用poll()方法时调用exact()方法,取出takeIndex索引处的值后删除这个元素,源代码如下:
private E extract() { final Object[] items = this.items; // 强制将元素转换为泛型E E x = this.<E>cast(items[takeIndex]); // 将第takeIndex元素设为null,即删除。同时,帮助GC回收 items[takeIndex] = null; // 设置下一个被取出元素的索引 takeIndex = inc(takeIndex); --count; notFull.signal(); return x; }
(3)take()方法调用时,如果此时队列为空,则阻塞等待;否则调用extract()方法返回元素值。
(4)poll(long timeout, TimeUnit unit) 在指定的时间内队列仍然为空则阻塞,超过指定时间返回null;队列不空直接调用extract()方法返回元素值。
3、查找元素
public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { // 如果队列为空,则返回null,否则调用itemAt()方法获取元素 return (count == 0) ? null : itemAt(takeIndex); } finally { lock.unlock(); } } <pre name="code" class="java"> final E itemAt(int i) { return this.<E>cast(items[i]); }
这个类还继承了AbstractQueue中的一个element()方法,如下:
public E element() { E x = peek(); if (x != null) return x; else throw new NoSuchElementException(); }调用peek()方法查找,如果元素存在,则返回,否则抛出异常。
4、遍历元素
<pre name="code" class="java"> public Iterator<E> iterator() { return new Itr(); }private class Itr implements Iterator<E> { // 队列中剩余元素的个数 private int remaining; // Number of elements yet to be returned // 下一次调用next()返回的元素的索引 private int nextIndex; // Index of element
to be returned by next // 下一次调用next()返回的元素 private E nextItem; // Element to be returned by next call to next // 上一次调用next()返回的元素 private E lastItem; // Element returned by last call to next // 上一次调用next()返回的元素的索引 private int lastRet; // Index of last element
returned, or -1 if none Itr() { final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); try { lastRet = -1; if ((remaining = count) > 0) nextItem = itemAt(nextIndex = takeIndex); } finally { lock.unlock(); } } public boolean hasNext() { return
remaining > 0; } public E next() { // 获取阻塞队列的锁 final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); try { // 若“剩余元素<=0”,则抛出异常。 if (remaining <= 0) throw new NoSuchElementException(); lastRet = nextIndex; // 获取第nextIndex位置的元素 E x = itemAt(nextIndex);
// check for fresher value if (x == null) { x = nextItem; // we are forced to report old value lastItem = null; // but ensure remove fails } else lastItem = x; while (--remaining > 0 && // skip over nulls (nextItem = itemAt(nextIndex = inc(nextIndex))) ==
null) ; return x; } finally { lock.unlock(); } } public void remove() { final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); try { int i = lastRet; if (i == -1) throw new IllegalStateException(); lastRet = -1; E x = lastItem; lastItem = null;
// only remove if item still at index if (x != null && x == items[i]) { boolean removingHead = (i == takeIndex); removeAt(i); if (!removingHead) nextIndex = dec(nextIndex); } } finally { lock.unlock(); } } }
本文参考:http://blog.csdn.net/mazhimazh/article/details/19239033
相关文章推荐
- JDK源码分析之主要阻塞队列实现类ArrayBlockingQueue -- java消息队列/java并发编程/阻塞队列
- 线程并发集合实现java生成消费模型(ArrayBlockingQueue和ConcurrentMap)
- 黑马程序员——高新技术—java5并发库之ArrayBlockingQueue
- java 并发工具包 BlockingQueue-ArrayBlockingQueue
- Java concurrent Framework并发容器之ArrayBlockingQueue(1.6)源码分析
- Java并发集合——ArrayBlockingQueue ,LinkedBlockingQueue,ConcurrentHashMap
- Java并发学习笔记(七)-ArrayBlockingQueue
- 详细分析Java并发集合ArrayBlockingQueue的用法
- (十五)java多线程之并发集合ArrayBlockingQueue
- JAVA并发之阻塞队列LinkedBlockingQueue与ArrayBlockingQueue
- Java多线程--并发中集合的使用之ArrayBlockingQueue
- 【死磕Java并发】-----J.U.C之阻塞队列:ArrayBlockingQueue
- java并发编程-同步类容器-ArrayBlockingQueue
- 深入剖析java并发之阻塞队列LinkedBlockingQueue与ArrayBlockingQueue
- Java多线程与并发应用-(10)-java阻塞队列实现ArrayBlockingQueue
- 【死磕Java并发】-----分析 ArrayBlockingQueue 构造函数加锁问题
- 【Java并发编程】—–“J.U.C”:ArrayBlockingQueue
- Java并发学习(二十二)-ArrayBlockingQueue分析
- java 5并发中的阻塞队列ArrayBlockingQueue的使用以及案例实现
- Java 并发 --- 阻塞队列之ArrayBlockingQueue源码分析