目录
引言
接着上篇:Spring Batch 作业启动方式,了解Spring Batch 作业启动方式后,接下来一起学习一下Spring Batch Spring Batch 作业启动方式作业停止方式,这里介绍其中3种核心方式。
概况
作业的停止,存在有3种情况:
-
一种自然结束
作业成功执行,正常停止,此时作业返回状态为:COMPLETED
-
一种异常结束 作业执行过程因为各种意外导致作业中断而停止,大多数作业返回状态为:FAILED
-
一种编程结束
某个步骤处理数据结果不满足下一步骤执行前提,手动让其停止,一般设置返回状态为:STOPED
上面1,2种情况相对简单,我们重点说下第三种:以编程方式让作业停止。
需求
模拟一个操作场景
1>有一个资源类,里面有2个属性:总数:totalCount = 100, 读取数:readCount = 0
2>设计2个步骤,step1 用于叠加readCount 模拟从数据库中读取资源, step2 用于执行逻辑
3>当totalCount == readCount 时,为正常情况,正常结束。如果不等时,为异常状态。此时不执行step2,直接停止作业。
4>修复数据,在从step1开始执行,并完成作业
public class ResouceCount {
public static int totalCount = 100; //总数
public static int readCount = 0; //读取数
}
要实现上面需求,有2种方式可以实现
方案1:Step 步骤监听器方式
监听器
public class StopStepListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
//不满足
if(ResouceCount.totalCount != ResouceCount.readCount){
return ExitStatus.STOPPED; //手动停止,后续可以重启
}
return stepExecution.getExitStatus();
}
}
代码
package com.langfeiyes.batch._16_job_stop;
import com.langfeiyes.batch._01_hello.HelloJob;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
@EnableBatchProcessing
public class ListenerJobStopJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
private int readCount = 50; //模拟只读取50个
@Bean
public Tasklet tasklet1(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
for (int i = 1; i <= readCount; i++) {
System.out.println("---------------step1执行-"+i+"------------------");
ResouceCount.readCount ++;
}
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Tasklet tasklet2(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.err.println("step2执行了.......");
System.err.println("readCount:" + ResouceCount.readCount + ", totalCount:" + ResouceCount.totalCount);
return RepeatStatus.FINISHED;
}
};
}
@Bean
public StopStepListener stopStepListener(){
return new StopStepListener();
}
@Bean
public Step step1(){
return stepBuilderFactory.get("step1")
.tasklet(tasklet1())
.listener(stopStepListener())
.allowStartIfComplete(true) //执行完后,运行重启
.build();
}
@Bean
public Step step2(){
return stepBuilderFactory.get("step2")
.tasklet(tasklet2())
.build();
}
//定义作业
@Bean
public Job job(){
return jobBuilderFactory.get("job-stop-job")
.start(step1())
.on("STOPPED").stopAndRestart(step1())
.from(step1()).on("*").to(step2()).end()
.build();
}
public static void main(String[] args) {
SpringApplication.run(ListenerJobStopJob.class, args);
}
}
第一次执行:tasklet1 中readCount 默认执行50次,不满足条件, stopStepListener() afterStep 返回STOPPED, job进行条件控制走.on("STOPPED").stopAndRestart(step1()) 分支,停止并允许重启--下次重启,从step1步骤开始执行
第二次执行, 修改readCount = 100, 再次启动作业,task1遍历100次,满足条件, stopStepListener() afterStep 正常返回,job条件控制走.from(step1()).on("*").to(step2()).end()分支,正常结束。
注意:step1() 方法中.allowStartIfComplete(true) 代码必须添加,因为第一次执行step1步骤,虽然不满足条件,但是它仍属于正常结束(正常执行完tasklet1的流程),状态码:COMPLETED, 第二次重启,默认情况下正常结束的step1步骤是不允许再执行的,所以必须设置:.allowStartIfComplete(true) 允许step1即使完成也可以重启。
方案2:StepExecution停止标记
package com.langfeiyes.batch._17_job_stop_sign;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
@EnableBatchProcessing
public class SignJobStopJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
private int readCount = 50; //模拟只读取50个
@Bean
public Tasklet tasklet1(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
for (int i = 1; i <= readCount; i++) {
System.out.println("---------------step1执行-"+i+"------------------");
ResouceCount.readCount ++;
}
if(ResouceCount.readCount != ResouceCount.totalCount){
chunkContext.getStepContext().getStepExecution().setTerminateOnly();
}
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Tasklet tasklet2(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.err.println("step2执行了.......");
System.err.println("readCount:" + ResouceCount.readCount + ", totalCount:" + ResouceCount.totalCount);
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Step step1(){
return stepBuilderFactory.get("step1")
.tasklet(tasklet1())
.allowStartIfComplete(true)
.build();
}
@Bean
public Step step2(){
return stepBuilderFactory.get("step2")
.tasklet(tasklet2())
.build();
}
//定义作业
@Bean
public Job job(){
return jobBuilderFactory.get("job-stop-job")
.start(step1())
//.on("STOPPED").stopAndRestart(step1())
//.from(step1()).on("*").to(step2()).end()
.next(step2())
.build();
}
public static void main(String[] args) {
SpringApplication.run(SignJobStopJob.class, args);
}
}
变动的代码有2处
tasket1(), 多了下面判断
if(ResouceCount.readCount != ResouceCount.totalCount){
chunkContext.getStepContext().getStepExecution().setTerminateOnly();
}
其中的StepExecution#setTerminateOnly() 给运行中的stepExecution设置停止标记,Spring Batch 识别后直接停止步骤,进而停止流程
job() 改动
return jobBuilderFactory.get("job-stop-job")
.start(step1())
.next(step2())
.build();
正常设置步骤流程。
到这,本篇就结束了,欲知后事如何,请听下回分解~
转视频版
看文字不过瘾可以切换视频版: