Java集合源码学习(15)_Queue接口的实现PriorityQueue和PriorityBlockingQueue
2014-07-21 21:14
751 查看
一:PriorityQueue
继承自AbstractQueue类,队列里面的数据根据指定的Comparator或者自然排序(元素的compareTo())排序;1:无界的队列(可以动态扩展)
2:内部实现基于堆
3:队列的头元素,是按照排序规则最小的元素;
一:类变量:
private static final int DEFAULT_INITIAL_CAPACITY = 11;//默认的初始化容量 //实际的存放元素的数组,其实实现的是一个最小堆。 private transient Object[] queue; //queue里面的元素的实际数量 private int size = 0; //指定的比较器 private final Comparator<? super E> comparator;
二:几个方法
1:容量扩展
private void grow(int minCapacity) { if (minCapacity < 0) // overflow throw new OutOfMemoryError(); int oldCapacity = queue.length; // Double size if small; else grow by 50% int newCapacity = ((oldCapacity < 64) ? ((oldCapacity + 1) * 2) : ((oldCapacity / 2) * 3)); if (newCapacity < 0) // overflow newCapacity = Integer.MAX_VALUE; if (newCapacity < minCapacity) newCapacity = minCapacity; queue = Arrays.copyOf(queue, newCapacity); }
2:元素入队列
public boolean offer(E e) { if (e == null) throw new NullPointerException(); modCount++; int i = size; if (i >= queue.length) grow(i + 1);//如果空间不足,扩容 size = i + 1; if (i == 0) queue[0] = e; else siftUp(i, e);//将数据放置到堆的最后位置,自下而上的调整堆 return true; }自下而上调整的算法如下:
private void siftUp(int k, E x) { if (comparator != null) siftUpUsingComparator(k, x); else siftUpComparable(k, x); } private void siftUpComparable(int k, E x) { Comparable<? super E> key = (Comparable<? super E>) x; while (k > 0) { int parent = (k - 1) >>> 1; Object e = queue[parent]; if (key.compareTo((E) e) >= 0) break; queue[k] = e; k = parent; } queue[k] = key; } private void siftUpUsingComparator(int k, E x) { while (k > 0) { int parent = (k - 1) >>> 1; Object e = queue[parent]; if (comparator.compare(x, (E) e) >= 0) break; queue[k] = e; k = parent; } queue[k] = x; }
3:元素出队列
public E poll() { if (size == 0) return null; int s = --size; modCount++; E result = (E) queue[0]; E x = (E) queue[s]; queue[s] = null; if (s != 0) siftDown(0, x);//移除堆最上面的元素,然后自上而下调整堆 return result; }自上而下调整的算法如下:
private void siftDown(int k, E x) { if (comparator != null) siftDownUsingComparator(k, x); else siftDownComparable(k, x); } private void siftDownComparable(int k, E x) { Comparable<? super E> key = (Comparable<? super E>) x; int half = size >>> 1; // loop while a non-leaf while (k < half) { int child = (k << 1) + 1; // assume left child is least Object c = queue[child]; int right = child + 1; if (right < size && ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0) c = queue[child = right]; if (key.compareTo((E) c) <= 0) break; queue[k] = c; k = child; } queue[k] = key; } private void siftDownUsingComparator(int k, E x) { int half = size >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = queue[child]; int right = child + 1; if (right < size && comparator.compare((E) c, (E) queue[right]) > 0) c = queue[child = right]; if (comparator.compare(x, (E) c) <= 0) break; queue[k] = c; k = child; } queue[k] = x; }
学习过数据结构的话,如果对堆的算法有了解,理解上面这几个方法应该比较容易
二:PriorityBlockingQueue
继承自AbstractQueue,实现了BlockingQueue接口;其实是,包装了PriorityQueue,对相应的方法进行了加锁的控制;
当然,也是一个无界队列;
一:类的变量
private final PriorityQueue<E> q;//实际存储数据的队列 private final ReentrantLock lock = new ReentrantLock(true);//公平的锁 private final Condition notEmpty = lock.newCondition();//因为是无解的队列,所以不需要notFull的条件变量
二:关键方法
1:offer方法(立即返回,成功true)
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { boolean ok = q.offer(e); assert ok; notEmpty.signal();//唤醒一个等待线程 return true; } finally { lock.unlock(); } }
2:add()(抛异常)方法和put()方法(阻塞)和offer(time)(阻塞)
入队列都不会阻塞,直接调用offer方法的public boolean add(E e) { return offer(e); }
public void put(E e) { offer(e); // never need to block }
public boolean offer(E e, long timeout, TimeUnit unit) { return offer(e); // never need to block }
3:poll()方法,不阻塞
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.poll();//如果不存在返回null } finally { lock.unlock(); } }
4:take()阻塞式获取
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (q.size() == 0) notEmpty.await(); } catch (InterruptedException ie) {//不太理解为何有signal了一次? notEmpty.signal(); // propagate to non-interrupted thread throw ie; } E x = q.poll(); assert x != null; return x; } finally { lock.unlock(); } }
public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E x = q.poll(); if (x != null) return x; if (nanos <= 0) return null; try { nanos = notEmpty.awaitNanos(nanos); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } } } finally { lock.unlock(); } }
相关文章推荐
- Java集合源码学习(17)_BlockingQueue接口的实现LinkedBlockingQueue
- Java集合源码学习(16)_BlockingQueue接口的实现ArrayBlockingQueue
- Java集合源码学习(13)_Queue接口以及基础实现AbstractQueue
- Java集合源码学习(10)_Set接口的实现HashSet
- Java集合源码学习(11)_Set接口的实现LinkedHashSet
- 深入Java集合学习系列:ArrayBlockingQueue及其实现原理
- Java集合源码学习(5)_List接口的基础实现AbstractList
- Java集合源码学习(8)_List接口的实现_CopyOnWriteArrayList
- Java常见集合框架(十六):Queue之DelayQueue、PriorityQueue、PriorityBlockingQueue
- Java集合源码学习(9)_Set接口的基础实现AbstractSet
- Java集合源码学习(6)_List接口的实现_ArrayList_Vector
- Java集合源码学习(7)_List接口的实现_LinkedList
- Java集合源码学习(3)_Collection接口的基础实现AbstractCollection
- Java多线程 -- JUC包源码分析5 -- Condition/ArrayBlockingQueue/LinkedBlockingQueue/Deque/PriorityBlockingQueu
- 《java.util.concurrent 包源码阅读》19 PriorityBlockingQueue
- Java集合源码学习(24)_ConcurrentMap的实现类ConcurrentHashMap
- JDK源码分析之主要阻塞队列实现类ArrayBlockingQueue -- java消息队列/java并发编程/阻塞队列
- Java 集合框架分析:PriorityBlockingQueue java1.8
- 深入Java集合学习系列:ConcurrentLinkedQueue及其实现原理
- java并发学习之BlockingQueue实现生产者消费者