Quartz原理分析
2015-08-12 13:04
239 查看
quartz简介
quartz是常用的作业调度框架,常用于定时任务等场景。如定时获取api数据,每周生成数据报表等等。
说明
很多项目中使用到quartz,但一直没有研究过quartz的原理,最近看了源码,分享一下心得体会。
本版为当前最新的2.2.1
简单的demo
QuartzTest.java
execute job at Wed Aug 12 12:45:30 CST 2015
execute job at Wed Aug 12 12:45:40 CST 2015
execute job at Wed Aug 12 12:45:50 CST 2015
//**********执行结果结束
每隔10秒钟执行一次job。
那么quartz是如何做到在我们指定的时间触发任务?
方案1:
一定时间间隔(比如0.5秒)不断的轮询,轮询的内容为是否到达指定的触发时间,如果是的话,就执行job,否的话继续轮询。
伪代码:
计算出job下次的执行时间,比如是在2015-08-12 12:00:00。而当前时间是在2015-08-12 10:00:00。也就是2小时(3600000ms)之后,会执行job。那么我们没必要,频繁地判断是否达到指定的执行时间,直接等待3600000ms,在执行job,就ok。
方案1,需要频繁地判断,过多的占用资源,而且是否到达指定时间,容易出现判断错误,重复执行等情况,所以方案二更好,quartz也是采用方案二。
quartz相关源码分析:
1.
quartz是常用的作业调度框架,常用于定时任务等场景。如定时获取api数据,每周生成数据报表等等。
说明
很多项目中使用到quartz,但一直没有研究过quartz的原理,最近看了源码,分享一下心得体会。
本版为当前最新的2.2.1
简单的demo
QuartzTest.java
`` package demo.quartz; import org.quartz.CronScheduleBuilder; import org.quartz.CronTrigger; import org.quartz.JobBuilder; import org.quartz.JobDetail; import org.quartz.Scheduler; import org.quartz.SchedulerFactory; import org.quartz.TriggerBuilder; import org.quartz.impl.StdSchedulerFactory; public class QuartzTest { public static void main(String[] args) throws Exception { JobDetail jobDetail=JobBuilder.newJob(SimpleJob.class).withIdentity("myjob", "job-group").build(); CronTrigger cronTrigger=TriggerBuilder.newTrigger().withIdentity("cronTrigger", "trigger-group").withSchedule(CronScheduleBuilder.cronSchedule("0/10* * * * ?")).build(); SchedulerFactory sFactory=new StdSchedulerFactory(); Scheduler scheduler=sFactory.getScheduler(); scheduler.scheduleJob(jobDetail, cronTrigger); scheduler.start(); } }SimpleJob.java
package demo.quartz; import java.util.Date; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; public class SimpleJob implements Job{ @Override public void execute(JobExecutionContext arg0) throws JobExecutionException { System.out.println("execute job at "+new Date()); } }//**********执行结果开始:
execute job at Wed Aug 12 12:45:30 CST 2015
execute job at Wed Aug 12 12:45:40 CST 2015
execute job at Wed Aug 12 12:45:50 CST 2015
//**********执行结果结束
每隔10秒钟执行一次job。
那么quartz是如何做到在我们指定的时间触发任务?
方案1:
一定时间间隔(比如0.5秒)不断的轮询,轮询的内容为是否到达指定的触发时间,如果是的话,就执行job,否的话继续轮询。
伪代码:
while(true){ Date now = new Date(); if(now==指定的时间){//如果到达指定时间则执行job 执行job... } Thread.sleep(500);//sleep500ms,释放cpu }方案2:
计算出job下次的执行时间,比如是在2015-08-12 12:00:00。而当前时间是在2015-08-12 10:00:00。也就是2小时(3600000ms)之后,会执行job。那么我们没必要,频繁地判断是否达到指定的执行时间,直接等待3600000ms,在执行job,就ok。
方案1,需要频繁地判断,过多的占用资源,而且是否到达指定时间,容易出现判断错误,重复执行等情况,所以方案二更好,quartz也是采用方案二。
quartz相关源码分析:
1.
@Override public void run() { boolean lastAcquireFailed = false; while (!halted.get()) {//如果任务没有被设置成停止状态,则一直轮询执行。 System.out.println(this.getId()); try { // check if we're supposed to pause... synchronized (sigLock) { while (paused && !halted.get()) { try { // wait until togglePause(false) is called... sigLock.wait(1000L); } catch (InterruptedException ignore) { } } if (halted.get()) { break; } } int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads(); if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads... List<OperableTrigger> triggers = null; long now = System.currentTimeMillis(); clearSignaledSchedulingChange(); try { triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); lastAcquireFailed = false; if (log.isDebugEnabled()) log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers"); } catch (JobPersistenceException jpe) { if(!lastAcquireFailed) { qs.notifySchedulerListenersError( "An error occurred while scanning for the next triggers to fire.", jpe); } lastAcquireFailed = true; continue; } catch (RuntimeException e) { if(!lastAcquireFailed) { getLog().error("quartzSchedulerThreadLoop: RuntimeException " +e.getMessage(), e); } lastAcquireFailed = true; continue; } if (triggers != null && !triggers.isEmpty()) { now = System.currentTimeMillis(); long triggerTime = triggers.get(0).getNextFireTime().getTime(); long timeUntilTrigger = triggerTime - now; while(timeUntilTrigger > 2) { synchronized (sigLock) { if (halted.get()) { break; } if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) { try { // we could have blocked a long while // on 'synchronize', so we must recompute now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now;//job下次触发的时间-当前时间 if(timeUntilTrigger >= 1) sigLock.wait(timeUntilTrigger);//等待(<span style="font-family: Arial, Helvetica, sans-serif;">job下次触发的时间-当前时间</span>),则线程苏醒时刚好到达了,job指定的触发时间 } catch (InterruptedException ignore) { } } } if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) { break; } now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; } // this happens if releaseIfScheduleChangedSignificantly decided to release triggers if(triggers.isEmpty()) continue; // set triggers to 'executing' List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>(); boolean goAhead = true; synchronized(sigLock) { goAhead = !halted.get(); } if(goAhead) { try { List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers); if(res != null) bndles = res; } catch (SchedulerException se) { qs.notifySchedulerListenersError( "An error occurred while firing triggers '" + triggers + "'", se); //QTZ-179 : a problem occurred interacting with the triggers from the db //we release them and loop again for (int i = 0; i < triggers.size(); i++) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); } continue; } } for (int i = 0; i < bndles.size(); i++) { TriggerFiredResult result = bndles.get(i); TriggerFiredBundle bndle = result.getTriggerFiredBundle(); Exception exception = result.getException(); if (exception instanceof RuntimeException) { getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception); qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } // it's possible to get 'null' if the triggers was paused, // blocked, or other similar occurrences that prevent it being // fired at this time... or if the scheduler was shutdown (halted) if (bndle == null) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } JobRunShell shell = null; try { shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); shell.initialize(qs); } catch (SchedulerException se) { qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); continue; } if (qsRsrcs.getThreadPool().runInThread(shell) == false) { // this case should never happen, as it is indicative of the // scheduler being shutdown or a bug in the thread pool or // a thread pool being used concurrently - which the docs // say not to do... getLog().error("ThreadPool.runInThread() return false!"); qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); } } continue; // while (!halted) } } else { // if(availThreadCount > 0) // should never happen, if threadPool.blockForAvailableThreads() follows contract continue; // while (!halted) } long now = System.currentTimeMillis(); long waitTime = now + getRandomizedIdleWaitTime(); long timeUntilContinue = waitTime - now; synchronized(sigLock) { try { if(!halted.get()) { // QTZ-336 A job might have been completed in the mean time and we might have // missed the scheduled changed signal by not waiting for the notify() yet // Check that before waiting for too long in case this very job needs to be // scheduled very soon if (!isScheduleChanged()) { sigLock.wait(timeUntilContinue); } } } catch (InterruptedException ignore) { } } } catch(RuntimeException re) { getLog().error("Runtime error occurred in main trigger firing loop.", re); } } // while (!halted) // drop references to scheduler stuff to aid garbage collection... qs = null; qsRsrcs = null; }
相关文章推荐
- Windows 回调监控 <一>
- 通过若干个 Demo 进行讲解 Auto Layout
- Xcode配置SVN详细步骤
- 绘制一个简单的实现接口盘
- this compilation unit is not on the build of a java project
- va_list、va_start、va_end 宏用法小结
- jQuery(4)JQuery框架操作元素的属性与样式
- memset函数详解
- HDU5366 The mook jong (DP)
- ORACLE学习之scott用户的表结构sql
- IOS应用开发版本控制工具之Versions使用,iosversions
- leetcode 103: Binary Tree Zigzag Level Order Traversal
- TCL: LIST命令-lsearch, lsort, lrange
- Struts2 OGNL中value stack contents 和 stack contex解释简化版
- LNMP--Nginx代理详解
- STM32 GPIO 配置之ODR, BSRR, BRR 详解
- 使用localResizeIMG3+WebAPI实现手机端图片上传
- 生产者消费者模型实现<二>真实实现BlockingQueue
- Dynamic Adaptive Streaming ove HTTP(DASH) Design Principles and Standards
- HDU 4917 Permutation