quartz2.2源码分析3-线程模型
2016-05-13 00:00
513 查看
摘要: quartz2有一套机制来对用户添加的任务进行调度,它是通过调度线程和执行线程来实现这套机制的
quartz用一个线程不断轮询查找下次待执行的任务,并把任务交给线程池执行,这里涉及两种角色:调度线程和执行线程池
1. 判断线程池是否有空闲线程
ThreadPool有两个内置实现SimpleThreadPool、ZeroSizeThreadPool, ZeroSizeThreadPool 是个空实现,主要看 SimpleThreadPool , SimpleThreadPool 里面分可用线程集合和正在使用线程集合,看它的blockForAvailableThreads方法:
2. 获取jobtore中下次触发的triggers
取到triggers之后会判断第一条是否到执行时,如果没有会一直循环等待,等待的过程中会根据信号判断是否有必要释放当前触发器重新调度,所以这里有两点要注意:
为什么triggers集合的第一个就是最早需要被执行的?因为这个跟JobStore中存放triggers的数据结构有关,它是用TreeSet存放的,后续JobStore具体分析
这个调度信号是怎么判断的? 信号处理具体代码如下:
3. 把Trigger和JobDetail封装一下,生成执行任务shell(shellFactory获取的)
JobStore中的triggersFired(triggers);方法会把trigger和其对应的JobStore封装到TriggerFiredBundle中,并把 TriggerFiredBundle 对象传给TriggerFiredResult,
后续的qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);会创建一个JobRunShell对象,shell.initialize(qs);方法里面就会创建我们自定义的Job对象了,并把数据传给JobExecutionContextImpl,这个就是Job的execute方法参数。
4. 把任务放到线程池中执行
这里就可以看出执行任务的是WorkerThread线程,看run方法:
把任务传给WorkerThread,它会调用任务的run方法,这里即是调用JobRunShell的run方法, JobRunShell 的run方法最终就是调自定义Job的execute方法。
这边可以扩展,把线程池作为woker放到不同进程上去做分布式 。
调度任务都是调用JobStore的方法,后续分析JobStore的实现。
quartz用一个线程不断轮询查找下次待执行的任务,并把任务交给线程池执行,这里涉及两种角色:调度线程和执行线程池
调度线程
看线程QuartzSchedulerThread的run()方法以while(true)的方式循环执行,以下为run()方法中的核心代码,主要分以下四步:1. 判断线程池是否有空闲线程
[code=plain]//获取可用线程数 qsRsrcs是QuartzSchedulerResources对象 int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
ThreadPool有两个内置实现SimpleThreadPool、ZeroSizeThreadPool, ZeroSizeThreadPool 是个空实现,主要看 SimpleThreadPool , SimpleThreadPool 里面分可用线程集合和正在使用线程集合,看它的blockForAvailableThreads方法:
[code=plain]//获取可用线程个数 public int blockForAvailableThreads() { synchronized (nextRunnableLock) { while ((availWorkers.size() < 1 || handoffPending) && !isShutdown) { try { nextRunnableLock.wait(500); } catch (InterruptedException ignore) { } } //返回可用线程集合的大小 return availWorkers.size(); } }
2. 获取jobtore中下次触发的triggers
[code=plain]if (availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads... List<OperableTrigger> triggers = null; long now = System.currentTimeMillis(); //清除调度改变的信号 clearSignaledSchedulingChange(); try { //到JobStore中获取下次被触发的触发器 triggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); lastAcquireFailed = false; if (log.isDebugEnabled()) log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers"); } catch (JobPersistenceException jpe) { if (!lastAcquireFailed) { qs.notifySchedulerListenersError( "An error occurred while scanning for the next triggers to fire.", jpe); } lastAcquireFailed = true; continue; } catch (RuntimeException e) { if (!lastAcquireFailed) { getLog().error("quartzSchedulerThreadLoop: RuntimeException " + e.getMessage(), e); } lastAcquireFailed = true; continue; } if (triggers != null && !triggers.isEmpty()) { now = System.currentTimeMillis(); //这里为什么triggers的第一个对象就是最早需要被执行的? long triggerTime = triggers.get(0).getNextFireTime().getTime(); long timeUntilTrigger = triggerTime - now; //如果第一条下次触发时间大于当前时间则进入等待 while (timeUntilTrigger > 2) { synchronized (sigLock) { if (halted.get()) { break; } if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) { try { now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; if (timeUntilTrigger >= 1) sigLock.wait(timeUntilTrigger); } catch (InterruptedException ignore) { } } } //等待的过程中看看有没有收到调度信号 if (releaseIfScheduleChangedSignificantly(triggers, triggerTime)) { break; } now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; } // this happens if relea 3ff0 seIfScheduleChangedSignificantly // decided to release triggers //这个可能在前面等待的时候被清理掉了 if (triggers.isEmpty()) continue;
取到triggers之后会判断第一条是否到执行时,如果没有会一直循环等待,等待的过程中会根据信号判断是否有必要释放当前触发器重新调度,所以这里有两点要注意:
为什么triggers集合的第一个就是最早需要被执行的?因为这个跟JobStore中存放triggers的数据结构有关,它是用TreeSet存放的,后续JobStore具体分析
这个调度信号是怎么判断的? 信号处理具体代码如下:
[code=plain]private boolean releaseIfScheduleChangedSignificantly(List<OperableTrigger> triggers, long triggerTime) { if (isCandidateNewTimeEarlierWithinReason(triggerTime, true)) { // above call does a clearSignaledSchedulingChange() //释放触发器 for (OperableTrigger trigger : triggers) { qsRsrcs.getJobStore().releaseAcquiredTrigger(trigger); } triggers.clear(); return true; } return false; } private boolean isCandidateNewTimeEarlierWithinReason(long oldTime, boolean clearSignal) { synchronized (sigLock) { //看信号是否改变 if (!isScheduleChanged()) return false; boolean earlier = false; //如果信号发生改变了,则比较信号下次触发时间 if (getSignaledNextFireTime() == 0) earlier = true; else if (getSignaledNextFireTime() < oldTime) earlier = true; //下次触发时间可能比较早,但是如果本来执行时间快到了这个信号也不算 if (earlier) { // so the new time is considered earlier, but is it enough // earlier? long diff = oldTime - System.currentTimeMillis(); if (diff < (qsRsrcs.getJobStore().supportsPersistence() ? 70L : 7L)) earlier = false; } //把本次的信号清除掉 if (clearSignal) { clearSignaledSchedulingChange(); } return earlier; } }
3. 把Trigger和JobDetail封装一下,生成执行任务shell(shellFactory获取的)
[code=plain]//TriggerFiredResult-->TriggerFiredBundle-->(job, trigger, 一堆time) List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>(); boolean goAhead = true; synchronized (sigLock) { goAhead = !halted.get(); } if (goAhead) { try { // 通知jobStore开始执行了,根据trigger获取其对应的JobDetail, 封装成TriggerFiredResult List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers); if (res != null) bndles = res; } catch (SchedulerException se) { qs.notifySchedulerListenersError( "An error occurred while firing triggers '" + triggers + "'", se); // QTZ-179 : a problem occurred interacting with // the triggers from the db // we release them and loop again for (int i = 0; i < triggers.size(); i++) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); } continue; } } for (int i = 0; i < bndles.size(); i++) { TriggerFiredResult result = bndles.get(i); TriggerFiredBundle bndle = result.getTriggerFiredBundle(); Exception exception = result.getException(); if (exception instanceof RuntimeException) { getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception); qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } // it's possible to get 'null' if the triggers was // paused, // blocked, or other similar occurrences that // prevent it being // fired at this time... or if the scheduler was // shutdown (halted) if (bndle == null) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } // 下面是开始执行任务 JobRunShell shell = null; try { //构造执行对象,JobRunShell实现了Runnable shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); //这个里面会用我们自定义的Job来new一个对象,并把相关执行Job是需要的数据传给JobExecutionContextImpl(这是我们自定义job的execute方法参数) shell.initialize(qs); } catch (SchedulerException se) { qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); continue; }
JobStore中的triggersFired(triggers);方法会把trigger和其对应的JobStore封装到TriggerFiredBundle中,并把 TriggerFiredBundle 对象传给TriggerFiredResult,
后续的qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);会创建一个JobRunShell对象,shell.initialize(qs);方法里面就会创建我们自定义的Job对象了,并把数据传给JobExecutionContextImpl,这个就是Job的execute方法参数。
4. 把任务放到线程池中执行
[code=plain]// 这里是把任务放入到线程池中 if (qsRsrcs.getThreadPool().runInThread(shell) == false) { // this case should never happen, as it is // indicative of the // scheduler being shutdown or a bug in the // thread pool or // a thread pool being used concurrently - which // the docs // say not to do... getLog().error("ThreadPool.runInThread() return false!"); //放到线程池失败后,通知jobStore完成 qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); }
执行线程池
接着上面第4步,把 JobRunShell 放到了线程池中,看SimpleThreadPool的runInThread(Runnable runnable)方法逻辑很简单,到可用线程集合中取一个线程执行任务并放到正在使用线程集合中,如果线程成被shutdown了则创建一个额外的线程,如下:[code=plain]private List<WorkerThread> workers; private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>(); private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>(); public boolean runInThread(Runnable runnable) { if (runnable == null) { return false; } synchronized (nextRunnableLock) { handoffPending = true; // Wait until a worker thread is available while ((availWorkers.size() < 1) && !isShutdown) { try { nextRunnableLock.wait(500); } catch (InterruptedException ignore) { } } if (!isShutdown) { //到可用线程集合里取出第一个 WorkerThread wt = (WorkerThread) availWorkers.removeFirst(); //放到该正在使用线程池中 busyWorkers.add(wt); //运行任务 wt.run(runnable); } else { // If the thread pool is going down, execute the Runnable // within a new additional worker thread (no thread from the // pool). //如果线程池关闭了则创建一个额外的线程 WorkerThread wt = new WorkerThread(this, threadGroup, "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable); busyWorkers.add(wt); workers.add(wt); wt.start(); } nextRunnableLock.notifyAll(); handoffPending = false; } return true; }
这里就可以看出执行任务的是WorkerThread线程,看run方法:
[code=plain]//添加任务 public void run(Runnable newRunnable) { synchronized (lock) { if (runnable != null) { throw new IllegalStateException("Already running a Runnable!"); } runnable = newRunnable; lock.notifyAll(); } } /** * <p> * Loop, executing targets as they are received. * </p> */ @Override public void run() { boolean ran = false; while (run.get()) { try { synchronized (lock) { while (runnable == null && run.get()) { lock.wait(500); } if (runnable != null) { ran = true; //调用任务的run方法 runnable.run(); } } } ..... } }
把任务传给WorkerThread,它会调用任务的run方法,这里即是调用JobRunShell的run方法, JobRunShell 的run方法最终就是调自定义Job的execute方法。
这边可以扩展,把线程池作为woker放到不同进程上去做分布式 。
调度任务都是调用JobStore的方法,后续分析JobStore的实现。
相关文章推荐
- java技术路线
- Cocos2d-JS Hello world(新人贴,大神请指教或者绕路)
- 【整理贴】企业网站系统大全,拿好不谢
- 建企业网站之前必须要准备些什么?
- Mysql5.7支持Json数据类型
- ActionBar之Share的用法
- Windows环境下安装Cassandra1.0.2
- H5 页面适配所有 iPhone 和安卓机型的六个技巧
- centos7常用命令
- setTimeout() ie8以下 参数无效之解决方法
- Linux下Beego怎么部署在后台运行
- Beego如何在Linux下执行shell
- Beego下如何使用captcha生成验证码
- Android中文API:Android 如何保持屏幕长亮?
- sqlserver去重复数据
- Python生成图片验证码
- struts2中关于传值问题
- json+servlet
- json+struts2
- springMvc小案例