Java同步工具类——FutureTask
2017-04-12 07:49
405 查看
本文内容来自《Java并发编程实战》
FutureTask也可以用作闭锁。(FutureTask实现了Future语义,表示一种抽象的可生成结果的计算。FutureTask表示的计算是通过Callable来实现的,相当于一种可生成结果的Runnable,并且可以处于以下3种状态:等待运行(Waiting to run),正在运行(Running)和运行完成(Completed)。”执行完成”表示计算的所有可能结束方式,包括正常结束、由于取消而结束和由于异常而结束等。当FutureTask进入完成状态后,它会停止在这个状态上。
Future.get的行为取决于任务的状态。如果任务已经完成,那么get会立即返回结果,否则get将阻塞知道任务进入完成状态,然后返回结果或者抛出异常。FutureTask将计算结果从执行计算的线程到获取这个结果的线程,而FutureTask的规范确保了这种传递过程能实现结果的安全发布。
我们通过一个不断完善的例子来看看FutureTask的魅力
在下面的代码清单中,Computable(A, V)接口声明了一个函数Computable,其输入的类型为A,输出类型为V。在ExpensiveFunction中实现的Computablle,需要很长的时间来计算结果,我们将创建一个Computable包装器,帮助记住之前的计算结果,并将缓存过程封装起来
代码[初始化缓存]:
我们使用synchronized来确保不会有两个线程同时访问HashMap,但是这也导致了Memoizer1糟糕的并发性。
下面我们用ConcurrentHashmap替换HashMap
Memoizer2的问题在于可能有多个线程同时计算同一个值的结果[如果这个计算时间很长就会导致这个问题]。
现在我们用FutureTask来解决上面的问题
这个代码仍然存在问题,因为复合操作(“若没有则添加”)是在底层的Map对象上执行的,而这个对象无法通过加锁来保证原子性。我们使用ConcurrenMap中的原子方法putIfAbsent,避免Memoizer3的漏洞。
FutureTask也可以用作闭锁。(FutureTask实现了Future语义,表示一种抽象的可生成结果的计算。FutureTask表示的计算是通过Callable来实现的,相当于一种可生成结果的Runnable,并且可以处于以下3种状态:等待运行(Waiting to run),正在运行(Running)和运行完成(Completed)。”执行完成”表示计算的所有可能结束方式,包括正常结束、由于取消而结束和由于异常而结束等。当FutureTask进入完成状态后,它会停止在这个状态上。
Future.get的行为取决于任务的状态。如果任务已经完成,那么get会立即返回结果,否则get将阻塞知道任务进入完成状态,然后返回结果或者抛出异常。FutureTask将计算结果从执行计算的线程到获取这个结果的线程,而FutureTask的规范确保了这种传递过程能实现结果的安全发布。
我们通过一个不断完善的例子来看看FutureTask的魅力
在下面的代码清单中,Computable(A, V)接口声明了一个函数Computable,其输入的类型为A,输出类型为V。在ExpensiveFunction中实现的Computablle,需要很长的时间来计算结果,我们将创建一个Computable包装器,帮助记住之前的计算结果,并将缓存过程封装起来
代码[初始化缓存]:
public interface Computable<A, V> { V compute(A arg) throws InterruptedException; }
public class ExpensiveFunction implements Computable<String, BigInteger> { @Override public BigInteger compute(String arg) throws InterruptedException { return new BigInteger(arg); } }
public class Memoizer1<A, V> implements Computable<A, V> { private final Map<A, V> cache = new HashMap<A, V>(); private final Computable<A, V> c; public Memoizer1(Computable<A, V> c) { this.c = c; } @Override public synchronized V compute(A arg) throws InterruptedException { V result = cache.get(arg); if(result == null) { result = c.compute(arg); cache.put(arg, result); } return result; } }
我们使用synchronized来确保不会有两个线程同时访问HashMap,但是这也导致了Memoizer1糟糕的并发性。
下面我们用ConcurrentHashmap替换HashMap
public class Memoizer2<A, V> implements Computable<A, V> { private final Map<A, V> cache = new ConcurrentHashMap<A, V>(); private final Computable<A, V> c; public Memoizer2(Computable<A, V> c) { this.c = c; } @Override public synchronized V compute(A arg) throws InterruptedException { V result = cache.get(arg); if(result == null) { result = c.compute(arg); cache.put(arg, result); } return result; } }
Memoizer2的问题在于可能有多个线程同时计算同一个值的结果[如果这个计算时间很长就会导致这个问题]。
现在我们用FutureTask来解决上面的问题
public class Memoizer3<A, V> implements Computable<A, V> { private final Map<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>(); private final Computable<A, V> c; public Memoizer3(Computable<A, V> c) { this.c = c; } @Override public V compute(A arg) throws InterruptedException { Future<V> f = cache.get(arg); if (f == null) { Callable<V> eval = new Callable<V>() { @Override public V call() throws Exception { return c.compute(arg); } }; FutureTask<V> ft = new FutureTask<V>(eval); f = ft; cache.put(arg, ft); ft.run(); //在这里将调用c.compute } try{ return f.get(); // } catch (ExecutionException e) { } return null; } }
这个代码仍然存在问题,因为复合操作(“若没有则添加”)是在底层的Map对象上执行的,而这个对象无法通过加锁来保证原子性。我们使用ConcurrenMap中的原子方法putIfAbsent,避免Memoizer3的漏洞。
import jdk.nashorn.internal.codegen.CompilerConstants; import java.util.Map; import java.util.concurrent.*; /** * Created by hms on 2017/4/12. */ public class Memoizer3<A, V> implements Computable<A, V> { private final Map<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>(); private final Computable<A, V> c; public Memoizer3(Computable<A, V> c) { this.c = c; } @Override public V compute(A arg) throws InterruptedException { Future<V> f = cache.get(arg); if (f == null) { Callable<V> eval = new Callable<V>() { @Override public V call() throws Exception { return c.compute(arg); } }; FutureTask<V> ft = new FutureTask<V>(eval); f = cache.putIfAbsent(arg, ft); if(f==null) { f = ft; ft.run(); //在这里将调用c.compute } } try{ return f.get(); // } catch (CancellationException e) { cache.remove(arg, f); } catch (ExecutionException e) { e.printStackTrace(); } return null; } }
相关文章推荐
- 线程高级应用-心得6-java5线程并发库中同步工具类(synchronizers),新知识大用途
- 线程高级应用-心得8-java5线程并发库中同步集合Collections工具类的应用及案例分析
- 同步工具类一:闭锁(java.util.concurrent.CountDownLatch)
- 同步工具类 java.util.concurrent.CountDownLatch
- Java 并发编程(四)常用同步工具类
- java同步工具类之--
- java 异步转同步工具类
- 13____java线程同步工具类之线程数据交换(Exchanger)
- 12____java线程同步工具类之同步计数器(CountDownLatch)
- java同步工具类之Exchanger
- Java5 多线程(五)--CyclicBarrier同步的工具类
- java同步工具类--Semaphores
- 同步工具类之Java学习总结
- java同步工具类
- 同步工具类之 FutureTask
- 线程高级应用-心得8-java5线程并发库中同步集合Collections工具类的应用及案例分析
- Java5 多线程(六)--CountDownLatch 同步工具类
- java其他同步工具类
- 同步工具类三:计数信号量(java.util.concurrent.Semaphore)
- Java新技术---线程学习之常用同步工具类