《Java in Action》-1 第11章CompletableFuture:组合式异步编程(需重读)
2016-10-18 00:37
411 查看
第11 章 CompletableFuture:组合式异步编程
第7章中介绍的分支/合并框架以及并行流是实现并行处理的宝贵工具;他们将一个操作切分为多个子操作,在多个不同的核、CpU甚至是机器上并行地执行这些子操作。
以此相反,如果你的意图是实现并发,而非并行,或者你的主要目标是在同一个CpU上执行几个松耦合的任务,充分利用CpU的核,让其足够忙碌,从而最大化程序的吞吐量,那么你其实真正想做的是避免因为等待远程服务的返回,或者对数据库的查询,而阻塞线程的执行,浪费宝贵的计算资源,因为这种等待的时间很可能相当长。通过Future接口,尤其是它的新版实现CompletableFuture,是处理这种情况的利器。
11.1 Future接口
Future接口是对将来某个时刻会发生的结果进行建模,它建模了一种异步计算,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。在Futrue中触发那些潜在耗时的操作把调用线程解放出来,让它能继续执行其他有价值的工作,不需要呆呆等待耗时的操作完成。
使用Future,只需要将耗时的操作封装在一个Callable对象中,再将它提交给ExecutorService,就万事大吉了。
使用Future的一个例子:
ExecutorService executor = Executors.newCachedThreadPool(); //创建ExecutorService,通过它你可以向线程池提交任务
Future<Double> future = executor.submit(new Callable<Double>(){//向ExecutorService提交一个Callable对象
public Double call() {
return doSomeLongComputation();//以异步方式在新的线程中执行耗时的操作
}});
doSomethingElse();//异步操作进行的同时,你可以做其他的事情
try {
Double result = future.get(1,TimeUnit.SECONDS); //获取异步操作的结果,如果最终被阻塞,无法得到结果,那么在最多等待1秒钟之后退出
}catch() {
}
11.1.1 Future接口的局限性
不足以编写简洁的并发代码。
11.1.2 使用CompletableFuture构建异步应用
11.2 实现异步API
同步方法:
public double getPrice(String
product) {
return calculatePrice(product);
}
11.2.1 将同步方法转换为异步方法
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();//创建CompletableFuture对象,它会包含计算的结果
new Thread( () -> {//在另一个线程中以异步方式执行计算
doubleprice = calculatePrice(product);
futurePrice.complete(price);//需长时间计算的任务结束并得到结果时,设置Future的返回值
}).start();
return futurePrice;//无需等待还没结束的计算,直接返回Future对象
}
11.2.2 错误处理
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();//创建CompletableFuture对象,它会包含计算的结果
new Thread( () -> {//在另一个线程中以异步方式执行计算
try{
double price = calculatePrice(product);
futurePrice.complete(price);//需长时间计算的任务结束并得到结果时,设置Future的返回值
} catch (Exception ex) {
futurePrice.completeExceptionally(ex);
}
}).start();
return futurePrice;//无需等待还没结束的计算,直接返回Future对象
}
*使用工厂方法supplyAsync创建CompletableFuture
public Future<Double> getPriceAsync(String product) {
return CompletableFuture.supplyAsync(()
-> calculatePrice(product));
}
supplyAsync方法接受一个生产者(Supplier)作为参数,返回一个CompletableFuture对象,该对象完成异步执行后会读取调用生产者方法的返回值。生产者方法会交由ForkJoinPool池中的某个执行线程(Executor)运行,但是你也可以使用supplyAsync方法的重载版本,传递第二个参数指定不同的执行生产者方法。
11.3 让你的代码免受阻塞之苦
有一个商家的列表:
List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
new Shop("LetsSaveBig"),
new Shop("MyFavoriteShop"),
new Shop("BuyItAll");
采用顺序查询所有商店的方式实现findPrices方法:
public List<String> findPricesSequential(String product) {
return shops.stream()
.map(shop -> shop.getName() + " price is " + shop.getPrice(product))
.collect(Collectors.toList());
}
11.3.1 使用并行流对请求进行并行操作
public List<String> findPricesParallel(String product) {
return shops.parallelStream()
.map(shop -> shop.getName() + " price is " + shop.getPrice(product))
.collect(Collectors.toList());
}
11.3.2 使用CompletableFuture发起异步请求
public List<String> findPricesFuture(String product) {
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is "
+ shop.getPrice(product)))//使用CompletableFuture以异步方式计算每种商品的价格
.collect(Collectors.toList());
List<String> prices = priceFutures.stream()
.map(CompletableFuture::join) //等待所有异步操作结束
.collect(Collectors.toList());
return prices;
}
11.3.3 寻找更好的方案
11.3.4 使用定制的执行器
定制的执行器
private final Executor executor = Executors.newFixedThreadPool(shops.size(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});
现在可以按照下面的方式创建一个可查询指定商品价格的CompletableFuture对象:
CompletableFuture.supplyAsync(() -> shop.getName() + " price is "
+ shop.getPrice(product), executor)
*并行——使用流还是completableFutures的建议
1)如果进行的是计算密集型的操作,并且没有I/O,那么推荐使用Stream接口,因为实现简单,同时效率也可能是最高的
2)如果并行的工作单元还涉及等待I/O的操作,那么使用CompletableFuture灵活性更好。
11.4 对多个异步任务进行流水线操作
11.5 响应CompletableFuture的completion事件
11.6 小结
*执行比较耗时的操作时,尤其是那些依赖一个或多个远程服务的操作,使用异步任务可以改善程序的性能,加快程序的响应速度。
*你应该尽可能地为客户提供异步ApI。使用CompletableFuture类提供的特性,你能够轻松地实现这一目标。
*CompletableFuture类还提供了异常管理的机制,让你有机会抛出和管理异步任务执行中发生的异常
*将同步ApI的调用封装到一个CompletableFuture中,你能够以异步的方式使用其结果。
*如果异步任务之间相互独立,或者他们之间某一些的结果是另一些的输入,你可以将这些异步任务构造或者合并成一个
*你可以为CompletableFuture注册一个回调函数,在Future执行完毕或者他们计算的结果可用时,针对性地执行一些程序
*你可以决定在什么时候结束程序的运行,是等待有CompletableFuture对象构成的列表中所有的对象都执行完毕,还是只要其中任何一个首先完成就中止程序的运行。
第7章中介绍的分支/合并框架以及并行流是实现并行处理的宝贵工具;他们将一个操作切分为多个子操作,在多个不同的核、CpU甚至是机器上并行地执行这些子操作。
以此相反,如果你的意图是实现并发,而非并行,或者你的主要目标是在同一个CpU上执行几个松耦合的任务,充分利用CpU的核,让其足够忙碌,从而最大化程序的吞吐量,那么你其实真正想做的是避免因为等待远程服务的返回,或者对数据库的查询,而阻塞线程的执行,浪费宝贵的计算资源,因为这种等待的时间很可能相当长。通过Future接口,尤其是它的新版实现CompletableFuture,是处理这种情况的利器。
11.1 Future接口
Future接口是对将来某个时刻会发生的结果进行建模,它建模了一种异步计算,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。在Futrue中触发那些潜在耗时的操作把调用线程解放出来,让它能继续执行其他有价值的工作,不需要呆呆等待耗时的操作完成。
使用Future,只需要将耗时的操作封装在一个Callable对象中,再将它提交给ExecutorService,就万事大吉了。
使用Future的一个例子:
ExecutorService executor = Executors.newCachedThreadPool(); //创建ExecutorService,通过它你可以向线程池提交任务
Future<Double> future = executor.submit(new Callable<Double>(){//向ExecutorService提交一个Callable对象
public Double call() {
return doSomeLongComputation();//以异步方式在新的线程中执行耗时的操作
}});
doSomethingElse();//异步操作进行的同时,你可以做其他的事情
try {
Double result = future.get(1,TimeUnit.SECONDS); //获取异步操作的结果,如果最终被阻塞,无法得到结果,那么在最多等待1秒钟之后退出
}catch() {
}
11.1.1 Future接口的局限性
不足以编写简洁的并发代码。
11.1.2 使用CompletableFuture构建异步应用
11.2 实现异步API
同步方法:
public double getPrice(String
product) {
return calculatePrice(product);
}
11.2.1 将同步方法转换为异步方法
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();//创建CompletableFuture对象,它会包含计算的结果
new Thread( () -> {//在另一个线程中以异步方式执行计算
doubleprice = calculatePrice(product);
futurePrice.complete(price);//需长时间计算的任务结束并得到结果时,设置Future的返回值
}).start();
return futurePrice;//无需等待还没结束的计算,直接返回Future对象
}
11.2.2 错误处理
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();//创建CompletableFuture对象,它会包含计算的结果
new Thread( () -> {//在另一个线程中以异步方式执行计算
try{
double price = calculatePrice(product);
futurePrice.complete(price);//需长时间计算的任务结束并得到结果时,设置Future的返回值
} catch (Exception ex) {
futurePrice.completeExceptionally(ex);
}
}).start();
return futurePrice;//无需等待还没结束的计算,直接返回Future对象
}
*使用工厂方法supplyAsync创建CompletableFuture
public Future<Double> getPriceAsync(String product) {
return CompletableFuture.supplyAsync(()
-> calculatePrice(product));
}
supplyAsync方法接受一个生产者(Supplier)作为参数,返回一个CompletableFuture对象,该对象完成异步执行后会读取调用生产者方法的返回值。生产者方法会交由ForkJoinPool池中的某个执行线程(Executor)运行,但是你也可以使用supplyAsync方法的重载版本,传递第二个参数指定不同的执行生产者方法。
11.3 让你的代码免受阻塞之苦
有一个商家的列表:
List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
new Shop("LetsSaveBig"),
new Shop("MyFavoriteShop"),
new Shop("BuyItAll");
采用顺序查询所有商店的方式实现findPrices方法:
public List<String> findPricesSequential(String product) {
return shops.stream()
.map(shop -> shop.getName() + " price is " + shop.getPrice(product))
.collect(Collectors.toList());
}
11.3.1 使用并行流对请求进行并行操作
public List<String> findPricesParallel(String product) {
return shops.parallelStream()
.map(shop -> shop.getName() + " price is " + shop.getPrice(product))
.collect(Collectors.toList());
}
11.3.2 使用CompletableFuture发起异步请求
public List<String> findPricesFuture(String product) {
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is "
+ shop.getPrice(product)))//使用CompletableFuture以异步方式计算每种商品的价格
.collect(Collectors.toList());
List<String> prices = priceFutures.stream()
.map(CompletableFuture::join) //等待所有异步操作结束
.collect(Collectors.toList());
return prices;
}
11.3.3 寻找更好的方案
11.3.4 使用定制的执行器
定制的执行器
private final Executor executor = Executors.newFixedThreadPool(shops.size(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});
现在可以按照下面的方式创建一个可查询指定商品价格的CompletableFuture对象:
CompletableFuture.supplyAsync(() -> shop.getName() + " price is "
+ shop.getPrice(product), executor)
*并行——使用流还是completableFutures的建议
1)如果进行的是计算密集型的操作,并且没有I/O,那么推荐使用Stream接口,因为实现简单,同时效率也可能是最高的
2)如果并行的工作单元还涉及等待I/O的操作,那么使用CompletableFuture灵活性更好。
11.4 对多个异步任务进行流水线操作
11.5 响应CompletableFuture的completion事件
11.6 小结
*执行比较耗时的操作时,尤其是那些依赖一个或多个远程服务的操作,使用异步任务可以改善程序的性能,加快程序的响应速度。
*你应该尽可能地为客户提供异步ApI。使用CompletableFuture类提供的特性,你能够轻松地实现这一目标。
*CompletableFuture类还提供了异常管理的机制,让你有机会抛出和管理异步任务执行中发生的异常
*将同步ApI的调用封装到一个CompletableFuture中,你能够以异步的方式使用其结果。
*如果异步任务之间相互独立,或者他们之间某一些的结果是另一些的输入,你可以将这些异步任务构造或者合并成一个
*你可以为CompletableFuture注册一个回调函数,在Future执行完毕或者他们计算的结果可用时,针对性地执行一些程序
*你可以决定在什么时候结束程序的运行,是等待有CompletableFuture对象构成的列表中所有的对象都执行完毕,还是只要其中任何一个首先完成就中止程序的运行。
相关文章推荐
- Java in Action - Struts分页显示
- JavaServer Faces in Action
- 《Java in Action》-1 第2章 通过行为参数化传递代码
- 《Java in Action》-1 第3章 Lambda表达式
- Spring in Action : MVC 配置(JAVA方式)
- 《Java in Action 》
- 《Java in Action》-1 第7章 并行处理数据与性能
- Spring in action 02 -- 装配 Bean(JavaConfig)遇到的问题
- Netty in Action (二)第一章节 第一部分 java网络编程
- 《Java in Action》-1 第5章使用
- Java Reflection in Action
- Quick to Redis,Java Client:Jedis in Action
- Java数据结构 in action——————1、简介
- 《Thinkinginjava》第11章-持有对象
- SPRING IN ACTION 第4版笔记-第二章WIRING BEANS-008-在Java配置文件中引入xml配置文件@Import、@ImportResource
- Struts in Action: Building Web Applications with the Leading Java Framework
- Spring and Enterprise JavaBeans(Chapter 11 of Spring In Action)
- 重读《Struts In Action》
- Spring in Action : MVC 配置(JAVA方式)
- 《Java in Action》-1 第6章用流收集数据