不惑JAVA之JAVA基础 - 阻塞队列
2016-05-05 15:21
615 查看
在学习线程池前,先要了解一下阻塞队列,这个知识点是线程池的核心。
LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。
PriorityBlockingQueue:以上2种队列都是先进先出队列,而PriorityBlockingQueue却不是,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列。
DelayQueue:基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
take方法用来从队首取元素,如果队列为空,则等待;
offer方法用来向队尾存入元素,如果队列满,则等待一定的时间,当时间期限达到时,如果还没有插入成功,则返回false;否则返回true;
poll方法用来从队首取元素,如果队列空,则等待一定的时间,当时间期限达到时,如果取到,则返回null;否则返回取得的元素;
ArrayBlockingQueue中用来存储元素的实际上是一个数组,
takeIndex和putIndex分别表示队首元素和队尾元素的下标,
count表示队列中元素的个数,
lock是一个可重入锁,notEmpty和notFull是等待条件。
构造方法方面:
第一个构造器只有一个参数用来指定容量,
第二个构造器可以指定容量和公平性,
第三个构造器可以指定容量、公平性以及用另外一个集合进行初始化。
从put方法的实现可以看出,它先获取了锁,并且获取的是可中断锁,
然后判断当前元素个数是否等于数组的长度(数组长度为定义的最大数组阻塞数)。
如果相等,则调用notFull.await()进行等待,如果捕获到中断异常,则唤醒线程并抛出异常。
当被其他线程唤醒时,通过insert(e)方法插入元素,最后解锁。
来看一下inc:
移动队尾下标值;
添加count数;
通过notEmpty唤醒正在等待取元素的线程。
跟put方法实现很类似,只不过put方法等待的是notFull信号,而take方法等待的是notEmpty信号。
在take方法中,如果可以取元素,则通过extract方法取得元素,下面是extract方法的实现:
Condition是在java 1.5中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition1的await()、signal()这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用Condition,阻塞队列实际上是使用了Condition来模拟线程间协作。
Condition是个接口,基本的方法就是await()和signal()方法;
Condition依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition()
调用Condition的await()和signal()方法,都必须在lock保护之内,就是说必须在lock.lock()和lock.unlock之间才可以使用
Conditon中的await()对应Object的wait();
Condition中的signal()对应Object的notify();
Condition中的signalAll()对应Object的notifyAll()。
参考:
Java并发编程:阻塞队列
/article/4719078.html
阻塞队列的作用
使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产生阻塞,那么在面对类似消费者-生产者的模型时,就必须额外地实现同步策略以及线程间唤醒策略,这个实现起来就非常麻烦。但是有了阻塞队列就不一样了,它会对当前线程产生阻塞,比如一个线程从一个空的阻塞队列中取元素,此时线程会被阻塞直到阻塞队列中有了元素。当队列中有元素后,被阻塞的线程会自动被唤醒(不需要我们编写代码去唤醒)。这样提供了极大的方便性。主要队列
ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。
PriorityBlockingQueue:以上2种队列都是先进先出队列,而PriorityBlockingQueue却不是,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列。
DelayQueue:基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
常用方法
put方法用来向队尾存入元素,如果队列满,则等待;take方法用来从队首取元素,如果队列为空,则等待;
offer方法用来向队尾存入元素,如果队列满,则等待一定的时间,当时间期限达到时,如果还没有插入成功,则返回false;否则返回true;
poll方法用来从队首取元素,如果队列空,则等待一定的时间,当时间期限达到时,如果取到,则返回null;否则返回取得的元素;
实现原理:(ArrayBlockingQueue为例)
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -817911632652898426L; /** The queued items */ private final E[] items; /** items index for next take, poll or remove */ private int takeIndex; /** items index for next put, offer, or add. */ private int putIndex; /** Number of items in the queue */ private int count; /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ private final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; }
ArrayBlockingQueue中用来存储元素的实际上是一个数组,
takeIndex和putIndex分别表示队首元素和队尾元素的下标,
count表示队列中元素的个数,
lock是一个可重入锁,notEmpty和notFull是等待条件。
public ArrayBlockingQueue(int capacity) { } public ArrayBlockingQueue(int capacity, boolean fair) { } public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { }
构造方法方面:
第一个构造器只有一个参数用来指定容量,
第二个构造器可以指定容量和公平性,
第三个构造器可以指定容量、公平性以及用另外一个集合进行初始化。
put方法
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == items.length) notFull.await(); } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } insert(e); } finally { lock.unlock(); } }
从put方法的实现可以看出,它先获取了锁,并且获取的是可中断锁,
然后判断当前元素个数是否等于数组的长度(数组长度为定义的最大数组阻塞数)。
如果相等,则调用notFull.await()进行等待,如果捕获到中断异常,则唤醒线程并抛出异常。
当被其他线程唤醒时,通过insert(e)方法插入元素,最后解锁。
private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); }
来看一下inc:
final int inc(int i) { return (++i == items.length) ? 0 : i; }
移动队尾下标值;
添加count数;
通过notEmpty唤醒正在等待取元素的线程。
take方法
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == 0) notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } E x = extract(); return x; } finally { lock.unlock(); } }
跟put方法实现很类似,只不过put方法等待的是notFull信号,而take方法等待的是notEmpty信号。
在take方法中,如果可以取元素,则通过extract方法取得元素,下面是extract方法的实现:
private E extract() { final E[] items = this.items; E x = items[takeIndex]; items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); return x; }
实例
经典非阻塞队列实现生产者-消费者模式
通过阻塞队列和Object.wait()和Object.notify()实现,wait()和notify()主要用来实现线程间通信。public class Test { private int queueSize = 10; private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize); public static void main(String[] args) { Test test = new Test(); Producer producer = test.new Producer(); Consumer consumer = test.new Consumer(); producer.start(); consumer.start(); } class Consumer extends Thread{ @Override public void run() { consume(); } private void consume() { while(true){ synchronized (queue) { while(queue.size() == 0){ try { System.out.println("队列空,等待数据"); queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); queue.notify(); } } queue.poll(); //每次移走队首元素 queue.notify(); System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素"); } } } } class Producer extends Thread{ @Override public void run() { produce(); } private void produce() { while(true){ synchronized (queue) { while(queue.size() == queueSize){ try { System.out.println("队列满,等待有空余空间"); queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); queue.notify(); } } queue.offer(1); //每次插入一个元素 queue.notify(); System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size())); } } } } }
阻塞队列实现生产者-消费者模式
public class Test { private int queueSize = 10; private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize); public static void main(String[] args) { Test test = new Test(); Producer producer = test.new Producer(); Consumer consumer = test.new Consumer(); producer.start(); consumer.start(); } class Consumer extends Thread{ @Override public void run() { consume(); } private void consume() { while(true){ try { queue.take(); System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素"); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Producer extends Thread{ @Override public void run() { produce(); } private void produce() { while(true){ try { queue.put(1); System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size())); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
Condition
在阻塞队列中,使用到了Condition,这里搜了个blog简单介绍一下。/** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull;
Condition是在java 1.5中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition1的await()、signal()这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用Condition,阻塞队列实际上是使用了Condition来模拟线程间协作。
Condition是个接口,基本的方法就是await()和signal()方法;
Condition依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition()
调用Condition的await()和signal()方法,都必须在lock保护之内,就是说必须在lock.lock()和lock.unlock之间才可以使用
Conditon中的await()对应Object的wait();
Condition中的signal()对应Object的notify();
Condition中的signalAll()对应Object的notifyAll()。
参考:
Java并发编程:阻塞队列
/article/4719078.html
相关文章推荐
- 【自用】【Windows】下【maven】及其在【eclipse】中的配置
- Java 8 中新的 Date 和 Time 类入门详解, DateUtil ,以后可以少写几个了,关键是线程安全了
- Spring Security 4 基于角色的登录例子(带源码)
- win10 JDK安装图解
- eclipse里svn的合并操作
- spring定时任务时间格式cronExpression设置
- Struts2注解Convention扫描jar中的Action的设置
- Spring 3.1中对JSR-330的支持
- myeclipse启动报“java was started but returned exit code=13”
- 使用Maven搭建Struts2框架的开发环境
- Spring 使用@ComponentScan扫描注解包
- 关于在Struts2的Action中使用domain模型接收参数的问题
- JAVA闰年的0229日期问题
- java 的Exception和Error
- MyEclipse下要给主函数传递参数问题
- Java 流(Stream)、文件(File)和IO
- 谈谈对Spring IOC的理解
- java 计算中缀表达式结果
- Spring 对数据库操作
- 张孝祥java.concurrent线程并发学习笔记 - concurrent简介