您的位置:首页 > 编程语言 > Java开发

Spring中ThreadPoolTaskExecutor的线程调度及问题

2016-12-23 22:47 519 查看
问题现象

原因分析

任务调度逻辑

汇总分析

解决方案

问题现象

在我们的系统中,使用了这样的配置来开启异步操作:spring配置
<
task:annotation-driven
executor
=
"executor"

scheduler
=
"scheduler"
/>

<
task:executor
id
=
"executor"
pool-size
=
"16-128"

keep-alive
=
"60"
rejection-policy
=
"CALLER_RUNS"
queue-capacity
=
"1000"
/>

客户端开启异步代码
@Async
()

public
Future<Result4Calculate>calculateByLendId(int
did){

//标记1

//调用REST服务;监控调用时间。

}

获取Future后的处理
try
{

keplerOverdue=summay4Overdue.get(
5
,TimeUnit.SECONDS);

//后续处理

}
catch
(Exceptione){

//标记2

//异常报警

}

然而在这种配置下,客户端在标记1处监控到的调用时间普遍在4s以内(平均时间不到1s,个别峰值请求会突破5s,全天超过5s的请求不到10个)。然而,在标记2处捕获到的超时异常却非常多(一天接近700+)。问题出在哪儿?

原因分析

上述问题相关代码的调用时序如下图所示。https://www.processon.com/view/link/585d381ee4b02e6c0ac86d66


其中,restclient与restserver间的交互时间可以明确监控到,用时超过5s的非常少。但是,get方法却经常抛出超时异常。经过初步分析,问题出现在ThreadPoolTaskExecutor的任务调度过程中。

任务调度逻辑

使用<task:executor>注解得到的bean是ThreadPoolTaskExecutor的实例。这个类本身并不做调度,而是将调度工作委托给了ThreadPoolExecutor。后者的任务调度代码如下:ThreadPoolExecutor任务调度代码
/**

*Executesthegiventasksometimeinthefuture.Thetask

*mayexecuteinanewthreadorinanexistingpooledthread.

*

*Ifthetaskcannotbesubmittedforexecution,eitherbecausethis

*executorhasbeenshutdownorbecauseitscapacityhasbeenreached,

*thetaskishandledbythecurrent{@codeRejectedExecutionHandler}.

*

*@paramcommandthetasktoexecute

*@throwsRejectedExecutionExceptionatdiscretionof

*{@codeRejectedExecutionHandler},ifthetask

*cannotbeacceptedforexecution

*@throwsNullPointerExceptionif{@codecommand}isnull

*/

public
void
execute(Runnablecommand){

if
(command==
null
)

throw
new
NullPointerException();

/*

*Proceedin3steps:

*

*1.IffewerthancorePoolSizethreadsarerunning,tryto

*startanewthreadwiththegivencommandasitsfirst

*task.ThecalltoaddWorkeratomicallychecksrunStateand

*workerCount,andsopreventsfalsealarmsthatwouldadd

*threadswhenitshouldn't,byreturningfalse.

*

*2.Ifataskcanbesuccessfullyqueued,thenwestillneed

*todouble-checkwhetherweshouldhaveaddedathread

*(becauseexistingonesdiedsincelastchecking)orthat

*thepoolshutdownsinceentryintothismethod.Sowe

*recheckstateandifnecessaryrollbacktheenqueuingif

*stopped,orstartanewthreadiftherearenone.

*

*3.Ifwecannotqueuetask,thenwetrytoaddanew

*thread.Ifitfails,weknowweareshutdownorsaturated

*andsorejectthetask.

*/

int
c=ctl.get();

if
(workerCountOf(c)<corePoolSize){

if
(addWorker(command,
true
))

return
;

c=ctl.get();

}

if
(isRunning(c)&&workQueue.offer(command)){

int
recheck=ctl.get();

if
(!isRunning(recheck)&&remove(command))

reject(command);

else
if
(workerCountOf(recheck)==
0
)

addWorker(
null
,
false
);

}

else
if
(!addWorker(command,
false
))

reject(command);

}

通过其中的注释,我们可以知道它的核心调度逻辑如下(省略了一些检查等方法):如果正在运行的线程数量小于corePoolSize(最小线程数),则尝试启动一个新线程,并将当入参command作为该线程的第一个task。否则进入步骤二。

如果没有按步骤1执行,那么尝试把入参command放入workQueue中。如果能成功入队,做后续处理;否则,进入步骤三。

如果没有按步骤2执行,那么将尝试创建一个新线程,然后做后续处理。

简单的说,当向ThreadPoolExecutor提交一个任务时,它会优先交给线程池中的现有线程;如果暂时没有可用的线程,那么它会将任务放到队列中;一般只有在队列满了的时候(导致无法成功入队),才会创建新线程来处理队列中的任务。顺带一说,任务入队后,在某些条件下也会创建新线程。但新线程不会立即执行当前任务,而是从队列中获取一个任务并开始执行。

汇总分析

综上所述,我们可以确定以下信息:根据系统配置,ThreadPoolExecutor中的corePoolSize=16。

当并发数超过16时,ThreadPoolExecutor会按照步骤二进行任务调度,即把任务放入队列中,但没有及时创建新线程来执行这个任务

这一点是推测。但同时,通过日志中的线程名称确认的线程池内线程数量没有增长。日志中,异步线程的id从executor-1、executor-2一直到executor-16,但17及以上的都没有出现过。

队列中的任务出现积压、时间累积,导致某一个任务超时后,后续大量任务都超时。但是超时并没有阻止任务执行;任务仍然会继续通过restclient调用restserver,并被监控代码记录下时间。
任务在队列中积压、累积,是引发一天数百次异常、报警的原因。而监控代码并未监控到任务调度的时间,因此都没有出现超时。


解决方案

初步考虑方案有三:提高初始线程数。
提高步并发的初始线程数(如将16-128调整为32-128)。以此减少新任务进入队列的几率。
但是这个方案只是降低队列积压的风险,并不解决问题。

关闭队列。
将队列大小调整为0,以此保证每一个新任务都有一个新线程来执行。
这个方案的问题在于,并发压力大时,可能导致线程不够用。此时的异步调用会根据rejection-policy="CALLER_RUNS"的配置而变为同步调用。

更换线程池。
使用优先创建新线程(而非优先入队列)的线程池。
改动最大的方案。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Pool Thread Task