目录
引言
接着上篇:Spring Batch ItemProcessor组件-自定义处理器,了解Spring Batch ItemProcessor处理组件后,接下来一起学习一下Spring Batch ItemWriter组件。
概述
有输入那肯定有输出,前面讲了输入ItemReader,接下来就看本篇的输出器:ItemWriter, Spring Batch提供的数据输出组件与数据输入组件是成对,也就是说有啥样子的输入组件,就有啥样子的输出组件。
输出平面文件
当将读入的数据输出到纯文本文件时,可以通过FlatFileItemWriter 输出器实现。
需求:将user.txt中数据读取出来,输出到outUser.txt文件中
1>定义user.txt文件
1#dafei#18
2#xiaofei#16
3#laofei#20
4#zhongfei#19
5#feifei#15
2>定义实体对象
@Getter
@Setter
@ToString
public class User {
private Long id;
private String name;
private int age;
}
3>实现代码
package com.langfeiyes.batch._31_itemwriter_flat;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.PathResource;
import java.util.List;
@SpringBootApplication
@EnableBatchProcessing
public class FlatWriteJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public FlatFileItemReader<User> userItemReader(){
return new FlatFileItemReaderBuilder<User>()
.name("userItemReader")
.resource(new ClassPathResource("users.txt"))
.delimited().delimiter("#")
.names("id", "name", "age")
.targetType(User.class)
.build();
}
@Bean
public FlatFileItemWriter<User> itemWriter(){
return new FlatFileItemWriterBuilder<User>()
.name("userItemWriter")
.resource(new PathResource("c:/outUser.txt")) //输出的文件
.formatted() //数据格式指定
.format("id: %s,姓名:%s,年龄:%s") //输出数据格式
.names("id", "name", "age") //需要输出属性
.build();
}
@Bean
public Step step(){
return stepBuilderFactory.get("step1")
.<User, User>chunk(1)
.reader(userItemReader())
.writer(itemWriter())
.build();
}
@Bean
public Job job(){
return jobBuilderFactory.get("flat-writer-job")
.start(step())
.build();
}
public static void main(String[] args) {
SpringApplication.run(FlatWriteJob.class, args);
}
}
解析:
上面代码核心是itemWriter() 方法,设置到itemWrite读取器配置与输出
id: 1,姓名:dafei,年龄:18
id: 2,姓名:xiaofei,年龄:16
id: 3,姓名:laofei,年龄:20
id: 4,姓名:zhongfei,年龄:19
id: 5,姓名:feifei,年龄:15
一些拓展
@Bean
public FlatFileItemWriter<User> itemWriter(){
return new FlatFileItemWriterBuilder<User>()
.name("userItemWriter")
.resource(new PathResource("c:/outUser.txt")) //输出的文件
.formatted() //数据格式指定
.format("id: %s,姓名:%s,年龄:%s") //输出数据格式
.names("id", "name", "age") //需要输出属性
.shouldDeleteIfEmpty(true) //如果读入数据为空,输出时创建文件直接删除
.shouldDeleteIfExists(true) //如果输出文件已经存在,则删除
.append(true) //如果输出文件已经存在, 不删除,直接追加到现有文件中
.build();
}
输出Json文件
当将读入的数据输出到Json文件时,可以通过JsonFileItemWriter输出器实现。
需求:将user.txt中数据读取出来,输出到outUser.json文件中
沿用上面的user.txt, user对象将数据输出到outUser.json
package com.langfeiyes.batch._32_itemwriter_json;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.batch.item.json.JacksonJsonObjectMarshaller;
import org.springframework.batch.item.json.JsonFileItemWriter;
import org.springframework.batch.item.json.builder.JsonFileItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.PathResource;
@SpringBootApplication
@EnableBatchProcessing
public class JsonWriteJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public FlatFileItemReader<User> userItemReader(){
return new FlatFileItemReaderBuilder<User>()
.name("userItemReader")
.resource(new ClassPathResource("users.txt"))
.delimited().delimiter("#")
.names("id", "name", "age")
.targetType(User.class)
.build();
}
@Bean
public JacksonJsonObjectMarshaller<User> objectMarshaller(){
JacksonJsonObjectMarshaller marshaller = new JacksonJsonObjectMarshaller();
return marshaller;
}
@Bean
public JsonFileItemWriter<User> itemWriter(){
return new JsonFileItemWriterBuilder<User>()
.name("jsonUserItemWriter")
.resource(new PathResource("c:/outUser.json"))
.jsonObjectMarshaller(objectMarshaller())
.build();
}
@Bean
public Step step(){
return stepBuilderFactory.get("step1")
.<User, User>chunk(1)
.reader(userItemReader())
.writer(itemWriter())
.build();
}
@Bean
public Job job(){
return jobBuilderFactory.get("json-writer-job")
.start(step())
.build();
}
public static void main(String[] args) {
SpringApplication.run(JsonWriteJob.class, args);
}
}
结果:
[
{"id":1,"name":"dafei","age":18},
{"id":2,"name":"xiaofei","age":16},
{"id":3,"name":"laofei","age":20},
{"id":4,"name":"zhongfei","age":19},
{"id":5,"name":"feifei","age":15}
]
解析:
1>itemWriter() 实例方法构建JsonFileItemWriter 实例,需要明确指定Json格式装配器
2>Spring Batch默认提供装配器有2个:JacksonJsonObjectMarshaller GsonJsonObjectMarshaller 分别对应Jackson 跟 Gson 2种json格式解析逻辑,本案例用的是Jackson
输出数据库
当将读入的数据需要输出到数据库时,可以通过JdbcBatchItemWriter输出器实现。
需求:将user.txt中数据读取出来,输出到数据库user表中
沿用上面的user.txt, user对象将数据输出到user表中
1>定义操作数据库预编译类
//写入数据库需要操作insert sql, 使用预编译就需要明确指定参数值
public class UserPreStatementSetter implements ItemPreparedStatementSetter<User> {
@Override
public void setValues(User item, PreparedStatement ps) throws SQLException {
ps.setLong(1, item.getId());
ps.setString(2, item.getName());
ps.setInt(3, item.getAge());
}
}
2>完整代码
package com.langfeiyes.batch._33_itemwriter_db;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.json.JacksonJsonObjectMarshaller;
import org.springframework.batch.item.json.JsonFileItemWriter;
import org.springframework.batch.item.json.builder.JsonFileItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.PathResource;
import javax.sql.DataSource;
@SpringBootApplication
@EnableBatchProcessing
public class JdbcWriteJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Bean
public FlatFileItemReader<User> userItemReader(){
return new FlatFileItemReaderBuilder<User>()
.name("userItemReader")
.resource(new ClassPathResource("users.txt"))
.delimited().delimiter("#")
.names("id", "name", "age")
.targetType(User.class)
.build();
}
@Bean
public UserPreStatementSetter preStatementSetter(){
return new UserPreStatementSetter();
}
@Bean
public JdbcBatchItemWriter<User> itemWriter(){
return new JdbcBatchItemWriterBuilder<User>()
.dataSource(dataSource)
.sql("insert into user(id, name, age) values(?,?,?)")
.itemPreparedStatementSetter(preStatementSetter())
.build();
}
@Bean
public Step step(){
return stepBuilderFactory.get("step1")
.<User, User>chunk(1)
.reader(userItemReader())
.writer(itemWriter())
.build();
}
@Bean
public Job job(){
return jobBuilderFactory.get("jdbc-writer-job")
.start(step())
.build();
}
public static void main(String[] args) {
SpringApplication.run(JdbcWriteJob.class, args);
}
}
解析:
核心代码在itemWriter() 实例方法中, 需要1>准备构建JdbcBatchItemWriter实例 2>提前准备数据, 3>准备sql语句 4>准备参数绑定器
输出多终端
上面几种输出方法都是一对一,真实开发可能没那么简单了,可能存在一对多,多个终端输出,此时怎么办?答案是使用Spring Batch 提供的CompositeItemWriter 组合输出器。
需求:将user.txt中数据读取出来,输出到outUser.txt/outUser.json/数据库user表中
沿用上面的user.txt, user对象将数据输出到outUser.txt/outUser.json/user表中
package com.langfeiyes.batch._34_itemwriter_composite;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.batch.item.json.JacksonJsonObjectMarshaller;
import org.springframework.batch.item.json.JsonFileItemWriter;
import org.springframework.batch.item.json.builder.JsonFileItemWriterBuilder;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.support.builder.CompositeItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.PathResource;
import javax.sql.DataSource;
import java.util.Arrays;
@SpringBootApplication
@EnableBatchProcessing
public class CompositeWriteJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
public DataSource dataSource;
@Bean
public FlatFileItemReader<User> userItemReader(){
return new FlatFileItemReaderBuilder<User>()
.name("userItemReader")
.resource(new ClassPathResource("users.txt"))
.delimited().delimiter("#")
.names("id", "name", "age")
.targetType(User.class)
.build();
}
@Bean
public FlatFileItemWriter<User> flatFileItemWriter(){
return new FlatFileItemWriterBuilder<User>()
.name("userItemWriter")
.resource(new PathResource("c:/outUser.txt"))
.formatted() //数据格式指定
.format("id: %s,姓名:%s,年龄:%s") //输出数据格式
.names("id", "name", "age") //需要输出属性
.build();
}
@Bean
public JacksonJsonObjectMarshaller<User> objectMarshaller(){
JacksonJsonObjectMarshaller marshaller = new JacksonJsonObjectMarshaller();
return marshaller;
}
@Bean
public JsonFileItemWriter<User> jsonFileItemWriter(){
return new JsonFileItemWriterBuilder<User>()
.name("jsonUserItemWriter")
.resource(new PathResource("c:/outUser.json"))
.jsonObjectMarshaller(objectMarshaller())
.build();
}
@Bean
public UserPreStatementSetter preStatementSetter(){
return new UserPreStatementSetter();
}
@Bean
public JdbcBatchItemWriter<User> jdbcBatchItemWriter(){
return new JdbcBatchItemWriterBuilder<User>()
.dataSource(dataSource)
.sql("insert into user(id, name, age) values(?,?,?)")
.itemPreparedStatementSetter(preStatementSetter())
.build();
}
@Bean
public CompositeItemWriter<User> compositeItemWriter(){
return new CompositeItemWriterBuilder<User>()
.delegates(Arrays.asList(flatFileItemWriter(), jsonFileItemWriter(), jdbcBatchItemWriter()))
.build();
}
@Bean
public Step step(){
return stepBuilderFactory.get("step1")
.<User, User>chunk(1)
.reader(userItemReader())
.writer(compositeItemWriter())
.build();
}
@Bean
public Job job(){
return jobBuilderFactory.get("composite-writer-job")
.start(step())
.build();
}
public static void main(String[] args) {
SpringApplication.run(CompositeWriteJob.class, args);
}
}
解析:
代码没有啥技术难度,都是将前面的几种方式通过CompositeItemWriter 类整合在一起
@Bean
public CompositeItemWriter<User> compositeItemWriter(){
return new CompositeItemWriterBuilder<User>()
.delegates(Arrays.asList(flatFileItemWriter(), jsonFileItemWriter(), jdbcBatchItemWriter()))
.build();
}
解析:非常简单操作,实现ItemProcessor,定制自己校验标准即可。
到这,本篇就结束了,欲知后事如何,请听下回分解~
转视频版
看文字不过瘾可以切换视频版:Spring Batch高效批处理框架实战