您的位置:首页 > 其它

【优化技术专题】「温故而知新」基于Quartz系列的任务调度框架的动态化任务实现分析

2021-09-18 22:56 721 查看

不提XXLJOB或者其他的调度框架,就看我接触的第一个任务调度框架Quartz(温故而知新)

Quartz的动态暂停 恢复 修改和删除任务

实现动态添加定时任务,先来看一下我们初步要实现的目标效果图,这里我们只在内存中操作,并没有把quartz的任何信息保存到数据库,即使用的是RAMJobStore,

当然如果你有需要,可以实现成JDBCJobStore,那样任务信息将会更全面。

例如,我们要先列出计划中的定时任务以及正在执行中的定时任务,这里的正在执行中指的是任务已经触发线程还没执行完的情况。

  • 比如每天2点执行一个数据导入操作,这个操作执行时间需要5分钟,在这5分钟之内这个任务才是运行中的任务。
  • 当任务正常时可以使用暂停按钮,任务暂停时可以使用恢复按钮。

trigger各状态说明:

  • None:Trigger已经完成且不会在执行,或找不到该触发器,或Trigger已经被删除.
  • NORMAL:正常状态
  • PAUSED:暂停状态
  • COMPLETE:触发器完成,但是任务可能还正在执行中
  • BLOCKED:线程阻塞状态
  • ERROR:出现错误

定时任务运行工厂类

任务运行入口,即Job实现类,在这里我把它看作工厂类:

/**
* 定时任务运行工厂类
*
* User: liyd
* Date: 14-1-3
* Time: 上午10:11
*/
public class QuartzJobFactory implements Job {

@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
System.out.println("任务成功运行");
ScheduleJob scheduleJob
= (ScheduleJob)context.getMergedJobDataMap().get("scheduleJob");
System.out.println("任务名称 = [" + scheduleJob.getJobName() + "]");
}
}

这里我们实现的是无状态的Job,如果要实现有状态的Job在以前是实现StatefulJob接口,在我使用的quartz 2.2.1中,StatefulJob接口已经不推荐使用了,换成了注解的方式,只需要给你实现的Job类加上注解@DisallowConcurrentExecution即可实现有状态:

/**
* 定时任务运行工厂类
* <p/>
* User: liyd
* Date: 14-1-3
* Time: 上午10:11
*/
@DisallowConcurrentExecution
public class QuartzJobFactory implements Job {...}

创建任务

既然要动态的创建任务,我们的任务信息当然要保存在某个地方了,这里我们新建一个保存任务信息对应的实体类:

/**
* 计划任务信息
* User: liyd
* Date: 14-1-3
* Time: 上午10:24
*/
public class ScheduleJob {
/** 任务id */
private String jobId;

1044
/** 任务名称 */
private String jobName;
/** 任务分组 */
private String jobGroup;
/** 任务状态 0禁用 1启用 2删除*/
private String jobStatus;
/** 任务运行时间表达式 */
private String cronExpression;
/** 任务描述 */
private String desc;
getter and setter ....
}

接下来我们创建测试数据,实际应用中该数据可以保存在数据库等地方,我们把任务的分组名+任务名作为任务的唯一key,和quartz中的实现方式一致:

/** 计划任务map */
private static Map<String, ScheduleJob> jobMap = new HashMap<String, ScheduleJob>();
static {
for (int i = 0; i < 5; i++) {
ScheduleJob job = new ScheduleJob();
job.setJobId("10001" + i);
job.setJobName("data_import" + i);
job.setJobGroup("dataWork");
job.setJobStatus("1");
job.setCronExpression("0/5 * * * * ?");
job.setDesc("数据导入任务");
addJob(job);
}
}

/**
* 添加任务
* @param scheduleJob
*/
public static void addJob(ScheduleJob scheduleJob) {
jobMap.put(scheduleJob.getJobGroup() + "_" + scheduleJob.getJobName(), scheduleJob);
}

有了调度工厂,有了任务运行入口实现类,有了任务信息,接下来就是创建我们的定时任务了,在这里我把它设计成一个Job对应一个trigger,两者的分组及名称相同,方便管理,条理也比较清晰,在创建任务时如果不存在新建一个,如果已经存在则更新任务,主要代码如下

schedulerFactoryBean 由spring创建注入

Scheduler scheduler = schedulerFactoryBean.getScheduler();

//这里获取任务信息数据
List<ScheduleJob> jobList = DataWorkContext.getAllJob();
for (ScheduleJob job : jobList) {
TriggerKey T = TriggerKey.triggerKey(job.getJobName(), job.getJobGroup());
//获取trigger,即在spring配置文件中定义的 bean id="myTrigger"
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
//不存在,创建一个
if (null == trigger) {
JobDetail jobDetail = JobBuilder.newJob(QuartzJobFactory.class)
.withIdentity(job.getJobName(), job.getJobGroup()).build();
jobDetail.getJobDataMap().put("scheduleJob", job);
//表达式调度构建器
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job
.getCronExpression());
//按新的cronExpression表达式构建一个新的trigger
trigger = TriggerBuilder.newTrigger().withIdentity(job.getJobName(), job.getJobGroup()).withSchedule(scheduleBuilder).build();
scheduler.scheduleJob(jobDetail, trigger);
} else {
// Trigger已存在,那么更新相应的定时设置
//表达式调度构建器
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job
.getCronExpression());
//按新的cronExpression表达式重新构建trigger
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey)
.withSchedule(scheduleBuilder).build();
//按新的trigger重新设置job执行
scheduler.rescheduleJob(triggerKey, trigger);
}
}
  • 如此,可以说已经完成了我们的动态任务创建,大功告成了。有了上面的代码,添加和修改任务是不是也会了,顺道解决了?

  • 上面我们创建的5个测试任务,都是5秒执行一次,都将调用QuartzJobFactory的execute方法,但是传入的任务信息参数不同,execute方法中的如下代码就是得到具体的任务信息,包括任务分组和任务名:

ScheduleJob scheduleJob = (ScheduleJob)context.getMergedJobDataMap().get("scheduleJob");
  • 有了任务分组和任务名即确定了该任务 56c 的唯一性,接下来需要什么操作实现起来是不是就很容易了?
  • 以后需要添加新的定时任务只需要在任务信息列表中加入记录即可,然后在execute方法中通过判断任务分组和任务名来实现你具体的操作。
  • 以上已经初始实现了我们需要的功能,增加和修改也已经可以通过源代码举一反三出来,但是我们在实际开发的时候需要进行测试,如果一个任务是1个小时运行一次的,测试起来是不是很不方便?当然你可以修改任务的运行时间表达式,但相信这不是最好的方法,接下来我们就要实现在不对当前任务信息做任何修改的情况下触发任务,并且该触发只会运行一次作测试用。

计划中的任务

**主要是已经添加到quartz调度器的任务,因为quartz并没有直接提供这样的查询接口,所以我们需要结合JobKey和Trigger来实现,核心代码: **

Scheduler scheduler = schedulerFactoryBean.getScheduler();
GroupMatcher<JobKey> matcher = GroupMatcher.anyJobGroup();
Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);
List<ScheduleJob> jobList = new ArrayList<ScheduleJob>();
for (JobKey jobKey : jobKeys) {

ad0
List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
for (Trigger trigger : triggers) {
ScheduleJob job = new ScheduleJob();
job.setJobName(jobKey.getName());
job.setJobGroup(jobKey.getGroup());
job.setDesc("触发器:" + trigger.getKey());
Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
job.setJobStatus(triggerState.name());
if (trigger instanceof CronTrigger) {
CronTrigger cronTrigger = (CronTrigger) trigger;
String cronExpression = cronTrigger.getCronExpression();
job.setCronExpression(cronExpression);
}
jobList.add(job);
}
}
  • jobList就是我们需要的计划中的任务列表,需要注意一个job可能会有多个trigger的情况,在下面讲到的立即运行一次任务的时候,会生成一个临时的trigger也会出现在这。

  • 这里把一个Job有多个trigger的情况看成是多个任务。包括在实际项目中一般用到的都是CronTrigger ,所以这里我们着重处理了下CronTrigger的情况。

运行中的任务

实现和计划中的任务类似,核心代码:

Scheduler scheduler = schedulerFactoryBean.getScheduler();
List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs();
List<ScheduleJob> jobList = new ArrayList<ScheduleJob>(executingJobs.size());
for (JobExecutionContext executingJob : executingJobs) {
ScheduleJob job = new ScheduleJob();
JobDetail jobDetail = executingJob.getJobDetail();
JobKey jobKey = jobDetail.getKey();
Trigger trigger = executingJob.getTrigger();
job.setJobName(jobKey.getName());
job.setJobGroup(jobKey.getGroup());
job.setDesc("触发器:" + trigger.getKey());
Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
job.setJobStatus(triggerState.name());
if (trigger instanceof CronTrigger) {
CronTrigger cronTrigger = (CronTrigger) trigger;
String cronExpression = cronTrigger.getCronExpression();
job.setCronExpression(cronExpression);
}
jobList.add(job);
}

暂停任务机制

比较简单,核心代码:

Scheduler scheduler = schedulerFactoryBean.getScheduler();
JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
scheduler.pauseJob(jobKey);

恢复任务

暂停任务相对,核心代码:

Scheduler scheduler = schedulerFactoryBean.getScheduler();
Jo
2088
bKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
scheduler.resumeJob(jobKey);

删除任务

删除任务后,所对应的trigger也将被删除

Scheduler scheduler = schedulerFactoryBean.getScheduler();
JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
scheduler.deleteJob(jobKey);

立即运行任务

  • 这里的立即运行,只会运行一次,方便测试时用。quartz是通过临时生成一个trigger的方式来实现的,这个trigger将在本次任务运行完成之后自动删除。

  • trigger的key是随机生成的,例如:DEFAULT.MT_4k9fd10jcn9mg。

  • 在我的测试中,前面的DEFAULT.MT是固定的,后面部分才随机生成。

Scheduler scheduler = schedulerFactoryBean.getScheduler();
JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
scheduler.triggerJob(jobKey);

更新任务的时间表达式

更新之后,任务将立即按新的时间表达式执行:

Scheduler scheduler = schedulerFactoryBean.getScheduler();

TriggerKey triggerKey = TriggerKey.triggerKey(scheduleJob.getJobName(),
scheduleJob.getJobGroup());

//获取trigger,即在spring配置文件中定义的 bean id="myTrigger"
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);

//表达式调度构建器
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob
.getCronExpression());

//按新的cronExpression表达式重新构建trigger
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey)
.withSchedule(scheduleBuilder).build();

//按新的trigger重新设置job执行
scheduler.rescheduleJob(triggerKey, trigger);

cronExpression表达式:

  • 字段   允许值   允许的特殊字符

    秒    0-59    , - * /
  • 分    0-59    , - * /
  • 小时    0-23    , - * /
  • 日期    1-31    , - * ? / L W C
  • 月份    1-12 或者 JAN-DEC    , - * /
  • 星期    1-7 或者 SUN-SAT    , - * ? / L C #
  • 年(可选)    留空, 1970-2099    , - * /
  • 特殊字符   意义

      *
         表示所有值;
    • ?    表示未说明的值,即不关心它为何值;
    • -
         表示一个指定的范围;
    • ,    表示附加一个可能值;
    • /    符号前表示开始时间,符号后表示每次递增的值;
  • L W C

      L("last")    ("last") "L" 用在day-of-month字段意思是 "这个月最后一天";用在 day-of-week字段, 它简单意思是 "7" or "SAT"。如果在day-of-week字段里和数字联合使用,它的意思就是 "这个月的最后一个星期几" 例如: "6L" means "这个月的最后一个星期五". 当我们用“L”时,不指明一个列表值或者范围是很重要的,不然的话,我们会得到一些意想不到的结果。
  • W("weekday")    只能用在day-of-month字段。用来描叙最接近指定天的工作日(周一到周五)。
      例如:在day-of-month字段用“15W”指“最接近这个月第15天的工作日”,即如果这个月第15天是周六,那么触发器将会在这个月第14天即周五触发;如果这个月第15天是周日,那么触发器将会在这个月第16 天即周一触发;如果这个月第15天是周二,那么就在触发器这天触发。
    • 注意一点:这个用法只会在当前月计算值,不会越过当前月。“W”字符仅能在day- of-month指明一天,不能是一个范围或列表。也可以用“LW”来指定这个月的最后一个工作日。
  • #
    只能用在day-of-week字段。用来指定这个月的第几个周几。例:在day-of-week字段用"6#3"指这个月第3个周五(6指周五,3指第3个)。如果指定的日期不存在,触发器就不会触发。
  • C    指和calendar联系后计算过的值。
      例:在day-of-month 字段用“5C”指在这个月第5天或之后包括calendar的第一天;在day-of-week字段用“1C”指在这周日或之后包括calendar的第一天。

    - 星期的简写:
    - 周一 MON
    - 周二 TUE
    - 周三 WED
    - 周四 THU
    - 周五 FRI
    - 周六 SAT
    - 周日 SUN

    在MONTH和Day Of Week字段里对字母大小写不敏感

    • 表达式   意义 每天中午12点触发 "0 0 12 * * ?"
  • 每天上午10:15触发
      "0 15 10 ? * *"
    • "0 15 10 * * ?"
    • "0 15 10 * * ? *" (此处最后一项 年是可选的)
  • 2005年的每天上午10:15触发
      "0 15 10 * * ? 2005"
  • 每天下午2点到下午2:59期间的每1分钟触发
      "0 * 14 * * ?"
  • 每天下午2点到下午2:55期间的每5分钟触发
      "0 0/5 14 * * ?"
  • 每天下午2点到2:55期间和下午6点到6:55期间的每5分钟触发
      "0 0/5 14,18 * * ?"
  • 每天下午2点到下午2:05期间的每1分钟触发
      "0 0-5 14 * * ?"
  • 每年三月的星期三的下午2:10和2:44触发
      "0 10,44 14 ? 3 WED" / "0 10,44 14 ? 3 WED * "
  • 周一至周五的上午10:15触发
      "0 15 10 ? * MON-FRI"    / "0 15 10 ? * MON-FRI * "
  • 每月15日上午10:15触发
      "0 15 10 15 * ?"
  • 每月最后一日的上午10:15触发
      "0 15 10 L * ?"
  • 每月的最后一个星期五上午10:15触发
      "0 15 10 ? * 6L"
  • 2002年至2005年的每月的最后一个星期五上午10:15触发
      "0 15 10 ? * 6L 2002-2005"
  • 每月的第三个星期五上午10:15触发
      "0 15 10 ? * 6#3"
  • 每两个小时
      0 */2 * * *
  • 内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: