阅读ArrayBlockingQueue源码了解如何利用锁实现BlockingQueue
2014-06-11 15:29
615 查看
BlockingQueue是多线程里面一个非常重要的数据结构。在面试的时候,也常会被问到怎么实现BlockingQueue。本篇根据Java7里ArrayBlockingQueue的源码,简单介绍一下如何实现一个BlockingQueue。
要实现BlockingQueue,首先得了解最主要的方法:
add()和remove()是最原始的方法,也是最不常用的。原因是,当队列满了或者空了的时候,会抛出IllegalStateException("Queue full")/NoSuchElementException(),并不符合我们对阻塞队列的要求;因此,ArrayBlockingQueue里,这两个方法的实现,直接继承自java.util.AbstractQueue:
有上述源码可知,add()和remove()实现的关键,是来自java.util.Queue接口的offer()和poll()方法。
offer():在队列尾插入一个元素。若成功便返回true,若队列已满则返回false。(This method is generally preferable to method
poll():同理,取出并删除队列头的一个元素。若成功便返回true,若队列为空则返回false。
这里使用的是ReentrantLock,在插入或者取出前,都必须获得队列的锁,以保证同步。
由于offer()/poll()是非阻塞方法,一旦队列已满或者已空,均会马上返回结果,也不能达到阻塞队列的目的。因此有了put()/take()这两个阻塞方法:
put()/take()的实现,比起offer()/poll()复杂了一些,尤其有两个地方值得注意:
1. 取得锁以后,循环判断队列是否已满或者已空,并加上Condition的await()方法将当前正在调用put()的线程挂起,直至notFull.signal()唤起。
2. 这里使用的是lock.lockInterruptibly()而不是lock.lock()。原因在这里。lockInterruptibly()这个方法,优先考虑响应中断,而不是响应普通获得锁或重入获得锁。简单来说就是,由于put()/take()是阻塞方法,一旦有interruption发生,必须马上做出反应,否则可能会一直阻塞。
最后,无论是offer()/poll()还是put()/take(),都要靠insert()/extract()这个私有方法去完成真正的工作:
insert()/extract(),是真正将元素放进数组或者将元素从数组取出并删除的方法。由于ArrayBlockingQueue是有界限的队列(Bounded Queue),因此inc()/dec()方法保证元素不超出队列的界限。另外,每当insert()后,要使用notEmpty.signal()唤起因队列空而等待取出的线程;每当extract()后,同理要使用notFull.signal()唤起因队列满而等待插入的线程。
到此,便将ArrayBlockingQueue的主要的方法粗略介绍了一遍。假设面试时,需要我们自己实现BlockingQueue时,可参考以上的做法,重点放在put()/take()和insert()/extract()方法上,也可将其结合在一起:
最后,由于此文的启示,列举一些使用队列时的错误做法:
1. 忽略offer()的返回值。offer()作为有返回值的方法,可以在判断的时候十分有作用(例如add()的实现)。因此,千万不要忽略offer()方法的返回值。
2. 在循环里使用isEmpty()和阻塞方法:
take()是阻塞方法,无需做isEmpty()的判断,直接使用即可。而这种情况很有可能导致死锁,因为由于不断循环,锁会一直被isEmpty()取得(因为size()方法会取得锁),而生产者无法获得锁。
3. 频繁使用size()方法去记录。size()方法是要取得锁的,意味着这不是一个廉价的方法。可以使用原子变量代替。
本文完
要实现BlockingQueue,首先得了解最主要的方法:
add()和remove()是最原始的方法,也是最不常用的。原因是,当队列满了或者空了的时候,会抛出IllegalStateException("Queue full")/NoSuchElementException(),并不符合我们对阻塞队列的要求;因此,ArrayBlockingQueue里,这两个方法的实现,直接继承自java.util.AbstractQueue:
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } public E remove() { E x = poll(); if (x != null) return x; else throw new NoSuchElementException(); }
有上述源码可知,add()和remove()实现的关键,是来自java.util.Queue接口的offer()和poll()方法。
offer():在队列尾插入一个元素。若成功便返回true,若队列已满则返回false。(This method is generally preferable to method
add(java.lang.Object), which can fail to insert an element only by throwing an exception.)
poll():同理,取出并删除队列头的一个元素。若成功便返回true,若队列为空则返回false。
这里使用的是ReentrantLock,在插入或者取出前,都必须获得队列的锁,以保证同步。
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { insert(e); return true; } } finally { lock.unlock(); } } public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : extract(); } finally { lock.unlock(); } }
由于offer()/poll()是非阻塞方法,一旦队列已满或者已空,均会马上返回结果,也不能达到阻塞队列的目的。因此有了put()/take()这两个阻塞方法:
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); insert(e); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return extract(); } finally { lock.unlock(); } }
put()/take()的实现,比起offer()/poll()复杂了一些,尤其有两个地方值得注意:
1. 取得锁以后,循环判断队列是否已满或者已空,并加上Condition的await()方法将当前正在调用put()的线程挂起,直至notFull.signal()唤起。
2. 这里使用的是lock.lockInterruptibly()而不是lock.lock()。原因在这里。lockInterruptibly()这个方法,优先考虑响应中断,而不是响应普通获得锁或重入获得锁。简单来说就是,由于put()/take()是阻塞方法,一旦有interruption发生,必须马上做出反应,否则可能会一直阻塞。
最后,无论是offer()/poll()还是put()/take(),都要靠insert()/extract()这个私有方法去完成真正的工作:
private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); } final int inc(int i) { return (++i == items.length) ? 0 : i; } private E extract() { final Object[] items = this.items; E x = this.<E>cast(items[takeIndex]); items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); return x; } final int dec(int i) { return ((i == 0) ? items.length : i) - 1; }
insert()/extract(),是真正将元素放进数组或者将元素从数组取出并删除的方法。由于ArrayBlockingQueue是有界限的队列(Bounded Queue),因此inc()/dec()方法保证元素不超出队列的界限。另外,每当insert()后,要使用notEmpty.signal()唤起因队列空而等待取出的线程;每当extract()后,同理要使用notFull.signal()唤起因队列满而等待插入的线程。
到此,便将ArrayBlockingQueue的主要的方法粗略介绍了一遍。假设面试时,需要我们自己实现BlockingQueue时,可参考以上的做法,重点放在put()/take()和insert()/extract()方法上,也可将其结合在一起:
class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } }
最后,由于此文的启示,列举一些使用队列时的错误做法:
1. 忽略offer()的返回值。offer()作为有返回值的方法,可以在判断的时候十分有作用(例如add()的实现)。因此,千万不要忽略offer()方法的返回值。
2. 在循环里使用isEmpty()和阻塞方法:
while(!queue.isEmpty()) { T element = queue.take(); //Process element. }
take()是阻塞方法,无需做isEmpty()的判断,直接使用即可。而这种情况很有可能导致死锁,因为由于不断循环,锁会一直被isEmpty()取得(因为size()方法会取得锁),而生产者无法获得锁。
3. 频繁使用size()方法去记录。size()方法是要取得锁的,意味着这不是一个廉价的方法。可以使用原子变量代替。
本文完
相关文章推荐
- Java集合源码学习(16)_BlockingQueue接口的实现ArrayBlockingQueue
- JDK源码分析之主要阻塞队列实现类ArrayBlockingQueue -- java消息队列/java并发编程/阻塞队列
- J.U.C并发框架源码阅读(八)ArrayBlockingQueue
- Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析
- 源码解析:ArrayBlockingQueue和LinkedBlockingQueue的区别
- 利用ArrayBlockingQueue实现大量计时任务计时
- java源码阅读之ArrayBlockingQueue
- JDK 1.8 ArrayBlockingQueue 源码阅读(二)获取
- JDK源码分析—— ArrayBlockingQueue 和 LinkedBlockingQueue
- ArrayBlockingQueue 源码阅读 问题(一)
- ArrayBlockingQueue源码阅读
- JDK源码分析—— ArrayBlockingQueue 和 LinkedBlockingQueue
- java1.7集合源码阅读:ArrayBlockingQueue
- Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析
- JDK源码分析—— ArrayBlockingQueue 和 LinkedBlockingQueue
- ArrayBlockingQueue源码阅读心得
- 《java.util.concurrent 包源码阅读》06 ArrayBlockingQueue
- ArrayBlockingQueue源码阅读与理解
- ArrayBlockingQueue 源码阅读与分析
- JDK 1.8 ArrayBlockingQueue 源码阅读(一)插入