Java集合源码学习(16)_BlockingQueue接口的实现ArrayBlockingQueue
2014-07-22 00:18
791 查看
ArrayBlockingQueue继承了AbstractQueue,实现了BlockingQueue接口;
1:内部使用数组来存储队列元素
2:元素的排序是按照FIFO的顺序,队列的第一个元素是入队列时间最久的那个元素;
3:是有界队列,初始化时设置队列大小,之后不可再次设置;
4:不允许null值
5:在初始化时可设置等待该队列的被阻塞的线程唤醒的策略,fairness设置为true,按照请求该队列的顺序唤醒;默认是false也就是随机的唤醒
1:内部使用数组来存储队列元素
2:元素的排序是按照FIFO的顺序,队列的第一个元素是入队列时间最久的那个元素;
3:是有界队列,初始化时设置队列大小,之后不可再次设置;
4:不允许null值
5:在初始化时可设置等待该队列的被阻塞的线程唤醒的策略,fairness设置为true,按照请求该队列的顺序唤醒;默认是false也就是随机的唤醒
1:成员变量
/** The queued items */ private final E[] items;//队列元素的存放数组 /** items index for next take, poll or remove */ private int takeIndex;//出队列的元素index /** items index for next put, offer, or add. */ private int putIndex;//如队列的元素index /** Number of items in the queue */ private int count; /** Main lock guarding all access */ private final ReentrantLock lock;//操作都要首先获取锁 /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull;
2:重要方法
1:offer()方法
入队列,满了返回false,不阻塞public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { insert(e); return true; } } finally { lock.unlock(); } }
private void insert(E x) {//实际插入元素的方法,使用该方法前应该获得锁 items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); }
final int inc(int i) {//循环增加index的值 return (++i == items.length) ? 0 : i; }
2:put()方法
一直阻塞public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == items.length) notFull.await(); } catch (InterruptedException ie) { notFull.signal(); //因为在await之后,放弃了锁,被中断之后需要重新获取锁,要抛出异常了,需要唤醒其他在等待notFull的线程 throw ie; } insert(e); } finally { lock.unlock(); } }
3:offer(time)方法
阻塞固定时间public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { if (count != items.length) { insert(e); return true; } if (nanos <= 0) return false; try { nanos = notFull.awaitNanos(nanos);//返回还需要等待的时间 } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } } } finally { lock.unlock(); } }
4:poll()方法
不阻塞,出队列public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { if (count == 0) return null; E x = extract(); return x; } finally { lock.unlock(); } }
private E extract() { final E[] items = this.items; E x = items[takeIndex]; items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); return x; }
5:take()方法
阻塞式获取方法public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == 0) notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } E x = extract(); return x; } finally { lock.unlock(); } }
6:poll(time)方法
阻塞指定时间段public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { if (count != 0) { E x = extract(); return x; } if (nanos <= 0) return null; try { nanos = notEmpty.awaitNanos(nanos); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } } } finally { lock.unlock(); } }
7:peek()方法
public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : items[takeIndex]; } finally { lock.unlock(); } }
相关文章推荐
- Java集合源码学习(17)_BlockingQueue接口的实现LinkedBlockingQueue
- Java集合源码学习(15)_Queue接口的实现PriorityQueue和PriorityBlockingQueue
- Java集合源码学习(13)_Queue接口以及基础实现AbstractQueue
- 深入Java集合学习系列:ArrayBlockingQueue及其实现原理
- 阅读ArrayBlockingQueue源码了解如何利用锁实现BlockingQueue
- Java集合源码学习(7)_List接口的实现_LinkedList
- Java集合源码学习(11)_Set接口的实现LinkedHashSet
- Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析
- Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析
- Java集合源码学习(9)_Set接口的基础实现AbstractSet
- Java并发集合——ArrayBlockingQueue ,LinkedBlockingQueue,ConcurrentHashMap
- Java集合源码学习(6)_List接口的实现_ArrayList_Vector
- Java集合源码学习(8)_List接口的实现_CopyOnWriteArrayList
- Java Thread&Concurrency(16): 深入理解ArrayBlockingQueue及其实现原理
- Java集合源码学习(10)_Set接口的实现HashSet
- Java集合源码学习(5)_List接口的基础实现AbstractList
- Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析(还没看,先马)
- 线程并发集合实现java生成消费模型(ArrayBlockingQueue和ConcurrentMap)
- JDK源码分析之主要阻塞队列实现类ArrayBlockingQueue -- java消息队列/java并发编程/阻塞队列
- Java集合, ArrayBlockingQueue源码解析(常用于并发编程)