您的位置:首页 > 编程语言 > Java开发

Java异步编程——深入源码分析FutureTask

2018-10-09 11:51 330 查看
版权声明:版权归Zack说码所有 https://blog.csdn.net/zaskkk/article/details/84076551

Java的异步编程是一项非常常用的多线程技术。

之前通过源码详细分析了ThreadPoolExecutor《你真的懂ThreadPoolExecutor线程池技术吗?看了源码你会有全新的认识》。通过创建一个ThreadPoolExecutor,往里面丢任务就可以实现多线程异步执行了。

但之前的任务主要倾向于线程池,并没有讲到异步编程方面的内容。本文将通过介绍Executor+Future框架(FutureTask是实现的核心),来深入了解下Java的异步编程。

万事从示例开始,我们先通过示例Demo有一个直观的印象,再深入去了解概念与原理。

使用示例

Demo:



使用上比较简单,

运行结果:

任务1异步执行:0
任务2异步执行:0
任务2异步执行:1
...
任务2异步执行:45
同步代码
任务2异步执行:24
...
任务1异步执行:199
任务1:执行完成
...
任务2异步执行:199
任务2:执行完成

假若你多次执行这个程序,会发现结果大大的不一样,因为两个任务和同步代码是异步由多条线程执行的,打印的结果当然是随机的。

回顾这个Demo做了什么,

  1. 构建了一个线程池
  2. 往线程池里面丢两个需要执行的任务
  3. 最后获取这两个任务的结果

其中第二点是异步执行两个任务,这两个任务和主线程分别是用了三个线程并发执行的,第三点是在主线程中同步等待两个任务的结果。

很容易看出来,异步编程的好处就在于可以让不相干的任务异步执行,不阻塞主线程。若是主线程需要异步执行的结果,此时再去等待结果会更加高效,提高程序的执行效率。

下面来看看整个流程的实现原理。

源码分析

一般在实际项目中,都会有配置有自己的线程池,建议大家在用异步编程时,配置一个专用的线程池,做好线程隔离,避免异步线程影响到其他模块的工作。Demo中为了方便,直接调用Exectors的方法生成一个临时的线程池,日常不建议使用。

我们从这个

ExecutorService.submit()
方法入手,看看整体实现。


ExecutorService.submit()
定义一个接口。这个接口接收一个Callable参数(执行的任务),返回一个Future(计算结果)。

Callable
,相当于一个需要执行的任务。它不接收任何参数,可以返回结果,可以抛出异常。相类似的还有
Runnable
,它也是不接收,不同点在于它不返回结果,也不抛异常,异常需要在任务内部处理。总结来说
Callable
更像一个方法的调用,
Runnable
则是一个不需要理会结果的调用。在JDK 8以后,它们都可以通过Lamda表达式写法去替代内部类的写法(详见Demo)。

Future
,一个异步计算的结果。调用
get()
方法可以得到对应的计算结果,如果调用时没有异步计算完,会阻塞等待计算的结果。同时它还提供方法可以尝试取消任务的执行。

看回

ExecutorService.submit()
的实现,代码在实现类
AbstractExecutorService
中。


除了它接口的实现,还提供了两种变形。原来接口只接收
Callable
参数,实现类中还新增了接收
Runnable
参数的。

如果看过之前写的《你真的懂ThreadPoolExecutor线程池技术吗?看了源码你会有全新的认识》,应该了解

ThreadPoolExecutor
执行任务是可以调用
execute()
方法的。而这里面
submit()
方法则是为
Callable/Runnable
加多一层
FutureTask
,从而
使执行结果有一个存放的地方,同时也添加一个可以取消的功能。原本的
execute()
只能执行任务,不会返回结果的,具体实现原理可以看看之前的文章分析。

FutureTask
RunnableFuture
的实现。而
RunnableFuture
是继承
Future
Runnable
接口的,定义
run()
接口。


因为
FutureTask
run()
接口,所以可以直接用一个
Callable/Runnable
创建一个
FutureTask
单独执行。但这样并没有异步的效果,因为没有启用新的线程去跑,而是在原来的线程阻塞执行的。

到这里我们清楚知道了,

submit()
方法重点是利用
Callable/Runnable
创建一个
FutureTask
,然后多线程执行
run()
方法,达到异步处理并且得到结果的效果。而
FutureTask
的重点则是
run()
方法如何持有保存计算的结果。

FutureTask.run()


首先判断
futureTask
对象的
state
状态,如果不是NEW的话,证明已经开始运行过了,则退出执行。同时
futureTask
对象通过CAS,把当前线程赋值给变量
runner
(是Thread类型,说明对象使用哪个线程执行的),如果CAS失败则退出。

外层

try{}
代码块中,对
callable
判空和
state
状态必须是NEW。内层
try{}
代码真正调用
callable
,开始执行任务。若执行成功,则把
ran
变量设为true,保存结果在
result
变量中,证明已跑成功过了;若抛异常了,则设为false,
result
为空,并且调用
setException()
保存异常。最后如果
ran
为true的话,则调用
set()
保存
result
结果。

看下

setException()
set()
的实现。


两者的基本流程一样,CAS置换状态,保存结果在
outcome
变量道中,但
setException()
保存的结果类型固定是
Throwable
。另外一个不同在于最终
state
状态,一个是EXCEPTION,一个是NORMAL。

这两个方法最后都调用了

finishCompletion()
。这个方法主要是配合线程池唤醒下一个任务。

FutureTask.get()

从上面

run()
方法得知,最后执行的结果放在了
outcome
变量中。那最终怎么从其中取出结果来,我们来看看
get()
方法。


从源码可知,
get()
方法分两步。第一步,先判断状态,如果计算为完成,则需要阻塞地等待完成。第二步,如果完成了,则调用
report()
方法获取结果并返回。

先看看

awaitDone()
阻塞等待完成。该方法可以选用超时功能。


在自旋的for()循环中,

  • 先判断是否线程被中断,中断的话抛异常退出。
  • 然后开始判断运行的
    state
    值,如果
    state
    大于
    COMPLETING
    ,证明计算已经是终态了,此时返回终态变量。
  • state
    等于
    COMPLETING
    ,证明已经开始计算,并且还在计算中。此时为了避免过多的CPU时间放在这个for循环的自旋上,程序执行
    Thread.yield()
    ,把线程从运行态降为就绪态,让出CPU时间。
  • 若以上状态都不是,则证明
    state
    NEW
    ,还没开始执行。那么程序在当前循环现在会新增一个
    WaitNode
    ,在下一个循环里面调用
    LockSupport.park()
    把当前线程阻塞。当
    run()
    方法结束的时候,会再次唤醒此线程,避免自旋消耗CPU时间。
  • 如果选用了超时功能,在阻塞和自旋过程中超时了,则会返回当前超时的状态。

第二步的

report()
方法比较简单。

  • 如果状态是
    NORMAL
    ,正常结束的话,则把
    outcome
    变量返回;
  • 如果是取消或者中断状态的,则抛出取消异常;
  • 如果是
    EXCEPTION
    ,则把
    outcome
    当作异常抛出(之前
    setException()
    保存的类型就是
    Throwable
    )。从而整个
    get()
    会有一个异常抛出。

总结

至此我们已经比较完整地了解Executor+Future的框架原理了,而FutureTask则是该框架的主要实现。下面总结下要点

  1. Executor.sumbit()
    方法异步执行一个任务,并且返回一个Future结果。
  2. submit()
    的原理是利用
    Callable
    创建一个
    FutureTask
    对象,然后执行对象的
    run()
    方法,把结果保存在
    outcome
    中。
  3. 调用
    get()
    获取
    outcome
    时,如果任务未完成,会阻塞线程,等待执行完毕。
  4. 异常和正常结果都放在
    outcome
    中,调用
    get()
    获取结果或抛出异常。

更多技术文章、精彩干货,请关注
博客:zackku.com
微信公众号:Zack说码

阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: