您的位置:首页 > 其它

xxl-job源码解读之任务调度器(核心)

2019-08-01 23:09 375 查看
版权声明:本文为博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/yuanshangshenghuo/article/details/98110509

一.任务调度器介绍

在xxl-job中,有两个角色,一个是执行器,另一个就是调度中心了。而任务调度器可以说是调度中心的最核心,我们发往执行器的任务,都是从任务调度器发出来的(除手动执行的)。

二.原理解读

首先我们要带着这几个问题来解读原理:

1.分布式任务调度有个很重要的点,就是怎样保证一次任务调度被消费一次,也就是怎样保证幂等性?

2.是怎样实现调度的,调度流程是什么样的?

3.出现没来的急调度的是怎么办的?

接下来我们来看下源码,从源码中寻找答案:

2.1 启动

[code]@Component
@DependsOn("xxlJobAdminConfig")
public class XxlJobScheduler implements InitializingBean, DisposableBean {
private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class);
@Override
public void afterPropertiesSet() throws Exception {
// init i18n
initI18n();
// admin registry monitor run
JobRegistryMonitorHelper.getInstance().start();
// 失败监控线程
// admin monitor run
JobFailMonitorHelper.getInstance().start();
// admin-server
initRpcProvider();
///核心  , 任务调度器 的启动
// start-schedule
JobScheduleHelper.getInstance().start();

logger.info(">>>>>>>>> init xxl-job admin success.");
}
}

在conf个包中,有一个交由 spring 管理的 XxlJobScheduler类,实现 InitializingBean 接口的 afterPropertiesSet 方法。这个方法会在spring为其对象注入完参数执行。这个方法的最后一行  JobScheduleHelper.getInstance().start();  就是任务调度器的启动。

2.2 start 方法

由于这个方法代码有点多,我们分部分讲解。

[code]// 刷 数据库的线程
// schedule thread  调度线程
scheduleThread = new Thread(new Runnable() {
@Override
public void run() {

try {
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>> init xxl-job admin scheduler success.");

Connection conn = null;

// <1>
while (!scheduleThreadToStop) {

// Scan Job  扫描任务
long start = System.currentTimeMillis();
PreparedStatement preparedStatement = null;
boolean preReadSuc = true;
try {
if (conn==null || conn.isClosed()) {  // 数据库连接处理
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
}

// 关闭事务的自动提交
conn.setAutoCommit(false);
// <2> 数据库的悲观锁   , 保证数据的幂等性
preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();

// tx start

// <3>、预读5s内调度任务
long nowTime = System.currentTimeMillis();
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS);
if (scheduleList!=null && scheduleList.size()>0) {
// 2、推送时间轮
for (XxlJobInfo jobInfo: scheduleList) {

// <4>时间轮刻度计算
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) { // 调度时间超了5s 就放弃了  ,然后计算好下次的调度时间
// 过期超5s:本地忽略,当前时间开始计算下次触发时间

//<5> fresh next
Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date());
if (nextValidTime != null) {
jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
jobInfo.setTriggerNextTime(nextValidTime.getTime());
} else {
jobInfo.setTriggerStatus(0);
jobInfo.setTriggerLastTime(0);
jobInfo.setTriggerNextTime(0);
}

} else if (nowTime > jobInfo.getTriggerNextTime()) {
//<6> 过期5s内 :立即触发一次,当前时间开始计算下次触发时间;

CronExpression cronExpression = new CronExpression(jobInfo.getJobCron());
long nextTime = cronExpression.getNextValidTimeAfter(new Date()).getTime();

// 1、trigger  立即触发一次
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null);
logger.debug(">>>>>>>>>>> xxl-job, shecule push trigger : jobId = " + jobInfo.getId() );

// 2、fresh next
jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
jobInfo.setTriggerNextTime(nextTime);

// <7>下次5s内:预读一次;
if (jobInfo.getTriggerNextTime() - nowTime < PRE_READ_MS) {  // 距离下次调度 还有不到5s  就搞到 ring 上面

// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());// 放到环中

// 3、fresh next  计算出下下次的调度时间
Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date(jobInfo.getTriggerNextTime()));
if (nextValidTime != null) {
jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
jobInfo.setTriggerNextTime(nextValidTime.getTime());
} else {
jobInfo.setTriggerStatus(0);
jobInfo.setTriggerLastTime(0);
jobInfo.setTriggerNextTime(0);
}

}

} else {
//<8> 未过期:正常触发,递增计算下次触发时间

// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

// 2、push time ring  没有过期 就要放到时间刻度环上
pushTimeRing(ringSecond, jobInfo.getId());

// 3、fresh next  计算出下一次 ,然后进行设置
Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date(jobInfo.getTriggerNextTime()));
if (nextValidTime != null) {
jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
jobInfo.setTriggerNextTime(nextValidTime.getTime());
} else {
jobInfo.setTriggerStatus(0);
jobInfo.setTriggerLastTime(0);
jobInfo.setTriggerNextTime(0);
}

}

}

// 3、<9>更新trigger信息    将下次调度时间放到数据库中
for (XxlJobInfo jobInfo: scheduleList) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}

} else {
preReadSuc = false;
}

// tx stop

} catch (Exception e) {
if (!scheduleThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
}
} finally {

// commit 提交事务
try {
conn.commit();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}

// close PreparedStatement
if (null != preparedStatement) {
try {
preparedStatement.close();
} catch (SQLException ignore) {
if (!scheduleThreadToStop) {
logger.error(ignore.getMessage(), ignore);
}
}
}
}
long cost = System.currentTimeMillis()-start;

// Wait seconds, align second
if (cost < 1000) {  // scan-overtime, not wait
try {
// pre-read period: success > scan each second; fail > skip this period;  比较闲的话就要多等点时间了
TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}

}
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
}
});
scheduleThread.setDaemon(true);
scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
scheduleThread.start();

解释下为<>包裹的序号位置处的含义:

<1>:循环执行,然后后面那几行就是检查数据库连接,没有就创建

<2>:获取数据库悲观锁来保证数据消费的幂等性,该锁能够保证当前事务提交之前,其他事务不能操作数据,也就是能够实现分布式锁的作用。

<3>:从数据库中读取最近5s内要执行的调度列表。然后判空,遍历该调度列表

<4>:如果 调度时间过去5s之后(过期5s),放弃该次调度,并计算下次要调度的时间,设置到调度信息中。

<5>:这个其实就是解释cron 表达式,获取下次执行的时间戳。

<6>:如果调度时间过去5s之内(过期5s之内的),立即执行一次,然后计算下次时间,设置到调度信息中

<7>判读下次时间调度如果在5s之内,就计算在刻度轮上位置(ring second),然后将ring second 与 任务id 设置到 ringData上面(其实就是一个map,key: ring second  value: 任务id 集合),计算下下次的执行时间,设置到调度信息中

<8>:如果没有过期(也就是还差点时间才能执行),这时候也是计算刻度轮上的位置(ring second),设置到ringData里面,计算下次要执行的时间戳,设置到任务信息中。

<9>:这个就是将任务信息更新到数据库中

再看下另一个线程:

[code]
// ring thread
ringThread = new Thread(new Runnable() {
@Override
public void run() {

// align second
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
//<1>
while (!ringThreadToStop) {

try {
// second data
List<Integer> ringItemData = new ArrayList<>();
//<2>
int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
//<3>
for (int i = 0; i < 2; i++) {
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}

// ring trigger
logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
if (ringItemData!=null && ringItemData.size()>0) {
//<4> do trigger   // 进行调度
for (int jobId: ringItemData) {
// do trigger
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
}
// clear
ringItemData.clear();
}
} catch (Exception e) {
if (!ringThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
}
}

// next second, align second
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
}
});
ringThread.setDaemon(true);
ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
ringThread.start();

我们再来看下这个线程是干什么用的,这个线程主要是用来扫描ringData 这个刻度轮里面数据的

解释下<> 包裹序号位置:

<1>: 也是一个循环,ringThreadToStop 是volatile修饰的,具有线程可见性。

<2>:获取当前秒 数。主要是找到ringData与之对应的秒数上面的数据

<3>:主要是获取当前秒 跟上一秒数 在ringData 上面的数据(也就是任务)。

[code]List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );

(nowSecond +60-i)%60  这个就是算出当前秒位置的,可以算下 ,比如说当前秒是5 (也就是nowSecond=5)这次的是0(也就是i=0)可以算出 结果是5。在ringData 上的位置就是5 (也就是key是5)。下次的i=1,计算出的结果 4(这里是为了避免上次处理耗时太长而错过下一个刻度的调度,所以就往前处理一个刻度的,比如说上次执行的是7s的,然后由于某种原因,导致到这次的时候已经是9s 了,所以防止出现这种情况,就往前校验了一个时间刻度)

<4>:将获取的需要执行的任务集合遍历调度。

2.3 toStop 方法

[code] // 停止
public void toStop(){

// 1、stop schedule
scheduleThreadToStop = true;  // 这个是直接设置停止标识的
try {
TimeUnit.SECONDS.sleep(1);  // wait
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
if (scheduleThread.getState() != Thread.State.TERMINATED){
// interrupt and wait
scheduleThread.interrupt();
try {
scheduleThread.join();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}

// if has ring data   判断时间刻度环上是否还有数据  ,如果有的话 设置标识
boolean hasRingData = false;
if (!ringData.isEmpty()) {
for (int second : ringData.keySet()) {
List<Integer> tmpData = ringData.get(second);
if (tmpData!=null && tmpData.size()>0) {
hasRingData = true;
break;
}
}
}
if (hasRingData) {  // 等8 s  再搞
try {
TimeUnit.SECONDS.sleep(8);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}

// stop ring (wait job-in-memory stop)
ringThreadToStop = true;     // 设置停止标识
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
if (ringThread.getState() != Thread.State.TERMINATED){
// interrupt and wait
ringThread.interrupt();
try {
ringThread.join();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}

logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper stop");
}

我们可以从上面代码看出:首先是要停止扫描数据库的这个线程,还使用join等待该线程执行完成。然后判断ringData 中是否还有任务没有调度完成,如果有就等待8s,之后设置停止标识。等待该线程停止。

总结:

到现在我们的任务调度核心就解析完成了,我们开始之前提出的问题现在也是有了答案。

1. xxl-job 使用数据库悲观锁的形式,来保证任务调度的幂等性

2.有两个线程来配合实现调度,一个线程扫描数据库,找到5s内要调度的任务,然后遍历调度,对于过期5s的怎么办,对于过期5秒内的怎么办,对于还没到调度时间的又怎么办。另一个线程扫描ringData刻度环上的数据(这个刻度环就跟咱们的时钟一样,有60个刻度,每个刻度上有需要调度的数据),来调度。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: