学习路线指引(点击解锁) | 知识定位 | 人群定位 |
---|---|---|
🧡 Python实战微信订餐小程序 🧡 | 进阶级 | 本课程是python flask+微信小程序的完美结合,从项目搭建到腾讯云部署上线,打造一个全栈订餐系统。 |
💛Python量化交易实战💛 | 入门级 | 手把手带你打造一个易扩展、更安全、效率更高的量化交易系统 |
注,本文中的demo代码节选于图书《Spring Batch批处理框架》的配套源代码,并做并适配springboot升级版本,完全开源。
SpringBatch的背景和用法,就不再赘述了,默认本文受众都使用过batch框架。
本文仅讨论普通的ChunkStep,分片/异步处理等功能暂不讨论。
Spring系列的框架代码,大多又臭又长,让人头晕。先列出整体流程,再去看源码。顺带也可以了解存储表结构。
框架使用的表
| | batch\_job\_instance |
| | batch\_job\_execution |
| | batch\_job\_execution\_context |
| | batch\_job\_execution\_params |
| | batch\_step\_execution |
| | batch\_step\_execution\_context |
| | batch\_job\_seq |
| | batch\_step\_execution\_seq |
| | batch\_job\_execution\_seq |
先看看怎么调用启动Job的API,看起来非常简单,传入job信息和参数即可
| | @Autowired |
| | @Qualifier("billJob") |
| | private Job job; |
| | |
| | @Test |
| | public void billJob() throws Exception { |
| | JobParameters jobParameters = new JobParametersBuilder() |
| | .addLong("currentTimeMillis", System.currentTimeMillis()) |
| | .addString("batchNo","2022080402") |
| | .toJobParameters(); |
| | JobExecution result = jobLauncher.run(job, jobParameters); |
| | System.out.println(result.toString()); |
| | |
| | Thread.sleep(6000); |
| | } |
| | |
| | |
| | |
| | |
| | |
| | batch:chunk> |
| | batch:tasklet> |
| | batch:step> |
| | batch:job> |
org.springframework.batch.core.launch.support.SimpleJobLauncher#run
| | // 简化部分代码(参数检查、log日志) |
| | @Override |
| | public JobExecution run(final Job job, final JobParameters jobParameters){ |
| | final JobExecution jobExecution; |
| | JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters); |
| | // 上次执行存在,说明本次请求是重启job,先做检查 |
| | if (lastExecution != null) { |
| | if (!job.isRestartable()) { |
| | throw new JobRestartException("JobInstance already exists and is not restartable"); |
| | } |
| | /* 检查stepExecutions的状态 |
| | * validate here if it has stepExecutions that are UNKNOWN, STARTING, STARTED and STOPPING |
| | * retrieve the previous execution and check |
| | */ |
| | for (StepExecution execution : lastExecution.getStepExecutions()) { |
| | BatchStatus status = execution.getStatus(); |
| | if (status.isRunning() || status == BatchStatus.STOPPING) { |
| | throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: " |
| | + lastExecution); |
| | } else if (status == BatchStatus.UNKNOWN) { |
| | throw new JobRestartException( |
| | "Cannot restart step [" + execution.getStepName() + "] from UNKNOWN status. "); |
| | } |
| | } |
| | } |
| | // Check jobParameters |
| | job.getJobParametersValidator().validate(jobParameters); |
| | // 创建JobExecution 同一个job+参数,只能有一个Execution执行器 |
| | jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters); |
| | try { |
| | // SyncTaskExecutor 看似是异步,实际是同步执行(可扩展) |
| | taskExecutor.execute(new Runnable() { |
| | @Override |
| | public void run() { |
| | try { |
| | // 关键入口,请看[org.springframework.batch.core.job.AbstractJob#execute] |
| | job.execute(jobExecution); |
| | if (logger.isInfoEnabled()) { |
| | Duration jobExecutionDuration = BatchMetrics.calculateDuration(jobExecution.getStartTime(), jobExecution.getEndTime()); |
| | } |
| | } |
| | catch (Throwable t) { |
| | rethrow(t); |
| | } |
| | } |
| | private void rethrow(Throwable t) { |
| | // 省略各类抛异常 |
| | throw new IllegalStateException(t); |
| | } |
| | }); |
| | } |
| | catch (TaskRejectedException e) { |
| | // 更新job\_execution的运行状态 |
| | jobExecution.upgradeStatus(BatchStatus.FAILED); |
| | if (jobExecution.getExitStatus().equals(ExitStatus.UNKNOWN)) { |
| | jobExecution.setExitStatus(ExitStatus.FAILED.addExitDescription(e)); |
| | } |
| | jobRepository.update(jobExecution); |
| | } |
| | return jobExecution; |
| | } |
| | |
简单看看API入口,子类划分较多,继续往后看
总体代码流程
JobExecution的处理过程
org.springframework.batch.core.job.AbstractJob#execute
| | |
| | /** 运行给定的job,处理全部listener和DB存储的调用 |
| | * Run the specified job, handling all listener and repository calls, and |
| | * delegating the actual processing to {@link #doExecute(JobExecution)}. |
| | * |
| | * @see Job#execute(JobExecution) |
| | * @throws StartLimitExceededException |
| | * if start limit of one of the steps was exceeded |
| | */ |
| | @Ovrride |
| | public final void execute(JobExecution execution) { |
| | |
| | // 同步控制器,防并发执行 |
| | JobSynchronizationManager.register(execution); |
| | // 计时器,记录耗时 |
| | LongTaskTimer longTaskTimer = BatchMetrics.createLongTaskTimer("job.active", "Active jobs", |
| | Tag.of("name", execution.getJobInstance().getJobName())); |
| | LongTaskTimer.Sample longTaskTimerSample = longTaskTimer.start(); |
| | Timer.Sample timerSample = BatchMetrics.createTimerSample(); |
| | |
| | try { |
| | // 参数再次进行校验 |
| | jobParametersValidator.validate(execution.getJobParameters()); |
| | |
| | if (execution.getStatus() != BatchStatus.STOPPING) { |
| | |
| | // 更新db中任务状态 |
| | execution.setStartTime(new Date()); |
| | updateStatus(execution, BatchStatus.STARTED); |
| | // 回调所有listener的beforeJob方法 |
| | listener.beforeJob(execution); |
| | |
| | try { |
| | doExecute(execution); |
| | } catch (RepeatException e) { |
| | throw e.getCause(); // 搞不懂这里包一个RepeatException 有啥用 |
| | } |
| | } else { |
| | // 任务状态时BatchStatus.STOPPING,说明任务已经停止,直接改成STOPPED |
| | // The job was already stopped before we even got this far. Deal |
| | // with it in the same way as any other interruption. |
| | execution.setStatus(BatchStatus.STOPPED); |
| | execution.setExitStatus(ExitStatus.COMPLETED); |
| | } |
| | |
| | } catch (JobInterruptedException e) { |
| | // 任务被打断 STOPPED |
| | execution.setExitStatus(getDefaultExitStatusForFailure(e, execution)); |
| | execution.setStatus(BatchStatus.max(BatchStatus.STOPPED, e.getStatus())); |
| | execution.addFailureException(e); |
| | } catch (Throwable t) { |
| | // 其他原因失败 FAILED |
| | logger.error("Encountered fatal error executing job", t); |
| | execution.setExitStatus(getDefaultExitStatusForFailure(t, execution)); |
| | execution.setStatus(BatchStatus.FAILED); |
| | execution.addFailureException(t); |
| | } finally { |
| | try { |
| | if (execution.getStatus().isLessThanOrEqualTo(BatchStatus.STOPPED) |
| | && execution.getStepExecutions().isEmpty()) { |
| | ExitStatus exitStatus = execution.getExitStatus(); |
| | ExitStatus newExitStatus = |
| | ExitStatus.NOOP.addExitDescription("All steps already completed or no steps configured for this job."); |
| | execution.setExitStatus(exitStatus.and(newExitStatus)); |
| | } |
| | |
| | // 计时器 计算总耗时 |
| | timerSample.stop(BatchMetrics.createTimer("job", "Job duration", |
| | Tag.of("name", execution.getJobInstance().getJobName()), |
| | Tag.of("status", execution.getExitStatus().getExitCode()) |
| | )); |
| | longTaskTimerSample.stop(); |
| | execution.setEndTime(new Date()); |
| | |
| | try { |
| | // 回调所有listener的afterJob方法 调用失败也不影响任务完成 |
| | listener.afterJob(execution); |
| | } catch (Exception e) { |
| | logger.error("Exception encountered in afterJob callback", e); |
| | } |
| | // 写入db |
| | jobRepository.update(execution); |
| | } finally { |
| | // 释放控制 |
| | JobSynchronizationManager.release(); |
| | } |
| | |
| | } |
| | |
| | } |
在SimpleChunkProvider#provide中会分次调用reader,并将结果包装为Chunk返回。
其中有几个细节,此处不再赘述。
| | org.springframework.batch.core.step.item.SimpleChunkProcessor#process |
| | |
| | @Nullable |
| | @Override |
| | public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { |
| | |
| | @SuppressWarnings("unchecked") |
| | Chunk *inputs = (Chunk*) chunkContext.getAttribute(INPUTS\_KEY);** |
| | if (inputs == null) { |
| | inputs = chunkProvider.provide(contribution); |
| | if (buffering) { |
| | chunkContext.setAttribute(INPUTS\_KEY, inputs); |
| | } |
| | } |
| | |
| | chunkProcessor.process(contribution, inputs); |
| | chunkProvider.postProcess(contribution, inputs); |
| | |
| | // Allow a message coming back from the processor to say that we |
| | // are not done yet |
| | if (inputs.isBusy()) { |
| | logger.debug("Inputs still busy"); |
| | return RepeatStatus.CONTINUABLE; |
| | } |
| | |
| | chunkContext.removeAttribute(INPUTS\_KEY); |
| | chunkContext.setComplete(); |
| | |
| | if (logger.isDebugEnabled()) { |
| | logger.debug("Inputs not busy, ended: " + inputs.isEnd()); |
| | } |
| | return RepeatStatus.continueIf(!inputs.isEnd()); |
| | |
| | } |
在RepeatTemplate和外围事务模板的包装下,通过SimpleChunkProcessor进行处理:
| | // 忽略无关代码... |
| | @Override |
| | public final void process(StepContribution contribution, Chunk *inputs)* throws Exception { |
| | |
| | // 输入为空,直接返回If there is no input we don't have to do anything more |
| | if (isComplete(inputs)) { |
| | return; |
| | } |
| | |
| | // Make the transformation, calling remove() on the inputs iterator if |
| | // any items are filtered. Might throw exception and cause rollback. |
| | Chunk outputs = transform(contribution, inputs); |
| | |
| | // Adjust the filter count based on available data |
| | contribution.incrementFilterCount(getFilterCount(inputs, outputs)); |
| | |
| | // Adjust the outputs if necessary for housekeeping purposes, and then |
| | // write them out... |
| | write(contribution, inputs, getAdjustedOutputs(inputs, outputs)); |
| | |
| | } |
| | |
| | // 遍历items,逐个item调用processor |
| | protected Chunk transform(StepContribution contribution, Chunk *inputs)* throws Exception { |
| | Chunk outputs = new Chunk<>(); |
| | for (Chunk*.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {* |
| | final I item = iterator.next(); |
| | O output; |
| | String status = BatchMetrics.STATUS\_SUCCESS; |
| | try { |
| | output = doProcess(item); |
| | } |
| | catch (Exception e) { |
| | /* |
| | * For a simple chunk processor (no fault tolerance) we are done here, so prevent any more processing of these inputs. |
| | */ |
| | inputs.clear(); |
| | status = BatchMetrics.STATUS\_FAILURE; |
| | throw e; |
| | } |
| | if (output != null) { |
| | outputs.add(output); |
| | } |
| | else { |
| | iterator.remove(); |
| | } |
| | } |
| | return outputs; |
| | } |
| | |
在TaskletStep#doExecute中会使用TransactionTemplate,包装事务操作
标准的事务操作,通过函数式编程风格,从action的CallBack调用实际处理方法
| | // org.springframework.batch.core.step.tasklet.TaskletStep#doExecute |
| | result = new TransactionTemplate(transactionManager, transactionAttribute) |
| | .execute(new ChunkTransactionCallback(chunkContext, semaphore)); |
| | |
| | // 事务启用过程 |
| | // org.springframework.transaction.support.TransactionTemplate#execute |
| | @Override |
| | @Nullable |
| | public T execute(TransactionCallback action) throws TransactionException { |
| | Assert.state(this.transactionManager != null, "No PlatformTransactionManager set"); |
| | |
| | if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager) { |
| | return ((CallbackPreferringPlatformTransactionManager) this.transactionManager).execute(this, action); |
| | } |
| | else { |
| | TransactionStatus status = this.transactionManager.getTransaction(this); |
| | T result; |
| | try { |
| | result = action.doInTransaction(status); |
| | } |
| | catch (RuntimeException | Error ex) { |
| | // Transactional code threw application exception -> rollback |
| | rollbackOnException(status, ex); |
| | throw ex; |
| | } |
| | catch (Throwable ex) { |
| | // Transactional code threw unexpected exception -> rollback |
| | rollbackOnException(status, ex); |
| | throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception"); |
| | } |
| | this.transactionManager.commit(status); |
| | return result; |
| | } |
| | } |
在配置任务时,有个step级别的参数,[commit-interval],用于每个事务窗口提交的控制被处理的item数量。
RepeatTemplate#executeInternal 在处理单条item后,会查看已处理完的item数量,与配置的chunk数量做比较,如果满足chunk数,则不再继续,准备提交事务。
StepBean在初始化时,会新建SimpleCompletionPolicy(chunkSize会优先使用配置值,默认是5)
在每个chunk处理开始时,都会调用SimpleCompletionPolicy#start新建RepeatContextSupport#count用于计数。
源码(简化) org.springframework.batch.repeat.support.RepeatTemplate#executeInternal
| | |
| | /** |
| | * Internal convenience method to loop over interceptors and batch |
| | * callbacks. |
| | * @param callback the callback to process each element of the loop. |
| | */ |
| | private RepeatStatus executeInternal(final RepeatCallback callback) { |
| | // Reset the termination policy if there is one... |
| | // 此处会调用completionPolicy.start方法,更新chunk的计数器 |
| | RepeatContext context = start(); |
| | // Make sure if we are already marked complete before we start then no processing takes place. |
| | // 通过running字段来判断是否继续处理next |
| | boolean running = !isMarkedComplete(context); |
| | // 省略listeners处理.... |
| | // Return value, default is to allow continued processing. |
| | RepeatStatus result = RepeatStatus.CONTINUABLE; |
| | RepeatInternalState state = createInternalState(context); |
| | try { |
| | while (running) { |
| | /* |
| | * Run the before interceptors here, not in the task executor so |
| | * that they all happen in the same thread - it's easier for |
| | * tracking batch status, amongst other things. |
| | */ |
| | // 省略listeners处理.... |
| | if (running) { |
| | try { |
| | // callback是实际处理方法,类似函数式编程 |
| | result = getNextResult(context, callback, state); |
| | executeAfterInterceptors(context, result); |
| | } |
| | catch (Throwable throwable) { |
| | doHandle(throwable, context, deferred); |
| | } |
| | // 检查当前chunk是否处理完,决策出是否继续处理下一条item |
| | // N.B. the order may be important here: |
| | if (isComplete(context, result) || isMarkedComplete(context) || !deferred.isEmpty() { |
| | running = false; |
| | } |
| | } |
| | } |
| | result = result.and(waitForResults(state)); |
| | // 省略throwables处理.... |
| | // Explicitly drop any references to internal state... |
| | state = null; |
| | } |
| | finally { |
| | // 省略代码... |
| | } |
| | return result; |
| | } |
JSR-352标准定义了Java批处理的基本模型,包含批处理的元数据像 JobExecutions,JobInstances,StepExecutions 等等。通过此类模型,提供了许多基础组件与扩展点:
2013年, JSR-352标准包含在 JavaEE7中发布,到2022年已近10年,Spring也在探索新的批处理模式, 如Spring Attic /Spring Cloud Data Flow。 https://docs.spring.io/spring-batch/docs/current/reference/html/jsr-352.html
整个Job在运行时,会将运行信息保存在JobContext中。 类似的,Step运行时也有StepContext。可以在Context中保存一些参数,在任务或者步骤中传递使用。
查看JobContext/StepContext源码,发现仅用了普通变量保存Execution,这个类肯定有线程安全问题。 生产环境中常常出现多个任务并处处理的情况。
SpringBatch用了几种方式来包装并发安全:
此处能看到,如果在JobExecution存储大量的业务数据,会导致无法GC回收,导致OOM。所以在上下文中,只应保存精简的数据。
在源码中,使用了各类同步控制和加锁、oldVersion版本拷贝,整体比较复杂(org.springframework.batch.core.step.tasklet.TaskletStep.ChunkTransactionCallback#doInTransaction)
| | // 节选部分代码 |
| | oldVersion = new StepExecution(stepExecution.getStepName(), stepExecution.getJobExecution()); |
| | copy(stepExecution, oldVersion); |
| | |
| | private void copy(final StepExecution source, final StepExecution target) { |
| | target.setVersion(source.getVersion()); |
| | target.setWriteCount(source.getWriteCount()); |
| | target.setFilterCount(source.getFilterCount()); |
| | target.setCommitCount(source.getCommitCount()); |
| | target.setExecutionContext(new ExecutionContext(source.getExecutionContext())); |
| | } |
| | // 省略无关代码 |
| | try { |
| | try { |
| | // 执行w-p-r模型方法 |
| | result = tasklet.execute(contribution, chunkContext); |
| | if (result == null) { |
| | result = RepeatStatus.FINISHED; |
| | } |
| | } |
| | catch (Exception e) { |
| | // 省略... |
| | } |
| | } |
| | finally { |
| | // If the step operations are asynchronous then we need to synchronize changes to the step execution (at a |
| | // minimum). Take the lock *before* changing the step execution. |
| | try { |
| | // 获取锁 |
| | semaphore.acquire(); |
| | locked = true; |
| | } |
| | catch (InterruptedException e) { |
| | logger.error("Thread interrupted while locking for repository update"); |
| | stepExecution.setStatus(BatchStatus.STOPPED); |
| | stepExecution.setTerminateOnly(); |
| | Thread.currentThread().interrupt(); |
| | } |
| | stepExecution.apply(contribution); |
| | } |
| | stepExecutionUpdated = true; |
| | stream.update(stepExecution.getExecutionContext()); |
| | try { |
| | // 更新上下文、DB中的状态 |
| | // Going to attempt a commit. If it fails this flag will stay false and we can use that later. |
| | getJobRepository().updateExecutionContext(stepExecution); |
| | stepExecution.incrementCommitCount(); |
| | getJobRepository().update(stepExecution); |
| | } |
| | catch (Exception e) { |
| | // If we get to here there was a problem saving the step execution and we have to fail. |
| | String msg = "JobRepository failure forcing rollback"; |
| | logger.error(msg, e); |
| | throw new FatalStepExecutionException(msg, e); |
| | } |
| | |