java-并发-并发容器(4)
2016-07-23 19:30
471 查看
Set类型的ConcurrentSkipListSet和CopyOnWriteArraySet
对应的非并发容器:HashSet目标:代替synchronizedSet
原理:基于CopyOnWriteArrayList实现,其唯一的不同是在add时调用的是CopyOnWriteArrayList的addIfAbsent方法,其遍历当前Object数组,如Object数组中已有了当前元素,则直接返回,如果没有则放入Object数组的尾部,并返回。
基于CopyOnWriteArrayList实现,其唯一的不同是在add时调用的是CopyOnWriteArrayList的addIfAbsent方法,其遍历当前Object数组,如Object数组中已有了当前元素,则直接返回,如果没有则放入Object数组的尾部,并返回。
Copy-On-Write简称COW,是一种用于程序设计中的优化策略。其基本思路是,从一开始大家都在共享同一个内容,当某个人想要修改这个内容的时候,才会真正把内容Copy出去形成一个新的内容然后再改,这是一种延时懒惰策略。从JDK1.5开始Java并发包里提供了两个使用CopyOnWrite机制实现的并发容器,它们是CopyOnWriteArrayList和CopyOnWriteArraySet。CopyOnWrite容器非常有用,可以在非常多的并发场景中使用到。
什么是CopyOnWrite容器
CopyOnWrite容器即写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。
它是线程安全的无序的集合,可以将它理解成线程安全的HashSet。 有意思的是,CopyOnWriteArraySet和HashSet虽然都继承于共同的父类AbstractSet;但是,HashSet是通过“散列表(HashMap)”实现的,而CopyOnWriteArraySet则是通过“ 动态数组(CopyOnWriteArrayList) ”实现的,并不是散列表。
和CopyOnWriteArrayList类似,CopyOnWriteArraySet具有以下特性:
1. 它最适合于具有以下特征的应用程序:Set 大小通常保持很小,只读操作远多于可变操作,需要在遍历期间防止线程间的冲突。
2. 它是线程安全的。
3. 因为通常需要复制整个基础数组,所以可变操作(add()、set() 和 remove() 等等)的开销很大。
4. 迭代器支持hasNext(), next()等不可变操作,但不支持可变 remove()等 操作。
5. 使用迭代器进行遍历的速度很快,并且不会与其他线程发生冲突。在构造迭代器时,迭代器依赖于不变的数组快照。
CopyOnWriteArraySet继承于AbstractSet,这就意味着它是一个集合。
2. CopyOnWriteArraySet包含CopyOnWriteArrayList对象,它是通过CopyOnWriteArrayList实现的。而CopyOnWriteArrayList本质是个动态数组队列,
所以CopyOnWriteArraySet相当于通过通过动态数组实现的“集合”! CopyOnWriteArrayList中允许有重复的元素;但是,CopyOnWriteArraySet是一个集合,所以它不能有重复集合。因此,CopyOnWriteArrayList额外提供了addIfAbsent()和addAllAbsent()这两个添加元素的API,通过这些API来添加元素时,只有当元素不存在时才执行添加操作! 至于CopyOnWriteArraySet的“线程安全”机制,和CopyOnWriteArrayList一样,是通过volatile和互斥锁来实现的。这个在前一章节介绍CopyOnWriteArrayList时数据结构时,已经进行了说明,这里就不再重复叙述了。
// 创建一个空 set。 CopyOnWriteArraySet() // 创建一个包含指定 collection 所有元素的 set。 CopyOnWriteArraySet(Collection<? extends E> c) // 如果指定元素并不存在于此 set 中,则添加它。 boolean add(E e) // 如果此 set 中没有指定 collection 中的所有元素,则将它们都添加到此 set 中。 boolean addAll(Collection<? extends E> c) // 移除此 set 中的所有元素。 void clear() // 如果此 set 包含指定元素,则返回 true。 boolean contains(Object o) // 如果此 set 包含指定 collection 的所有元素,则返回 true。 boolean containsAll(Collection<?> c) // 比较指定对象与此 set 的相等性。 boolean equals(Object o) // 如果此 set 不包含任何元素,则返回 true。 boolean isEmpty() // 返回按照元素添加顺序在此 set 中包含的元素上进行迭代的迭代器。 Iterator<E> iterator() // 如果指定元素存在于此 set 中,则将其移除。 boolean remove(Object o) // 移除此 set 中包含在指定 collection 中的所有元素。 boolean removeAll(Collection<?> c) // 仅保留此 set 中那些包含在指定 collection 中的元素。 boolean retainAll(Collection<?> c) // 返回此 set 中的元素数目。 int size() // 返回一个包含此 set 所有元素的数组。 Object[] toArray() // 返回一个包含此 set 所有元素的数组;返回数组的运行时类型是指定数组的类型。 <T> T[] toArray(T[] a)
import java.util.*; import java.util.concurrent.*; /* * CopyOnWriteArraySet是“线程安全”的集合,而HashSet是非线程安全的。 * * 下面是“多个线程同时操作并且遍历集合set”的示例 * (01) 当set是CopyOnWriteArraySet对象时,程序能正常运行。 * (02) 当set是HashSet对象时,程序会产生ConcurrentModificationException异常。 * * @author skywang */ public class CopyOnWriteArraySetTest1 { // TODO: set是HashSet对象时,程序会出错。 //private static Set<String> set = new HashSet<String>(); private static Set<String> set = new CopyOnWriteArraySet<String>(); public static void main(String[] args) { // 同时启动两个线程对set进行操作! new MyThread("ta").start(); new MyThread("tb").start(); } private static void printAll() { String value = null; Iterator iter = set.iterator(); while(iter.hasNext()) { value = (String)iter.next(); System.out.print(value+", "); } System.out.println(); } private static class MyThread extends Thread { MyThread(String name) { super(name); } @Override public void run() { int i = 0; while (i++ < 10) { // “线程名” + "-" + "序号" String val = Thread.currentThread().getName() + "-" + (i%6); set.add(val); // 通过“Iterator”遍历set。 printAll(); } } } }
由于set是集合对象,因此它不会包含重复的元素。
如果将源码中的set改成HashSet对象时,程序会产生ConcurrentModificationException异常。
队列Queue类型的BlockingQueue和ConcurrentLinkedQueue
Java中的队列接口就是Queue,它有会抛出异常的add、remove方法,在队尾插入元素以及对头移除元素,还有不会抛出异常的,对应的offer、poll方法。2.1 LinkedList
List实现了deque接口以及List接口,可以将它看做是这两种的任何一种。
Queue queue=new LinkedList();
queue.offer(“testone”);
queue.offer(“testtwo”);
queue.offer(“testthree”);
queue.offer(“testfour”);
System.out.println(queue.poll()); //testone
2.2 PriorityQueue
一个基于优先级堆(简单的使用链表的话,可能插入的效率会比较低O(N))的无界优先级队列。优先级队列的元素按照其自然顺序进行排序,或者根据构造队列时提供的 Comparator 进行排序,具体取决于所使用的构造方法。优先级队列不允许使用 null 元素。依靠自然顺序的优先级队列还不允许插入不可比较的对象。
queue=new PriorityQueue();
queue.offer(“testone”);
queue.offer(“testtwo”);
queue.offer(“testthree”);
queue.offer(“testfour”);
System.out.println(queue.poll()); //testfour
2.3 ConcurrentLinkedQueue
基于链节点的,线程安全的队列。并发访问不需要同步。在队列的尾部添加元素,并在头部删除他们。所以只要不需要知道队列的大小,并发队列就是比较好的选择
生产者和消费者模式,生产者不需要知道消费者的身份或者数量,甚至根本没有消费者,他们只负责把数据放入队列。类似地,消费者也不需要知道生产者是谁,以及是谁给他们安排的工作。
而Java知道大家清楚这个模式的并发复杂性,于是乎提供了阻塞队列(BlockingQueue)来满足这个模式的需求。阻塞队列说起来很简单,就是当队满的时候写线程会等待,直到队列不满的时候;当队空的时候读线程会等待,直到队不空的时候。实现这种模式的方法很多,其区别也就在于谁的消耗更低和等待的策略更优。
以LinkedBlockingQueue的具体实现为例,它的put源码如下: public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { try { while (count.get() == capacity) notFull.await(); } catch (InterruptedException ie) { notFull.signal(); // propagate to a non-interrupted thread throw ie; } insert(e); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
撇开其锁的具体实现,其流程就是我们在操作系统课上学习到的标准生产者模式,看来那些枯燥的理论还是有用武之地的。其中,最核心的还是Java的锁实现,有兴趣的朋友可以再进一步深究一下。
阻塞队列Blocking queue,提供了可阻塞的put和take方法,他们与可定时的offer和poll方法是等价。Put方法简化了处理,如果是有界队列,那么当队列满的时候,生成者就会阻塞,从而改消费者更多的追赶速度。
ArrayBlockingQueue和LinkedBlockingQueue
FIFO的队列,与LinkedList(由链节点支持,无界)和ArrayList(由数组支持,有界)相似(Linked有更好的插入和移除性能,Array有更好的查找性能,考虑到阻塞队列的特性,移除头部,加入尾部,两个都区别不大),但是却拥有比同步List更好的并发性能。
另外,LinkedList永远不会等待,因为他是无界的
BlockingQueue<String> queue=new ArrayBlockingQueue<String>(5); Producer p=new Producer(queue); Consumer c1=new Consumer(queue); Consumer c2=new Consumer(queue); new Thread(p).start(); new Thread(c1).start(); new Thread(c2).start(); /** * 生产者 * @author Administrator * */ class Producer implements Runnable { private final BlockingQueue queue; Producer(BlockingQueue q) { queue = q; } public void run() { try { for(int i=0;i<100;i++){ queue.put(produce()); } } catch (InterruptedException ex) {} } String produce() { String temp=""+(char)('A'+(int)(Math.random()*26)); System.out.println("produce"+temp); return temp; } } /** * 消费者 * @author Administrator * */ class Consumer implements Runnable { private final BlockingQueue queue; Consumer(BlockingQueue q) { queue = q; } public void run() { try { for(int i=0;i<100;i++){ consume(queue.take()); } } catch (InterruptedException ex) {} } void consume(Object x) { System.out.println("cousume"+x.toString()); } } 输出: produceK cousumeK produceV cousumeV produceQ cousumeQ produceI produceD produceI produceG produceA produceE cousumeD
PriorityBlockingQueue
一个按优先级堆支持的无界优先级队列队列,如果不希望按照FIFO的顺序进行处理,它非常有用。它可以比较元素本身的自然顺序,也可以使用一个Comparator排序。
DelayQueue
一个优先级堆支持的,基于时间的调度队列。加入到队列中的元素必须实现新的Delayed接口(只有一个方法,Long getDelay(java.util.concurrent.TimeUnit unit)),添加可以理立即返回,但是在延迟时间过去之前,不能从队列中取出元素,如果多个元素的延迟时间已到,那么最早失效链接/失效时间最长的元素将第一个取出。
static class NanoDelay implements Delayed{ long tigger; NanoDelay(long i){ tigger=System.nanoTime()+i; } public boolean equals(Object other){ return ((NanoDelay)other).tigger==tigger; } /** * 返回此对象相关的剩余延迟时间,零或负值指示延迟时间已经用尽 */ public long getDelay(TimeUnit unit) { long n=tigger-System.nanoTime(); return unit.convert(n, TimeUnit.NANOSECONDS); } public long getTriggerTime(){ return tigger; } /** * 相互比较,看谁的实效时间最长,谁先出去 */ public int compareTo(Delayed o) { long i=tigger; long j=((NanoDelay)o).tigger; if(i<j){ return -1; } if(i>j) return 1; return 0; } } public static void main(String[] args) throws InterruptedException{ Random random=new Random(); DelayQueue<NanoDelay> queue=new DelayQueue<NanoDelay>(); for(int i=0;i<5;i++){ queue.add(new NanoDelay(random.nextInt(1000))); } long last=0; for(int i=0;i<5;i++){ NanoDelay delay=(NanoDelay)(queue.take()); long tt=delay.getTriggerTime(); System.out.println("Trigger time:"+tt); if(i!=0){ System.out.println("Data: "+(tt-last)); } last=tt; } }
SynchronousQueue
不是一个真正的队列,因为它不会为队列元素维护任何存储空间,不过它维护一个排队的线程清单,这些线程等待把元素加入(enqueue)队列或者移出(dequeue)队列。也就是说,它非常直接的移交工作,减少了生产者和消费者之间移动数据的延迟时间,另外,也可以更快的知道反馈信息,当移交被接受时,它就知道消费者已经得到了任务。
因为SynChronousQueue没有存储的能力,所以除非另一个线程已经做好准备,否则put和take会一直阻止。它只有在消费者比较充足的时候比较合适。
双端队列(Deque)
JAVA6中新增了两个容器Deque和BlockingDeque,他们分别扩展了Queue和BlockingQueue。Deque它是一个双端队列,允许高效的在头和尾分别进行插入和删除,它的实现分别是ArrayDeque和LinkedBlockingQueue。
双端队列使得他们能够工作在一种称为“窃取工作”的模式上面。
同步的(synchronized)+HashMap,如果不存在,则计算,然后加入,该方法需要同步。
HashMap cache=new HashMap(); public synchronized V compute(A arg){ V result=cace.get(arg); Ii(result==null){ result=c.compute(arg); Cache.put(result); } Return result; }
用ConcurrentHashMap代替HashMap+同步.,这样的在get和set的时候也基本能保证原子性。但是会带来重复计算的问题.
Map<A,V> cache=new ConcurrentHashMap<A,V>(); public V compute(A arg){ V result=cace.get(arg); Ii(result==null){ result=c.compute(arg); Cache.put(result); } Return result; }
采用FutureTask代替直接存储值,这样可以在一开始创建的时候就将Task加入
Map<A,FutureTask<V>> cache=new ConcurrentHashMap<A,FutureTask<V>>(); public V compute(A arg){ FutureTask <T> f=cace.get(arg); //检查再运行的缺陷 Ii(f==null){ Callable<V> evel=new Callable(){ Public V call() throws ..{ return c.compute(arg); } }; FutureTask <T> ft=new FutureTask<T>(evel); f=ft; cache.put(arg,ft; ft.run(); } Try{ //阻塞,直到完成 return f.get(); }cach(){ } }
上面还有检查再运行的缺陷,在高并发的情况下啊,双方都没发现FutureTask,并且都放入Map(后一个被前一个替代),都开始了计算。
这里的解决方案在于,当他们都要放入Map的时候,如果可以有原子方法,那么已经有了以后,后一个FutureTask就加入,并且启动。
public V compute(A arg){ FutureTask <T> f=cace.get(arg); //检查再运行的缺陷 Ii(f==null){ Callable<V> evel=new Callable(){ Public V call() throws ..{ return c.compute(arg); } }; FutureTask <T> ft=new FutureTask<T>(evel); f=cache.putIfAbsent(args,ft); //如果已经存在,返回存在的值,否则返回null if(f==null){f=ft;ft.run();} //以前不存在,说明应该开始这个计算 else{ft=null;} //取消该计算 } Try{ //阻塞,直到完成 return f.get(); }cach(){ } }
上面的程序上来看已经完美了,不过可能带来缓存污染的可能性。如果一个计算被取消或者失败,那么这个键以后的值永远都是失败了;一种解决方案是,发现取消或者失败的task,就移除它,如果有Exception,也移除。另外,如果考虑缓存过期的问题,可以为每个结果关联一个过去时间,并周期性的扫描,清除过期的缓存。(过期时间可以用Delayed接口实现,参考DelayQueue,给他一个大于当前时间XXX的时间,,并且不断减去当前时间,直到返回负数,说明延迟时间已到了。
相关文章推荐
- Java三大框架之struts的上传文件
- java-并发-并发容器(3)
- java设计模式_简单工厂模式
- Java面向对象的三个基本特征:封装、继承、多态
- java bean对象转换json对象时过滤空值
- java 读取文件
- java-并发-ConcurrentHashMap高并发机制-jdk1.8
- java中对列表的添加或删除操作
- java-并发-ConcurrentHashMap高并发机制-jdk1.6
- java-并发-并发容器(2)
- eclipseli离线快捷安装插件
- JAVA面向对象-----final关键字
- JAVA面向对象-----final关键字
- java-并发-并发容器(1)
- Spring Boot的一个测试用例
- JAVA面向对象-----instanceof 关键字
- JAVA面向对象-----instanceof 关键字
- Java中字符串拼接的一些细节分析
- 使用spring websocket 和stomp实现消息功能
- Java OCR 图像智能字符识别技术,可识别中文