您的位置:首页 > 编程语言 > Java开发

java.util.concurrent 用法实例详解

2015-01-07 16:42 585 查看
写多线程的程序一直都是一件比较麻烦的事情,要考虑很多事情,处理不好还会出很多意想不到的麻烦。加上现在很多开发者接触到的项目都是打着企业级旗号的B/S项目,大多数人都很少涉及多线程,这又为本文的主角增加了一份神秘感。

讲到Java多线程,大多数人脑海中跳出来的是Thread、Runnable、synchronized……这些是最基本的东西,虽然已经足够强大,但想要用好还真不容易。从JDK 1.5开始,增加了java.util.concurrent包,它的引入大大简化了多线程程序的开发(要感谢一下大牛Doug Lee)。

java.util.concurrent包分成了三个部分,分别是java.util.concurrent、java.util.concurrent.atomic和java.util.concurrent.lock。内容涵盖了并发集合类、线程池机制、同步互斥机制、线程安全的变量更新工具类、锁等等常用工具。

为了便于理解,本文使用一个例子来做说明,交代一下它的场景:

假设要对一套10个节点组成的环境进行检查,这个环境有两个入口点,通过节点间的依赖关系可以遍历到整个环境。依赖关系可以构成一张有向图,可能存在环。为了提高检查的效率,考虑使用多线程。

1、Executors

通过这个类能够获得多种线程池的实例,例如可以调用newSingleThreadExecutor()获得单线程的ExecutorService,调用newFixedThreadPool()获得固定大小线程池的ExecutorService。拿到ExecutorService可以做的事情就比较多了,最简单的是用它来执行Runnable对象,也可以执行一些实现了Callable<T>的对象。用Thread的start()方法没有返回值,如果该线程执行的方法有返回值那用ExecutorService就再好不过了,可以选择submit()、invokeAll()或者invokeAny(),根据具体情况选择合适的方法即可。

ThreadPoolService.java

这里要额外说明一下invokeAll()和invokeAny()方法。前者会执行给定的所有Callable<T>对象,等所有任务完成后返回一个包含了执行结果的List<Future<T>>,每个Future.isDone()都是true,可以用Future.get()拿到结果;后者只要完成了列表中的任意一个任务就立刻返回,返回值就是执行结果。

还有一个比较诡异的地方

本代码是在JDK 1.6下编译测试的,如果在JDK 1.5下测试,很可能在invokeAll和invokeAny的地方出错。明明ValidationTask实现了 Callable<Node>,可是它死活不认,类型不匹配,这时可以将参数声明由List<ValidationTask>改为 List<Callable<Node>>。

造成这个问题的主要原因是两个版本中invokeAll和invokeAny的方法签名不同,1.6里是invokeAll(Collection<? extends Callable<T>> tasks),而1.5里是invokeAll(Collection<Callable<T>> tasks)。网上也有人遇到类似的问题(invokeAll() is not willing to acept a Collection<Callable<T>> )。

和其他资源一样,线程池在使用完毕后也需要释放,用shutdown()方法可以关闭线程池,如果当时池里还有没有被执行的任务,它会等待任务执行完毕,在等待期间试图进入线程池的任务将被拒绝。也可以用shutdownNow()来关闭线程池,它会立刻关闭线程池,没有执行的任务作为返回值返回。

2、Lock

多线程编程中常常要锁定某个对象,之前会用synchronized来实现,现在又多了另一种选择,那就是java.util.concurrent.locks。通过Lock能够实现更灵活的锁定机制,它还提供了很多synchronized所没有的功能,例如尝试获得锁(tryLock())。

使用Lock时需要自己获得锁并在使用后手动释放,这一点与synchronized有所不同,所以通常Lock的使用方式是这样的:

Lock l = ...;

l.lock();

try {

// 执行操作

} finally {

l.unlock();

}

java.util.concurrent.locks中提供了几个Lock接口的实现类,比较常用的应该是ReentrantLock。以下范例中使用了ReentrantLock进行节点锁定:

Node.java

ValidationTask.java

请注意ValidationTask的call()方法,这里会先检查节点是否被锁定,如果被锁定则表示当前有另一个线程正在验证该节点,那就不用重复进行验证。第50行和第51行,那到锁后立即释放,这里只是为了等待验证结束。

讲到Lock,就不能不讲Conditon,前者代替了synchronized,而后者则代替了Object对象上的wait()、notify()和notifyAll()方法(Condition中提供了await()、signal()和signalAll()方法),当满足运行条件前挂起线程。Condition是与Lock结合使用的,通过Lock.newCondition()方法能够创建与Lock绑定的Condition实例。JDK的JavaDoc中有一个例子能够很好地说明Condition的用途及用法:

BoundedBuffer.java

说到这里,让我解释一下之前的例子里为什么没有选择Condition来等待验证结束。await()方法在调用时当前线程先要获得对应的锁,既然我都拿到锁了,那也就是说验证已经结束了。。。

3、并发集合类

集合类是大家编程时经常要使用的东西,ArrayList、HashMap什么的,java.util包中的集合类有的是线程安全的,有的则不是,在编写多线程的程序时使用线程安全的类能省去很多麻烦,但这些类的性能如何呢?java.util.concurrent包中提供了几个并发结合类,例如ConcurrentHashMap、ConcurrentLinkedQueue和CopyOnWriteArrayList等等,根据不同的使用场景,开发者可以用它们替换java.util包中的相应集合类。

CopyOnWriteArrayList是ArrayList的一个变体,比较适合用在读取比较频繁、修改较少的情况下,因为每次修改都要复制整个底层数组。ConcurrentHashMap中为Map接口增加了一些方法(例如putIfAbsenct()),同时做了些优化,总之灰常之好用,下面的代码中使用ConcurrentHashMap来作为全局节点表,完全无需考虑并发问题。ValidationService中只是声明(第17行),具体的使用是在上面的ValidationTask中。

ValidationService.java

4、AtomicInteger

对变量的读写操作都是原子操作(除了long或者double的变量),但像数值类型的++ --操作不是原子操作,像i++中包含了获得i的原始值、加1、写回i、返回原始值,在进行类似i++这样的操作时如果不进行同步问题就大了。好在java.util.concurrent.atomic为我们提供了很多工具类,可以以原子方式更新变量。

以AtomicInteger为例,提供了代替++ --的getAndIncrement()、incrementAndGet()、getAndDecrement()和decrementAndGet()方法,还有加减给定值的方法、当前值等于预期值时更新的compareAndSet()方法。

下面的例子中用AtomicInteger保存全局验证次数(第69行做了自增的操作),因为validateNode()方法会同时被多个线程调用,所以直接用int不同步是不行的,但用AtomicInteger在这种场合下就很合适。

MockNodeValidator.java

上述代码还有另一个功能,就是构造测试用的节点数据,一共10个节点,有2个入口点,通过这两个点能够遍历整个系统。每次调用会模拟远程访问,等待500ms。环境间节点依赖如下:

环境依赖

Node0 [Node1, Node2]

Node1 [Node3, Node4]

Node2 [Node5]

Node6 [Node7, Node8]

Node7 [Node5, Node9]

Node8 [Node3, Node4]

5、CountDownLatch

CountDownLatch是一个一次性的同步辅助工具,允许一个或多个线程一直等待,直到计数器值变为0。它有一个构造方法,设定计数器初始值,即在await()结束等待前需要调用多少次countDown()方法。CountDownLatch的计数器不能重置,所以说它是“一次性”的,如果需要重置计数器,可以使用CyclicBarrier。在运行环境检查的主类中,使用了CountDownLatch来等待所有验证结束,在各个并发验证的线程完成任务结束前都会调用countDown(),因为有3个并发的验证,所以将计数器设置为3。

最后将所有这些类整合起来,运行环境检查的主类如下。它会创建线程池服务和验证服务,先做一次验证(相当于是对系统做次初始化),随后并发3个验证请求。系统运行完毕会显示实际执行的节点验证次数和执行时间。如果是顺序执行,验证次数应该是13*4=52,但实际的验证次数会少于这个数字(我这里最近一次执行了33次验证),因为如果同时有两个线程要验证同一节点时只会做一次验证。关于时间,如果是顺序执行,52次验证每次等待500ms,那么验证所耗费的时间应该是26000ms,使用了多线程后的实际耗时远小于该数字(最近一次执行耗时4031ms)。

ValidationStarter.java

转自:http://blog.csdn.net/zhgflx/article/details/4485848

源码在下一篇博客
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: