您的位置:首页 > 编程语言 > Java开发

Java同步工具类——FutureTask

2017-04-12 07:49 405 查看
本文内容来自《Java并发编程实战》

FutureTask也可以用作闭锁。(FutureTask实现了Future语义,表示一种抽象的可生成结果的计算。FutureTask表示的计算是通过Callable来实现的,相当于一种可生成结果的Runnable,并且可以处于以下3种状态:等待运行(Waiting to run),正在运行(Running)和运行完成(Completed)。”执行完成”表示计算的所有可能结束方式,包括正常结束、由于取消而结束和由于异常而结束等。当FutureTask进入完成状态后,它会停止在这个状态上。

Future.get的行为取决于任务的状态。如果任务已经完成,那么get会立即返回结果,否则get将阻塞知道任务进入完成状态,然后返回结果或者抛出异常。FutureTask将计算结果从执行计算的线程到获取这个结果的线程,而FutureTask的规范确保了这种传递过程能实现结果的安全发布。

我们通过一个不断完善的例子来看看FutureTask的魅力

在下面的代码清单中,Computable(A, V)接口声明了一个函数Computable,其输入的类型为A,输出类型为V。在ExpensiveFunction中实现的Computablle,需要很长的时间来计算结果,我们将创建一个Computable包装器,帮助记住之前的计算结果,并将缓存过程封装起来

代码[初始化缓存]:

public interface Computable<A, V> {
V compute(A arg) throws InterruptedException;
}


public class ExpensiveFunction implements Computable<String, BigInteger> {
@Override
public BigInteger compute(String arg) throws InterruptedException {
return new BigInteger(arg);
}
}


public class Memoizer1<A, V> implements Computable<A, V> {
private final Map<A, V> cache = new HashMap<A, V>();
private final Computable<A, V> c;
public Memoizer1(Computable<A, V> c) {
this.c = c;
}
@Override
public synchronized V compute(A arg) throws InterruptedException {
V result = cache.get(arg);
if(result == null) {
result = c.compute(arg);
cache.put(arg, result);
}
return result;
}
}


我们使用synchronized来确保不会有两个线程同时访问HashMap,但是这也导致了Memoizer1糟糕的并发性。

下面我们用ConcurrentHashmap替换HashMap

public class Memoizer2<A, V> implements Computable<A, V> {
private final Map<A, V> cache = new ConcurrentHashMap<A, V>();
private final Computable<A, V> c;
public Memoizer2(Computable<A, V> c) {
this.c = c;
}
@Override
public synchronized V compute(A arg) throws InterruptedException {
V result = cache.get(arg);
if(result == null) {
result = c.compute(arg);
cache.put(arg, result);
}
return result;
}
}


Memoizer2的问题在于可能有多个线程同时计算同一个值的结果[如果这个计算时间很长就会导致这个问题]。

现在我们用FutureTask来解决上面的问题

public class Memoizer3<A, V> implements Computable<A, V> {
private final Map<A, Future<V>> cache =
new ConcurrentHashMap<A, Future<V>>();
private final Computable<A, V> c;
public Memoizer3(Computable<A, V> c) {
this.c = c;
}
@Override
public V compute(A arg) throws InterruptedException {
Future<V> f = cache.get(arg);
if (f == null) {
Callable<V> eval = new Callable<V>() {
@Override
public V call() throws Exception {
return c.compute(arg);
}
};
FutureTask<V> ft = new FutureTask<V>(eval);
f = ft;
cache.put(arg, ft);
ft.run(); //在这里将调用c.compute
}
try{
return f.get(); //
} catch (ExecutionException e) {

}
return null;
}
}


这个代码仍然存在问题,因为复合操作(“若没有则添加”)是在底层的Map对象上执行的,而这个对象无法通过加锁来保证原子性。我们使用ConcurrenMap中的原子方法putIfAbsent,避免Memoizer3的漏洞。

import jdk.nashorn.internal.codegen.CompilerConstants;

import java.util.Map;
import java.util.concurrent.*;

/**
* Created by hms on 2017/4/12.
*/
public class Memoizer3<A, V> implements Computable<A, V> {
private final Map<A, Future<V>> cache =
new ConcurrentHashMap<A, Future<V>>();
private final Computable<A, V> c;
public Memoizer3(Computable<A, V> c) {
this.c = c;
}
@Override
public V compute(A arg) throws InterruptedException {
Future<V> f = cache.get(arg);
if (f == null) {
Callable<V> eval = new Callable<V>() {
@Override
public V call() throws Exception {
return c.compute(arg);
}
};
FutureTask<V> ft = new FutureTask<V>(eval);
f = cache.putIfAbsent(arg, ft);
if(f==null) {
f = ft;
ft.run(); //在这里将调用c.compute
}
}
try{
return f.get(); //
}
catch (CancellationException e) {
cache.remove(arg, f);
} catch (ExecutionException e) {
e.printStackTrace();
}
return null;
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息