您的位置:首页 > 编程语言 > Java开发

java.util.concurrent并发集合

2015-04-21 14:47 330 查看
在 Java 编程的早期阶段,位于 Oswego 市的纽约州立大学(SUNY) 的一位教授决定创建一个简单的库,以帮助开发人员构建可以更好地处理多线程情况的应用程序。这并不是说用现有的库就不能实现,但是就像有了标准网络库一样,用经过调试的、可信任的库更容易自己处理多线程。在 Addision-Wesley 的一本相关书籍的帮助下,这个库变得越来越流行了。最终,作者 Doug Lea 决定设法让它成为 Java 平台的标准部分 —— JSR-166。这个库最后变成了
Tiger 版本的
java.util.concurrent
包。在这篇新的 驯服 Tiger 技巧中,我们将探讨 Collection Framework 中新的
Queue
接口、这个接口的非并发和并发实现、并发
Map
实现和专用于读操作大大超过写操作这种情况的并发
List
Set
实现。

介绍 Queue 接口

java.util
包为集合提供了一个新的基本接口:
java.util.Queue
。虽然肯定可以在相对应的两端进行添加和删除而将
java.util.List
作为队列对待,但是这个新的
Queue
接口提供了支持添加、删除和检查集合的更多方法,如下所示:

public boolean offer(Object element)
public Object remove()
public Object poll()
public Object element()
public Object peek()

 

基本上,一个队列就是一个先入先出(FIFO)的数据结构。一些队列有大小限制,因此如果想在一个满的队列中加入一个新项,多出的项就会被拒绝。这时新的
offer
方法就可以起作用了。它不是对调用
add()
方法抛出一个 unchecked 异常,而只是得到由
offer()
返回的 false。
remove()

poll()
方法都是从队列中删除第一个元素(head)。
remove()
的行为与
Collection
接口的版本相似,但是新的
poll()
方法在用空集合调用时不是抛出异常,只是返回 null。因此新的方法更适合容易出现异常条件的情况。后两个方法
element()
peek()
用于在队列的头部查询元素。与
remove()
方法类似,在队列为空时,
element()
抛出一个异常,而
peek()
返回 null。

 



使用基本队列

在 Tiger 中有两组
Queue
实现:实现了新
BlockingQueue
接口的和没有实现这个接口的。我将首先分析那些没有实现的。

在最简单的情况下,原来有的
java.util.LinkedList
实现已经改造成不仅实现
java.util.List
接口,而且还实现
java.util.Queue
接口。可以将集合看成这两者中的任何一种。清单 1 显示将
LinkedList
作为
Queue
使用的一种方法:

清单 1. 使用 Queue 实现

   Queue queue = new LinkedList();
   queue.offer("One");
   queue.offer("Two");
   queue.offer("Three");
   queue.offer("Four");
   // Head of queue should be One
   System.out.println("Head of queue is: " + queue.poll());

 

再复杂一点的是新的
java.util.AbstractQueue
类。这个类的工作方式类似于
java.util.AbstractList
java.util.AbstractSet
类。在创建自定义集合时,不用自己实现整个接口,只是继承抽象实现并填入细节。使用
AbstractQueue
时,必须为方法
offer()

poll()
peek()
提供实现。像
add()
addAll()
这样的方法修改为使用
offer()
,而
clear()

remove()
使用
poll()
。最后,
element()
使用
peek()
。当然可以在子类中提供这些方法的优化实现,但是不是必须这么做。而且,不必创建自己的子类,可以使用几个内置的实现, 其中两个是不阻塞队列:
PriorityQueue
ConcurrentLinkedQueue


PriorityQueue

ConcurrentLinkedQueue
类在 Collection Framework 中加入两个具体集合实现。
PriorityQueue
类实质上维护了一个有序列表。加入到
Queue
中的元素根据它们的天然排序(通过其
java.util.Comparable
实现)或者根据传递给构造函数的
java.util.Comparator
实现来定位。将清单 2 中的
LinkedList
改变为
PriorityQueue
将会打印出 Four 而不是 One,因为按字母排列 —— 字符串的天然顺序 —— Four 是第一个。
ConcurrentLinkedQueue
是基于链接节点的、线程安全的队列。并发访问不需要同步。因为它在队列的尾部添加元素并从头部删除它们,所以只要不需要知道队列的大小,
ConcurrentLinkedQueue
对公共集合的共享访问就可以工作得很好。收集关于队列大小的信息会很慢,需要遍历队列。

 



使用阻塞队列

新的
java.util.concurrent
包在 Collection Framework 中可用的具体集合类中加入了
BlockingQueue
接口和五个阻塞队列类。假如不熟悉阻塞队列概念,它实质上就是一种带有一点扭曲的 FIFO 数据结构。不是立即从队列中添加或者删除元素,线程执行操作阻塞,直到有空间或者元素可用。
BlockingQueue
接口的 Javadoc 给出了阻塞队列的基本用法,如清单 2 所示。生产者中的
put()
操作会在没有空间可用时阻塞,而消费者的
take()
操作会在队列中没有任何东西时阻塞。

清单 2. 使用 BlockingQueue

class Producer implements Runnable {
    private final BlockingQueue queue;
    Producer(BlockingQueue q) { queue = q; }
    public void run() {
      try {
        while(true) { queue.put(produce()); }
      } catch (InterruptedException ex) { ... handle ...}
    }
    Object produce() { ... }
}
class Consumer implements Runnable {
    private final BlockingQueue queue;
    Consumer(BlockingQueue q) { queue = q; }
    public void run() {
      try {
        while(true) { consume(queue.take()); }
      } catch (InterruptedException ex) { ... handle ...}
    }
    void consume(Object x) { ... }
}
class Setup {
    void main() {
      BlockingQueue q = new SomeQueueImplementation();
      Producer p = new Producer(q);
      Consumer c1 = new Consumer(q);
      Consumer c2 = new Consumer(q);
      new Thread(p).start();
      new Thread(c1).start();
      new Thread(c2).start();
    }
}

 

五个队列所提供的各有不同:

ArrayBlockingQueue
:一个由数组支持的有界队列。

LinkedBlockingQueue
:一个由链接节点支持的可选有界队列。

PriorityBlockingQueue
:一个由优先级堆支持的无界优先级队列。

DelayQueue
:一个由优先级堆支持的、基于时间的调度队列。

SynchronousQueue
:一个利用
BlockingQueue
接口的简单聚集(rendezvous)机制。
前两个类
ArrayBlockingQueue

LinkedBlockingQueue
几乎相同,只是在后备存储器方面有所不同,
LinkedBlockingQueue
并不总是有容量界限。无大小界限的
LinkedBlockingQueue
类在添加元素时永远不会有阻塞队列的等待(至少在其中有
Integer.MAX_VALUE
元素之前不会)。

PriorityBlockingQueue
是具有无界限容量的队列,它利用所包含元素的
Comparable
排序顺序来以逻辑顺序维护元素。可以将它看作
TreeSet
的可能替代物。例如,在队列中加入字符串 One、Two、Three 和 Four 会导致 Four 被第一个取出来。对于没有天然顺序的元素,可以为构造函数提供一个
Comparator
。不过对
PriorityBlockingQueue
有一个技巧。从
iterator()
返回的
Iterator
实例不需要以优先级顺序返回元素。如果必须以优先级顺序遍历所有元素,那么让它们都通过
toArray()
方法并自己对它们排序,像
Arrays.sort(pq.toArray())


新的
DelayQueue
实现可能是其中最有意思(也是最复杂)的一个。加入到队列中的元素必须实现新的
Delayed
接口(只有一个方法 ——
long getDelay(java.util.concurrent.TimeUnit unit)
)。因为队列的大小没有界限,使得添加可以立即返回,但是在延迟时间过去之前,不能从队列中取出元素。如果多个元素完成了延迟,那么最早失效/失效时间最长的元素将第一个取出。实际上没有听上去这样复杂。清单 3 演示了这种新的阻塞队列集合的使用:

清单 3. 使用 DelayQueue 实现

import java.util.*;
import java.util.concurrent.*;
public class Delay {
   /**
    * Delayed implementation that actually delays
    */
   static class NanoDelay implements Delayed {
     long trigger;
     NanoDelay(long i) {
       trigger = System.nanoTime() + i;
     }
     public int compareTo(Object y) {
       long i = trigger;
       long j = ((NanoDelay)y).trigger;
       if (i < j) return -1;
       if (i > j) return 1;
       return 0;
     }
     public boolean equals(Object other) {
       return ((NanoDelay)other).trigger == trigger;
     }
     public boolean equals(NanoDelay other) {
       return ((NanoDelay)other).trigger == trigger;
     }
     public long getDelay(TimeUnit unit) {
       long n = trigger - System.nanoTime();
       return unit.convert(n, TimeUnit.NANOSECONDS);
     }
     public long getTriggerTime() {
       return trigger;
     }
     public String toString() {
       return String.valueOf(trigger);
     }
   }
   public static void main(String args[]) throws InterruptedException {
     Random random = new Random();
     DelayQueue queue = new DelayQueue();
     for (int i=0; i < 5; i++) {
       queue.add(new NanoDelay(random.nextInt(1000)));
     }
     long last = 0;
     for (int i=0; i < 5; i++) {
       NanoDelay delay = (NanoDelay)(queue.take());
       long tt = delay.getTriggerTime();
       System.out.println("Trigger time: " + tt);
       if (i != 0) {
         System.out.println("Delta: " + (tt - last));
       }
       last = tt;
     }
   }
}

 

这个例子首先是一个内部类
NanoDelay
,它实质上将暂停给定的任意纳秒(nanosecond)数,这里利用了
System
的新
nanoTime()
方法。然后
main()
方法只是将
NanoDelay
对象放到队列中并再次将它们取出来。如果希望队列项做一些其他事情,就需要在
Delayed
对象的实现中加入方法,并在从队列中取出后调用这个新方法。(请随意扩展
NanoDelay
以试验加入其他方法做一些有趣的事情。)显示从队列中取出元素的两次调用之间的时间差。如果时间差是负数,可以视为一个错误,因为永远不会在延迟时间结束后,在一个更早的触发时间从队列中取得项。

SynchronousQueue
类是最简单的。它没有内部容量。它就像线程之间的手递手机制。在队列中加入一个元素的生产者会等待另一个线程的消费者。当这个消费者出现时,这个元素就直接在消费者和生产者之间传递,永远不会加入到阻塞队列中。

 



使用 ConcurrentMap 实现

新的
java.util.concurrent.ConcurrentMap
接口和
ConcurrentHashMap
实现只能在键不存在时将元素加入到 map 中,只有在键存在并映射到特定值时才能从 map 中删除一个元素。

有一个新的
putIfAbsent()
方法用于在 map 中进行添加。这个方法以要添加到
ConcurrentMap
实现中的键的值为参数,就像普通的
put()
方法,但是只有在 map 不包含这个键时,才能将键加入到 map 中。如果 map 已经包含这个键,那么这个键的现有值就会保留。
putIfAbsent()
方法是原子的。如果不调用这个原子操作,就需要从适当的同步块中调用清单 4 中的代码:

清单 4. 等价的 putIfAbsent() 代码

   if (!map.containsKey(key)) {
     return map.put(key, value);
   } else {
     return map.get(key);
   }

 

putIfAbsent()
方法一样,重载后的
remove()
方法有两个参数 —— 键和值。在调用时,只有当键映射到指定的值时才从 map 中删除这个键。如果不匹配,那么就不删除这个键,并返回 false。如果值匹配键的当前映射内容,那么就删除这个键。清单 5 显示了这种操作的等价源代码:

清单 5. 等价的 remove() 代码

   if (map.get(key).equals(value)) {
     map.remove(key);
     return true;
   } else {
     return false;
   }



使用 CopyOnWriteArrayList 和 CopyOnWriteArraySet

在 Doug Lea 的 Concurrent Programming in Java一书的第 2 章第 2.4.4 节(请参阅
参考资料)中,对 copy-on-write 模式作了最好的描述。实质上,这个模式声明了,为了维护对象的一致性快照,要依靠不可变性(immutability)来消除在协调读取不同的但是相关的属性时需要的同步。对于集合,这意味着如果有大量的读(即
get()
) 和迭代,不必同步操作以照顾偶尔的写(即
add()
)调用。对于新的
CopyOnWriteArrayList
CopyOnWriteArraySet
类,所有可变的(mutable)操作都首先取得后台数组的副本,对副本进行更改,然后替换副本。这种做法保证了在遍历自身更改的集合时,永远不会抛出
ConcurrentModificationException
。遍历集合会用原来的集合完成,而在以后的操作中使用更新后的集合。

这些新的集合,
CopyOnWriteArrayList

CopyOnWriteArraySet
,最适合于读操作通常大大超过写操作的情况。一个最常提到的例子是使用监听器列表。已经说过,Swing 组件还没有改为使用新的集合。相反,它们继续使用
javax.swing.event.EventListenerList
来维护它们的监听器列表。

如清单 6 所示,集合的使用与它们的非 copy-on-write 替代物完全一样。只是创建集合并在其中加入或者删除元素。即使对象加入到了集合中,原来的
Iterator
也可以进行,继续遍历原来集合中的项。

清单 6. 展示一个 copy-on-write 集合

import java.util.*;
import java.util.concurrent.*;
public class CopyOnWrite {
   public static void main(String args[]) {
     List list1 = new CopyOnWriteArrayList(Arrays.asList(args));
     List list2 = new ArrayList(Arrays.asList(args));
     Iterator itor1 = list1.iterator();
     Iterator itor2 = list2.iterator();
     list1.add("New");
     list2.add("New");
     try {
       printAll(itor1);
     } catch (ConcurrentModificationException e) {
       System.err.println("Shouldn't get here");
     }
     try {
       printAll(itor2);
     } catch (ConcurrentModificationException e) {
       System.err.println("Will get here.");
     }
   }
   private static void printAll(Iterator itor) {
     while (itor.hasNext()) {
       System.out.println(itor.next());
     }
   }
}

 

这个示例程序用命令行参数创建
CopyOnWriteArrayList

ArrayList
这两个实例。在得到每一个实例的
Iterator
后,分别在其中加入一个元素。当
ArrayList
迭代因一个
ConcurrentModificationException
问题而立即停止时,
CopyOnWriteArrayList
迭代可以继续,不会抛出异常,因为原来的集合是在得到 iterator 之后改变的。如果这种行为(比如通知原来一组事件监听器中的所有元素)是您需要的,那么最好使用 copy-on-write 集合。如果不使用的话,就还用原来的,并保证在出现异常时对它进行处理。


 

结束语

在 J2SE 平台的 Tiger 版中有许多重要的增加。除了语言级别的改变,如一般性支持,这个库也许是最重要的增加了,因为它会被最广泛的用户使用。不要忽视加入到平台中的其他包,像 Java Management Extensions (JMX),但是大多数其他重要的库增强只针对范围很窄的开发人员。但是这个库不是。除了用于锁定和原子操作的其他并发实用程序,这些类也会经常使用。尽早学习它们并利用它们所提供的功能。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐