您的位置:首页 > 编程语言 > Java开发

JDK8 并发新特性学习 (一) CompletableFuture

2017-12-30 15:22 671 查看

JDK8 并发新特性学习 (一) CompletableFuture

JDK 8
java.util.concurrent
新增加的两个接口和四个类

CompletableFutre.AsynchronousCompletionTask


一个没有方法的装饰接口,用来标识异步的任务

CompletionStage<T>


一个多个异步计算的接口,对多个异步计算进行组合、过滤、异常处理等操作。

ConcurrentHashMap.KeySet<K,V>


一个ConcurrentHashMap的key的view

CounterCompleter<T>


一个ForkJoinTask,当任务执行完成时候执行

CompletionException


在完成一个任务时候抛出的异常.

这次我们先学习的是CompletableFuture。首先我们看这个类实现了Future和CompletionStage接口,所以它既有

Future的功能,也有一些异步计算的功能。

个人理解CompletableFuture的功能主要分为几类:

生产一个CompletableFuture,使用
supplyAsync


转换和使用上一个结构,主要使用
accept


单个异常处理
exceptionally


组合结果
combine
compose


等待完成之后执行,主要使用
acceptEither
runEither
allOf
anyOf


创建一个异步任务,打印日志信息

private static void create() throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.<String>supplyAsync(() -> sendMessage("1"));
}

private static String sendMessage(String id) {
log.info("这里是发送信息{}", id);
return "信息" + id;
}

public static void main(String[] args) throws Throwable {
log.info("启动");
create();
log.info("结束");
Thread.sleep(3 * 1000L);
}


最后的输出结果是:

10:15:37.918 [main] INFO  c.k.j.c.CompletableFutureDemo - 启动
10:15:38.004 [main] INFO  c.k.j.c.CompletableFutureDemo - 结束
10:15:38.004 [ForkJoinPool.commonPool-worker-1] INFO  c.k.j.c.CompletableFutureDemo - 这里是发送信息1


可见是异步的,如果没有提供Executor,默认使用ForkJoinPool的commonPool

我们对一个结果进行处理,处理多次

private static void run() throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.<String>supplyAsync(() -> sendMessage("1"));
CompletableFuture<Void> acceptFuture = future.thenAccept(s -> log.info(s.concat("第一步的accept")));
CompletableFuture<Void> voidCompletableFuture = acceptFuture.thenAccept(aVoid -> log.info("第二步的accept, 返回值已经为void了, 返回值: {}", aVoid));
}


输出结果:

10:45:34.296 [main] INFO  c.k.j.c.CompletableFutureDemo - 启动
10:45:34.367 [ForkJoinPool.commonPool-worker-1] INFO  c.k.j.c.CompletableFutureDemo - 这里是发送信息1
10:45:34.367 [main] INFO  c.k.j.c.CompletableFutureDemo - 信息1第一步的accept
10:45:34.368 [main] INFO  c.k.j.c.CompletableFutureDemo - 第二步的accept, 返回值已经为void了, 返回值: null
10:45:34.368 [main] INFO  c.k.j.c.CompletableFutureDemo - 结束


在这里我们观察到CompletableFutre的thenApply是每次对上一个completableFuture做处理,thenApply的参数是一个Consumer,所以没有返回值。在第二次进行处理的时候已经成
Consumer<? extends Void>
了.

那假如我们使用
thenApplySync
效果会是什么呢

private static void run() throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.<String>supplyAsync(() -> sendMessage("1"));
// CompletableFuture<Void> acceptFuture = future.thenAccept(s -> log.info(s.concat("第一步的accept")));
// CompletableFuture<Void> voidCompletableFuture = acceptFuture.thenAccept(aVoid -> log.info("第二步的accept, 返回值已经为void了, 返回值: {}", aVoid));
CompletableFuture<Void> acceptSyncFuture = future.thenAcceptAsync(s -> log.info(s.concat("第一步的accept")));
acceptSyncFuture.thenAcceptAsync(aVoid -> log.info("第二步的accept, 返回值已经为void了, 返回值: {}", aVoid));
}


输出结果:

10:47:07.286 [main] INFO  c.k.j.c.CompletableFutureDemo - 启动
10:47:07.361 [main] INFO  c.k.j.c.CompletableFutureDemo - 结束
10:47:07.363 [ForkJoinPool.commonPool-worker-1] INFO  c.k.j.c.CompletableFutureDemo - 这里是发送信息1
10:47:07.363 [ForkJoinPool.commonPool-worker-1] INFO  c.k.j.c.CompletableFutureDemo - 信息1第一步的accept
10:47:07.363 [ForkJoinPool.commonPool-worker-1] INFO  c.k.j.c.CompletableFutureDemo - 第二步的accept, 返回值已经为void了, 返回值: null


上述输出结果顺序可能会有所变化,如果我们使用
.thenAcceptAsync
,那么就会保持使用异步的线程执行accept,而
.thenAccept
会类似于future.get等待上一个异步的结果,最后在主线程里执行

单个异常处理

CompletableFuture<String> future = CompletableFuture.<String>supplyAsync(() -> sendMessage("1"));
CompletableFuture<String> exceptionFuture = future.exceptionally(e -> {
log.info("发生了异常", e);
return e.getMessage();
});
exceptionFuture.thenAccept(s -> log.info("这是accept".concat(s)));


测试说出说明,如果future正常完成,那么将会走thenAccept,否则会进行异常处理

combine 和 compose

combine是两个CompletableFuture结合起来,将前一个和后一个结合起来。compose类似于flatmap的作用,会将一个future的返回值,组合成一个新的completableFuture

CompletableFuture<String> future = CompletableFuture.<String>supplyAsync(() -> sendMessage("1"));
CompletableFuture<String> secondFuture = CompletableFuture.<String>supplyAsync(() -> sendMessage("2"));

CompletableFuture<String> combineFuture = secondFuture.thenCombine(future, (s, v) -> {
log.info("这里是combine, {}", s + v);
return s + v;
});


CompletableFuture<String> future = CompletableFuture.<String>supplyAsync(() -> sendMessage("1"));
CompletableFuture<String> secondFuture = CompletableFuture.<String>supplyAsync(() -> sendMessage("2"));

CompletableFuture<String> composeFuture = future.thenCompose(s -> {
log.info("compose的参数值是{}", s);
return CompletableFuture.<String>supplyAsync(() -> "compose的future" + s);
});


任务完成之后执行,这里我们只假设一个场景,就是我们要并发做四件事,然后做完之后再打印成功日志

CompletableFuture.allOf(future, secondFuture, thirdFuture, forthFuture).thenAccept(o -> log.info("完成任务"));


现在CompletableFuture介绍的差不多了,JDK8的异步编程的确方便了很多,而且像stream一样。还有handle和complete等其他方法,可以去看看。CompletableFuture就介绍到这儿了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: