您的位置:首页 > 编程语言 > Java开发

Quartz(二)整合Spring容器中bean及动态调度任务

2015-08-20 15:44 627 查看



Quartz 是开源任务调度框架中的翘首,它提供了强大任务调度机制,同时保持了使用的简单性。

Quartz 允许开发人员灵活地定义触发器的调度时间表,并可以对触发器和任务进行关联映射。

此外,Quartz提供了调度运行环境的持久化机制,可以保存并恢复调度现场,即使系统因故障关闭,任务调度现场数据并不会丢失。

此外,Quartz还提供了组件式的侦听器、各种插件、线程池等功能。

Spring中使用Quartz的3种方法(MethodInvokingJobDetailFactoryBean,implements Job,extends QuartzJobBean)

以下介绍一下实现job接口的方法,通过此方法可以动态启动,暂定,添加,删除定时功能,可传参数。

所有数据全部持久化到数据表中,不再需要XML配置文件存储数据。quartz已经封装了持久化方法。数据表用的MYSQL见附件

Java代码


package com.berheley.bi.basic.timer;

import java.util.Map;

import org.quartz.JobExecutionContext;

import com.berheley.bi.basic.exp.BusinessException;

/**

* 类功能描述:定时任务需要实现此接口

*

* @author <a href="mailto:qingyu.meng21@gmail.com">mengqingyu </a>

* Create: 2010-1-3 上午09:18:51

* Description:

*/

public interface ITimerJob

{

/**

* 定时器执行的任务

* @param context

* @throws Exception

*/

void jtaTimerExecute(JobExecutionContext context, Map<?, ?> parameters)throws BusinessException;

}

package com.berheley.bi.basic.timer;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.quartz.Job;

import org.quartz.JobDataMap;

import org.quartz.JobExecutionContext;

import org.quartz.JobExecutionException;

import org.springframework.beans.factory.NoSuchBeanDefinitionException;

import org.webframe.web.util.WebFrameUtils;

import com.berheley.bi.basic.exp.BusinessException;

/**

* 类功能描述:定时执行的任务,发布表单

*

* @author <a href="mailto:qingyu.meng21@gmail.com">mengqingyu </a>

* Create: 2009-12-31 上午12:31:02

* Description:

*/

public class TimerDataJob implements Job {

private ITimerJob timerJob;

protected Log log = LogFactory.getLog(getClass());

@Override

public void execute(JobExecutionContext context) throws JobExecutionException {

JobDataMap parameters = context.getJobDetail().getJobDataMap();

try {

Object beanName = parameters.get("beanName");

if (beanName == null) {

log.error("JobDataMap 中'beanName'属性值为null!");

return;

}

timerJob = (ITimerJob) WebFrameUtils.getBean(beanName.toString());

timerJob.jtaTimerExecute(context, parameters);

} catch (NoSuchBeanDefinitionException e) {

log.error("任务ITimerJob接口的实现类(" + e.getBeanName() + ")不存在!");

} catch (BusinessException e) {

log.error(e.getMessage(), e);

}

}

}

import java.util.Map;

import org.quartz.JobDetail;

import org.quartz.Scheduler;

import org.quartz.Trigger;

import com.berheley.bi.basic.exp.BusinessException;

/**

* 类功能描述:定时调度后台管理

*

* @author <a href="mailto:qingyu.meng21@gmail.com">mengqingyu </a> Create: 2010-1-3 上午09:18:51

* Description:

*/

public interface ITimerManageService {

/**

* 获得scheduler

*/

Scheduler getScheduler();

/**

*

* @function:获得任务

* @param jobName

* @param jobGroup

* @return

* @throws BusinessException

* @author: mengqingyu 2012-10-22 下午03:29:03

*/

JobDetail getJob(String jobName, String jobGroup) throws BusinessException;

/**

*

* @function:添加任务

* @param jobDetail

* @throws BusinessException

* @author: mengqingyu 2012-10-22 下午03:29:15

*/

void addJob(JobDetail jobDetail) throws BusinessException;

/**

*

* @function:删除任务

* @param jobName

* @param jobGroup

* @throws BusinessException

* @author: mengqingyu 2012-10-22 下午03:29:31

*/

void deleteJob(String jobName, String jobGroup) throws BusinessException;

/**

*

* @function:立即运行

* @param jobName

* @param jobGroup

* @author: mengqingyu 2012-5-24 上午11:00:20

*/

void addRunNowTrigger(String jobName, String jobGroup) throws BusinessException ;

/**

*

* @function:重置定时器

* @param jobName

* @param jobGroup

* @param trigger

* @throws BusinessException

* @author: mengqingyu 2012-11-30 下午05:22:48

*/

public void rescheduleJob(String triggerName, String triggerGroup, Trigger trigger) throws BusinessException;

/**

*

* @function:获得定时器

* @param jobName

* @param jobGroup

* @return

* @throws BusinessException

* @author: mengqingyu 2012-11-30 下午05:17:09

*/

Trigger getTrigger(String jobName, String jobGroup) throws BusinessException;

/**

*

* @function:添加定时

* @param trigger

* @throws BusinessException

* @author: mengqingyu 2012-10-22 下午03:29:41

*/

void addTrigger(Trigger trigger) throws BusinessException;

/**

* 删除定时

*

* @param triggerName

* @param group

*/

void deleteTrigger(String triggerName, String group) throws BusinessException ;

/**

* 暂停定时

*

* @param triggerName

* @param group

*/

void updatePauseTrigger(String triggerName, String group) throws BusinessException ;

/**

* 恢复Trigger

*

* @param triggerName

* @param group

*/

void updateResumeTrigger(String triggerName, String group) throws BusinessException ;

/**

*

* @function:保存、修改任务

* @param jobName

* @param jobGroup

* @param description

* @throws BusinessException

* @author: mengqingyu 2012-10-22 下午03:32:35

*/

void saveOrUpdateJob(String jobName, String jobGroup, String description) throws BusinessException;

/**

*

* @function:批量删除任务

* @param jobNames

* @param jobGroup

* @throws BusinessException

* @author: mengqingyu 2012-10-22 下午03:33:11

*/

void deleteJobs(String jobNames, String jobGroup) throws BusinessException;

/**

*

* @function:查询任务参数

* @param jobName

* @param jobGroup

* @return

* @throws BusinessException

* @author: mengqingyu 2012-10-22 下午03:34:20

*/

String findJobParams(String jobName, String jobGroup) throws BusinessException;

/**

*

* @function:保存任务参数

* @param jobName

* @param jobGroup

* @param key

* @param value

* @return

* @throws BusinessException

* @author: mengqingyu 2012-10-22 下午03:38:17

*/

String saveJobParam(String jobName, String jobGroup, String key, String value) throws BusinessException;

/**

*

* @function:删除任务参数

* @param jobName

* @param jobGroup

* @param keys

* @return

* @throws BusinessException

* @author: mengqingyu 2012-10-22 下午03:38:26

*/

String deleteJobParams(String jobName, String jobGroup, String keys) throws BusinessException;

/**

*

* @function:保存cron

* @param params

* @throws BusinessException

* @author: mengqingyu 2012-10-22 下午03:38:41

*/

void saveCron(Map<String, Object> params) throws BusinessException;

/**

*

* @function:保存simple

* @param params

* @throws BusinessException

* @author: mengqingyu 2012-10-22 下午03:40:39

*/

void saveSimple(Map<String, Object> params) throws BusinessException;

/**

*

* @function:批量删除定时

* @param jobName

* @param group

* @param triggerNames

* @throws BusinessException

* @author: mengqingyu 2012-10-22 下午03:41:42

*/

void deleteTriggers(String jobName, String group, String triggerNames) throws BusinessException;

/**

* 根据名称和组别启动和暂停Tigger

*

* @param triggerName

* @param group

*/

String updateStartOrStop(String jobName, String jobGroup) throws BusinessException ;

}

import java.text.ParseException;

import java.util.ArrayList;

import java.util.List;

import java.util.Map;

import java.util.Map.Entry;

import org.apache.commons.lang.StringUtils;

import org.quartz.CronTrigger;

import org.quartz.JobDataMap;

import org.quartz.JobDetail;

import org.quartz.Scheduler;

import org.quartz.SchedulerException;

import org.quartz.SimpleTrigger;

import org.quartz.Trigger;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

import org.webframe.core.util.DateUtils;

import com.berheley.bi.basic.exp.BusinessException;

import com.berheley.bi.basic.util.UUIDGenerator;

/**

* 类功能描述:通用定时功能

*

* @author <a href="mailto:qingyu.meng21@gmail.com">mengqingyu </a>

* @version $Id: codetemplates.xml,v 1.1 2009/03/06 01:13:01 mengqingyu Exp $ Create: 2011-6-1

* 下午05:03:06

*/

@Service

public class TimerManageService implements ITimerManageService {

@Autowired

protected Scheduler scheduler;

public Scheduler getScheduler() {

return scheduler;

}

@Override

public JobDetail getJob(String jobName, String jobGroup) throws BusinessException{

JobDetail jobDetail;

try {

jobDetail = scheduler.getJobDetail(jobName, jobGroup);

} catch (SchedulerException e) {

throw new BusinessException("获取任务失败", e);

}

return jobDetail;

}

@Override

public void addJob(JobDetail jobDetail) throws BusinessException{

try {

scheduler.addJob(jobDetail, true);

} catch (SchedulerException e) {

throw new BusinessException("添加任务失败", e);

}

}

@Override

public void deleteJob(String jobName, String jobGroup) throws BusinessException{

try {

scheduler.deleteJob(jobName, jobGroup);

} catch (SchedulerException e) {

throw new BusinessException("删除任务失败", e);

}

}

@Override

public void addRunNowTrigger(String jobName, String jobGroup) throws BusinessException{

try {

scheduler.triggerJob(jobName, jobGroup);

} catch (SchedulerException e) {

throw new BusinessException("立即运行任务失败", e);

}

}

@Override

public void rescheduleJob(String triggerName, String triggerGroup, Trigger trigger) throws BusinessException{

try {

scheduler.rescheduleJob(triggerName,triggerGroup,trigger);

} catch (SchedulerException e) {

throw new BusinessException("重置定时器失败", e);

}

}

@Override

public Trigger getTrigger(String triggerName, String triggerGroup) throws BusinessException{

Trigger trigger;

try {

trigger = scheduler.getTrigger(triggerName,triggerGroup);

} catch (SchedulerException e) {

throw new BusinessException("获取定时器失败", e);

}

return trigger;

}

@Override

public void addTrigger(Trigger trigger) throws BusinessException {

try {

trigger.setVolatility(false);

scheduler.scheduleJob(trigger);

} catch (SchedulerException e) {

throw new BusinessException("添加定时器失败", e);

}

}

@Override

public void deleteTrigger(String triggerName, String group) throws BusinessException{

this.updatePauseTrigger(triggerName, group);

try {

scheduler.unscheduleJob(triggerName, group);

} catch (SchedulerException e) {

throw new BusinessException("删除定时任务失败", e);

}

}

@Override

public void updatePauseTrigger(String triggerName, String group) throws BusinessException{

try {

scheduler.pauseTrigger(triggerName, group);

} catch (SchedulerException e) {

throw new BusinessException("停止定时任务失败", e);

}

}

@Override

public void updateResumeTrigger(String triggerName, String group) throws BusinessException{

try {

scheduler.resumeTrigger(triggerName, group);

} catch (SchedulerException e) {

throw new BusinessException("恢复定时任务失败", e);

}

}

@Override

public void saveOrUpdateJob(String jobName, String jobGroup, String description) throws BusinessException {

JobDetail jobDetail;

if(StringUtils.isBlank(jobName)){

jobName = UUIDGenerator.getUUID();

try {

jobDetail = new JobDetail(jobName, jobGroup, Class.forName("com.berheley.bi.basic.timer.TimerDataJob"));

} catch (ClassNotFoundException e) {

throw new BusinessException("添加修改任务失败", e);

}

}

else {

jobDetail = this.getJob(jobName, jobGroup);

}

jobDetail.setDescription(description);

this.addJob(jobDetail);

}

@Override

public void deleteJobs(String jobNames, String group) throws BusinessException {

String[] jobs = jobNames.split(",");

String[] groups = group.split(",");

for(int i=0;i<jobs.length;i++){

this.deleteJob(jobs[i], groups[i]);

}

}

public String paramMapToList(JobDataMap params) {

ArrayList<List<String>> listJobData = new ArrayList<List<String>>();

for (Object e: params.entrySet()) {

Map.Entry<?, ?> entry = (Entry<?, ?>) e;

ArrayList<String> list = new ArrayList<String>();

list.add("'" + entry.getKey() + "'");

list.add("'" + entry.getValue() + "'");

listJobData.add(list);

}

return listJobData.toString();

}

@Override

public String findJobParams(String jobName, String jobGroup) throws BusinessException {

JobDetail jobDetail = this.getJob(jobName, jobGroup);

return jobDetail == null?"{}":paramMapToList(jobDetail.getJobDataMap());

}

@Override

public String saveJobParam(String jobName, String jobGroup, String key, String value) throws BusinessException {

JobDetail jobDetail = this.getJob(jobName, jobGroup);

JobDataMap params = jobDetail.getJobDataMap();

params.put(key, value);

this.addJob(jobDetail);

return paramMapToList(params);

}

@Override

public String deleteJobParams(String jobName, String jobGroup, String keys) throws BusinessException {

JobDetail jobDetail = this.getJob(jobName, jobGroup);

JobDataMap params = jobDetail.getJobDataMap();

String[] key = keys.split(",");

for(int i=0;i<key.length;i++){

params.remove(key[i]);

}

this.addJob(jobDetail);

return paramMapToList(params);

}

public void initTrigger(Trigger trigger, Map<String, Object> params) {

trigger.setDescription(params.get("label").toString());

trigger.setJobName(params.get("jobName").toString());

trigger.setJobGroup(params.get("group").toString());

String startTime = params.get("startTime")==null?"":params.get("startTime").toString();

String stopTime = params.get("stopTime")==null?"":params.get("stopTime").toString();

if (!StringUtils.isBlank(startTime)) {

trigger.setStartTime(DateUtils.parseStringToDate(startTime,"yyyy-MM-dd HH:mm:ss"));

}

if (!StringUtils.isBlank(stopTime)) {

trigger.setEndTime(DateUtils.parseStringToDate(stopTime,"yyyy-MM-dd HH:mm:ss"));

}

}

@Override

public void saveCron(Map<String, Object> params) throws BusinessException {

Trigger trigger = null;

try {

trigger = new CronTrigger(UUIDGenerator.getUUID(), params.get("group").toString(), params.get("cronExpression").toString());

} catch (ParseException e) {

throw new BusinessException("添加cron失败", e);

}

this.initTrigger(trigger, params);

this.addTrigger(trigger);

}

@Override

public void saveSimple(Map<String, Object> params) throws BusinessException {

String countStr = params.get("repeatCount").toString();

int repeatCount = countStr.equals("")?SimpleTrigger.REPEAT_INDEFINITELY:Integer.parseInt(countStr);

long repeatInterval = Long.parseLong(params.get("repeatInterval").toString());

Trigger trigger = new SimpleTrigger(UUIDGenerator.getUUID(), params.get("group").toString(), repeatCount, repeatInterval);

this.initTrigger(trigger, params);

this.addTrigger(trigger);

}

@Override

public void deleteTriggers(String jobName, String group, String triggerNames) throws BusinessException {

String[] trigger = triggerNames.split(",");

for(int i=0;i<trigger.length;i++){

this.deleteTrigger(trigger[i], group);

}

}

@Override

public String updateStartOrStop(String triggerName, String group) throws BusinessException {

int flag = -1;

try {

flag = scheduler.getTriggerState(triggerName,group);

switch (flag) {

case 0 :

scheduler.pauseTrigger(triggerName, group);

flag = 1;

break;

case 1 :

scheduler.resumeTrigger(triggerName, group);

flag = 0;

break;

}

} catch (SchedulerException e) {

throw new BusinessException("修改定时状态失败", e);

}

return "{state:'"+flag+"'}";

}

}

这种配置就是对quartz的一种简单的使用了,调度任务会在spring启动的时候加载到内存中,按照bjcronTrigger中定义的crontrigger定义的时间按时触发调度任务。

但是这是quartz使用“内存”方式的一种配置,也比较常见,当然对于不使用spring的项目,也可以单独整合quartz。

方法也比较简单,可以从quartz的doc中找到配置方式,或者看一下《Quartz Job Scheduling Framework 》(附件中可下载)这本书中的例子。

但是对于想持久化调度任务的状态,并且灵活调整调度时间的方式来说,上面的内存方式就不能满足要求了,正如本文开始我遇到的情况,需要采用数据库方式集成quartz,

这部分集成其实在《Quartz Job Scheduling Framework 》中也有较为详细的介绍,当然doc文档中也有,

但是缺乏和spring集成的实例,我在这里把我在项目中在spring配置quartz数据库存储方式的配置也写一下:

Xml代码


<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:mvc="http://www.springframework.org/schema/mvc"

xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">

<bean name="scheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">

<property name="dataSource">

<ref bean="dataSource" />

</property>

<property name="applicationContextSchedulerContextKey" value="BI-Scheduler" />

<property name="configLocation" value="classpath:quartz.properties" />

</bean>

</beans>

属性说明:

dataSource:项目中用到的数据源,里面包含了quartz用到的12张数据库表;

schedulerName:调度器名,我理解主要在调度集群的时候会有用,如果出现多个调度器实例的时候可以用来进行区分,详细看一下《Quartz Job Scheduling Framework 》;

configLocation:用于指明quartz的配置文件的位置,如果不用spring配置quartz的话,本身quartz是通过一个配置文件进行配置的

,默认名称是quartz.properties,里面配置的参数在quartz的doc文档中都有介绍,可以调整quartz,我在项目中也用这个文件部分的配置了一些属性,代码如下:

Xml代码


#============================================================================

# Configure Main Scheduler Properties

#============================================================================

org.quartz.scheduler.instanceName = TestScheduler

org.quartz.scheduler.instanceId = AUTO

#============================================================================

# Configure ThreadPool

#============================================================================

orgorg.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool

org.quartz.threadPool.threadCount = 5

org.quartz.threadPool.threadPriority = 4

#============================================================================

# Configure JobStore

#============================================================================

org.quartz.jobStore.misfireThreshold = 60000

#orgorg.quartz.jobStore.class = org.quartz.simpl.RAMJobStore

orgorg.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX

##orgorg.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.oracle.OracleDelegate

#orgorg.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.PostgreSQLDelegate

#org.quartz.jobStore.dataSource = myDS

org.quartz.jobStore.tablePrefix = QRTZ_

org.quartz.jobStore.isClustered = false

#============================================================================

# Configure Plugins

#============================================================================

orgorg.quartz.plugin.triggHistory.class = org.quartz.plugins.history.LoggingJobHistoryPlugin

#orgorg.quartz.plugin.jobInitializer.class = org.quartz.plugins.xml.JobInitializationPlugin

# init plugin will load jobs.xml as a classpath resource i.e. /jobs.xml if not found on file system

#org.quartz.plugin.jobInitializer.fileName=jobs.xml

#org.quartz.plugin.jobInitializer.overWriteExistingJobs = false

#org.quartz.plugin.jobInitializer.failOnFileNotFound = false

quartz.rar (1.3 KB)
下载次数: 171
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: