通过前文我们基本梳理了定时任务体系:Timer和ScheduledExecutorService是JDK内置的定时任务方案,以及Netty内部基于时间轮实现的HashedWheelTimer,再到Quartz以及分布式任务(ElasticJob,xxl-job等等)。对于Springboot简单应用,还可以采用Spring自带task方式,本文主要介绍Spring自带的Task的案例和其实现方式。
Spring Task封装的比较好,使用非常简单。
@EnableScheduling @Configuration public class ScheduleDemo { /** * 每隔1分钟执行一次。 */ @Scheduled(fixedRate = 1000 * 60 * 1) public void runScheduleFixedRate() { log.info("runScheduleFixedRate: current DateTime, {}", LocalDateTime.now()); } /** * 每个整点小时执行一次。 */ @Scheduled(cron = "0 0 */1 * * ?") public void runScheduleCron() { log.info("runScheduleCron: current DateTime, {}", LocalDateTime.now()); } }
cron fixedDelay fixedDelayString fixedRate fixedRateString initialDelay initialDelayString zone
我们再通过一些问题来帮助你更深入理解Spring Task实现方式。@pdai
建议自行处理异常
在实际的开发中,这个问题经常会出现,比如执行一段时间后定时任务不再执行了; 这种情况会发生在,比如你调用一个第三方接口,没有设置调用超时,继而引发异常,这时候当前任务便阻塞了。
~~java思维导图获取路径~~
Spring Task的源码在这里:
@EnableScheduling注解
添加@EnableScheduling注解会自动注入SchedulingConfiguration
* @author Chris Beams * @author Juergen Hoeller * @since 3.1 * @see Scheduled * @see SchedulingConfiguration * @see SchedulingConfigurer * @see ScheduledTaskRegistrar * @see Trigger * @see ScheduledAnnotationBeanPostProcessor */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Import(SchedulingConfiguration.class) @Documented public @interface EnableScheduling { }
SchedulingConfiguration中初始化ScheduledAnnotationBeanPostProcessor
SchedulingConfiguration配置中自动初始化ScheduledAnnotationBeanPostProcessor
@Configuration(proxyBeanMethods = false) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class SchedulingConfiguration { @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() { return new ScheduledAnnotationBeanPostProcessor(); } }
Spring 容器中 Bean 的生命周期流程
ScheduledTaskRegistrar注册task
在ScheduledAnnotationBeanPostProcessor构造函数中初始化了ScheduledTaskRegistrar
/** * Create a default {@code ScheduledAnnotationBeanPostProcessor}. */ public ScheduledAnnotationBeanPostProcessor() { this.registrar = new ScheduledTaskRegistrar(); }
ScheduledTaskRegistrar最主要的是注册各种类型的task (这种方式在新的版本中已经废弃了)
protected void scheduleTasks() { if (this.taskScheduler == null) { this.localExecutor = Executors.newSingleThreadScheduledExecutor(); this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor); } if (this.triggerTasks != null) { for (TriggerTask task : this.triggerTasks) { addScheduledTask(scheduleTriggerTask(task)); } } if (this.cronTasks != null) { for (CronTask task : this.cronTasks) { addScheduledTask(scheduleCronTask(task)); } } if (this.fixedRateTasks != null) { for (IntervalTask task : this.fixedRateTasks) { addScheduledTask(scheduleFixedRateTask(task)); } } if (this.fixedDelayTasks != null) { for (IntervalTask task : this.fixedDelayTasks) { addScheduledTask(scheduleFixedDelayTask(task)); } } }
注册哪些Task,怎么设计类的呢?
ScheduledAnnotationBeanPostProcessor加载Scheduled注解
在BeanPostProcessor的postProcessAfterInitialization阶段加载Scheduled注解
@Override public Object postProcessAfterInitialization(Object bean, String beanName) { if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler || bean instanceof ScheduledExecutorService) { // Ignore AOP infrastructure such as scoped proxies. return bean; } Class> targetClass = AopProxyUtils.ultimateTargetClass(bean); if (!this.nonAnnotatedClasses.contains(targetClass) && AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) { Map> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (MethodIntrospector.MetadataLookup >) method -> { Set scheduledAnnotations = AnnotatedElementUtils.getMergedRepeatableAnnotations( method, Scheduled.class, Schedules.class); return (!scheduledAnnotations.isEmpty() ? scheduledAnnotations : null); }); if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(targetClass); if (logger.isTraceEnabled()) { logger.trace("No @Scheduled annotations found on bean class: " + targetClass); } } else { // Non-empty set of methods annotatedMethods.forEach((method, scheduledAnnotations) -> scheduledAnnotations.forEach(scheduled -> processScheduled(scheduled, method, bean))); if (logger.isTraceEnabled()) { logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName + "': " + annotatedMethods); } } } return bean; }
Scheduled注解是添加到方法级别,具体如下
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Repeatable(Schedules.class) public @interface Scheduled { /** * A special cron expression value that indicates a disabled trigger: {@value}. *This is primarily meant for use with
${...}
placeholders, * allowing for external disabling of corresponding scheduled methods. * @since 5.1 * @see ScheduledTaskRegistrar#CRON_DISABLED */ String CRON_DISABLED = ScheduledTaskRegistrar.CRON_DISABLED; /** * A cron-like expression, extending the usual UN*X definition to include triggers * on the second, minute, hour, day of month, month, and day of week. *For example, {@code "0 * * * * MON-FRI"} means once per minute on weekdays * (at the top of the minute - the 0th second). *
The fields read from left to right are interpreted as follows. *
The special value {@link #CRON_DISABLED "-"} indicates a disabled cron
* trigger, primarily meant for externally specified values resolved by a
* ${...}
placeholder.
* @return an expression that can be parsed to a cron schedule
* @see org.springframework.scheduling.support.CronExpression#parse(String)
*/
String cron() default "";
/**
* A time zone for which the cron expression will be resolved. By default, this
* attribute is the empty String (i.e. the server's local time zone will be used).
* @return a zone id accepted by {@link java.util.TimeZone#getTimeZone(String)},
* or an empty String to indicate the server's default time zone
* @since 4.0
* @see org.springframework.scheduling.support.CronTrigger#CronTrigger(String, java.util.TimeZone)
* @see java.util.TimeZone
*/
String zone() default "";
/**
* Execute the annotated method with a fixed period between the end of the
* last invocation and the start of the next.
*
The time unit is milliseconds by default but can be overridden via * {@link #timeUnit}. * @return the delay */ long fixedDelay() default -1; /** * Execute the annotated method with a fixed period between the end of the * last invocation and the start of the next. *
The time unit is milliseconds by default but can be overridden via * {@link #timeUnit}. * @return the delay as a String value — for example, a placeholder * or a {@link java.time.Duration#parse java.time.Duration} compliant value * @since 3.2.2 */ String fixedDelayString() default ""; /** * Execute the annotated method with a fixed period between invocations. *
The time unit is milliseconds by default but can be overridden via * {@link #timeUnit}. * @return the period */ long fixedRate() default -1; /** * Execute the annotated method with a fixed period between invocations. *
The time unit is milliseconds by default but can be overridden via * {@link #timeUnit}. * @return the period as a String value — for example, a placeholder * or a {@link java.time.Duration#parse java.time.Duration} compliant value * @since 3.2.2 */ String fixedRateString() default ""; /** * Number of units of time to delay before the first execution of a * {@link #fixedRate} or {@link #fixedDelay} task. *
The time unit is milliseconds by default but can be overridden via * {@link #timeUnit}. * @return the initial * @since 3.2 */ long initialDelay() default -1; /** * Number of units of time to delay before the first execution of a * {@link #fixedRate} or {@link #fixedDelay} task. *
The time unit is milliseconds by default but can be overridden via * {@link #timeUnit}. * @return the initial delay as a String value — for example, a placeholder * or a {@link java.time.Duration#parse java.time.Duration} compliant value * @since 3.2.2 */ String initialDelayString() default ""; /** * The {@link TimeUnit} to use for {@link #fixedDelay}, {@link #fixedDelayString}, * {@link #fixedRate}, {@link #fixedRateString}, {@link #initialDelay}, and * {@link #initialDelayString}. *
Defaults to {@link TimeUnit#MICROSECONDS}. *
This attribute is ignored for {@linkplain #cron() cron expressions} * and for {@link java.time.Duration} values supplied via {@link #fixedDelayString}, * {@link #fixedRateString}, or {@link #initialDelayString}. * @return the {@code TimeUnit} to use * @since 5.3.10 */ TimeUnit timeUnit() default TimeUnit.MILLISECONDS; }
@Scheduled所支持的参数:
cron fixedDelay fixedDelayString fixedRate fixedRateString initialDelay initialDelayString zone
获取到方法上Scheduled注解(对任务的定义),通过processScheduled处理具体类型的task
/** * Process the given {@code @Scheduled} method declaration on the given bean. * @param scheduled the {@code @Scheduled} annotation * @param method the method that the annotation has been declared on * @param bean the target bean instance * @see #createRunnable(Object, Method) */ protected void processScheduled(Scheduled scheduled, Method method, Object bean) { try { Runnable runnable = createRunnable(bean, method); boolean processedSchedule = false; String errorMessage = "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required"; Settasks = new LinkedHashSet<>(4); // Determine initial delay long initialDelay = convertToMillis(scheduled.initialDelay(), scheduled.timeUnit()); String initialDelayString = scheduled.initialDelayString(); if (StringUtils.hasText(initialDelayString)) { Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both"); if (this.embeddedValueResolver != null) { initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString); } if (StringUtils.hasLength(initialDelayString)) { try { initialDelay = convertToMillis(initialDelayString, scheduled.timeUnit()); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long"); } } } // Check cron expression String cron = scheduled.cron(); if (StringUtils.hasText(cron)) { String zone = scheduled.zone(); if (this.embeddedValueResolver != null) { cron = this.embeddedValueResolver.resolveStringValue(cron); zone = this.embeddedValueResolver.resolveStringValue(zone); } if (StringUtils.hasLength(cron)) { Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers"); processedSchedule = true; if (!Scheduled.CRON_DISABLED.equals(cron)) { TimeZone timeZone; if (StringUtils.hasText(zone)) { timeZone = StringUtils.parseTimeZoneString(zone); } else { timeZone = TimeZone.getDefault(); } tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone)))); } } } // At this point we don't need to differentiate between initial delay set or not anymore if (initialDelay < 0) { initialDelay = 0; } // Check fixed delay long fixedDelay = convertToMillis(scheduled.fixedDelay(), scheduled.timeUnit()); if (fixedDelay >= 0) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))); } String fixedDelayString = scheduled.fixedDelayString(); if (StringUtils.hasText(fixedDelayString)) { if (this.embeddedValueResolver != null) { fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString); } if (StringUtils.hasLength(fixedDelayString)) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; try { fixedDelay = convertToMillis(fixedDelayString, scheduled.timeUnit()); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long"); } tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))); } } // Check fixed rate long fixedRate = convertToMillis(scheduled.fixedRate(), scheduled.timeUnit()); if (fixedRate >= 0) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); } String fixedRateString = scheduled.fixedRateString(); if (StringUtils.hasText(fixedRateString)) { if (this.embeddedValueResolver != null) { fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString); } if (StringUtils.hasLength(fixedRateString)) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; try { fixedRate = convertToMillis(fixedRateString, scheduled.timeUnit()); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long"); } tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); } } // Check whether we had any attribute set Assert.isTrue(processedSchedule, errorMessage); // Finally register the scheduled tasks synchronized (this.scheduledTasks) { Set regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4)); regTasks.addAll(tasks); } } catch (IllegalArgumentException ex) { throw new IllegalStateException( "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage()); } }
ScheduledTaskRegistrar 中解析task
以CronTask为例,如果定义了taskScheduler则由taskScheduler执行,如果没有放到unresolvedTasks中。
/** * Schedule the specified cron task, either right away if possible * or on initialization of the scheduler. * @return a handle to the scheduled task, allowing to cancel it * (or {@code null} if processing a previously registered task) * @since 4.3 */ @Nullable public ScheduledTask scheduleCronTask(CronTask task) { ScheduledTask scheduledTask = this.unresolvedTasks.remove(task); boolean newTask = false; if (scheduledTask == null) { scheduledTask = new ScheduledTask(task); newTask = true; } if (this.taskScheduler != null) { scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger()); } else { addCronTask(task); this.unresolvedTasks.put(task, scheduledTask); } return (newTask ? scheduledTask : null); }
TaskScheduler对Task处理
默认是ConcurrentTaskScheduler, 处理方法如下
@Override @Nullable public ScheduledFuture> schedule(Runnable task, Trigger trigger) { try { if (this.enterpriseConcurrentScheduler) { return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger); } else { ErrorHandler errorHandler = (this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true)); return new ReschedulingRunnable(task, trigger, this.clock, this.scheduledExecutor, errorHandler).schedule(); } } catch (RejectedExecutionException ex) { throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex); } }
EnterpriseConcurrentTriggerScheduler 是 JSR-236 Trigger标准,它的处理方法如下
/** * Delegate that adapts a Spring Trigger to a JSR-236 Trigger. * Separated into an inner class in order to avoid a hard dependency on the JSR-236 API. */ private class EnterpriseConcurrentTriggerScheduler { public ScheduledFuture> schedule(Runnable task, final Trigger trigger) { ManagedScheduledExecutorService executor = (ManagedScheduledExecutorService) scheduledExecutor; return executor.schedule(task, new javax.enterprise.concurrent.Trigger() { @Override @Nullable public Date getNextRunTime(@Nullable LastExecution le, Date taskScheduledTime) { return (trigger.nextExecutionTime(le != null ? new SimpleTriggerContext(le.getScheduledStart(), le.getRunStart(), le.getRunEnd()) : new SimpleTriggerContext())); } @Override public boolean skipRun(LastExecution lastExecution, Date scheduledRunTime) { return false; } }); } }
~~更多java资料点击处获取~~
如果没有使用EnterpriseConcurrentTriggerScheduler, 则使用ReschedulingRunnable,本质上由ScheduledExecutorService处理
public ReschedulingRunnable(Runnable delegate, Trigger trigger, Clock clock, ScheduledExecutorService executor, ErrorHandler errorHandler) { super(delegate, errorHandler); this.trigger = trigger; this.triggerContext = new SimpleTriggerContext(clock); this.executor = executor; } @Nullable public ScheduledFuture> schedule() { synchronized (this.triggerContextMonitor) { this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext); if (this.scheduledExecutionTime == null) { return null; } long initialDelay = this.scheduledExecutionTime.getTime() - this.triggerContext.getClock().millis(); this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS); return this; } }