您的位置:首页 > 编程语言 > Java开发

Quartz与Spring结合动态控制任务RAM版

2015-06-10 10:55 351 查看
使用quart 动态的创建控制任务,任务数据是放置到数据库中的。自己建立了一个任务表,和一个历史表

下面代码只开放部分代码。(工具类中差不多是全的)工具类中操作参数和数据库的。可以忽略不计。由于某些原因。暂时不能全部放代码,自己看懂了就添加就行。

ScheduleJob 是保存任务信息的bean, ScheduleJobMapper 是mybatis的dao接口。

比较简单。把 mapper去掉。这个主要的工具类就可以直接运行测试

在 quartz.properties 中可以配置使用数据库版还是内存RAM版(该文件在quartz中有一个默认的配置文件)

实现思路

根据quartz Api 实现主要工具类中的一些常用操作。

单机main方法测试正常之后,与spring结合

首先要知道怎么启动一个入口任务,去每隔一定的时间去获取数据库中配置的任务,查看是否有新增任务,或则任务中的状态是否有手动控制更改的,然后进行相应的控制任务。 为了方便获取spring容器中的服务对象,没有使用一个job来定时跑任务,而是
implements ApplicationListener<ContextRefreshedEvent>
接口,让项目启动完成之后,触发我们自定义的一个线程。

需要注意实现中的坑:

在单独main方法中测试启动、创建、控制任务都ok,但是开启循环抓取任务信息根据状态进行对应的操作的时候,发现暂停等一些控制操作无效了。

问题出现的原因:


调度工厂start 会把负面状态(比如暂停)都给变成启动状态。

更新tigger时间,会让暂停的任务重新运行

解决方案:
在循环遍历之前调用调度工厂的start方法,循环中有条件的判断 是否需要更新时间。(比如本实现中就专门增加了一个状态叫做更新世间)

在spring集合中使用quarts自带的实现工厂(比如:Scheduler scheduler = new StdSchedulerFactory().getScheduler();),会出现tomcat关掉之后,还有一个独立的进程跑任务

解决方案:
使用spring的调度工厂实现org.springframework.scheduling.quartz.SchedulerFactoryBean,该jar包在spring-context-support中

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>


pom.xml

<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz-jobs</artifactId>
<version>2.2.1</version>
</dependency>


主要的工具类

import org.apache.log4j.Logger;
import org.zq.beans.parameter.system.ScheduleJob;
import org.zq.core.parameter.system.dao.mapper.ScheduleJobMapper;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.text.ParseException;

/**
* Created by zq on 2015/5/26.
*/
@Component
public class ScheduleUtils {
private static Logger log = Logger.getLogger(ScheduleUtils.class);
/** 自动状态:任务创建失败 **/
public static final String RUN_STATUS_0 = "0";
/** 自动状态:准备运行 **/
public static final String RUN_STATUS_1 = "1";
/** 自动状态:运行中 **/
public static final String RUN_STATUS_2 = "2";
/** 自动状态:手动暂停 **/
public static final String RUN_STATUS_3 = "3";
/** 自动状态:更新任务失败 **/
public static final String RUN_STATUS_4 = "4";
/** 自动状态:运行完成 **/
public static final String RUN_STATUS_5 = "5";

/** 自动状态: -1:由运行状态产生的错误导致任务未运行等(此状态由系统维护)**/
public static final String STATUS_1_1 = "-1";
/** 手动控制状态:删除 **/
public static final String STATUS_0 = "0";
/** 手动控制状态:自动执行**/
public static final String STATUS_1 = "1";
/** 手动控制状态:暂停 **/
public static final String STATUS_2 = "2";
/** 手动控制状态:恢复 **/
public static final String STATUS_3 = "3";
/** 手动控制状态:更新任务:只能更新时间 **/
public static final String STATUS_4 = "4";
/** 手动控制状态:立即执行,并且只执行一次 **/
public static final String STATUS_5 = "5";

/** 默认job group 名称**/
public static final String DEFAULT_JOB_GROUP = "DEFAULT_JOB_GROUP_zq";
/** 默认 trigger 名称 **/
public static final String DEFAULT_TRIGGER = "DEFAULT_TRIGGER_zq";

/**
* 任务信息,默认的参数名称
*/
public static final String JOB_PARAM_KEY = "JOB_PARAM_KEY";

@Autowired
private static ScheduleJobMapper scheduleJobMapper;
/**
* 获取表达式触发器
*
* @param scheduler scheduler调度器
* @param jobName the job name名称
* @param jobGroup the job group组名称
* @return cron trigger
*/
public static CronTrigger getCronTrigger(Scheduler scheduler, String jobName, String jobGroup) {
try {
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
return (CronTrigger) scheduler.getTrigger(triggerKey);
} catch (SchedulerException e) {
log.error("获取定时任务CronTrigger出现异常", e);
}
return null;
}

/**
* 创建任务
*
* @param scheduler the scheduler
* @param scheduleJob the schedule job
*/
public static boolean createScheduleJob(Scheduler scheduler, ScheduleJob scheduleJob) {
return createScheduleJob(scheduler, scheduleJob.getJobName(), scheduleJob.getJobGroup(),
scheduleJob.getCronExpression(), false, scheduleJob);
}

/**
* 创建定时任务
*
* @param scheduler the scheduler
* @param jobName the job name
* @param jobGroup the job group
* @param cronExpression the cron expression
* @param isSync the is sync
* @param param the param
*/
public static boolean createScheduleJob(Scheduler scheduler, String jobName, String jobGroup,
String cronExpression, boolean isSync, ScheduleJob param) {
Class<? extends Job> jobClass = null;
Class clazz = parth(param);
if(clazz == null){
return false;
}else{
jobClass = clazz;
}

//构建job信息
JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroup).build();

//放入参数,运行时的方法可以获取
jobDetail.getJobDataMap().put(JOB_PARAM_KEY, param);

//表达式调度构建器
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);

//按新的cronExpression表达式构建一个新的trigger
CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroup)
.withSchedule(scheduleBuilder).build();

try {
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
log.error("创建定时任务失败", e);
param.setStatus(STATUS_1_1);
param.setRunStatus(RUN_STATUS_0);
return false;
}
param.setRunStatus(RUN_STATUS_1);
return true;
}

/**
* 更新定时任务
*
* @param scheduler the scheduler
* @param scheduleJob the schedule job
*/
public static boolean updateScheduleJob(Scheduler scheduler, ScheduleJob scheduleJob) {
return updateScheduleJob(scheduler, scheduleJob.getJobName(), scheduleJob.getJobGroup(),
scheduleJob.getCronExpression(), false, scheduleJob);
}

/**
* 更新定时任务
*
* @param scheduler the scheduler
* @param jobName the job name
* @param jobGroup the job group
* @param cronExpression the cron expression
* @param isSync the is sync
* @param param the param
*/
public static boolean updateScheduleJob(Scheduler scheduler, String jobName, String jobGroup,
String cronExpression, boolean isSync, ScheduleJob param) {
Class<? extends Job> jobClass = null;
Class clazz = parth(param);
if(clazz == null){
return false;
}else{
jobClass = clazz;
}
try {
JobDetail jobDetail = scheduler.getJobDetail(getJobKey(jobName, jobGroup));
jobDetail = jobDetail.getJobBuilder().ofType(jobClass).build();
//            //更新参数 实际测试中发现无法更新
//            JobDataMap jobDataMap = jobDetail.getJobDataMap();
//            jobDataMap.put(JOB_PARAM_KEY, param);
//            jobDetail.getJobBuilder().usingJobData(jobDataMap);
TriggerKey triggerKey = ScheduleUtils.getTriggerKey(jobName, jobGroup);
//表达式调度构建器
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
//按新的cronExpression表达式重新构建trigger
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
//按新的trigger重新设置job执行
scheduler.rescheduleJob(triggerKey, trigger);

} catch (SchedulerException e) {
param.setStatus(STATUS_3); //更新失败,处理任务暂停
param.setRunStatus(RUN_STATUS_5);
log.error("更新定时任务失败", e);
return false;
}
return true;
}

/**
* 获取class
* @param param
* @return
*/
public static Class parth(ScheduleJob param){
Class clazz = null;
try {
clazz  =  Class.forName(param.getJobClassName());
} catch (Exception e) {
log.error("JobClassName : " + param.getJobClassName() + ",未找到该类·");
param.setStatus(STATUS_1);
param.setRunStatus(RUN_STATUS_0);
}
return clazz;
}

/**
* 获取jobKey
*
* @param jobName the job name
* @param jobGroup the job group
* @return the job key
*/
public static JobKey getJobKey(String jobName, String jobGroup) {
return JobKey.jobKey(jobName, jobGroup);
}
/**
* 获取触发器key,创建的时候 就是以 jobName 和 job group 命名的
*
* @param jobName
* @param jobGroup
* @return
*/
public static TriggerKey getTriggerKey(String jobName, String jobGroup) {
return TriggerKey.triggerKey(jobName, jobGroup);
}

/**
* 暂停任务
*
* @param scheduler
* @param jobName
* @param jobGroup
*/
public static void pauseJob(Scheduler scheduler, String jobName, String jobGroup) {
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
try {
scheduler.pauseJob(jobKey);
scheduler.pauseTrigger(getTriggerKey(jobName, jobGroup));//暂停触发器
} catch (SchedulerException e) {
log.error("暂停定时任务失败", e);
}
}

/**
* 恢复一个定时任务
* @param scheduler
* @param jobName
* @param jobGroup
*/
public static void resumeJob(Scheduler scheduler, String jobName, String jobGroup) {
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
try {
scheduler.resumeJob(jobKey);
scheduler.resumeTrigger(getTriggerKey(jobName, jobGroup));
} catch (SchedulerException e) {
log.error("恢复定时任务失败", e);
}
}

/**
* 删除定时任务
*
* @param scheduler
* @param jobName
* @param jobGroup
*/
public static boolean deleteScheduleJob(Scheduler scheduler, String jobName, String jobGroup) {
try {
scheduler.deleteJob(getJobKey(jobName, jobGroup));
scheduler.pauseTrigger(getTriggerKey(jobName, jobGroup));//停止触发器
scheduler.unscheduleJob(getTriggerKey(jobName, jobGroup));//移除触发器
} catch (SchedulerException e) {
log.error("删除定时任务失败", e);
return false;
}
return true;
}

public static void main(String[] args) throws SchedulerException, ParseException, InterruptedException {
Scheduler scheduler = new StdSchedulerFactory().getScheduler();
ScheduleJob scheduleJob = new ScheduleJob();
scheduleJob.setScheduleJobId(1);
scheduleJob.setStatus(STATUS_2);
scheduleJob.setJobName("test");
scheduleJob.setJobGroup("testGroup");
scheduleJob.setJobClassName("org.zq.core.common.util.schedule.job.TestJob");
scheduleJob.setCronExpression("0/2 * * * * ?");
ScheduleUtils.createScheduleJob(scheduler,scheduleJob);
//启动
if (!scheduler.isShutdown()) {
scheduler.start();
}
Thread.sleep(10000);
ScheduleUtils.deleteScheduleJob(scheduler,scheduleJob.getJobName(),scheduleJob.getJobGroup());
}
}


与spring结合,初始化任务的一个类

import org.apache.log4j.Logger;
import org.zq.beans.parameter.system.ScheduleJob;
import org.zq.beans.parameter.system.ScheduleJobHistory;
import org.zq.core.common.util.BeanUtils;
import org.zq.core.parameter.system.dao.mapper.ScheduleJobMapper;
import org.zq.core.parameter.system.dao.mapper.ScheduleJobHistoryMapper;
import org.quartz.CronTrigger;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.util.CollectionUtils;

import java.util.Date;
import java.util.List;

/**
* Created by zq on 2015/5/26.
* 在xml中初始化该类,方便控制是否需要启动定时任务处理
*/
public class InItJob implements ApplicationListener<ContextRefreshedEvent> {
private static Logger log = Logger.getLogger(InItJob.class);
@Autowired
private ScheduleJobMapper scheduleJobMapper;
@Autowired
private Scheduler scheduler;  // 该对象是spring工厂配置自动产生的一个调度对象
@Autowired
private ScheduleJobHistoryMapper scheduleJobHistoryMapper;

@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext().getParent() == null) { //spring mvc  中有两个容器
init();
}
}

public void init() {
log.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 启动初始化任务" + scheduler.toString());
try {
if (!scheduler.isShutdown()) {
scheduler.start();
}
} catch (Exception e) {

}

new Thread(new Init()).start();
}

class Init implements Runnable {
@Override
public void run() {
while (true) {
try {
running();
log.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 任务运行中");
Thread.sleep(20000);  //每20秒 重新抓去一次数据库任务信息
} catch (Exception e) {
e.printStackTrace();
}
}
}

public void running() throws SchedulerException {
List<ScheduleJob> list = scheduleJobMapper.selectAll(); //从数据库获取所有的任务信息
if (CollectionUtils.isEmpty(list)) {
return;
}
for (ScheduleJob job : list) {
if (handleStatus(scheduler, job)) continue;
CronTrigger cronTrigger = ScheduleUtils.getCronTrigger(scheduler, job.getJobName(), job.getJobGroup());
//不存在,创建一个
if (cronTrigger == null) {
ScheduleUtils.createScheduleJob(scheduler, job);
}
job.setModifyById("system");
job.setModifyDate(new Date());
scheduleJobMapper.updateByPrimaryKey(job);
}

}

/**
* 处理状态,由于某些原因,以下代码不能全部公开。但是很简单逻辑就是,根据读取到的任务状态 调用工具类中的各种操作,达到能动态控制任务的一个功能
*
* @param scheduler
* @param job
* @return 返回true。不继续执行循环下面的代码
*/
private boolean handleStatus(Scheduler scheduler, ScheduleJob job) {
String status = job.getStatus();
String runStatus = job.getRunStatus();

CronTrigger cronTrigger = ScheduleUtils.getCronTrigger(scheduler, job.getJobName(), job.getJobGroup());

//判断运行状态
//如果是已经运行完成:则移动到历史表
if (ScheduleUtils.RUN_STATUS_5.equals(runStatus)) {
scheduleJobMapper.deleteByPrimaryKey(job.getScheduleJobId());
ScheduleJobHistory sjh = new ScheduleJobHistory();
BeanUtils.copyProperties(sjh, job);
scheduleJobHistoryMapper.insertSelective(sjh);
}
//如果是暂停
if (ScheduleUtils.STATUS_2.equals(status)) { //如果是暂停状态
if (cronTrigger == null) { //状态异常
log.error("暂停状态异常:任务调度中没有找到对应的job!");
return true;
}
ScheduleUtils.pauseJob(scheduler, job.getJobName(), job.getJobGroup());
job.setRunStatus(ScheduleUtils.RUN_STATUS_3);
}
return false;
}
}
}


另外一种启动方式

public class ScheduleJobInit {
@Autowired
InItJob inItJob;
/**
* 项目启动时初始化
*/
@PostConstruct  //该注解是在项目启动完成之后触发该任务。同样也需要在xml中配置该类的bean
public void init() {
inItJob.init();
}
}


spring.xml中配置bean

<!-- 调度任务入口类 -->
<bean id="inItJob" class="org.zq.core.common.util.schedule.util.InItJob"/>
<bean id="schedulerFactoryBean" class="org.springframework.scheduling.quartz.SchedulerFactoryBean" />
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: