Quartz与Spring结合动态控制任务RAM版
2015-06-10 10:55
351 查看
使用quart 动态的创建控制任务,任务数据是放置到数据库中的。自己建立了一个任务表,和一个历史表
下面代码只开放部分代码。(工具类中差不多是全的)工具类中操作参数和数据库的。可以忽略不计。由于某些原因。暂时不能全部放代码,自己看懂了就添加就行。
ScheduleJob 是保存任务信息的bean, ScheduleJobMapper 是mybatis的dao接口。
比较简单。把 mapper去掉。这个主要的工具类就可以直接运行测试
在 quartz.properties 中可以配置使用数据库版还是内存RAM版(该文件在quartz中有一个默认的配置文件)
单机main方法测试正常之后,与spring结合
首先要知道怎么启动一个入口任务,去每隔一定的时间去获取数据库中配置的任务,查看是否有新增任务,或则任务中的状态是否有手动控制更改的,然后进行相应的控制任务。 为了方便获取spring容器中的服务对象,没有使用一个job来定时跑任务,而是
调度工厂start 会把负面状态(比如暂停)都给变成启动状态。
更新tigger时间,会让暂停的任务重新运行
在spring集合中使用quarts自带的实现工厂(比如:Scheduler scheduler = new StdSchedulerFactory().getScheduler();),会出现tomcat关掉之后,还有一个独立的进程跑任务
下面代码只开放部分代码。(工具类中差不多是全的)工具类中操作参数和数据库的。可以忽略不计。由于某些原因。暂时不能全部放代码,自己看懂了就添加就行。
ScheduleJob 是保存任务信息的bean, ScheduleJobMapper 是mybatis的dao接口。
比较简单。把 mapper去掉。这个主要的工具类就可以直接运行测试
在 quartz.properties 中可以配置使用数据库版还是内存RAM版(该文件在quartz中有一个默认的配置文件)
实现思路
根据quartz Api 实现主要工具类中的一些常用操作。单机main方法测试正常之后,与spring结合
首先要知道怎么启动一个入口任务,去每隔一定的时间去获取数据库中配置的任务,查看是否有新增任务,或则任务中的状态是否有手动控制更改的,然后进行相应的控制任务。 为了方便获取spring容器中的服务对象,没有使用一个job来定时跑任务,而是
implements ApplicationListener<ContextRefreshedEvent>接口,让项目启动完成之后,触发我们自定义的一个线程。
需要注意实现中的坑:
在单独main方法中测试启动、创建、控制任务都ok,但是开启循环抓取任务信息根据状态进行对应的操作的时候,发现暂停等一些控制操作无效了。问题出现的原因:
调度工厂start 会把负面状态(比如暂停)都给变成启动状态。
更新tigger时间,会让暂停的任务重新运行
解决方案:在循环遍历之前调用调度工厂的start方法,循环中有条件的判断 是否需要更新时间。(比如本实现中就专门增加了一个状态叫做更新世间)
在spring集合中使用quarts自带的实现工厂(比如:Scheduler scheduler = new StdSchedulerFactory().getScheduler();),会出现tomcat关掉之后,还有一个独立的进程跑任务
解决方案:使用spring的调度工厂实现org.springframework.scheduling.quartz.SchedulerFactoryBean,该jar包在spring-context-support中
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> </dependency>
pom.xml
<dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <version>2.2.1</version> </dependency> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz-jobs</artifactId> <version>2.2.1</version> </dependency>
主要的工具类
import org.apache.log4j.Logger; import org.zq.beans.parameter.system.ScheduleJob; import org.zq.core.parameter.system.dao.mapper.ScheduleJobMapper; import org.quartz.*; import org.quartz.impl.StdSchedulerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.text.ParseException; /** * Created by zq on 2015/5/26. */ @Component public class ScheduleUtils { private static Logger log = Logger.getLogger(ScheduleUtils.class); /** 自动状态:任务创建失败 **/ public static final String RUN_STATUS_0 = "0"; /** 自动状态:准备运行 **/ public static final String RUN_STATUS_1 = "1"; /** 自动状态:运行中 **/ public static final String RUN_STATUS_2 = "2"; /** 自动状态:手动暂停 **/ public static final String RUN_STATUS_3 = "3"; /** 自动状态:更新任务失败 **/ public static final String RUN_STATUS_4 = "4"; /** 自动状态:运行完成 **/ public static final String RUN_STATUS_5 = "5"; /** 自动状态: -1:由运行状态产生的错误导致任务未运行等(此状态由系统维护)**/ public static final String STATUS_1_1 = "-1"; /** 手动控制状态:删除 **/ public static final String STATUS_0 = "0"; /** 手动控制状态:自动执行**/ public static final String STATUS_1 = "1"; /** 手动控制状态:暂停 **/ public static final String STATUS_2 = "2"; /** 手动控制状态:恢复 **/ public static final String STATUS_3 = "3"; /** 手动控制状态:更新任务:只能更新时间 **/ public static final String STATUS_4 = "4"; /** 手动控制状态:立即执行,并且只执行一次 **/ public static final String STATUS_5 = "5"; /** 默认job group 名称**/ public static final String DEFAULT_JOB_GROUP = "DEFAULT_JOB_GROUP_zq"; /** 默认 trigger 名称 **/ public static final String DEFAULT_TRIGGER = "DEFAULT_TRIGGER_zq"; /** * 任务信息,默认的参数名称 */ public static final String JOB_PARAM_KEY = "JOB_PARAM_KEY"; @Autowired private static ScheduleJobMapper scheduleJobMapper; /** * 获取表达式触发器 * * @param scheduler scheduler调度器 * @param jobName the job name名称 * @param jobGroup the job group组名称 * @return cron trigger */ public static CronTrigger getCronTrigger(Scheduler scheduler, String jobName, String jobGroup) { try { TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup); return (CronTrigger) scheduler.getTrigger(triggerKey); } catch (SchedulerException e) { log.error("获取定时任务CronTrigger出现异常", e); } return null; } /** * 创建任务 * * @param scheduler the scheduler * @param scheduleJob the schedule job */ public static boolean createScheduleJob(Scheduler scheduler, ScheduleJob scheduleJob) { return createScheduleJob(scheduler, scheduleJob.getJobName(), scheduleJob.getJobGroup(), scheduleJob.getCronExpression(), false, scheduleJob); } /** * 创建定时任务 * * @param scheduler the scheduler * @param jobName the job name * @param jobGroup the job group * @param cronExpression the cron expression * @param isSync the is sync * @param param the param */ public static boolean createScheduleJob(Scheduler scheduler, String jobName, String jobGroup, String cronExpression, boolean isSync, ScheduleJob param) { Class<? extends Job> jobClass = null; Class clazz = parth(param); if(clazz == null){ return false; }else{ jobClass = clazz; } //构建job信息 JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroup).build(); //放入参数,运行时的方法可以获取 jobDetail.getJobDataMap().put(JOB_PARAM_KEY, param); //表达式调度构建器 CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression); //按新的cronExpression表达式构建一个新的trigger CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroup) .withSchedule(scheduleBuilder).build(); try { scheduler.scheduleJob(jobDetail, trigger); } catch (SchedulerException e) { log.error("创建定时任务失败", e); param.setStatus(STATUS_1_1); param.setRunStatus(RUN_STATUS_0); return false; } param.setRunStatus(RUN_STATUS_1); return true; } /** * 更新定时任务 * * @param scheduler the scheduler * @param scheduleJob the schedule job */ public static boolean updateScheduleJob(Scheduler scheduler, ScheduleJob scheduleJob) { return updateScheduleJob(scheduler, scheduleJob.getJobName(), scheduleJob.getJobGroup(), scheduleJob.getCronExpression(), false, scheduleJob); } /** * 更新定时任务 * * @param scheduler the scheduler * @param jobName the job name * @param jobGroup the job group * @param cronExpression the cron expression * @param isSync the is sync * @param param the param */ public static boolean updateScheduleJob(Scheduler scheduler, String jobName, String jobGroup, String cronExpression, boolean isSync, ScheduleJob param) { Class<? extends Job> jobClass = null; Class clazz = parth(param); if(clazz == null){ return false; }else{ jobClass = clazz; } try { JobDetail jobDetail = scheduler.getJobDetail(getJobKey(jobName, jobGroup)); jobDetail = jobDetail.getJobBuilder().ofType(jobClass).build(); // //更新参数 实际测试中发现无法更新 // JobDataMap jobDataMap = jobDetail.getJobDataMap(); // jobDataMap.put(JOB_PARAM_KEY, param); // jobDetail.getJobBuilder().usingJobData(jobDataMap); TriggerKey triggerKey = ScheduleUtils.getTriggerKey(jobName, jobGroup); //表达式调度构建器 CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression); CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); //按新的cronExpression表达式重新构建trigger trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build(); //按新的trigger重新设置job执行 scheduler.rescheduleJob(triggerKey, trigger); } catch (SchedulerException e) { param.setStatus(STATUS_3); //更新失败,处理任务暂停 param.setRunStatus(RUN_STATUS_5); log.error("更新定时任务失败", e); return false; } return true; } /** * 获取class * @param param * @return */ public static Class parth(ScheduleJob param){ Class clazz = null; try { clazz = Class.forName(param.getJobClassName()); } catch (Exception e) { log.error("JobClassName : " + param.getJobClassName() + ",未找到该类·"); param.setStatus(STATUS_1); param.setRunStatus(RUN_STATUS_0); } return clazz; } /** * 获取jobKey * * @param jobName the job name * @param jobGroup the job group * @return the job key */ public static JobKey getJobKey(String jobName, String jobGroup) { return JobKey.jobKey(jobName, jobGroup); } /** * 获取触发器key,创建的时候 就是以 jobName 和 job group 命名的 * * @param jobName * @param jobGroup * @return */ public static TriggerKey getTriggerKey(String jobName, String jobGroup) { return TriggerKey.triggerKey(jobName, jobGroup); } /** * 暂停任务 * * @param scheduler * @param jobName * @param jobGroup */ public static void pauseJob(Scheduler scheduler, String jobName, String jobGroup) { JobKey jobKey = JobKey.jobKey(jobName, jobGroup); try { scheduler.pauseJob(jobKey); scheduler.pauseTrigger(getTriggerKey(jobName, jobGroup));//暂停触发器 } catch (SchedulerException e) { log.error("暂停定时任务失败", e); } } /** * 恢复一个定时任务 * @param scheduler * @param jobName * @param jobGroup */ public static void resumeJob(Scheduler scheduler, String jobName, String jobGroup) { JobKey jobKey = JobKey.jobKey(jobName, jobGroup); try { scheduler.resumeJob(jobKey); scheduler.resumeTrigger(getTriggerKey(jobName, jobGroup)); } catch (SchedulerException e) { log.error("恢复定时任务失败", e); } } /** * 删除定时任务 * * @param scheduler * @param jobName * @param jobGroup */ public static boolean deleteScheduleJob(Scheduler scheduler, String jobName, String jobGroup) { try { scheduler.deleteJob(getJobKey(jobName, jobGroup)); scheduler.pauseTrigger(getTriggerKey(jobName, jobGroup));//停止触发器 scheduler.unscheduleJob(getTriggerKey(jobName, jobGroup));//移除触发器 } catch (SchedulerException e) { log.error("删除定时任务失败", e); return false; } return true; } public static void main(String[] args) throws SchedulerException, ParseException, InterruptedException { Scheduler scheduler = new StdSchedulerFactory().getScheduler(); ScheduleJob scheduleJob = new ScheduleJob(); scheduleJob.setScheduleJobId(1); scheduleJob.setStatus(STATUS_2); scheduleJob.setJobName("test"); scheduleJob.setJobGroup("testGroup"); scheduleJob.setJobClassName("org.zq.core.common.util.schedule.job.TestJob"); scheduleJob.setCronExpression("0/2 * * * * ?"); ScheduleUtils.createScheduleJob(scheduler,scheduleJob); //启动 if (!scheduler.isShutdown()) { scheduler.start(); } Thread.sleep(10000); ScheduleUtils.deleteScheduleJob(scheduler,scheduleJob.getJobName(),scheduleJob.getJobGroup()); } }
与spring结合,初始化任务的一个类
import org.apache.log4j.Logger; import org.zq.beans.parameter.system.ScheduleJob; import org.zq.beans.parameter.system.ScheduleJobHistory; import org.zq.core.common.util.BeanUtils; import org.zq.core.parameter.system.dao.mapper.ScheduleJobMapper; import org.zq.core.parameter.system.dao.mapper.ScheduleJobHistoryMapper; import org.quartz.CronTrigger; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.util.CollectionUtils; import java.util.Date; import java.util.List; /** * Created by zq on 2015/5/26. * 在xml中初始化该类,方便控制是否需要启动定时任务处理 */ public class InItJob implements ApplicationListener<ContextRefreshedEvent> { private static Logger log = Logger.getLogger(InItJob.class); @Autowired private ScheduleJobMapper scheduleJobMapper; @Autowired private Scheduler scheduler; // 该对象是spring工厂配置自动产生的一个调度对象 @Autowired private ScheduleJobHistoryMapper scheduleJobHistoryMapper; @Override public void onApplicationEvent(ContextRefreshedEvent event) { if (event.getApplicationContext().getParent() == null) { //spring mvc 中有两个容器 init(); } } public void init() { log.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 启动初始化任务" + scheduler.toString()); try { if (!scheduler.isShutdown()) { scheduler.start(); } } catch (Exception e) { } new Thread(new Init()).start(); } class Init implements Runnable { @Override public void run() { while (true) { try { running(); log.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 任务运行中"); Thread.sleep(20000); //每20秒 重新抓去一次数据库任务信息 } catch (Exception e) { e.printStackTrace(); } } } public void running() throws SchedulerException { List<ScheduleJob> list = scheduleJobMapper.selectAll(); //从数据库获取所有的任务信息 if (CollectionUtils.isEmpty(list)) { return; } for (ScheduleJob job : list) { if (handleStatus(scheduler, job)) continue; CronTrigger cronTrigger = ScheduleUtils.getCronTrigger(scheduler, job.getJobName(), job.getJobGroup()); //不存在,创建一个 if (cronTrigger == null) { ScheduleUtils.createScheduleJob(scheduler, job); } job.setModifyById("system"); job.setModifyDate(new Date()); scheduleJobMapper.updateByPrimaryKey(job); } } /** * 处理状态,由于某些原因,以下代码不能全部公开。但是很简单逻辑就是,根据读取到的任务状态 调用工具类中的各种操作,达到能动态控制任务的一个功能 * * @param scheduler * @param job * @return 返回true。不继续执行循环下面的代码 */ private boolean handleStatus(Scheduler scheduler, ScheduleJob job) { String status = job.getStatus(); String runStatus = job.getRunStatus(); CronTrigger cronTrigger = ScheduleUtils.getCronTrigger(scheduler, job.getJobName(), job.getJobGroup()); //判断运行状态 //如果是已经运行完成:则移动到历史表 if (ScheduleUtils.RUN_STATUS_5.equals(runStatus)) { scheduleJobMapper.deleteByPrimaryKey(job.getScheduleJobId()); ScheduleJobHistory sjh = new ScheduleJobHistory(); BeanUtils.copyProperties(sjh, job); scheduleJobHistoryMapper.insertSelective(sjh); } //如果是暂停 if (ScheduleUtils.STATUS_2.equals(status)) { //如果是暂停状态 if (cronTrigger == null) { //状态异常 log.error("暂停状态异常:任务调度中没有找到对应的job!"); return true; } ScheduleUtils.pauseJob(scheduler, job.getJobName(), job.getJobGroup()); job.setRunStatus(ScheduleUtils.RUN_STATUS_3); } return false; } } }
另外一种启动方式
public class ScheduleJobInit { @Autowired InItJob inItJob; /** * 项目启动时初始化 */ @PostConstruct //该注解是在项目启动完成之后触发该任务。同样也需要在xml中配置该类的bean public void init() { inItJob.init(); } }
spring.xml中配置bean
<!-- 调度任务入口类 --> <bean id="inItJob" class="org.zq.core.common.util.schedule.util.InItJob"/> <bean id="schedulerFactoryBean" class="org.springframework.scheduling.quartz.SchedulerFactoryBean" />
相关文章推荐
- JAVA异常机制简述
- 学习使用第三方控件IECapt截取网页图片
- Java凝视Override、Deprecated、SuppressWarnings详细解释
- Eclipse更改项目包名
- Spring3 MVC详解一
- springmvc初步搭建+freemarker集成
- java多线程(二)线程的实现及生命周期
- java如何获得JVM可能的总内存,最大内存,以及空闲内存?
- HDU-1042-N!(Java大法好 && HDU大数水题)
- 关于Spring中的<context:annotation-config/>配置
- Java基础---java线程unchecked异常的处理UncaughtExceptionHandler
- java基础之标签、按钮和按钮事件简介
- 如何给SAP打补丁(ABAP&amp;JAVA)
- Java里的static import使用小结
- java 调用webservice的各种方法总结
- java图形界面之布局设计
- 请用Java设计一个Least Recently Used (LRU) 缓存
- redhat 安装 jdk1.7 问题
- struts2中的拦截器的配置
- spring mvc + freemarker优雅的实现邮件定时发送