您的位置:首页 > 编程语言 > Java开发

《Java in Action》-1 第11章CompletableFuture:组合式异步编程(需重读)

2016-10-18 00:37 411 查看
第11 章 CompletableFuture:组合式异步编程



第7章中介绍的分支/合并框架以及并行流是实现并行处理的宝贵工具;他们将一个操作切分为多个子操作,在多个不同的核、CpU甚至是机器上并行地执行这些子操作。

以此相反,如果你的意图是实现并发,而非并行,或者你的主要目标是在同一个CpU上执行几个松耦合的任务,充分利用CpU的核,让其足够忙碌,从而最大化程序的吞吐量,那么你其实真正想做的是避免因为等待远程服务的返回,或者对数据库的查询,而阻塞线程的执行,浪费宝贵的计算资源,因为这种等待的时间很可能相当长。通过Future接口,尤其是它的新版实现CompletableFuture,是处理这种情况的利器。

11.1 Future接口

Future接口是对将来某个时刻会发生的结果进行建模,它建模了一种异步计算,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。在Futrue中触发那些潜在耗时的操作把调用线程解放出来,让它能继续执行其他有价值的工作,不需要呆呆等待耗时的操作完成。

使用Future,只需要将耗时的操作封装在一个Callable对象中,再将它提交给ExecutorService,就万事大吉了。

使用Future的一个例子:

ExecutorService executor = Executors.newCachedThreadPool();   //创建ExecutorService,通过它你可以向线程池提交任务

Future<Double> future = executor.submit(new Callable<Double>(){//向ExecutorService提交一个Callable对象

public Double call() {

return doSomeLongComputation();//以异步方式在新的线程中执行耗时的操作

}});

doSomethingElse();//异步操作进行的同时,你可以做其他的事情

try {

Double result = future.get(1,TimeUnit.SECONDS); //获取异步操作的结果,如果最终被阻塞,无法得到结果,那么在最多等待1秒钟之后退出

}catch() {

}

11.1.1 Future接口的局限性

不足以编写简洁的并发代码。

11.1.2 使用CompletableFuture构建异步应用

11.2 实现异步API

同步方法:

public double getPrice(String
product) {

return calculatePrice(product);

}

11.2.1 将同步方法转换为异步方法

public Future<Double> getPriceAsync(String product) {

CompletableFuture<Double> futurePrice = new CompletableFuture<>();//创建CompletableFuture对象,它会包含计算的结果

new Thread( () -> {//在另一个线程中以异步方式执行计算

doubleprice = calculatePrice(product);

futurePrice.complete(price);//需长时间计算的任务结束并得到结果时,设置Future的返回值

}).start();

return futurePrice;//无需等待还没结束的计算,直接返回Future对象

}

11.2.2 错误处理

public Future<Double> getPriceAsync(String product) {

CompletableFuture<Double> futurePrice = new CompletableFuture<>();//创建CompletableFuture对象,它会包含计算的结果

new Thread( () -> {//在另一个线程中以异步方式执行计算

try{

double price = calculatePrice(product);
futurePrice.complete(price);//需长时间计算的任务结束并得到结果时,设置Future的返回值
} catch (Exception ex) {

futurePrice.completeExceptionally(ex);

}

}).start();

return futurePrice;//无需等待还没结束的计算,直接返回Future对象

}

*使用工厂方法supplyAsync创建CompletableFuture
public Future<Double> getPriceAsync(String product) {

return CompletableFuture.supplyAsync(()
-> calculatePrice(product));
}
supplyAsync方法接受一个生产者(Supplier)作为参数,返回一个CompletableFuture对象,该对象完成异步执行后会读取调用生产者方法的返回值。生产者方法会交由ForkJoinPool池中的某个执行线程(Executor)运行,但是你也可以使用supplyAsync方法的重载版本,传递第二个参数指定不同的执行生产者方法。

11.3 让你的代码免受阻塞之苦

有一个商家的列表:

List<Shop> shops = Arrays.asList(new Shop("BestPrice"),

                                                   new Shop("LetsSaveBig"),

                                                   new Shop("MyFavoriteShop"),

                                                   new Shop("BuyItAll");

采用顺序查询所有商店的方式实现findPrices方法:

public List<String> findPricesSequential(String product) {

        return shops.stream()

                .map(shop -> shop.getName() + " price is " + shop.getPrice(product))

                .collect(Collectors.toList());

    }

11.3.1 使用并行流对请求进行并行操作

    public List<String> findPricesParallel(String product) {

        return shops.parallelStream()

                .map(shop -> shop.getName() + " price is " + shop.getPrice(product))

                .collect(Collectors.toList());

    }

11.3.2 使用CompletableFuture发起异步请求

 public List<String> findPricesFuture(String product) {

        List<CompletableFuture<String>> priceFutures =

                shops.stream()

                .map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is "

                        + shop.getPrice(product)))//使用CompletableFuture以异步方式计算每种商品的价格

                .collect(Collectors.toList());

        List<String> prices = priceFutures.stream()

                .map(CompletableFuture::join) //等待所有异步操作结束

                .collect(Collectors.toList());

        return prices;

    }

11.3.3 寻找更好的方案

11.3.4 使用定制的执行器

定制的执行器

 private final Executor executor = Executors.newFixedThreadPool(shops.size(), new ThreadFactory() {

        @Override

        public Thread newThread(Runnable r) {

            Thread t = new Thread(r);

            t.setDaemon(true);

            return t;

        }

    });

现在可以按照下面的方式创建一个可查询指定商品价格的CompletableFuture对象:

CompletableFuture.supplyAsync(() -> shop.getName() + " price is "

                        + shop.getPrice(product), executor)

*并行——使用流还是completableFutures的建议

1)如果进行的是计算密集型的操作,并且没有I/O,那么推荐使用Stream接口,因为实现简单,同时效率也可能是最高的

2)如果并行的工作单元还涉及等待I/O的操作,那么使用CompletableFuture灵活性更好。

11.4 对多个异步任务进行流水线操作

11.5 响应CompletableFuture的completion事件

11.6 小结

*执行比较耗时的操作时,尤其是那些依赖一个或多个远程服务的操作,使用异步任务可以改善程序的性能,加快程序的响应速度。

*你应该尽可能地为客户提供异步ApI。使用CompletableFuture类提供的特性,你能够轻松地实现这一目标。

*CompletableFuture类还提供了异常管理的机制,让你有机会抛出和管理异步任务执行中发生的异常

*将同步ApI的调用封装到一个CompletableFuture中,你能够以异步的方式使用其结果。

*如果异步任务之间相互独立,或者他们之间某一些的结果是另一些的输入,你可以将这些异步任务构造或者合并成一个

*你可以为CompletableFuture注册一个回调函数,在Future执行完毕或者他们计算的结果可用时,针对性地执行一些程序

*你可以决定在什么时候结束程序的运行,是等待有CompletableFuture对象构成的列表中所有的对象都执行完毕,还是只要其中任何一个首先完成就中止程序的运行。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: