JDK8 并发新特性学习 (一) CompletableFuture
2017-12-30 15:22
671 查看
JDK8 并发新特性学习 (一) CompletableFuture
JDK 8java.util.concurrent新增加的两个接口和四个类
CompletableFutre.AsynchronousCompletionTask
一个没有方法的装饰接口,用来标识异步的任务
CompletionStage<T>
一个多个异步计算的接口,对多个异步计算进行组合、过滤、异常处理等操作。
ConcurrentHashMap.KeySet<K,V>
一个ConcurrentHashMap的key的view
CounterCompleter<T>
一个ForkJoinTask,当任务执行完成时候执行
CompletionException
在完成一个任务时候抛出的异常.
这次我们先学习的是CompletableFuture。首先我们看这个类实现了Future和CompletionStage接口,所以它既有
Future的功能,也有一些异步计算的功能。
个人理解CompletableFuture的功能主要分为几类:
生产一个CompletableFuture,使用
supplyAsync
转换和使用上一个结构,主要使用
accept
单个异常处理
exceptionally
组合结果
combine和
compose
等待完成之后执行,主要使用
acceptEither、
runEither、
allOf和
anyOf
创建一个异步任务,打印日志信息
private static void create() throws ExecutionException, InterruptedException { CompletableFuture<String> future = CompletableFuture.<String>supplyAsync(() -> sendMessage("1")); } private static String sendMessage(String id) { log.info("这里是发送信息{}", id); return "信息" + id; } public static void main(String[] args) throws Throwable { log.info("启动"); create(); log.info("结束"); Thread.sleep(3 * 1000L); }
最后的输出结果是:
10:15:37.918 [main] INFO c.k.j.c.CompletableFutureDemo - 启动 10:15:38.004 [main] INFO c.k.j.c.CompletableFutureDemo - 结束 10:15:38.004 [ForkJoinPool.commonPool-worker-1] INFO c.k.j.c.CompletableFutureDemo - 这里是发送信息1
可见是异步的,如果没有提供Executor,默认使用ForkJoinPool的commonPool
我们对一个结果进行处理,处理多次
private static void run() throws ExecutionException, InterruptedException { CompletableFuture<String> future = CompletableFuture.<String>supplyAsync(() -> sendMessage("1")); CompletableFuture<Void> acceptFuture = future.thenAccept(s -> log.info(s.concat("第一步的accept"))); CompletableFuture<Void> voidCompletableFuture = acceptFuture.thenAccept(aVoid -> log.info("第二步的accept, 返回值已经为void了, 返回值: {}", aVoid)); }
输出结果:
10:45:34.296 [main] INFO c.k.j.c.CompletableFutureDemo - 启动 10:45:34.367 [ForkJoinPool.commonPool-worker-1] INFO c.k.j.c.CompletableFutureDemo - 这里是发送信息1 10:45:34.367 [main] INFO c.k.j.c.CompletableFutureDemo - 信息1第一步的accept 10:45:34.368 [main] INFO c.k.j.c.CompletableFutureDemo - 第二步的accept, 返回值已经为void了, 返回值: null 10:45:34.368 [main] INFO c.k.j.c.CompletableFutureDemo - 结束
在这里我们观察到CompletableFutre的thenApply是每次对上一个completableFuture做处理,thenApply的参数是一个Consumer,所以没有返回值。在第二次进行处理的时候已经成
Consumer<? extends Void>了.
那假如我们使用
thenApplySync效果会是什么呢
private static void run() throws ExecutionException, InterruptedException { CompletableFuture<String> future = CompletableFuture.<String>supplyAsync(() -> sendMessage("1")); // CompletableFuture<Void> acceptFuture = future.thenAccept(s -> log.info(s.concat("第一步的accept"))); // CompletableFuture<Void> voidCompletableFuture = acceptFuture.thenAccept(aVoid -> log.info("第二步的accept, 返回值已经为void了, 返回值: {}", aVoid)); CompletableFuture<Void> acceptSyncFuture = future.thenAcceptAsync(s -> log.info(s.concat("第一步的accept"))); acceptSyncFuture.thenAcceptAsync(aVoid -> log.info("第二步的accept, 返回值已经为void了, 返回值: {}", aVoid)); }
输出结果:
10:47:07.286 [main] INFO c.k.j.c.CompletableFutureDemo - 启动 10:47:07.361 [main] INFO c.k.j.c.CompletableFutureDemo - 结束 10:47:07.363 [ForkJoinPool.commonPool-worker-1] INFO c.k.j.c.CompletableFutureDemo - 这里是发送信息1 10:47:07.363 [ForkJoinPool.commonPool-worker-1] INFO c.k.j.c.CompletableFutureDemo - 信息1第一步的accept 10:47:07.363 [ForkJoinPool.commonPool-worker-1] INFO c.k.j.c.CompletableFutureDemo - 第二步的accept, 返回值已经为void了, 返回值: null
上述输出结果顺序可能会有所变化,如果我们使用
.thenAcceptAsync,那么就会保持使用异步的线程执行accept,而
.thenAccept会类似于future.get等待上一个异步的结果,最后在主线程里执行
单个异常处理
CompletableFuture<String> future = CompletableFuture.<String>supplyAsync(() -> sendMessage("1")); CompletableFuture<String> exceptionFuture = future.exceptionally(e -> { log.info("发生了异常", e); return e.getMessage(); }); exceptionFuture.thenAccept(s -> log.info("这是accept".concat(s)));
测试说出说明,如果future正常完成,那么将会走thenAccept,否则会进行异常处理
combine 和 compose
combine是两个CompletableFuture结合起来,将前一个和后一个结合起来。compose类似于flatmap的作用,会将一个future的返回值,组合成一个新的completableFuture
CompletableFuture<String> future = CompletableFuture.<String>supplyAsync(() -> sendMessage("1")); CompletableFuture<String> secondFuture = CompletableFuture.<String>supplyAsync(() -> sendMessage("2")); CompletableFuture<String> combineFuture = secondFuture.thenCombine(future, (s, v) -> { log.info("这里是combine, {}", s + v); return s + v; });
CompletableFuture<String> future = CompletableFuture.<String>supplyAsync(() -> sendMessage("1")); CompletableFuture<String> secondFuture = CompletableFuture.<String>supplyAsync(() -> sendMessage("2")); CompletableFuture<String> composeFuture = future.thenCompose(s -> { log.info("compose的参数值是{}", s); return CompletableFuture.<String>supplyAsync(() -> "compose的future" + s); });
任务完成之后执行,这里我们只假设一个场景,就是我们要并发做四件事,然后做完之后再打印成功日志
CompletableFuture.allOf(future, secondFuture, thirdFuture, forthFuture).thenAccept(o -> log.info("完成任务"));
现在CompletableFuture介绍的差不多了,JDK8的异步编程的确方便了很多,而且像stream一样。还有handle和complete等其他方法,可以去看看。CompletableFuture就介绍到这儿了。
相关文章推荐
- Guava-Monitor-Future并发特性
- 共同学习Java源代码-多线程与并发-Future、RunnableFuture接口
- JDK8 并发包学习 Executor/ExecutorService(一)
- Java8学习计划--关于多核多线程并发编程-Java8-CompletableFuture 3的介绍
- 【Java高并发学习】Future模式
- Java8学习计划--关于多核多线程并发编程-Java8-CompletableFuture 4的介绍
- Python学习(二十)—— __future__模块引入新特性
- 《Java高并发程序设计》学习 --6.5 增强的Future:CompletableFuture
- [Java并发包学习四]Future和FutureTask
- [Java并发包学习四]Future和FutureTask
- [Java并发包学习四]Future和FutureTask
- Java 8新特性之CompletableFuture:组合式异步编程
- java学习——java高级特性,线程,并发 笔记
- java并发包学习系列:future模式
- 并发调试和JDK8新特性
- [Java并发包学习四]Future和FutureTask
- JDK8新特性学习(一) Lambda表达式和函数式接口
- Java多线程编程--(8)学习Java5.0 并发编程包--线程池、Callable & Future 简介
- 学习实战全笔记--JavaSE--包装类的特性--用法示例(JDK8)
- Java多线程编程--(8)学习Java5.0 并发编程包--线程池、Callable & Future 简介