您的位置:首页 > 其它

Quartz原理分析

2015-08-12 13:04 239 查看
quartz简介

quartz是常用的作业调度框架,常用于定时任务等场景。如定时获取api数据,每周生成数据报表等等。

说明

很多项目中使用到quartz,但一直没有研究过quartz的原理,最近看了源码,分享一下心得体会。

本版为当前最新的2.2.1

简单的demo

QuartzTest.java

``
package demo.quartz;

import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerFactory;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;

public class QuartzTest {

public static void main(String[] args) throws Exception {
JobDetail jobDetail=JobBuilder.newJob(SimpleJob.class).withIdentity("myjob", "job-group").build();
CronTrigger cronTrigger=TriggerBuilder.newTrigger().withIdentity("cronTrigger", "trigger-group").withSchedule(CronScheduleBuilder.cronSchedule("0/10* * * * ?")).build();
SchedulerFactory sFactory=new StdSchedulerFactory();
Scheduler scheduler=sFactory.getScheduler();
scheduler.scheduleJob(jobDetail, cronTrigger);
scheduler.start();
}
}
SimpleJob.java
package demo.quartz;

import java.util.Date;

import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

public class SimpleJob implements Job{

@Override
public void execute(JobExecutionContext arg0)
throws JobExecutionException {
System.out.println("execute job at "+new Date());
}

}
//**********执行结果开始:

execute job at Wed Aug 12 12:45:30 CST 2015

execute job at Wed Aug 12 12:45:40 CST 2015

execute job at Wed Aug 12 12:45:50 CST 2015

//**********执行结果结束

每隔10秒钟执行一次job。

那么quartz是如何做到在我们指定的时间触发任务?

方案1:
一定时间间隔(比如0.5秒)不断的轮询,轮询的内容为是否到达指定的触发时间,如果是的话,就执行job,否的话继续轮询。

伪代码:
while(true){
Date now = new Date();
if(now==指定的时间){//如果到达指定时间则执行job
执行job...
}
Thread.sleep(500);//sleep500ms,释放cpu
}
方案2:
计算出job下次的执行时间,比如是在2015-08-12 12:00:00。而当前时间是在2015-08-12 10:00:00。也就是2小时(3600000ms)之后,会执行job。那么我们没必要,频繁地判断是否达到指定的执行时间,直接等待3600000ms,在执行job,就ok。

方案1,需要频繁地判断,过多的占用资源,而且是否到达指定时间,容易出现判断错误,重复执行等情况,所以方案二更好,quartz也是采用方案二。

quartz相关源码分析:
1.
@Override
public void run() {
boolean lastAcquireFailed = false;
while (!halted.get()) {//如果任务没有被设置成停止状态,则一直轮询执行。
System.out.println(this.getId());
try {
// check if we're supposed to pause...
synchronized (sigLock) {
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
}

if (halted.get()) {
break;
}
}

int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...

List<OperableTrigger> triggers = null;

long now = System.currentTimeMillis();

clearSignaledSchedulingChange();
try {
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
lastAcquireFailed = false;
if (log.isDebugEnabled())
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) {
if(!lastAcquireFailed) {
qs.notifySchedulerListenersError(
"An error occurred while scanning for the next triggers to fire.",
jpe);
}
lastAcquireFailed = true;
continue;
} catch (RuntimeException e) {
if(!lastAcquireFailed) {
getLog().error("quartzSchedulerThreadLoop: RuntimeException "
+e.getMessage(), e);
}
lastAcquireFailed = true;
continue;
}

if (triggers != null && !triggers.isEmpty()) {

now = System.currentTimeMillis();
long triggerTime = triggers.get(0).getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
while(timeUntilTrigger > 2) {
synchronized (sigLock) {
if (halted.get()) {
break;
}
if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
try {
// we could have blocked a long while
// on 'synchronize', so we must recompute
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;//job下次触发的时间-当前时间
if(timeUntilTrigger >= 1)
sigLock.wait(timeUntilTrigger);//等待(<span style="font-family: Arial, Helvetica, sans-serif;">job下次触发的时间-当前时间</span>),则线程苏醒时刚好到达了,job指定的触发时间

} catch (InterruptedException ignore) {
}
}
}
if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
break;
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
}

// this happens if releaseIfScheduleChangedSignificantly decided to release triggers
if(triggers.isEmpty())
continue;

// set triggers to 'executing'
List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();

boolean goAhead = true;
synchronized(sigLock) {
goAhead = !halted.get();
}
if(goAhead) {
try {
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if(res != null)
bndles = res;
} catch (SchedulerException se) {
qs.notifySchedulerListenersError(
"An error occurred while firing triggers '"
+ triggers + "'", se);
//QTZ-179 : a problem occurred interacting with the triggers from the db
//we release them and loop again
for (int i = 0; i < triggers.size(); i++) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
}
continue;
}

}

for (int i = 0; i < bndles.size(); i++) {
TriggerFiredResult result =  bndles.get(i);
TriggerFiredBundle bndle =  result.getTriggerFiredBundle();
Exception exception = result.getException();

if (exception instanceof RuntimeException) {
getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}

// it's possible to get 'null' if the triggers was paused,
// blocked, or other similar occurrences that prevent it being
// fired at this time...  or if the scheduler was shutdown (halted)
if (bndle == null) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}

JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}

if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}

}

continue; // while (!halted)
}
} else { // if(availThreadCount > 0)
// should never happen, if threadPool.blockForAvailableThreads() follows contract
continue; // while (!halted)
}

long now = System.currentTimeMillis();
long waitTime = now + getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now;
synchronized(sigLock) {
try {
if(!halted.get()) {
// QTZ-336 A job might have been completed in the mean time and we might have
// missed the scheduled changed signal by not waiting for the notify() yet
// Check that before waiting for too long in case this very job needs to be
// scheduled very soon
if (!isScheduleChanged()) {
sigLock.wait(timeUntilContinue);
}
}
} catch (InterruptedException ignore) {
}
}

} catch(RuntimeException re) {
getLog().error("Runtime error occurred in main trigger firing loop.", re);
}
} // while (!halted)

// drop references to scheduler stuff to aid garbage collection...
qs = null;
qsRsrcs = null;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: