java多线程-BlockingQueue
2016-02-28 12:05
393 查看
BlockingQueue简介
ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。
LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE,每次插入后都将动态地创建链接节点。
PriorityBlockingQueue:以上2种队列都是先进先出队列,而PriorityBlockingQueue却不是,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素,依据对象的自然排序顺序或者是构造函数所带的Comparator决定的顺序。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列。
DelayQueue:基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的。其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。
BlockingQueue内容
BlockingQueue主要方法:
对于非阻塞队列,一般情况下建议使用offer、poll和peek三个方法,不建议使用add和remove方法。因为使用offer、poll和peek三个方法可以通过返回值判断操作成功与否,而使用add和remove方法却不能达到这样的效果。注意,非阻塞队列中的方法都没有进行同步措施。
BlockingQueue实现原理
以ArrayBlockingQueue为例,查看其源代码,其中主要包含以下对象:
下面主要介绍下put()和take()方法,来观察其同步的实现:
大家应该明白了阻塞队列的实现原理,事实它和我们用Object.wait()、Object.notify()和非阻塞队列实现生产者-消费者的思路类似,只不过它把这些工作一起集成到了阻塞队列中实现。并且在前面Condition中我们也模拟实现了一个阻塞队列,实现与其大同小异。
BlockingQueue应用
1:启动两个线程实现互斥等待:
2:前面介绍传统线程通信中,主线程和子线程交替运行,现在以阻塞队列来实现。
3:在API中有一个阻塞对象实现生产者和消费者的例子
使用阻塞队列代码要简单得多,不需要再单独考虑同步和线程间通信的问题。
在并发编程中,一般推荐使用阻塞队列,这样实现可以尽量地避免程序出现意外的错误。
阻塞队列使用最经典的场景就是socket客户端数据的读取和解析,读取数据的线程不断将数据放入队列,然后解析线程不断从队列取数据解析。还有其他类似的场景,只要符合生产者-消费者模型的都可以使用阻塞队列。
参考资料:http://www.cnblogs.com/dolphin0520/p/3932906.html
javaAPI
ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。
LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE,每次插入后都将动态地创建链接节点。
PriorityBlockingQueue:以上2种队列都是先进先出队列,而PriorityBlockingQueue却不是,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素,依据对象的自然排序顺序或者是构造函数所带的Comparator决定的顺序。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列。
DelayQueue:基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的。其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。
BlockingQueue内容
BlockingQueue主要方法:
抛出异常 | 特殊值 | 阻塞 | 超时 | |
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除 | remove() | poll() | take() | poll(time, unit) |
检查 | element() | peek() | 不可用 | 不可用 |
BlockingQueue实现原理
以ArrayBlockingQueue为例,查看其源代码,其中主要包含以下对象:
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -817911632652898426L; /** 数组对象,用于放置对象 */ final Object[] items; /** put, offer, or add方法放入数组的索引 */ int putIndex; /** take, poll, peek or remove方法取出数据的数组索引 */ int takeIndex; /** queue队列的总数 */ int count; /**可重入锁,控制并发*/ final ReentrantLock lock; /** 非空信号量,可以取数*/ private final Condition notEmpty; /** 非满信号量,可以放数 */ private final Condition notFull; }
下面主要介绍下put()和take()方法,来观察其同步的实现:
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); insert(e); } finally { lock.unlock(); } }
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return extract(); } finally { lock.unlock(); } }
大家应该明白了阻塞队列的实现原理,事实它和我们用Object.wait()、Object.notify()和非阻塞队列实现生产者-消费者的思路类似,只不过它把这些工作一起集成到了阻塞队列中实现。并且在前面Condition中我们也模拟实现了一个阻塞队列,实现与其大同小异。
BlockingQueue应用
1:启动两个线程实现互斥等待:
public class BlockingQueueTest { public static void main(String[] args) { final BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(3); for (int i = 0; i < 2; i++) { new Thread(new Runnable() { @Override public void run() { while (true) { System.out.println("Thread "+Thread.currentThread().getName()+"正在准备放入数据"); try { //模拟线程的放数速度 Thread.sleep(new Random().nextInt(1000)); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { queue.put(1); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("Thread "+Thread.currentThread().getName()+"放入数据,此时队列中的数据为:"+queue.size()); } } }).start(); new Thread(new Runnable() { @Override public void run() { while (true) { System.out.println("Thread "+Thread.currentThread().getName()+"正在取得数据"); try { //模拟线程的去数速度 Thread.sleep(100); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { queue.take(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("Thread "+Thread.currentThread().getName()+"取得数据,此时队列中的数据为:"+queue.size()); } } }).start(); } } }
2:前面介绍传统线程通信中,主线程和子线程交替运行,现在以阻塞队列来实现。
public class BlockingQueueCommunication { public static void main(String[] args) { final Business business = new Business(); new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub for (int i = 0; i < 50; i++) { try { business.sub(i); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }).start(); for (int i = 0; i < 50; i++) { try { business.main(i); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } static class Business{ BlockingQueue<Integer> queue1 = new ArrayBlockingQueue<Integer>(1); BlockingQueue<Integer> queue2 = new ArrayBlockingQueue<Integer>(1); { try { queue2.put(1);//保证queue2阻塞 } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void main(int i) throws InterruptedException{ queue1.put(1);//阻塞queue1 for (int j = 0; j < 100; j++) { System.out.println("main thread is looping of "+j +" in " + i); } queue2.take();//唤醒queue2 } public void sub(int i) throws InterruptedException{ queue2.put(1);//阻塞queue2 for (int j = 0; j < 10; j++) { System.out.println("sub thread is looping of "+j +" in " + i); } queue1.take();//唤醒queue1 } } }
BlockingQueue实现了线程同步,不可在方法中再次加入同步限制,否则会出现死锁。
3:在API中有一个阻塞对象实现生产者和消费者的例子
class Producer implements Runnable { private final BlockingQueue queue; Producer(BlockingQueue q) { queue = q; } public void run() { try { while(true) { queue.put(produce()); } } catch (InterruptedException ex) { ... handle ...} } Object produce() { ... } } class Consumer implements Runnable { private final BlockingQueue queue; Consumer(BlockingQueue q) { queue = q; } public void run() { try { while(true) { consume(queue.take()); } } catch (InterruptedException ex) { ... handle ...} } void consume(Object x) { ... } } class Setup { void main() { BlockingQueue q = new SomeQueueImplementation(); Producer p = new Producer(q); Consumer c1 = new Consumer(q); Consumer c2 = new Consumer(q); new Thread(p).start(); new Thread(c1).start(); new Thread(c2).start(); } }
使用阻塞队列代码要简单得多,不需要再单独考虑同步和线程间通信的问题。
在并发编程中,一般推荐使用阻塞队列,这样实现可以尽量地避免程序出现意外的错误。
阻塞队列使用最经典的场景就是socket客户端数据的读取和解析,读取数据的线程不断将数据放入队列,然后解析线程不断从队列取数据解析。还有其他类似的场景,只要符合生产者-消费者模型的都可以使用阻塞队列。
参考资料:http://www.cnblogs.com/dolphin0520/p/3932906.html
javaAPI
相关文章推荐
- API Guides(五)——<Activity> To Dialogs
- 优先级队列priority_queue
- Android:代码设置UI
- knockoutjs十一 value绑定
- html textarea标签的innerHTML属性和value属性的区别
- queue队列
- 15.UIScrollView
- Android Studio 运行 Gradle Build Running卡死的解决办法
- hdu-1242Rescue(优先队列+bfs)
- Arduino可穿戴开发入门教程(大学霸内部资料)
- 4321: queue2 思路题 DP
- Android UI 使用HTML布局(直接打开server网页)
- 2802: [Poi2012]Warehouse Store 贪心 priority_queue
- druid参考配置
- [Hapi.js] Replying to Requests
- 【CodeForces 297C】Splitting the Uniqueness
- poj 2785 4 Values whose Sum is 0
- 232. Implement Queue using Stacks
- 2016-02-28 00:53:21 version 与build
- build-tools\21.1.2\aapt.exe finished with non- zero exit value 1错误