您的位置:首页 > 编程语言 > Java开发

使用JDK1.8 CompletableFuture异步化任务处理

2018-02-27 17:41 197 查看

0.概述

服务端编程的一个经典场景是在接收和处理客户端请求时,为了避免对每一个请求都分配线程而带来的资源开销,服务一般会预先分配一个固定大小的线程池(比如TomcatconnectormaxThreads),当客户端请求到来时,从线程池里寻找空闲状态的线程来处理请求,请求处理完毕后会回到线程池,继续服务下一个请求。当线程池内的线程都处于繁忙状态时,新来的请求需要排队直到线程池内有可用的线程,或者当超出队列容量后(TomcatconnectoracceptCount属性)请求被拒绝(connectionrefusederror)。

为了提高服务的吞吐量,我们应当确保主线程尽快处理尽快返回,尽量使服务端的任务处理线程池始终有可分配的线程来处理新的客户端请求。

当主线程执行一个任务时,如果该任务较耗时,通常的做法是利用Future/Promise来异步化处理任务。从JDK1.5开始,J.U.C中提供了Future来代表一个异步操作。JDK1.8中则新增了lambda表达式和CompletableFuture,可以帮助我们更好的用函数式编程风格来实现任务的异步处理。

1.Future

代码例子:

ExecutorServiceexecutor=Executors.newFixedThreadPool(1);
Future<String>future=executor.submit(()->{
//longrunningtask
return"taskfinish.";
});


Future实在是太鸡肋了,仅暴露了get/cancel/isDone/isCancelled方法。我们无法通过Future去手动结束一个任务,也无法非阻塞的去获取Future的任务结果,因为future.get()方法是阻塞的。假设有下面这个场景,当前有两个任务,后一个任务依赖了前一个任务的处理结果,这种场景也无法通过Future来实现异步流程任务处理。

2.CompletableFuture

CompletableFuture实现了Future和CompletionStage两个接口,CompletionStage可以看做是一个异步任务执行过程的抽象。我们可以基于CompletableFuture方便的创建任务和链式处理多个任务。下面我们通过实例来介绍它的用法。

2.1创建任务

可以使用runAsync方法新建一个线程来运行Runnable对象(无返回值)

CompletableFuture<Void>futureAsync=CompletableFuture.runAsync(()->{
//longrunningtaskwithoutreturnvalue
System.out.println("taskfinish.");
});


也可以使用supplyAysnc方法新建线程来运行Supplier<T>对象(有返回值)

CompletableFuture<String>future=CompletableFuture.supplyAsync(()->{
//longrunningtask
return"taskresult";
});


这里执行任务的线程来自于ForkJoinPool.commonPool(),也可以自定义线程池

ExecutorServiceexector=Executors.newFixedThreadPool(5);
CompletableFuture<String>future=CompletableFuture.supplyAsync(()->{
//longrunningtask
return"taskresult";
},executor);



2.2任务的异步处理

不论Future.get()方法还是CompletableFuture.get()方法都是阻塞的,为了获取任务的结果同时不阻塞当前线程的执行,我们可以使用CompletionStage提供的方法结合callback来实现任务的异步处理。

2.2.1使用callback基于特定任务的结果进行异步处理

程序中经常需要主线程创建新的线程来处理某一任务,然后基于任务的完成结果(返回值或者exception)来执行特定逻辑,对于这种场景我们可以很方面的使用whenComplete或者whenCompleteAsync来注册callback方法

CompletableFuture<String>future=CompletableFuture.supplyAsync(()->{
//longrunningtask
return"taskresult";
});

future.whenComplete((result,exception)->{
if(null==exception){
System.out.println("resultfromprevioustask:"+result);
}
});


对于任务执行中抛错的情况:


CompletableFuture<String>future=CompletableFuture.supplyAsync(()->{
//longrunningtask
thrownewRuntimeException("error!");
});

future.whenComplete((result,exception)->{
if(null==exception){
System.out.println("resultfromprevioustask:"+result);
}else{
System.err.println("Exceptionthrownfromprevioustask:"+exception.getMessage());
}
});


也可以用exceptionally来显示的处理错误:

CompletableFuture.supplyAsync(()->{
thrownewIllegalArgumentException("error");
}).exceptionally(ex->{
System.out.println("Exceptioncaught:"+ex.getMessage());
returnex.getMessage();
}).thenAccept(result->{
System.out.println("result:"+result);
});


如果不需关心任务执行中是否有exception,则可以使用thenAccept方法,需要注意的是如果执行中抛了exception,则thenAccept里面的回调方法不会被执行

CompletableFuture.supplyAsync(()->{
//longrunningtask
return"taskresult";
}).thenAccept((result)->{
System.out.println("resultfromprevioustask:"+result);
});



2.2.2任务的链式处理

在应用中经常会遇到任务的pipeline处理,任务A执行完后触发任务B,任务B执行完后触发任务C,上一个任务的结果是下一个任务的输入,对于这种场景,我们可以使用thenApply方法。

CompletableFuture.supplyAsync(()->{
//longrunningtask
return"task1";
}).thenApply(previousResult->{
returnpreviousResult+"task2";
}).thenApply(previousResult->{
returnpreviousResult+"task3";
}).thenAccept(previousResult->{
System.out.println(previousResult);
});
output:task1task2task3


让我们再看下面这个例子,某一应用需要先根据accountId从数据库找到对应的账号信息,然后对该账号执行特定的处理逻辑:

CompletableFuture<Account>getAccount(StringaccountId){
returnCompletableFuture.supplyAsync(()->{
returnaccountService.findAccount(accountId);
});
}

CompletableFuture<String>processAccount(Accountaccount){
returnCompletableFuture.supplyAsync(()->{
returnaccountService.updateAccount(account);
});
}


如果使用thenApply方法,其返回的结果是一个嵌套的CompletableFuture对象:

CompletableFuture<CompletableFuture<String>>res=getAccount("123").thenApply(account->{
returnprocessAccount(account);
});


如果不希望结果是嵌套的CompletableFuture,我们可以使用thenCompose方法来替代thenApply

CompletableFuture<String>res=getAccount("123").thenCompose(account->{
returnprocessAccount(account);
});


2.2.3多任务的并行处理

另一种常见的场景是将一个大的任务切分为数个子任务,并行处理所有子任务,当所有子任务都成功结束时再继续处理后面的逻辑。以前的做法是利用CountDownLatch,主线程构造countDownLatch对象,latch的大小为子任务的总数,每一个任务持有countDownLatch的引用,任务完成时对latch减1,主线程阻塞在countDownLatch.await方法上,当所有子任务都成功执行完后,latch=0,主线程继续执行。

intsize=5;
CountDownLatchlatch=newCountDownLatch(size);
for(inti=0;i<size;i++){
Executors.newFixedThreadPool(size).submit(()->{
try{
//longrunningtask
System.out.println(Thread.currentThread().getName()+""+latch.getCount());
}finally{
latch.countDown();
}
});
}
try{
latch.await();
}catch(InterruptedExceptione){
e.printStackTrace();
}

//continue...
System.out.println(Thread.currentThread().getName());


这样的代码繁琐且很容易出错,我们可以用CompletableFuture.allOf来方便的处理上述场景。直接贴例子,根据一组账户ID并行查找对应账户:

CompletableFuture<String>findAccount(StringaccountId){
returnCompletableFuture.supplyAsync(()->{
//mockfindingaccountfromdatabase
return"account"+accountId;
});
}

publicvoidbatchProcess(List<String>accountIdList){
//并行根据accountId查找对应account
List<CompletableFuture<String>>accountFindingFutureList=
accountIdList.stream().map(accountId->findAccount(accountId)).collect(Collectors.toList());

//使用allOf方法来表示所有的并行任务
CompletableFuture<Void>allFutures=
CompletableFuture
.allOf(accountFindingFutureList.toArray(newCompletableFuture[accountFindingFutureList.size()]));

//下面的方法可以帮助我们获得所有子任务的处理结果
CompletableFuture<List<String>>finalResults=allFutures.thenApply(v->{
returnaccountFindingFutureList.stream().map(accountFindingFuture->accountFindingFuture.join())
.collect(Collectors.toList());
});
}


如果后续逻辑没有必要等待所有子任务全部结束,而是只要任一一个任务成功结束就可以继续执行,我们可以使用CompletableFuture.anyOf方法:

CompletableFuture<Object>anyOfFutures=CompletableFuture.anyOf(taskFutureA,taskFutureB,taskFutureC);


假设三个任务中taskFutureA最先执行完毕并成功返回,则anyOfFutures里得到的是taskFutureA的执行结果.

3.展望

基于JDK1.8的lambda表达式和CompletableFuture,我们可以写出更具有函数式编程风格的代码,可以更方便的实现任务的异步处理,只用很少的代码便可以实现任务的异步pipeline和并行调用。在异步开发模型(nodeJs/Vert.x)越来越火的今天,我们就从今天开始使用lambda+CompletableFuture来改造我们的Java应用吧。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: