学习互联网架构第十一课(并发类容器之Queue)
2017-06-28 23:41
232 查看
在并发队列上JDK提供了两套实现,一个是以ConcurrentLinkedQueue为代表的高性能队列,一个是以BlockingQueue接口为代表的阻塞队列,无论哪种都继承自Queue。如下图所示。
首先我们来学ConcurrentLinkedQueue,ConcurrentLinkedQueue:是一个适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,通常ConcurrentLinkedQueue性能好于BlockingQueue。它是一个基于链接节点的无界线程安全队列。该队列的元素遵循先进先出的原则。头是最先加入的,尾是最近加入的,该队列不允许null元素。
ConcurrentLinkedQueue重要方法:
add()和offer()都是加入元素的方法(在ConcurrentLinkedQueue中,这两个方法没有任何区别,大家可能有疑问,既然两个没有区别为何还要弄两个方法,这是因为这两个方法都继承自父类Queue,在其它场景下是可能不一样的)
poll()和peek()都是取头元素节点,区别在于前者会删除元素,后者不会。
下面我们来看个例子,如下所示。这是最常用的几个方法。
第一个:ArrayBlockingQueue
基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,其内部没实现读写分离,也就意味着生产和消费不能完全并行,长度是需要定义的,可以指定先进先出或者先进后出,也叫有界队列,在很多场合非常适用。
之所以说ArrayBlockingQueue是有界队列,是因为我们在使用该队列时必须指定队列的容量大小,如下图所示,三种实例化方式都必须有"int capacity"(容量大小)。
ArrayBlockingQueue向队列添加元素有三种方法,分别是put、add、offer。这三个方法虽然都是添加元素,但是作用却不同。首先我们来看下put方法,如下所示,我们给队列设置容量为5,然后故意向容器中添加6个元素,看是什么效果。
下面我们再试试add方法,代码如下所示。
下面我们再看下offer方法,代码如下:
第二个:LinkedBlockingQueue
举个例子如下:
代码如下,其中q.drainTo(list,3)是一次性把队列中的三个元素都存放到list当中,返回值是成功从队列中取出的元素个数。我们说LinkedBlockingQueue是无界队列是因为我们可以不设置队列的长度,这样队列便是无界的。
但是如果给LinkedBlockingQueue指定长度的话,它就变成了有界队列,比如我们把LinkedBlockingQueue的长度设置为5,超出队列的话,将无法再添加元素,如下图所示。
运行结果如下图所示,q.offer()方法如果返回true表示添加成功,返回false表示添加失败。可见第6个元素并没有成功添加。
第三个:SynchronousQueue
这个队列非常特殊,它不能装任何元素。
下面看个例子,这个例子,貌似SynchronousQueue可以添加元素,如下所示。但是其实SynchronousQueue依然是没有存储元素的,这里之所以没有报错,是因为我们先启动了一个线程t1要消费SynchronousQueue这个队列中的元素,线程t2要向SynchronousQueue队列添加一个元素,这时候会发生什么呢?这时候,线程t2并不会真的把元素添加到队列中,而是直接将要添加的元素交给线程t1了。也就是说,SynchronousQueue队列还是不会真正存储元素的。
肯定有些人会有疑问,既然SynchronousQueue不能装任何元素的话,那么要它有何用?还有就是有界队列和无界队列的应用场景是什么呢?如下图所示。
第四个:PriorityBlockingQueue
基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定,也就是说传入队列的对象必须实现Comparable接口),在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁,他也是一个无界的队列。下面我们便以一个小例子来说明。
参与比较的对象必须实现Comparable接口,如下所示,重写了compareTo方法,用id来进行比较。
第五个:DelayQueue
带有延迟时间的Queue,其中的元素只有当其指定的延迟时间到了,才能够从队列中获取该元素。DelayQueue中的元素必须实现Delayed接口,DelayQueue是一个没有大小限制的队列,应用场景很多,比如对缓存超时的数据进行移除、任务超时处理、空闲连接的关闭等等。
下面我们便来看一个网民在网吧上网的例子,首先我们来新建一个网民类,如下所示
首先我们来学ConcurrentLinkedQueue,ConcurrentLinkedQueue:是一个适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,通常ConcurrentLinkedQueue性能好于BlockingQueue。它是一个基于链接节点的无界线程安全队列。该队列的元素遵循先进先出的原则。头是最先加入的,尾是最近加入的,该队列不允许null元素。
ConcurrentLinkedQueue重要方法:
add()和offer()都是加入元素的方法(在ConcurrentLinkedQueue中,这两个方法没有任何区别,大家可能有疑问,既然两个没有区别为何还要弄两个方法,这是因为这两个方法都继承自父类Queue,在其它场景下是可能不一样的)
poll()和peek()都是取头元素节点,区别在于前者会删除元素,后者不会。
下面我们来看个例子,如下所示。这是最常用的几个方法。
package com.internet.queue; import java.util.Iterator; import java.util.concurrent.ConcurrentLinkedQueue; public class UseQueue { public static void main(String[] args) { //高性能无阻塞无界队列:ConcurrentLinkedQueue ConcurrentLinkedQueue<String> concurrentLinkedQueue = new ConcurrentLinkedQueue<String>(); concurrentLinkedQueue.offer("a"); concurrentLinkedQueue.add("b"); concurrentLinkedQueue.offer("c"); concurrentLinkedQueue.add("d"); System.out.println(concurrentLinkedQueue.poll());//取出第一个元素并删除 System.out.println(concurrentLinkedQueue.size());//打印队列的大小 System.out.println(concurrentLinkedQueue.peek());//取出第一个元素,不删除 System.out.println(concurrentLinkedQueue.size());//打印队列的大小 System.out.println("--------------------------------------------------------"); for (Iterator iterator = concurrentLinkedQueue.iterator(); iterator.hasNext();) { String str = (String) iterator.next(); System.out.println(str); } } }上面运行结果如下所示。
a 3 b 3 -------------------------------------------------------- b c d下面我们来验证ConcurrentLinkedQueue是线程安全的,我们向队列里添加一个元素,然后用多个线程去获取队列中的这个元素,如下所示。
package com.internet.queue; import java.util.concurrent.ConcurrentLinkedQueue; public class UseQueue { public static void main(String[] args) { //高性能无阻塞无界队列:ConcurrentLinkedQueue ConcurrentLinkedQueue<String> concurrentLinkedQueue = new ConcurrentLinkedQueue<String>(); concurrentLinkedQueue.offer("a"); Thread t1 = new Thread(new Runnable() { @Override public void run() { //不要使用.size()方法,因为那样效率非常低 if(!concurrentLinkedQueue.isEmpty()){ System.out.println("进入线程1"); String str = concurrentLinkedQueue.poll(); System.out.println("线程1取出的元素:"+str); } } },"t1"); Thread t2 = new Thread(new Runnable() { @Override public void run() { //不要使用.size()方法,因为那样效率非常低 if(!concurrentLinkedQueue.isEmpty()){ System.out.println("进入线程2"); String str = concurrentLinkedQueue.poll(); System.out.println("线程2取出的元素:"+str); } } },"t2"); Thread t3 = new Thread(new Runnable() { @Override public void run() { //不要使用.size()方法,因为那样效率非常低 if(!concurrentLinkedQueue.isEmpty()){ System.out.println("进入线程3"); String str = concurrentLinkedQueue.poll(); System.out.println("线程3取出的元素:"+str); } } },"t3"); Thread t4 = new Thread(new Runnable() { @Override public void run() { //不要使用.size()方法,因为那样效率非常低 if(!concurrentLinkedQueue.isEmpty()){ System.out.println("进入线程4"); String str = concurrentLinkedQueue.poll(); System.out.println("线程4取出的元素:"+str); } } },"t4"); Thread t5 = new Thread(new Runnable() { @Override public void run() { //不要使用.size()方法,因为那样效率非常低 if(!concurrentLinkedQueue.isEmpty()){ System.out.println("进入线程5"); String str = concurrentLinkedQueue.poll(); System.out.println("线程5取出的元素:"+str); } } },"t5"); t1.start(); t2.start(); t3.start(); t4.start(); t5.start(); } }其中一次运行结果如下所示,可以看到,能取出元素的只有一个线程,无论执行多少次,都只有一个线程能够获取到元素a,其它线程获取的都是null。注意判断队列是否为空时,不要使用.size()方法,因为.size() 是要遍历一遍集合的,因此比较慢,使用isEmpty()效率比较高。
进入线程1 进入线程4 进入线程3 进入线程2 线程3取出的元素:null 线程2取出的元素:null 线程4取出的元素:null 线程1取出的元素:a下面来学习下常见的几个阻塞队列,由于底层源码都比较难懂,我这里还是只说用法,想深入研究的同学可以去查看源码。
第一个:ArrayBlockingQueue
基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,其内部没实现读写分离,也就意味着生产和消费不能完全并行,长度是需要定义的,可以指定先进先出或者先进后出,也叫有界队列,在很多场合非常适用。
之所以说ArrayBlockingQueue是有界队列,是因为我们在使用该队列时必须指定队列的容量大小,如下图所示,三种实例化方式都必须有"int capacity"(容量大小)。
ArrayBlockingQueue向队列添加元素有三种方法,分别是put、add、offer。这三个方法虽然都是添加元素,但是作用却不同。首先我们来看下put方法,如下所示,我们给队列设置容量为5,然后故意向容器中添加6个元素,看是什么效果。
package com.internet.queue; import java.util.concurrent.ArrayBlockingQueue; public class UseQueue { public static void main(String[] args) throws Exception { ArrayBlockingQueue<String> array = new ArrayBlockingQueue<>(5); array.put("a"); array.put("b"); array.put("c"); array.put("d"); array.put("e"); array.put("f"); } }上面代码执行效果如下图,可以看到,线程一直处于running状态,这是因为put将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。现在第六个元素由于插入不到队列当中,它就在这儿等着,什么时候有元素从队列中出去了,它就插入到队列当中。
下面我们再试试add方法,代码如下所示。
package com.internet.queue; import java.util.concurrent.ArrayBlockingQueue; public class UseQueue { public static void main(String[] args) throws Exception { ArrayBlockingQueue<String> array = new ArrayBlockingQueue<>(5); array.add("a"); array.add("b"); array.add("c"); array.add("d"); array.add("e"); array.add("f"); } }运行结果如下图,可以看到抛出了异常,说队列已经满了,盛不下第六个元素了。add方法的作用便是:将指定的元素插入到此队列中(如果立即可行且不会违反容量限制),在成功时返回 true,如果当前没有可用空间,则抛出 IllegalStateException。
下面我们再看下offer方法,代码如下:
package com.internet.queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; public class UseQueue { public static void main(String[] args) throws Exception { ArrayBlockingQueue<String> array = new ArrayBlockingQueue<>(5); array.offer("a"); array.offer("b"); array.offer("c"); array.offer("d"); array.offer("e"); System.out.println(array.offer("f",3,TimeUnit.SECONDS)); } }下面是运行结果,可以看到offer返回的是bool类型的值,offer方法将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量),在成功时返回 true,如果此队列已满,则返回 false。当使用有容量限制的队列时,此方法通常要优于 add 方法,后者可能无法插入元素,而只是抛出一个异常。
第二个:LinkedBlockingQueue
举个例子如下:
代码如下,其中q.drainTo(list,3)是一次性把队列中的三个元素都存放到list当中,返回值是成功从队列中取出的元素个数。我们说LinkedBlockingQueue是无界队列是因为我们可以不设置队列的长度,这样队列便是无界的。
package com.internet.queue; import java.util.ArrayList; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; public class UseQueue { public static void main(String[] args) { //改队列可以是无界队列也可以是有界队列,不指定长度便是无界队列,指定长度便是有界队列 LinkedBlockingQueue<String> q = new LinkedBlockingQueue<String>(); q.offer("a"); q.offer("b"); q.offer("c"); q.offer("d"); q.offer("e"); q.offer("f"); List<String> list = new ArrayList<String>(); System.out.println(q.drainTo(list,3)); System.out.println(list.size()); for(String str : list){ System.out.println(str); } } }运行结果如下:
3 3 a b c
但是如果给LinkedBlockingQueue指定长度的话,它就变成了有界队列,比如我们把LinkedBlockingQueue的长度设置为5,超出队列的话,将无法再添加元素,如下图所示。
运行结果如下图所示,q.offer()方法如果返回true表示添加成功,返回false表示添加失败。可见第6个元素并没有成功添加。
第三个:SynchronousQueue
这个队列非常特殊,它不能装任何元素。
package com.internet.queue; import java.util.concurrent.SynchronousQueue; public class UseQueue { public static void main(String[] args) { SynchronousQueue<String> q = new SynchronousQueue<>(); System.out.println(q.offer("a")); } }运行结果如下图所示
下面看个例子,这个例子,貌似SynchronousQueue可以添加元素,如下所示。但是其实SynchronousQueue依然是没有存储元素的,这里之所以没有报错,是因为我们先启动了一个线程t1要消费SynchronousQueue这个队列中的元素,线程t2要向SynchronousQueue队列添加一个元素,这时候会发生什么呢?这时候,线程t2并不会真的把元素添加到队列中,而是直接将要添加的元素交给线程t1了。也就是说,SynchronousQueue队列还是不会真正存储元素的。
package com.internet.queue; import java.util.concurrent.SynchronousQueue; public class UseQueue { public static void main(String[] args) { final SynchronousQueue<String> q = new SynchronousQueue<String>(); Thread t1 = new Thread(new Runnable() { @Override public void run() { try { System.out.println(q.take()); } catch (Exception e) { e.printStackTrace(); } } }); t1.start(); Thread t2 = new Thread(new Runnable() { @Override public void run() { q.add("ffasss"); } }); t2.start(); } }
肯定有些人会有疑问,既然SynchronousQueue不能装任何元素的话,那么要它有何用?还有就是有界队列和无界队列的应用场景是什么呢?如下图所示。
第四个:PriorityBlockingQueue
基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定,也就是说传入队列的对象必须实现Comparable接口),在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁,他也是一个无界的队列。下面我们便以一个小例子来说明。
参与比较的对象必须实现Comparable接口,如下所示,重写了compareTo方法,用id来进行比较。
package com.internet.queue; public class Task implements Comparable<Task>{ private int id; private String name; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public int compareTo(Task task) { return this.id > task.id ? 1 : (this.id < task.id ? -1 : 0); } }下面我们使用PriorityBlockingQueue ,如下所示。
package com.internet.queue; import java.util.Iterator; import java.util.concurrent.PriorityBlockingQueue; public class UsePriorityBlockingQueue { public static void main(String[] args) { PriorityBlockingQueue<Task> q = new PriorityBlockingQueue<Task>(); Task t1 = new Task(); t1.setId(3); t1.setName("任务1"); Task t2 = new Task(); t2.setId(6); t2.setName("任务2"); Task t3 = new Task(); t3.setId(1); t3.setName("任务3"); q.add(t1); q.add(t2); q.add(t3); //添加到队列里面的元素还是没有顺序的 for (Iterator iterator = q.iterator(); iterator.hasNext();) { Task task = (Task) iterator.next(); System.out.println(task.getName()); } //只有当往外取数据的时候才有顺序 try { System.out.println(q.take().getId()); System.out.println(q.take().getId()); System.out.println(q.take().getId()); } catch (InterruptedException e) { e.printStackTrace(); } } }我们运行main方法,可以看到结果如下所示,可以看到,添加到队列里的对象其实是没有顺序的(任务3对应的对象的id是1,任务2对应的对象的id是6,任务1对应的对象的id是3),而我们往外取的时候可以看到取出的顺序是1、3、6,符合排序规则。
任务3 任务2 任务1 1 3 6
第五个:DelayQueue
带有延迟时间的Queue,其中的元素只有当其指定的延迟时间到了,才能够从队列中获取该元素。DelayQueue中的元素必须实现Delayed接口,DelayQueue是一个没有大小限制的队列,应用场景很多,比如对缓存超时的数据进行移除、任务超时处理、空闲连接的关闭等等。
下面我们便来看一个网民在网吧上网的例子,首先我们来新建一个网民类,如下所示
package com.internet.queue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; /** * 网民类 * @author wanghaijie * */ public class Wangmin implements Delayed{ //网名 private String name; //身份证号 private String id; //截止时间 private long endTime; //定义时间工具类,以秒为单位 private TimeUnit timeUnit = TimeUnit.SECONDS; public Wangmin(String name,String id,long endTime){ this.name = name; this.id = id; this.endTime = endTime; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getId() { return id; } public void setId(String id) { this.id = id; } public long getEndTime() { return endTime; } public void setEndTime(long endTime) { this.endTime = endTime; } @Override public int compareTo(Delayed delayed) { Wangmin w = (Wangmin)delayed; return this.getDelay(this.timeUnit) - w.getDelay(this.timeUnit) > 0 ? 1:0; } @Override public long getDelay(TimeUnit unit) { return unit.convert(endTime, TimeUnit.MILLISECONDS) - unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS); } }下面我们便来使用DelayQueue,如下所示
package com.internet.queue; import java.util.concurrent.DelayQueue; public class WangBa implements Runnable{ //延迟队列 private DelayQueue<Wangmin> queue = new DelayQueue<>(); //是否营业的标志 public boolean yingye = true; //上机方法,为了测试方便,规定交1块钱只能上1秒网。 public void shangji(String name, String id, int money){ //第三个参数是下机时间,上网时长加上当前时间就是下机时间 Wangmin man = new Wangmin(name, id, 1000*money + System.currentTimeMillis()); System.out.println("网名"+man.getName()+" 身份证"+man.getId()+" 交钱"+money+"块,开始上机..."); this.queue.add(man); } public void xiaji(Wangmin man){ System.out.println("网名"+man.getName()+" 身份证"+man.getId()+"时间到下机..."); } @Override public void run() { while(yingye){ try { Wangmin man = queue.take(); xiaji(man); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args){ try { System.out.println("网吧开始营业"); WangBa wangBa = new WangBa(); Thread shangwang = new Thread(wangBa); shangwang.start(); wangBa.shangji("路人甲", "123", 1); wangBa.shangji("路人乙", "234", 10); wangBa.shangji("路人丙", "345", 5); } catch (Exception e) { e.printStackTrace(); } } }运行main方法,结果如下所示,可见,DelayQueue在处理网吧上网的问题上还是非常方便的。
网吧开始营业 网名路人甲 身份证123 交钱1块,开始上机... 网名路人乙 身份证234 交钱10块,开始上机... 网名路人丙 身份证345 交钱5块,开始上机... 网名路人甲 身份证123时间到下机... 网名路人丙 身份证345时间到下机... 网名路人乙 身份证234时间到下机...
相关文章推荐
- 学习互联网架构第十课(并发类容器)
- 学习互联网架构第九课(同步类容器)
- nginx 源码学习笔记(九)——基本容器——queue
- STL学习(五)queue容器学习
- 学习互联网架构第一课(线程基础)
- 学习互联网架构第六课(使用wait/notify模拟Queue)
- 学习互联网架构第五课(多线程通信---wait和notify)
- 标准模板库(STL)学习探究之Queue容器
- 架构之路--同步类容器和并发类容器
- 互联网架构学习篇
- 学习互联网架构第八课(单例和多线程)
- 互联网架构学习相关资料(转)
- 【Java】容器类学习之路(二)Collection详解:List、Set和Queue
- STL学习系列五:Queue容器
- C++queue容器学习(详解)
- ACM学习历程19——queue队列容器与priority_queue优先队列容器
- C++ STL--queue队列容器学习笔记
- nginx 源码学习笔记(九)——基本容器——queue
- STL学习笔记— —特殊容器priority_queue
- 学习互联网架构第三课(synchronized重入锁)