xxl-job源码解读之任务调度器(核心)
一.任务调度器介绍
在xxl-job中,有两个角色,一个是执行器,另一个就是调度中心了。而任务调度器可以说是调度中心的最核心,我们发往执行器的任务,都是从任务调度器发出来的(除手动执行的)。
二.原理解读
首先我们要带着这几个问题来解读原理:
1.分布式任务调度有个很重要的点,就是怎样保证一次任务调度被消费一次,也就是怎样保证幂等性?
2.是怎样实现调度的,调度流程是什么样的?
3.出现没来的急调度的是怎么办的?
接下来我们来看下源码,从源码中寻找答案:
2.1 启动
[code]@Component @DependsOn("xxlJobAdminConfig") public class XxlJobScheduler implements InitializingBean, DisposableBean { private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class); @Override public void afterPropertiesSet() throws Exception { // init i18n initI18n(); // admin registry monitor run JobRegistryMonitorHelper.getInstance().start(); // 失败监控线程 // admin monitor run JobFailMonitorHelper.getInstance().start(); // admin-server initRpcProvider(); ///核心 , 任务调度器 的启动 // start-schedule JobScheduleHelper.getInstance().start(); logger.info(">>>>>>>>> init xxl-job admin success."); } }
在conf个包中,有一个交由 spring 管理的 XxlJobScheduler类,实现 InitializingBean 接口的 afterPropertiesSet 方法。这个方法会在spring为其对象注入完参数执行。这个方法的最后一行 JobScheduleHelper.getInstance().start(); 就是任务调度器的启动。
2.2 start 方法
由于这个方法代码有点多,我们分部分讲解。
[code]// 刷 数据库的线程 // schedule thread 调度线程 scheduleThread = new Thread(new Runnable() { @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 ); } catch (InterruptedException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } logger.info(">>>>>>>>> init xxl-job admin scheduler success."); Connection conn = null; // <1> while (!scheduleThreadToStop) { // Scan Job 扫描任务 long start = System.currentTimeMillis(); PreparedStatement preparedStatement = null; boolean preReadSuc = true; try { if (conn==null || conn.isClosed()) { // 数据库连接处理 conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection(); } // 关闭事务的自动提交 conn.setAutoCommit(false); // <2> 数据库的悲观锁 , 保证数据的幂等性 preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" ); preparedStatement.execute(); // tx start // <3>、预读5s内调度任务 long nowTime = System.currentTimeMillis(); List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS); if (scheduleList!=null && scheduleList.size()>0) { // 2、推送时间轮 for (XxlJobInfo jobInfo: scheduleList) { // <4>时间轮刻度计算 if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) { // 调度时间超了5s 就放弃了 ,然后计算好下次的调度时间 // 过期超5s:本地忽略,当前时间开始计算下次触发时间 //<5> fresh next Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date()); if (nextValidTime != null) { jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime()); jobInfo.setTriggerNextTime(nextValidTime.getTime()); } else { jobInfo.setTriggerStatus(0); jobInfo.setTriggerLastTime(0); jobInfo.setTriggerNextTime(0); } } else if (nowTime > jobInfo.getTriggerNextTime()) { //<6> 过期5s内 :立即触发一次,当前时间开始计算下次触发时间; CronExpression cronExpression = new CronExpression(jobInfo.getJobCron()); long nextTime = cronExpression.getNextValidTimeAfter(new Date()).getTime(); // 1、trigger 立即触发一次 JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null); logger.debug(">>>>>>>>>>> xxl-job, shecule push trigger : jobId = " + jobInfo.getId() ); // 2、fresh next jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime()); jobInfo.setTriggerNextTime(nextTime); // <7>下次5s内:预读一次; if (jobInfo.getTriggerNextTime() - nowTime < PRE_READ_MS) { // 距离下次调度 还有不到5s 就搞到 ring 上面 // 1、make ring second int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60); // 2、push time ring pushTimeRing(ringSecond, jobInfo.getId());// 放到环中 // 3、fresh next 计算出下下次的调度时间 Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date(jobInfo.getTriggerNextTime())); if (nextValidTime != null) { jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime()); jobInfo.setTriggerNextTime(nextValidTime.getTime()); } else { jobInfo.setTriggerStatus(0); jobInfo.setTriggerLastTime(0); jobInfo.setTriggerNextTime(0); } } } else { //<8> 未过期:正常触发,递增计算下次触发时间 // 1、make ring second int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60); // 2、push time ring 没有过期 就要放到时间刻度环上 pushTimeRing(ringSecond, jobInfo.getId()); // 3、fresh next 计算出下一次 ,然后进行设置 Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date(jobInfo.getTriggerNextTime())); if (nextValidTime != null) { jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime()); jobInfo.setTriggerNextTime(nextValidTime.getTime()); } else { jobInfo.setTriggerStatus(0); jobInfo.setTriggerLastTime(0); jobInfo.setTriggerNextTime(0); } } } // 3、<9>更新trigger信息 将下次调度时间放到数据库中 for (XxlJobInfo jobInfo: scheduleList) { XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo); } } else { preReadSuc = false; } // tx stop } catch (Exception e) { if (!scheduleThreadToStop) { logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e); } } finally { // commit 提交事务 try { conn.commit(); } catch (SQLException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } // close PreparedStatement if (null != preparedStatement) { try { preparedStatement.close(); } catch (SQLException ignore) { if (!scheduleThreadToStop) { logger.error(ignore.getMessage(), ignore); } } } } long cost = System.currentTimeMillis()-start; // Wait seconds, align second if (cost < 1000) { // scan-overtime, not wait try { // pre-read period: success > scan each second; fail > skip this period; 比较闲的话就要多等点时间了 TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000); } catch (InterruptedException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } } } if (conn != null) { try { conn.close(); } catch (SQLException e) { } } logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop"); } }); scheduleThread.setDaemon(true); scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread"); scheduleThread.start();
解释下为<>包裹的序号位置处的含义:
<1>:循环执行,然后后面那几行就是检查数据库连接,没有就创建
<2>:获取数据库悲观锁来保证数据消费的幂等性,该锁能够保证当前事务提交之前,其他事务不能操作数据,也就是能够实现分布式锁的作用。
<3>:从数据库中读取最近5s内要执行的调度列表。然后判空,遍历该调度列表
<4>:如果 调度时间过去5s之后(过期5s),放弃该次调度,并计算下次要调度的时间,设置到调度信息中。
<5>:这个其实就是解释cron 表达式,获取下次执行的时间戳。
<6>:如果调度时间过去5s之内(过期5s之内的),立即执行一次,然后计算下次时间,设置到调度信息中
<7>判读下次时间调度如果在5s之内,就计算在刻度轮上位置(ring second),然后将ring second 与 任务id 设置到 ringData上面(其实就是一个map,key: ring second value: 任务id 集合),计算下下次的执行时间,设置到调度信息中
<8>:如果没有过期(也就是还差点时间才能执行),这时候也是计算刻度轮上的位置(ring second),设置到ringData里面,计算下次要执行的时间戳,设置到任务信息中。
<9>:这个就是将任务信息更新到数据库中
再看下另一个线程:
[code] // ring thread ringThread = new Thread(new Runnable() { @Override public void run() { // align second try { TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 ); } catch (InterruptedException e) { if (!ringThreadToStop) { logger.error(e.getMessage(), e); } } //<1> while (!ringThreadToStop) { try { // second data List<Integer> ringItemData = new ArrayList<>(); //<2> int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度; //<3> for (int i = 0; i < 2; i++) { List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 ); if (tmpData != null) { ringItemData.addAll(tmpData); } } // ring trigger logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) ); if (ringItemData!=null && ringItemData.size()>0) { //<4> do trigger // 进行调度 for (int jobId: ringItemData) { // do trigger JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null); } // clear ringItemData.clear(); } } catch (Exception e) { if (!ringThreadToStop) { logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e); } } // next second, align second try { TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000); } catch (InterruptedException e) { if (!ringThreadToStop) { logger.error(e.getMessage(), e); } } } logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop"); } }); ringThread.setDaemon(true); ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread"); ringThread.start();
我们再来看下这个线程是干什么用的,这个线程主要是用来扫描ringData 这个刻度轮里面数据的
解释下<> 包裹序号位置:
<1>: 也是一个循环,ringThreadToStop 是volatile修饰的,具有线程可见性。
<2>:获取当前秒 数。主要是找到ringData与之对应的秒数上面的数据
<3>:主要是获取当前秒 跟上一秒数 在ringData 上面的数据(也就是任务)。
[code]List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
(nowSecond +60-i)%60 这个就是算出当前秒位置的,可以算下 ,比如说当前秒是5 (也就是nowSecond=5)这次的是0(也就是i=0)可以算出 结果是5。在ringData 上的位置就是5 (也就是key是5)。下次的i=1,计算出的结果 4(这里是为了避免上次处理耗时太长而错过下一个刻度的调度,所以就往前处理一个刻度的,比如说上次执行的是7s的,然后由于某种原因,导致到这次的时候已经是9s 了,所以防止出现这种情况,就往前校验了一个时间刻度)
<4>:将获取的需要执行的任务集合遍历调度。
2.3 toStop 方法
[code] // 停止 public void toStop(){ // 1、stop schedule scheduleThreadToStop = true; // 这个是直接设置停止标识的 try { TimeUnit.SECONDS.sleep(1); // wait } catch (InterruptedException e) { logger.error(e.getMessage(), e); } if (scheduleThread.getState() != Thread.State.TERMINATED){ // interrupt and wait scheduleThread.interrupt(); try { scheduleThread.join(); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } } // if has ring data 判断时间刻度环上是否还有数据 ,如果有的话 设置标识 boolean hasRingData = false; if (!ringData.isEmpty()) { for (int second : ringData.keySet()) { List<Integer> tmpData = ringData.get(second); if (tmpData!=null && tmpData.size()>0) { hasRingData = true; break; } } } if (hasRingData) { // 等8 s 再搞 try { TimeUnit.SECONDS.sleep(8); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } } // stop ring (wait job-in-memory stop) ringThreadToStop = true; // 设置停止标识 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } if (ringThread.getState() != Thread.State.TERMINATED){ // interrupt and wait ringThread.interrupt(); try { ringThread.join(); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } } logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper stop"); }
我们可以从上面代码看出:首先是要停止扫描数据库的这个线程,还使用join等待该线程执行完成。然后判断ringData 中是否还有任务没有调度完成,如果有就等待8s,之后设置停止标识。等待该线程停止。
总结:
到现在我们的任务调度核心就解析完成了,我们开始之前提出的问题现在也是有了答案。
1. xxl-job 使用数据库悲观锁的形式,来保证任务调度的幂等性
2.有两个线程来配合实现调度,一个线程扫描数据库,找到5s内要调度的任务,然后遍历调度,对于过期5s的怎么办,对于过期5秒内的怎么办,对于还没到调度时间的又怎么办。另一个线程扫描ringData刻度环上的数据(这个刻度环就跟咱们的时钟一样,有60个刻度,每个刻度上有需要调度的数据),来调度。
- xxl-job 源码解读 (二)
- 分布式任务调度系统xxl-job源码探究(二、服务中心)
- xxl-job任务操作源码分析(四)
- 分布式任务调度系统xxl-job源码探究(一、客户端)
- OkHttp源码解读总结(五)--->OkHttp核心调度器Dispatcher类源码总结
- azkaban源码解读(二):观察者模式在job执行过程中事件监听应用
- 分布式任务调度平台XXL-JOB
- spark源码之Job执行(2)任务调度taskscheduler
- Spark源码解读-JOB的提交与执行
- 拷贝map任务输出源码解读
- 分布式任务调度平台XXL-Job集群版搭建
- 努力的小胖学习记录(四)---xxl-job核心详解
- Tomcat源码解读系列(二)——Tomcat的核心组成和启动过程
- 【原】Spark中Job的提交源码解读
- [Hadoop源码解读](三)MapReduce篇之Job类
- Spark 定制版:006~Spark Streaming源码解读之Job动态生成和深度思考
- 第6课:Spark Streaming源码解读之Job动态生成和深度思考
- spark源码之Job执行(2)任务调度taskscheduler
- Java HashMap 核心源码解读
- Spring boot下,集成任务调度中心(XXL-JOB)