您的位置:首页 > 其它

quartz2.2源码分析3-线程模型

2016-05-13 00:00 513 查看
摘要: quartz2有一套机制来对用户添加的任务进行调度,它是通过调度线程和执行线程来实现这套机制的

quartz用一个线程不断轮询查找下次待执行的任务,并把任务交给线程池执行,这里涉及两种角色:调度线程和执行线程池

调度线程

看线程QuartzSchedulerThread的run()方法以while(true)的方式循环执行,以下为run()方法中的核心代码,主要分以下四步:

1. 判断线程池是否有空闲线程

[code=plain]//获取可用线程数 qsRsrcs是QuartzSchedulerResources对象
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();

ThreadPool有两个内置实现SimpleThreadPool、ZeroSizeThreadPool, ZeroSizeThreadPool 是个空实现,主要看 SimpleThreadPool , SimpleThreadPool 里面分可用线程集合和正在使用线程集合,看它的blockForAvailableThreads方法:

[code=plain]//获取可用线程个数
public int blockForAvailableThreads() {
synchronized (nextRunnableLock) {
while ((availWorkers.size() < 1 || handoffPending) && !isShutdown) {
try {
nextRunnableLock.wait(500);
} catch (InterruptedException ignore) {
}
}
//返回可用线程集合的大小
return availWorkers.size();
}
}

2. 获取jobtore中下次触发的triggers

[code=plain]if (availThreadCount > 0) { // will always be true, due to  semantics of blockForAvailableThreads...
List<OperableTrigger> triggers = null;
long now = System.currentTimeMillis();

//清除调度改变的信号
clearSignaledSchedulingChange();
try {
//到JobStore中获取下次被触发的触发器
triggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime,
Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
lastAcquireFailed = false;
if (log.isDebugEnabled())
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) {
if (!lastAcquireFailed) {
qs.notifySchedulerListenersError(
"An error occurred while scanning for the next triggers to fire.", jpe);
}
lastAcquireFailed = true;
continue;
} catch (RuntimeException e) {
if (!lastAcquireFailed) {
getLog().error("quartzSchedulerThreadLoop: RuntimeException " + e.getMessage(), e);
}
lastAcquireFailed = true;
continue;
}

if (triggers != null && !triggers.isEmpty()) {
now = System.currentTimeMillis();
//这里为什么triggers的第一个对象就是最早需要被执行的?
long triggerTime = triggers.get(0).getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
//如果第一条下次触发时间大于当前时间则进入等待
while (timeUntilTrigger > 2) {
synchronized (sigLock) {
if (halted.get()) {
break;
}
if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
try {
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
if (timeUntilTrigger >= 1)
sigLock.wait(timeUntilTrigger);
} catch (InterruptedException ignore) {
}
}
}
//等待的过程中看看有没有收到调度信号
if (releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
break;
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
}

// this happens if relea
3ff0
seIfScheduleChangedSignificantly
// decided to release triggers
//这个可能在前面等待的时候被清理掉了
if (triggers.isEmpty())
continue;

取到triggers之后会判断第一条是否到执行时,如果没有会一直循环等待,等待的过程中会根据信号判断是否有必要释放当前触发器重新调度,所以这里有两点要注意:

为什么triggers集合的第一个就是最早需要被执行的?因为这个跟JobStore中存放triggers的数据结构有关,它是用TreeSet存放的,后续JobStore具体分析

这个调度信号是怎么判断的? 信号处理具体代码如下:

[code=plain]private boolean releaseIfScheduleChangedSignificantly(List<OperableTrigger> triggers, long triggerTime) {
if (isCandidateNewTimeEarlierWithinReason(triggerTime, true)) {
// above call does a clearSignaledSchedulingChange()
//释放触发器
for (OperableTrigger trigger : triggers) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(trigger);
}
triggers.clear();
return true;
}
return false;
}
private boolean isCandidateNewTimeEarlierWithinReason(long oldTime, boolean clearSignal) {
synchronized (sigLock) {

//看信号是否改变
if (!isScheduleChanged())
return false;

boolean earlier = false;

//如果信号发生改变了,则比较信号下次触发时间
if (getSignaledNextFireTime() == 0)
earlier = true;
else if (getSignaledNextFireTime() < oldTime)
earlier = true;

//下次触发时间可能比较早,但是如果本来执行时间快到了这个信号也不算
if (earlier) {
// so the new time is considered earlier, but is it enough
// earlier?
long diff = oldTime - System.currentTimeMillis();
if (diff < (qsRsrcs.getJobStore().supportsPersistence() ? 70L : 7L))
earlier = false;
}

//把本次的信号清除掉
if (clearSignal) {
clearSignaledSchedulingChange();
}

return earlier;
}
}

3. 把Trigger和JobDetail封装一下,生成执行任务shell(shellFactory获取的)

[code=plain]//TriggerFiredResult-->TriggerFiredBundle-->(job, trigger, 一堆time)
List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();

boolean goAhead = true;
synchronized (sigLock) {
goAhead = !halted.get();
}
if (goAhead) {
try {
// 通知jobStore开始执行了,根据trigger获取其对应的JobDetail, 封装成TriggerFiredResult
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if (res != null)
bndles = res;
} catch (SchedulerException se) {
qs.notifySchedulerListenersError(
"An error occurred while firing triggers '" + triggers + "'", se);
// QTZ-179 : a problem occurred interacting with
// the triggers from the db
// we release them and loop again
for (int i = 0; i < triggers.size(); i++) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
}
continue;
}

}

for (int i = 0; i < bndles.size(); i++) {
TriggerFiredResult result = bndles.get(i);
TriggerFiredBundle bndle = result.getTriggerFiredBundle();
Exception exception = result.getException();

if (exception instanceof RuntimeException) {
getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}

// it's possible to get 'null' if the triggers was
// paused,
// blocked, or other similar occurrences that
// prevent it being
// fired at this time... or if the scheduler was
// shutdown (halted)
if (bndle == null) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}

// 下面是开始执行任务
JobRunShell shell = null;
try {
//构造执行对象,JobRunShell实现了Runnable
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
//这个里面会用我们自定义的Job来new一个对象,并把相关执行Job是需要的数据传给JobExecutionContextImpl(这是我们自定义job的execute方法参数)
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(),
CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}

JobStore中的triggersFired(triggers);方法会把trigger和其对应的JobStore封装到TriggerFiredBundle中,并把 TriggerFiredBundle 对象传给TriggerFiredResult,

后续的qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);会创建一个JobRunShell对象,shell.initialize(qs);方法里面就会创建我们自定义的Job对象了,并把数据传给JobExecutionContextImpl,这个就是Job的execute方法参数。

4. 把任务放到线程池中执行

[code=plain]// 这里是把任务放入到线程池中
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is
// indicative of the
// scheduler being shutdown or a bug in the
// thread pool or
// a thread pool being used concurrently - which
// the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
//放到线程池失败后,通知jobStore完成
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(),
CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}


执行线程池

接着上面第4步,把 JobRunShell 放到了线程池中,看SimpleThreadPool的runInThread(Runnable runnable)方法逻辑很简单,到可用线程集合中取一个线程执行任务并放到正在使用线程集合中,如果线程成被shutdown了则创建一个额外的线程,如下:

[code=plain]private List<WorkerThread> workers;
private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>();
private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();

public boolean runInThread(Runnable runnable) {
if (runnable == null) {
return false;
}
synchronized (nextRunnableLock) {
handoffPending = true;
// Wait until a worker thread is available
while ((availWorkers.size() < 1) && !isShutdown) {
try {
nextRunnableLock.wait(500);
} catch (InterruptedException ignore) {
}
}
if (!isShutdown) {
//到可用线程集合里取出第一个
WorkerThread wt = (WorkerThread) availWorkers.removeFirst();
//放到该正在使用线程池中
busyWorkers.add(wt);
//运行任务
wt.run(runnable);
} else {
// If the thread pool is going down, execute the Runnable
// within a new additional worker thread (no thread from the
// pool).
//如果线程池关闭了则创建一个额外的线程
WorkerThread wt = new WorkerThread(this, threadGroup, "WorkerThread-LastJob", prio,
isMakeThreadsDaemons(), runnable);
busyWorkers.add(wt);
workers.add(wt);
wt.start();
}
nextRunnableLock.notifyAll();
handoffPending = false;
}

return true;
}

这里就可以看出执行任务的是WorkerThread线程,看run方法:

[code=plain]//添加任务
public void run(Runnable newRunnable) {
synchronized (lock) {
if (runnable != null) {
throw new IllegalStateException("Already running a Runnable!");
}

runnable = newRunnable;
lock.notifyAll();
}
}
/**
* <p>
* Loop, executing targets as they are received.
* </p>
*/
@Override
public void run() {
boolean ran = false;
while (run.get()) {
try {
synchronized (lock) {
while (runnable == null && run.get()) {
lock.wait(500);
}

if (runnable != null) {
ran = true;
//调用任务的run方法
runnable.run();
}
}
}
.....
}
}

把任务传给WorkerThread,它会调用任务的run方法,这里即是调用JobRunShell的run方法, JobRunShell 的run方法最终就是调自定义Job的execute方法。

这边可以扩展,把线程池作为woker放到不同进程上去做分布式 。

调度任务都是调用JobStore的方法,后续分析JobStore的实现。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: