目录
3.5 运行
1 前言
Spring Batch是一个轻量级的、完善的批处理框架,作为Spring体系中的一员,它拥有灵活、方便、生产可用的特点。在应对高效处理大量信息、定时处理大量数据等场景十分简便。
结合调度框架能更大地发挥Spring Batch的作用。
2 Spring Batch的概念知识
2.1 分层架构
Spring Batch的分层架构图如下:
通过例子讲解Spring Batch入门,优秀的批处理框架
可以看到它分为三层,分别是:
-
Application应用层:包含了所有任务batch jobs和开发人员自定义的代码,主要是根据项目需要开发的业务流程等。
-
Batch Core核心层:包含启动和管理任务的运行环境类,如JobLauncher等。
-
Batch Infrastructure基础层:上面两层是建立在基础层之上的,包含基础的读入reader和写出writer、重试框架等。
2.2 关键概念
理解下图所涉及的概念至关重要,不然很难进行后续开发和问题分析。
通过例子讲解Spring Batch入门,优秀的批处理框架
2.2.1 JobRepository
专门负责与数据库打交道,对整个批处理的新增、更新、执行进行记录。所以Spring Batch是需要依赖数据库来管理的。
2.2.2 任务启动器JobLauncher
负责启动任务Job。
2.2.3 任务Job
Job是封装整个批处理过程的单位,跑一个批处理任务,就是跑一个Job所定义的内容。
通过例子讲解Spring Batch入门,优秀的批处理框架
上图介绍了Job的一些相关概念:
-
Job:封装处理实体,定义过程逻辑。
-
JobInstance:Job的运行实例,不同的实例,参数不同,所以定义好一个Job后可以通过不同参数运行多次。
-
JobParameters:与JobInstance相关联的参数。
-
JobExecution:代表Job的一次实际执行,可能成功、可能失败。
所以,开发人员要做的事情,就是定义Job。
2.2.4 步骤Step
Step是对Job某个过程的封装,一个Job可以包含一个或多个Step,一步步的Step按特定逻辑执行,才代表Job执行完成。
通过例子讲解Spring Batch入门,优秀的批处理框架
通过定义Step来组装Job可以更灵活地实现复杂的业务逻辑。
2.2.5 输入——处理——输出
所以,定义一个Job关键是定义好一个或多个Step,然后把它们组装好即可。而定义Step有多种方法,但有一种常用的模型就是输入——处理——输出,即Item Reader、Item Processor和Item Writer。比如通过Item Reader从文件输入数据,然后通过Item Processor进行业务处理和数据转换,最后通过Item Writer写到数据库中去。
Spring Batch为我们提供了许多开箱即用的Reader和Writer,非常方便。
3 代码实例
理解了基本概念后,就直接通过代码来感受一下吧。整个项目的功能是从多个csv文件中读数据,处理后输出到一个csv文件。
3.1添加依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
需要添加Spring Batch的依赖,同时使用H2作为内存数据库比较方便,实际生产肯定是要使用外部的数据库,如Oracle、PostgreSQL、mysql等
入口主类:
@SpringBootApplication
@EnableBatchProcessing
public class PkslowBatchJobMain {
public static void main(String[] args) {
SpringApplication.run(PkslowBatchJobMain.class, args);
}
}
也很简单,只是在Springboot的基础上添加注解@EnableBatchProcessing。
领域实体类Employee:
package com.sf.gis.boot.rcboot.entity;
import lombok.*;
import lombok.experimental.Accessors;
/**
* @author 80004819
* @ClassName:
* @Description:
* @date 2020年11月03日 10:56:45
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@Builder
public class Employee {
String id;
String firstName;
String lastName;
}
在classpath下的input文件夹下放置一个inputData.csv文件
对应的csv文件内容如下:
id,firstName,lastName
1,Lokesh,Gupta
2,Amit,Mishra
3,Pankaj,Kumar
4,David,Miller
3.2读取csv文件-处理数据-写入csv文件代码实现
(1)编写公共类:
package com.sf.gis.boot.rcboot.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import org.springframework.stereotype.Component;
import java.util.concurrent.atomic.LongAdder;
/**
* @author 80004819
* @ClassName:
* @Description:
* @date 2020年11月03日 15:15:16
*/
@Component
@Order(-1)
public class Common {
@Autowired
public ObjectMapper objectMapper;
@Autowired
public SqlSessionFactory sqlSessionFactory;
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
}
(2)核心配置类
package com.sf.gis.boot.rcboot.config;
import cn.hutool.core.map.MapUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.sf.gis.boot.rcboot.entity.CommunityStatist;
import com.sf.gis.boot.rcboot.entity.CommunityTotal;
import com.sf.gis.boot.rcboot.entity.Employee;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisPagingItemReader;
import org.springframework.batch.core.ItemReadListener;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.MultiResourceItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.builder.MultiResourceItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor;
import org.springframework.batch.item.file.transform.DelimitedLineAggregator;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.LineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import java.io.File;
import java.math.BigDecimal;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.atomic.LongAdder;
/**
* @author 80004819
* @ClassName:
* @Description:
* @date 2020年11月03日 10:54:30
*/
@Configuration
@Slf4j
public class CsvBatchTaskConfig {
public final Resource[] inputResources = new ClassPathResource[]{new ClassPathResource("input/inputData.csv")};
public final Resource outputResource = new FileSystemResource("output/outputData.csv");
@Autowired
private Common common;
/**
* 构建多资源读取器
*
* @return
*/
@Bean
public MultiResourceItemReader<Employee> multiResourceItemReader() {
MultiResourceItemReader<Employee> itemReader = new MultiResourceItemReader<>();
itemReader.setResources(inputResources);
itemReader.setDelegate(csvflatFileItemReader());
return itemReader;
}
/**
* 构建文件读取器
*
* @return
*/
@Bean
public FlatFileItemReader<Employee> csvflatFileItemReader() {
FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>();
//跳过csv文件第一行,为表头
reader.setLinesToSkip(1);
reader.setLineMapper(new DefaultLineMapper() {
{
setLineTokenizer(new DelimitedLineTokenizer() {
{
//字段名
setNames(new String[]{"id", "firstName", "lastName"});
}
});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() {
{
//转换化后的目标类
setTargetType(Employee.class);
}
});
}
});
return reader;
}
/**
* 读取到的每条数据要做的操作处理
*
* @return
*/
@Bean
public ItemProcessor<Employee, Employee> csvItemProcessor() {
return employee -> employee.setLastName(employee.getLastName().toUpperCase());
}
/**
* 构建文件写入器
*
* @return
*/
@Bean
public FlatFileItemWriter<Employee> csvflatFileItemWriter() {
FlatFileItemWriter<Employee> writer = new FlatFileItemWriter<>();
writer.setResource(outputResource);
//是否为追加模式
writer.setAppendAllowed(true);
writer.setLineAggregator(new DelimitedLineAggregator<Employee>() {
{
//设置分割符
setDelimiter(",");
setFieldExtractor(new BeanWrapperFieldExtractor<Employee>() {
{
//设置字段
setNames(new String[]{"id", "firstName", "lastName"});
}
});
}
});
return writer;
}
/**
* 构建csv文件读取-处理-写入的步骤
*
* @return
*/
@Bean
public Step csvStep() {
//Writer每次是处理5条记录
return common.stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5)
.reader(multiResourceItemReader())
.listener(new PkslowReadListener(new LongAdder(), common.objectMapper))
.processor(csvItemProcessor())
.writer(csvflatFileItemWriter())
.listener(new PkslowWriteListener(new LongAdder(), common.objectMapper))
.build();
}
@Bean(name = "csvJob")
public Job csvJob() {
return common.jobBuilderFactory
.get("csvJob")
.incrementer(new RunIdIncrementer())
.start(csvStep())
.build();
}
// class PkslowReadListener implements ItemReadListener<Employee> {
//
// @Override
// public void beforeRead() {
// log.info("start read~~!");
// }
//
// @Override
// public void afterRead(Employee item) {
// try {
// LONG_ADDER.increment();
// log.info("read {} item content :{}", LONG_ADDER.toString(), common.OBJECT_MAPPER.writeValueAsString(item));
// } catch (Exception e) {
// log.error("json translate exception", e);
// }
// }
//
// @Override
// public void onReadError(Exception ex) {
// log.error("batch read exception", ex);
// }
// }
// class PkslowWriteListener implements ItemWriteListener<Employee> {
//
// @Override
// public void beforeWrite(List<? extends Employee> items) {
// try {
// log.info("brfore write:{}" + common.OBJECT_MAPPER.writeValueAsString(items));
// } catch (Exception e) {
// log.error("json translate exception", e);
// }
// }
//
// @Override
// public void afterWrite(List<? extends Employee> items) {
// try {
// log.info("after write:{}" + common.OBJECT_MAPPER.writeValueAsString(items));
// } catch (Exception e) {
// log.error("json translate exception", e);
// }
// }
//
// @Override
// public void onWriteError(Exception exception, List<? extends Employee> items) {
// try {
// log.info("on Write Error:{}" + common.OBJECT_MAPPER.writeValueAsString(items));
// } catch (Exception e) {
// log.error("json translate exception", e);
// }
// }
// }
}
(3)自定义读取事件监听器和写入事件监听器
可以通过Listener接口对特定事件进行监听,以实现更多业务功能。比如如果处理失败,就记录一条失败日志;处理完成,就通知下游拿数据等。
我们分别对Read、Process和Write事件进行监听,对应分别要实现ItemReadListener接口、ItemProcessListener接口和ItemWriteListener接口。因为代码比较简单,就是打印一下日志,这里只贴出ItemWriteListener的实现代码:
读取事件监听器
package com.sf.gis.boot.rcboot.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.sf.gis.boot.rcboot.entity.Employee;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemReadListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.atomic.LongAdder;
/**
* @author 80004819
* @ClassName:
* @Description:
* @date 2020年11月03日 15:22:53
*/
@Slf4j
public class PkslowReadListener implements ItemReadListener<Object> {
private LongAdder longAdder;
private ObjectMapper objectMapper;
public PkslowReadListener(LongAdder longAdder, ObjectMapper objectMapper) {
this.longAdder = longAdder;
this.objectMapper = objectMapper;
}
@Override
public void beforeRead() {
log.info("start read~~!");
}
@Override
public void afterRead(Object item) {
try {
longAdder.increment();
log.info("read {} item content :{}", longAdder.toString(), objectMapper.writeValueAsString(item));
} catch (Exception e) {
log.error("json translate exception", e);
}
}
@Override
public void onReadError(Exception ex) {
log.error("batch read exception", ex);
}
}
写入事件监听器
package com.sf.gis.boot.rcboot.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.sf.gis.boot.rcboot.entity.Employee;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemWriteListener;
import java.util.List;
import java.util.concurrent.atomic.LongAdder;
/**
* @author 80004819
* @ClassName:
* @Description:
* @date 2020年11月03日 15:30:45
*/
@Slf4j
public class PkslowWriteListener implements ItemWriteListener<Object> {
private LongAdder longAdder;
private ObjectMapper objectMapper;
public PkslowWriteListener(LongAdder longAdder, ObjectMapper objectMapper) {
this.longAdder = longAdder;
this.objectMapper = objectMapper;
}
@Override
public void beforeWrite(List<?> items) {
try {
log.info("brfore write:{}" + objectMapper.writeValueAsString(items));
} catch (Exception e) {
log.error("json translate exception", e);
}
}
@Override
public void afterWrite(List<?> items) {
try {
log.info("after write:{}" + objectMapper.writeValueAsString(items));
longAdder.add(items.size());
log.info("已写入{}条数据到csv文件", longAdder.toString());
} catch (Exception e) {
log.error("json translate exception", e);
}
}
@Override
public void onWriteError(Exception exception, List<?> items) {
try {
log.info("on Write Error:{}" + objectMapper.writeValueAsString(items));
} catch (Exception e) {
log.error("json translate exception", e);
}
}
}
3.3 运行
完成以上编码后,执行程序,结果如下:
通过例子讲解Spring Batch入门,优秀的批处理框架
成功读取数据,并将最后字段转为大写,并输出到outputData.csv文件。
步骤step里面设置了读取监听器和写入监听器之后,执行后看一下日志:
通过例子讲解Spring Batch入门,优秀的批处理框架
这里就能明显看到之前设置的chunk的作用了。Writer每次是处理5条记录,如果一条输出一次,会对IO造成压力。
3.4 mysql数据库读取-处理处理-写入csv文件
公共类Common、以及读取事件监听器PkslowReadListener 、写入事件监听器 PkslowWriteListener 上面一个例子中已经给出,这里只列出核心配置类和mapper相关代码
实体类:
package com.sf.gis.boot.rcboot.entity;
import lombok.*;
import lombok.experimental.Accessors;
import java.math.BigDecimal;
/**
* @author 80004819
* @ClassName:
* @Description:
* @date 2020年11月03日 14:42:11
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@Builder
public class CommunityTotal {
private Integer peopleTotal;
private Integer farenTotal;
private Integer eventTotal;
private Integer houseTypeTotal;
private Integer houseUseTotal;
}
mapper:
package com.sf.gis.boot.rcboot.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.sf.gis.boot.rcboot.entity.CommunityStatist;
import com.sf.gis.boot.rcboot.entity.CommunityTotal;
import org.apache.ibatis.annotations.Mapper;
import org.springframework.stereotype.Repository;
import java.util.LinkedHashMap;
import java.util.List;
/**
* <p>
* 社区管理表 Mapper 接口
* </p>
*
* @author 段朝旭
* @since 2020-09-11
*/
@Mapper
@Repository
public interface CommunityStatistMapper extends BaseMapper<CommunityStatist> {
LinkedHashMap<String, Object> staticCommunity(String aoiId);
CommunityStatist getByAoiId(String aoiId);
List<CommunityStatist> getByParentAoiId(String parentAoiId);
CommunityTotal staticCommunityTotal(String aoiId);
}
mapper.Xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.sf.gis.boot.rcboot.mapper.CommunityStatistMapper">
<sql id="resultField">
aoi_id, aoi, st_astext(shape) shape,shape_text,create_time, regis_people, no_regis_people, hkmt_people, foreign_people,
enterprise, ind_business, community, road, mark, grid, eliminate_evil, keep_event, city_manage,
labor_dispute, multiple, house, business, factory_building, store, office, public_fac, house_type_other,
self_use, lease, vacant, wait_rent, part_rent, house_use_other,coordinate,x,y,z,course,alpha,roll,pos_x,pos_y,pos_z
</sql>
<select id="staticCommunity" resultType="java.util.LinkedHashMap">
SELECT IFNULL(SUM(regis_people+no_regis_people+hkmt_people+foreign_people),0) as people_total ,
IFNULL(SUM(enterprise+ind_business),0) as faren_total,
IFNULL(SUM(eliminate_evil+keep_event+city_manage+labor_dispute),0) as event_total,
IFNULL(SUM(multiple+house+business+factory_building+store+office+public_fac+house_type_other),0) as house_type_total,
IFNULL(SUM(self_use+lease+vacant+wait_rent+part_rent+house_use_other),0) as house_use_total
from community_statist where aoi_id = #{aoiId}
</select>
<select id="getByAoiId" resultType="com.sf.gis.boot.rcboot.entity.CommunityStatist">
SELECT
<include refid="resultField"/>
from community_statist where aoi_id = #{aoiId}
</select>
<select id="getByParentAoiId" resultType="com.sf.gis.boot.rcboot.entity.CommunityStatist">
SELECT
<include refid="resultField"/>
from community_statist where parent_aoi_id = #{parentAoiId}
</select>
<select id="staticCommunityTotal" resultType="com.sf.gis.boot.rcboot.entity.CommunityTotal">
SELECT IFNULL(SUM(regis_people+no_regis_people+hkmt_people+foreign_people),0) as people_total ,
IFNULL(SUM(enterprise+ind_business),0) as faren_total,
IFNULL(SUM(eliminate_evil+keep_event+city_manage+labor_dispute),0) as event_total,
IFNULL(SUM(multiple+house+business+factory_building+store+office+public_fac+house_type_other),0) as house_type_total,
IFNULL(SUM(self_use+lease+vacant+wait_rent+part_rent+house_use_other),0) as house_use_total
from community_statist where aoi_id = #{aoiId}
</select>
</mapper>
package com.sf.gis.boot.rcboot.config;
import cn.hutool.core.map.MapUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.sf.gis.boot.rcboot.entity.CommunityStatist;
import com.sf.gis.boot.rcboot.entity.CommunityTotal;
import com.sf.gis.boot.rcboot.entity.Employee;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisPagingItemReader;
import org.springframework.batch.core.ItemReadListener;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.MultiResourceItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.builder.MultiResourceItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor;
import org.springframework.batch.item.file.transform.DelimitedLineAggregator;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.LineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import java.io.File;
import java.math.BigDecimal;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.atomic.LongAdder;
/**
* @author 80004819
* @ClassName:
* @Description:
* @date 2020年11月03日 10:54:30
*/
@Configuration
@Slf4j
public class MybatisBatchTaskConfig {
public final Resource outputResource = new FileSystemResource("output/outputData.csv");
@Autowired
private Common common;
/**
* 构建mybatis分页数据读取器
*
* @return
*/
@Bean
public MyBatisPagingItemReader<CommunityTotal> myBatisPagingItemReader() {
MyBatisPagingItemReader<CommunityTotal> pagingItemReader = new MyBatisPagingItemReader<>();
//指定查询id,即:类路径+方法名
pagingItemReader.setQueryId("com.sf.gis.boot.rcboot.mapper.CommunityStatistMapper.staticCommunityTotal");
//设置sqlSessionFactory
pagingItemReader.setSqlSessionFactory(common.sqlSessionFactory);
//给将要执行的sql传参
pagingItemReader.setParameterValues(MapUtil.of("aoiId", "A0000011"));
//设置从第几条数据开始读取
// pagingItemReader.setCurrentItemCount();
//页面容量
pagingItemReader.setPageSize(10);
//读取的最大条数
pagingItemReader.setMaxItemCount(50);
//分页读取器的名称
// pagingItemReader.setName();
// pagingItemReader.setSaveState();
return pagingItemReader;
}
/**
* 将读取到的每条社区管理数据的人口总数进行加1操作
*
* @return
*/
@Bean
public ItemProcessor<CommunityTotal, CommunityTotal> mybatisItemProcessor() {
return item -> item.setPeopleTotal(item.getPeopleTotal() + 1);
}
/**
* 构建文件写入器
*
* @return
*/
@Bean
public FlatFileItemWriter<CommunityTotal> mybatisWriter() {
FlatFileItemWriter<CommunityTotal> writer = new FlatFileItemWriter<>();
writer.setResource(outputResource);
//是否为追加模式
writer.setAppendAllowed(true);
writer.setLineAggregator(new DelimitedLineAggregator<CommunityTotal>() {
{
//设置分割符
setDelimiter(",");
setFieldExtractor(new BeanWrapperFieldExtractor<CommunityTotal>() {
{
//设置字段
setNames(new String[]{"peopleTotal", "farenTotal", "eventTotal", "houseTypeTotal", "houseUseTotal"});
}
});
}
});
return writer;
}
@Bean
public Step dataSourceStep() {
return common.stepBuilderFactory.get("dataSourceStep").<CommunityTotal, CommunityTotal>chunk(5)
.reader(myBatisPagingItemReader())
.listener(new PkslowReadListener(new LongAdder(), common.objectMapper))
.processor(mybatisItemProcessor())
.writer(mybatisWriter())
.listener(new PkslowWriteListener(new LongAdder(), common.objectMapper))
.build();
}
/**
* 构建读取数据库-写入到csv的job
* @return
*/
@Bean(name = "dataSourceJob")
public Job dataSourceJob() {
return common.jobBuilderFactory
.get("dataSourceJob")
.incrementer(new RunIdIncrementer())
.start(dataSourceStep())
.build();
}
// /**
// * 读取事件监听
// */
// class PkslowReadListener implements ItemReadListener<Employee> {
//
// @Override
// public void beforeRead() {
// log.info("start read~~!");
// }
//
// @Override
// public void afterRead(Employee item) {
// try {
// LONG_ADDER.increment();
// log.info("read {} item content :{}", LONG_ADDER.toString(), common.OBJECT_MAPPER.writeValueAsString(item));
// } catch (Exception e) {
// log.error("json translate exception", e);
// }
// }
//
// @Override
// public void onReadError(Exception ex) {
// log.error("batch read exception", ex);
// }
// }
//
//
// /**
// * 写入事件监听
// */
// class PkslowWriteListener implements ItemWriteListener<Employee> {
//
// @Override
// public void beforeWrite(List<? extends Employee> items) {
// try {
// log.info("brfore write:{}" + common.OBJECT_MAPPER.writeValueAsString(items));
// } catch (Exception e) {
// log.error("json translate exception", e);
// }
// }
//
// @Override
// public void afterWrite(List<? extends Employee> items) {
// try {
// log.info("after write:{}" + common.OBJECT_MAPPER.writeValueAsString(items));
// } catch (Exception e) {
// log.error("json translate exception", e);
// }
// }
//
// @Override
// public void onWriteError(Exception exception, List<? extends Employee> items) {
// try {
// log.info("on Write Error:{}" + common.OBJECT_MAPPER.writeValueAsString(items));
// } catch (Exception e) {
// log.error("json translate exception", e);
// }
// }
// }
}
3.5 运行
后结果发现从数据读取的数量统计指标也写入到了outputData.csv文件中,如下所示:
总结:
spring batch 批处理框架,还有很多优秀的特性,在批量操作,大数据量计算中都可以使用。