您的位置:首页 > 编程语言 > Java开发

[springboot] 基于Spring Task实现定时任务

2020-03-02 04:26 746 查看

创建定时任务
在Spring Boot的主类或配置类中加入@EnableScheduling注解,启用定时任务的配置

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class Chapter1111Application {

public static void main(String[] args) {
SpringApplication.run(Chapter1111Application.class, args);
}

}
创建定时任务实现类

@Component
public class PrintTask {

private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

@Scheduled(fixedRate = 3000)
public void reportCurrentTime() {
System.out.println("NOW:" + sdf.format(new Date()));
}

}
启动项目就可以看到控制台定时打印的内容

@Scheduled注解

@Scheduled(fixedRate = 5000) :上一次执行开始时间点之后5秒再执行
@Scheduled(fixedDelay = 5000) :上一次执行完毕时间点之后5秒再执行
@Scheduled(initialDelay=1000, fixedRate=5000) :第一次延迟1秒后执行,之后按fixedRate的规则每5秒执行一次。initialDelay表示第一次调用前的延时,单位毫秒,必需配合cron、fixedDelay或者fixedRate来使用。
@Scheduled(cron="*/5 * * * * *") :通过cron表达式定义规则,与fixedDelay类似,上次执行完毕后才进行下次调度。
 多定时任务的串并行分析
定义两个定时任务task01和task02,其中任务task01以固定时间间隔每1秒执行一次,执行中延迟2秒;另外一个任务task02以固定时间间隔每2秒执行一次。

@Scheduled(fixedDelay = 1000)
public void task01() throws InterruptedException {
System.out.println(Thread.currentThread().getName() + " | task01 " + new Date().toLocaleString());
Thread.sleep(2000);
}

@Scheduled(fixedDelay = 2000)
public void task02() {
System.out.println(Thread.currentThread().getName() + " | task02 " + new Date().toLocaleString());
}

分析:

若两个任务串行,则task01每3秒打印一次,而task02会受到task01影响每3秒打印一次
若两个任务并行,则task01每3秒打印一次,而task02则不受影响每2秒打印一次
启动工程观看执行打印结果:

上图的结果,印证了默认的情况下,多个定时任务是串行执行的;如果一个任务出现阻塞,其他的任务都会受到影响。而且从图中打印的线程名称也可以看出2个任务都是由同一个线程"pool-1-thread-1"调度的。

定时任务串行执行的优先级
定义两个具有相同cron表达式的定时任务task01和task02,都是以固定时间间隔每1秒打印一次,看他们的输出是否会乱掉,如果每次都是任务1打印完再打印任务2,那就是固定优先级的,否则每次调度顺序是随机的。

@Scheduled(cron = "0/1 * * * * ?")
public void task01() throws InterruptedException {
System.out.println(Thread.currentThread().getName() + " | task01 " + new Date().toLocaleString());
}

@Scheduled(cron = "0/1 * * * * ?")
public void task02() {
System.out.println(Thread.currentThread().getName() + " | task02 " + new Date().toLocaleString());
}

从图中打印结果来看,同一个线程中调度多个定时任务的顺序是不固定的,并没有表现出明显的优先级关系。

多定时任务的并行调度
Springboot定时任务默认是由是单个线程串行调度所有任务,当定时任务很多的时候,为了提高任务执行效率,避免任务之间互相影响,可以采用并行方式执行定时任务。定义一个ScheduleConfig配置类并实现SchedulingConfigurer接口,重写configureTasks方法,配置任务调度线程池。

@Configuration
@EnableScheduling
public class ScheduleConfig implements SchedulingConfigurer {

@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setTaskScheduler(taskScheduler());
}

@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(10);// 配置线程池大小,根据任务数量定制
taskScheduler.setThreadNamePrefix("spring-task-scheduler-thread-");// 线程名称前缀
taskScheduler.setAwaitTerminationSeconds(60);// 线程池关闭前最大等待时间,确保最后一定关闭
taskScheduler.setWaitForTasksToCompleteOnShutdown(true);// 线程池关闭时等待所有任务完成
taskScheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());// 任务丢弃策略
return taskScheduler;
}

}
另外,可以使用JDK提供的定时任务调度线程池ScheduledThreadPoolExecutor,在此不作赘述。

@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setScheduler(Executors.newScheduledThreadPool(10));
}

执行测试代码,观看调度线程以及多任务之间是否互相影响:

@Scheduled(cron = "0/1 * * * * ?")
public void task01() throws InterruptedException {
System.out.println(Thread.currentThread().getName() + " | task01 " + new Date().toLocaleString());
Thread.sleep(2000);
}

@Scheduled(cron = "0/2 * * * * ?")
public void task02() {
System.out.println(Thread.currentThread().getName() + " | task02 " + new Date().toLocaleString());
}

从图中打印结果可知,当存在多个调度线程时定时任务之间不会互相影响,即task01的执行延迟并没有影响task02的定期调度,而且通过打印的线程可推断出每个定时任务每次调度的线程也是不固定的。

异步并行定时任务
上述并行任务的实现是通过增加任务调度线程池的线程来实现的,也可以结合spring的异步调用来实现,这时任务调度线程池的数量使用默认的单个也可,调用关系为:任务调度线程->任务执行线程->执行任务。首先使用spring提供的异步调用默认实现org.springframework.core.task.SimpleAsyncTaskExecutor,在配置类上添加注解@EnableAsync然后在执行任务的方法上添加注解@Async即可。

@Async
@Scheduled(cron = "0/1 * * * * ?")
public void task03() {
System.out.println(Thread.currentThread().getName() + " | task03 " + new Date().toLocaleString());
}

从上面的输出,可以简单的推理,每次调度上面的任务都是新开了一个线程来做的,所以如果在定时任务中写了死循环,可能会导致无限线程直至整个进程崩掉。

@Scheduled(cron = "0/2 * * * * ?")
public void task02() {
System.out.println(Thread.currentThread().getName() + " | task02 " + new Date().toLocaleString());
}

@Async
@Scheduled(cron = "0/3 * * * * ?")
public void task03() throws InterruptedException {
System.out.println(Thread.currentThread().getName() + " | task03 " + new Date().toLocaleString());
}

运行上述测试代码,发现使用@Async注解的定时任务调度线程是SimpleAsyncTaskExecutor提供的,而没有使用@Async注解的定时任务使用的是自定义的任务调度线程池的线程,控制台打印结果如下:

2018-12-13 11:17:18.653 INFO 9616 — [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ‘’
2018-12-13 11:17:18.656 INFO 9616 — [ main] c.bo.springboot.Chapter1121Application : Started Chapter1121Application in 1.433 seconds (JVM running for 1.865)
spring-task-scheduler-thread-1 | task02 2018-12-13 11:17:20
2018-12-13 11:17:21.009 INFO 9616 — [eduler-thread-2] .s.a.AnnotationAsyncExecutionInterceptor : More than one TaskExecutor bean found within the context, and none is named ‘taskExecutor’. Mark one of them as primary or name it ‘taskExecutor’ (possibly as an alias) in order to use it for async processing: [taskScheduler, applicationTaskExecutor]
SimpleAsyncTaskExecutor-1 | task03 2018-12-13 11:17:21
spring-task-scheduler-thread-1 | task02 2018-12-13 11:17:22
spring-task-scheduler-thread-2 | task02 2018-12-13 11:17:24
SimpleAsyncTaskExecutor-2 | task03 2018-12-13 11:17:24
spring-task-scheduler-thread-3 | task02 2018-12-13 11:17:26
SimpleAsyncTaskExecutor-3 | task03 2018-12-13 11:17:27
spring-task-scheduler-thread-4 | task02 2018-12-13 11:17:28
spring-task-scheduler-thread-5 | task02 2018-12-13 11:17:30
SimpleAsyncTaskExecutor-4 | task03 2018-12-13 11:17:30
spring-task-scheduler-thread-7 | task02 2018-12-13 11:17:32
SimpleAsyncTaskExecutor-5 | task03 2018-12-13 11:17:33
发现控制台中有一处提示:

More than one TaskExecutor bean found within the context, and none is named ‘taskExecutor’. Mark one of them as primary or name it ‘taskExecutor’ (possibly as an alias) in order to use it for async processing: [taskScheduler, applicationTaskExecutor]

上面提到的配置中使用了自定义任务调度线程池以及spring默认的任务执行器SimpleAsyncTaskExecutor,所以提示有两个taskExecutor,但是并没有指定在异步注解@Async修饰的方法中使用哪一个作为主调度器,于是出现了上述结果(测试环境为springboot v2.1.0.RELEASE)。经测试在v2.0.3.RELEASE版本中,标注了@Async的定时任务其调度线程是自定义任务调度线程而不是spring默认的异步调度器SimpleAsyncTaskExecutor。无论使用哪个springboot版本我们可以使用@Primary注解或指定@Bean的"name=taskExecutor"来指定异步调用使用的调度器,或者使用自定义异步调用线程池来代替spring默认的SimpleAsyncTaskExecutor。

自定义异步调用线程池

用自定义的线程池来取代默认线程管理方式,无疑是一个更加安全和灵活的方式,可以避免大量的线程阻塞带来的系统崩溃。

配置类依旧使用上面的ScheduleConfig,需要再实现接口:AsyncConfigurer,重写getAsyncExecutor()配置线程池。

@Configuration
@EnableAsync
@EnableScheduling
public class ScheduleConfig implements SchedulingConfigurer , AsyncConfigurer {

Logger logger = LoggerFactory.getLogger(this.getClass());

@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setTaskScheduler(taskScheduler());
}

@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(10);// 配置线程池大小,根据任务数量定制
taskScheduler.setThreadNamePrefix("spring-task-scheduler-thread-");// 线程名称前缀
taskScheduler.setAwaitTerminationSeconds(60);// 线程池关闭前最大等待时间,确保最后一定关闭
taskScheduler.setWaitForTasksToCompleteOnShutdown(true);// 线程池关闭时等待所有任务完成
taskScheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());// 任务丢弃策略
return taskScheduler;
}
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);// 配置核心线程数
executor.setMaxPoolSize(50);// 配置最大线程数
executor.setQueueCapacity(100);// 配置缓存队列大小
executor.setKeepAliveSeconds(15);// 空闲线程存活时间
executor.setThreadNamePrefix("spring-task-executor-thread-");
// 线程池对拒绝任务的处理策略:这里采用了CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在execute方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// AbortPolicy()
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
// 设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是被没有完成的任务阻塞
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}

/**
* 处理异步方法的异常
*/
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SpringAsyncExceptionHandler();
}

class SpringAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable arg0, Method arg1, Object... arg2) {
logger.error("Exception occurs in async method", arg0);
}
}

}
执行测试代码:

@Scheduled(cron = "0/2 * * * * ?")
public void task02() {
System.out.println(Thread.currentThread().getName() + " | task02 " + new Date().toLocaleString());
}

@Async
@Scheduled(cron = "0/3 * * * * ?")
public void task03() throws InterruptedException {
System.out.println(Thread.currentThread().getName() + " | task03 " + new Date().toLocaleString());
}

控制台打印:

结论:标注@Async的task03由自定义异步调用线程调度,没有标注@Async的task02由自定义任务调度线程调度。

在上面的配置类中,自定义了异步调用线程池,当执行的异步调用任务的阻塞数量超过线程池缓冲能力,即会根据拒绝策略丢弃任务,避免系统崩溃,这里配置了CallerRunsPolicy()策略 -> 由调用线程处理丢弃任务,默认策略是AbortPolicy() -> 丢弃任务并抛出异常,简单说一下用自定义线程池的好处:

合理的分配线程池参数
可选择任务上限时的拒绝策略(可以按照自己的想法来处理"负载"的任务)
线程池命名,对于以后问题排查,会有很大的帮助
spring定时任务小结

默认所有的定时任务都是串行调度的,一个线程,且即便cron完全相同的两个任务先后顺序也没法保证(具体原因需要源码分析,看下这块是怎么支持)
使用@Async注解可以使定时任务异步调度;但是需要开启配置,在启动类上添加 @EnableAsync 注解
开启并发执行时,推荐用自定义的线程池来替代默认的,理由见上面
小提示

springboot中自定义任务调度线程池也可以不实现SchedulingConfigurer或AsyncConfigurer接口,直接在配置类中提供@Bean注解的TaskScheduler或TaskExecutor即可。

动态创建定时任务
使用注解的方式,无法实现动态的修改、添加或关闭定时任务,这个时候就需要使用编程的方式进行任务的更新操作了。可直接使用ThreadPoolTaskScheduler或者SchedulingConfigurer接口进行自定义定时任务创建。

1.SchedulingConfigurer

实现SchedulingConfigurer接口,重写configureTasks方法时添加定时任务:

@Configuration
public class ScheduleConfig implements SchedulingConfigurer {

@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setTaskScheduler(taskScheduler());

taskRegistrar.getScheduler().schedule(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "|SchedulingConfigurer cron task01");
}
}, new CronTrigger("0/3 * * * * ?"));

taskRegistrar.addCronTask(new CronTask(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "|SchedulingConfigurer cron task02");
}
}, new CronTrigger("0/2 * * * * ?")));
}

@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(10);
taskScheduler.setThreadNamePrefix("spring-task-scheduler-thread-");
taskScheduler.setAwaitTerminationSeconds(60);
taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
taskScheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
return taskScheduler;
}

}
2.ThreadPoolTaskScheduler

ThreadPoolTaskScheduler是Spring Task的核心实现类,该类提供了大量的重载方法进行任务调度。首先配置一个自定义任务调度线程池ThreadPoolTaskScheduler,可参考上面ScheduleConfig中的配置,这里不再赘述。

编写一个控制层实现定时任务的动态创建、关闭以及修改:

@Slf4j
@RestController
@RequestMapping("/task")
public class DynamicTaskController {

@Resource
private ThreadPoolTaskScheduler taskScheduler;

private ScheduledFuture<?> scheduledFuture;

@Value("${timing-task.cron1}")
private String cronStr1;

@Value("${timing-task.cron2}")
private String cronStr2;

@RequestMapping("/start")
public String startTask() {
scheduledFuture = taskScheduler.schedule(new RunTask01(), new Trigger() {
@Override
public Date nextExecutionTime(TriggerContext triggerContext) {
return new CronTrigger(cronStr1).nextExecutionTime(triggerContext);
}
});
log.info("start timed task success ..");
return "start task suceess";
}

@RequestMapping("/stop")
public String stopTask() {
Boolean result = null;
if (scheduledFuture != null) {
result = scheduledFuture.cancel(true);
}
log.info("stop timed task result: " + result);
return "stop task result: " + result;
}

@RequestMapping("/modify")
public String modifyTask() {
Boolean stopResult = null;
// 停止定时任务
if (scheduledFuture != null) {
stopResult = scheduledFuture.cancel(true);
} else {
log.info("modify task error -> scheduledFuture is null");
return "error";
}
// 更换cron重新开启定时任务
if (stopResult) {
scheduledFuture = taskScheduler.schedule(new RunTask01(), new Trigger() {
@Override
public Date nextExecutionTime(TriggerContext triggerContext) {
return new CronTrigger(cronStr2).nextExecutionTime(triggerContext);
}
});
log.info("modify task success ..");
return "success";
}
log.info("modify task failed ..");
return "failed";
}

class RunTask01 implements Runnable {

@Override
public void run() {
log.info(Thread.currentThread().getName() + "|schedule task01" + "|" + new Date().toLocaleString());
}
}

class RunTask02 implements Runnable {
@Override
public void run() {
log.info(Thread.currentThread().getName() + "|schedule task02" + "|" + new Date().toLocaleString());
}
}

}
配置文件中定义cron表达式:

timing-task.cron1=0/1 * * * * ?
timing-task.cron2=0/5 * * * * ?
 启动工程,首先访问 http://127.0.0.1:8090/task/start 开启定时任务:

控制台每1秒打印一次定时任务信息,说明定时任务已经启动成功,再访问 http://127.0.0.1:8090/task/modify 更改定时任务:

定时任务已由每1秒打印一次信息更改为每5秒打印一次,说明定时任务更改成功!

关闭定时任务:http://127.0.0.1:8090/task/stop,如下图,发现控制台已经不再输出打印信息,说明定时任务成功关闭。

通过这种编程式的定时任务优点是灵活可拓展,缺点是不能与异步调用相结合,例如@Async注解。

ThreadPoolTaskScheduler 除了提供了schedule()还有其他方法设置定时任务,简单示例如下:

@RequestMapping("/test")
public String test(){
Date startTime = new Date();// 起始时间点
long period = 1000, delay = 2000;// 时间间隔,毫秒
// 从指定时间点开始以固定频率执行定时任务,类似于@Scheduled(fixedRate = 1000, initialDelay=0)
taskScheduler.scheduleAtFixedRate(new RunTask01(), startTime, period);
// 从指定时间点开始以固定时间间隔执行定时任务,类似于@Scheduled(fixedDelay = 2000, initialDelay=0)
taskScheduler.scheduleWithFixedDelay(new RunTask02(), startTime, delay);
return "done";
}

————————————————
版权声明:本文为CSDN博主「博さん」的原创文章,遵循CC 4.0 by-sa版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/w47_csdn/article/details/84971870

  • 点赞
  • 收藏
  • 分享
  • 文章举报
qq_25401033 发布了0 篇原创文章 · 获赞 0 · 访问量 32 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: