您的位置:首页 > 编程语言 > Java开发

Java Callable Future接口执行机制解密

2017-10-07 14:45 459 查看
在Java中,我们执行异步任务的代码可以这么写。ExecutorServiceexecutorService=Executors.newSingleThreadExecutor();Future<String>stringFuture=executorService.submit(()->{System.out.println("你好,世界");return"helloworld";});System.out.println(stringFuture.getClass().getName());System.out.println(stringFuture.get());在submit方法中传入一个callable接口对象。里面承载的就是异步执行的Java代码。 我们可以看到异步执行的任务很简单,打印一行话 你好,世界。然后将helloworld字符串返回给Future<T>接口对象接受执行的结果。当我们执行  stringFuture.get()方法的时候,如果异步的任务还没有执行完。那么future.get()方法所在的线程是会阻塞的。像我们平常自己起一个线程去执行,执行的结果一般是那不会来的。但是Callable接口和Future接口这两个接口却可以做到将线程执行完的计算结果给拿到。所以带着这样一个问题我们去查看下jdk的源码是怎样实现的。首先第一行代码ExecutorServiceexecutorService=Executors.newSingleThreadExecutor();
我们可以知道,创建了一个线程池。具体的代码是这样的。
publicstaticExecutorServicenewSingleThreadExecutor(){returnnewFinalizableDelegatedExecutorService(newThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,newLinkedBlockingQueue<Runnable>()));}
Executors类新建了一个ExecutorService接口的实现类。
该类的继承结构是这样的。
其中新建的ExecutorService接口的实现类主要使用的是DelegatedExecutorService这个类。该类中封装又封装了一个ExecutorService接口的实现类。
staticclassDelegatedExecutorServiceextendsAbstractExecutorService{privatefinalExecutorServicee;DelegatedExecutorService(ExecutorServiceexecutor){e=executor;}publicvoidexecute(Runnablecommand){e.execute(command);}publicvoidshutdown(){e.shutdown();}publicList<Runnable>shutdownNow(){returne.shutdownNow();}publicbooleanisShutdown(){returne.isShutdown();}publicbooleanisTerminated(){returne.isTerminated();}publicbooleanawaitTermination(longtimeout,TimeUnitunit)throwsInterruptedException{returne.awaitTermination(timeout,unit);}publicFuture<?>submit(Runnabletask){returne.submit(task);}public<T>Future<T>submit(Callable<T>task){returne.submit(task);}public<T>Future<T>submit(Runnabletask,Tresult){returne.submit(task,result);}public<T>List<Future<T>>invokeAll(Collection<?extendsCallable<T>>tasks)throwsInterruptedException{returne.invokeAll(tasks);}public<T>List<Future<T>>invokeAll(Collection<?extendsCallable<T>>tasks,longtimeout,TimeUnitunit)throwsInterruptedException{returne.invokeAll(tasks,timeout,unit);}public<T>TinvokeAny(Collection<?extendsCallable<T>>tasks)throwsInterruptedException,ExecutionException{returne.invokeAny(tasks);}public<T>TinvokeAny(Collection<?extendsCallable<T>>tasks,longtimeout,TimeUnitunit)throwsInterruptedException,ExecutionException,TimeoutException{returne.invokeAny(tasks,timeout,unit);}}
我们可以看到,该类的所有的方法实现全都是调用可内部封装的ExecutorService接口的实现类。
有读者可能会问了,这个被包装的接口的类是哪个类。这里应该是ThreadPoolExecutor这个类。
我们可以看下集成关系图
ThraedPoolExecutor类也是继承了AbstractExecutorService这个类。这就好分析了。
接下来我们分析下面的代码。
以后笔者直接将分析的内容以注解的形式写在代码旁边
Future<String>stringFuture=executorService.submit(()->{System.out.println("你好,世界");return"helloworld";});
public<T>Future<T>submit(Callable<T>task){if(task==null)thrownewNullPointerException();
//实际上这里是new了一个FutureTask类的对象。
//FutureTask类实现了Runnable,Future<T>和RunnableFuture<V>三个接口,说明之后返回的FutureTask类对象可以
//当做Future<T>接口的实现类来用RunnableFuture<T>ftask=newTaskFor(task);
//Callable<T>接口里的异步代码是在这行代码被调用的。
	//execute方法接收了刚才那个FutureTask类的对象做为参数去执行。我们到这个execute()方法里去看看execute(ftask);returnftask;}
我们可以看到该方法实现在ThreadPoolExecutor类里面。当然execute(Runnabler)这个方法是声明在Executor接口里面的。
好,我们到execute()方法的实现里面去分析分析
publicvoidexecute(Runnablecommand){if(command==null)thrownewNullPointerException();/**Proceedin3steps:**1.IffewerthancorePoolSizethreadsarerunning,tryto*startanewthreadwiththegivencommandasitsfirst*task.ThecalltoaddWorkeratomicallychecksrunStateand*workerCount,andsopreventsfalsealarmsthatwouldadd*threadswhenitshouldn't,byreturningfalse.**2.Ifataskcanbesuccessfullyqueued,thenwestillneed*todouble-checkwhetherweshouldhaveaddedathread*(becauseexistingonesdiedsincelastchecking)orthat*thepoolshutdownsinceentryintothismethod.Sowe*recheckstateandifnecessaryrollbacktheenqueuingif*stopped,orstartanewthreadiftherearenone.**3.Ifwecannotqueuetask,thenwetrytoaddanew*thread.Ifitfails,weknowweareshutdownorsaturated*andsorejectthetask.*/intc=ctl.get();if(workerCountOf(c)<corePoolSize){
	//我们点入到这行代码里面去,其他的太复杂了,笔者这里就先不分析了,因为与今天的主题不是太相关if(addWorker(command,true))return;c=ctl.get();}if(isRunning(c)&&workQueue.offer(command)){intrecheck=ctl.get();if(!isRunning(recheck)&&remove(command))reject(command);elseif(workerCountOf(recheck)==0)addWorker(null,false);}elseif(!addWorker(command,false))reject(command);}
//这个方法的形参就是刚才new出来的FutureTask类的对象。因为其也实现了Runnable接口,所以可以这样传递
privatebooleanaddWorker(RunnablefirstTask,booleancore){retry:for(;;){intc=ctl.get();intrs=runStateOf(c);//Checkifqueueemptyonlyifnecessary.if(rs>=SHUTDOWN&&!(rs==SHUTDOWN&&firstTask==null&&!workQueue.isEmpty()))returnfalse;for(;;){intwc=workerCountOf(c);if(wc>=CAPACITY||wc>=(core?corePoolSize:maximumPoolSize))returnfalse;if(compareAndIncrementWorkerCount(c))breakretry;c=ctl.get();//Re-readctlif(runStateOf(c)!=rs)continueretry;//elseCASfailedduetoworkerCountchange;retryinnerloop}}booleanworkerStarted=false;booleanworkerAdded=false;Workerw=null;try{w=newWorker(firstTask);finalThreadt=w.thread;if(t!=null){finalReentrantLockmainLock=this.mainLock;mainLock.lock();try{//Recheckwhileholdinglock.//BackoutonThreadFactoryfailureorif//shutdownbeforelockacquired.intrs=runStateOf(ctl.get());if(rs<SHUTDOWN||(rs==SHUTDOWN&&firstTask==null)){if(t.isAlive())//precheckthattisstartablethrownewIllegalThreadStateException();workers.add(w);ints=workers.size();if(s>largestPoolSize)largestPoolSize=s;workerAdded=true;}}finally{mainLock.unlock();}if(workerAdded){
//这行代码启动了Callable接口里的线程代码。
		//既然线程被启动了,里面我们就要去看看Runnable接口定义的run方法是如何实现的,我们进入到FutureTask类的Run方法实现类中去t.start();workerStarted=true;}}}finally{if(!workerStarted)addWorkerFailed(w);}returnworkerStarted;}
publicvoidrun(){if(state!=NEW||!UNSAFE.compareAndSwapObject(this,runnerOffset,null,Thread.currentThread()))return;try{Callable<V>c=callable;if(c!=null&&state==NEW){Vresult;booleanran;try{
//这行代码调用了Callable接口实现类的call方法。得到的返回值会返回给一个result变量
		//下面的代码将这个变量设置到FutureTask对象的成员变量outcome上。result=c.call();ran=true;}catch(Throwableex){result=null;ran=false;setException(ex);}if(ran)
//就是这行代码将Callable接口的call方法的执行结果设置到FutureTask类对象的outcome成员变量上
		//我们可以点进入看一下是如何实现的set(result);}}finally{//runnermustbenon-nulluntilstateissettledto//preventconcurrentcallstorun()runner=null;//statemustbere-readafternullingrunnertoprevent//leakedinterruptsints=state;if(s>=INTERRUPTING)handlePossibleCancellationInterrupt(s);}}
protectedvoidset(Vv){if(UNSAFE.compareAndSwapInt(this,stateOffset,NEW,COMPLETING)){
	//将Callable接口的call方法的计算结果放置到outcome变量上。outcome=v;UNSAFE.putOrderedInt(this,stateOffset,NORMAL);//finalstatefinishCompletion();}}
到这里我们可以知道,执行完的结果是保存在FutureTask类对象的outcome成员变量上的。
我们再去看下Future<T>接口定义的get方法
/***@throwsCancellationException{@inheritDoc}*/publicVget()throwsInterruptedException,ExecutionException{ints=state;if(s<=COMPLETING)s=awaitDone(false,0L);
	//发现这里有个report方法,点进去看一下returnreport(s);}
@SuppressWarnings("unchecked")privateVreport(ints)throwsExecutionException{
//我们可以发现果然拿的是outcome成员变量的值Objectx=outcome;if(s==NORMAL)return(V)x;if(s>=CANCELLED)thrownewCancellationException();thrownewExecutionException((Throwable)x);}
到此文章的结束了

                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息