SpringBoot 2 批量服务

开篇词

该指南将引导你完成创建基本的批处理驱动的解决方案。
 

你将创建的应用

我们将构建一个服务,该服务从 CSV 电子表格导入数据,使用自定义代码对其进行转换,然后将最终结果存储在数据库中。
 

你将需要的工具

如何完成这个指南

像大多数的 Spring 入门指南一样,你可以从头开始并完成每个步骤,也可以绕过你已经熟悉的基本设置步骤。如论哪种方式,你最终都有可以工作的代码。

  • 要从头开始,移步至从 Spring Initializr 开始
  • 要跳过基础,执行以下操作:
    • 下载并解压缩该指南将用到的源代码,或借助 Git 来对其进行克隆操作:git clone https://github.com/spring-guides/gs-batch-processing.git
    • 切换至 gs-batch-processing/initial 目录;
    • 跳转至该指南的创建业务类

待一切就绪后,可以检查一下 gs-batch-processing/complete 目录中的代码。
 

业务数据

通常,我们的客户或业务分析师提供电子表格。对于这个简单示例,我们可以在 src/main/resources/sample-data.csv 中找到一些测试数据:

Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe

该电子表格每行上都包含一个名字和姓氏,用逗号分隔。这是 Spring 无需定制即可处理的普遍模式。

接下来,我们需要编写一个 SQL 脚本来创建一个表来存储数据。我们可以在 src/main/resources/schema-all.sql 中找到该脚本:

DROP TABLE people IF EXISTS;

CREATE TABLE people  (
    person_id BIGINT IDENTITY NOT NULL PRIMARY KEY,
    first_name VARCHAR(20),
    last_name VARCHAR(20)
);

Spring Boot 在启动期间会自动运行 schema-@@platform@@.sql-all 是默认全部平台。

从 Spring Initializr 开始

对于所有的 Spring 应用来说,你应该从 Spring Initializr 开始。Initializr 提供了一种快速的方法来提取应用程序所需的依赖,并为你完成许多设置。该示例需要 Spring Batch 和 HyperSQL 数据库依赖。下图显示了此示例项目的 Initializr 设置:

上图显示了选择 Maven 作为构建工具的 Initializr。你也可以使用 Gradle。它还将 com.examplebatch-processing 的值分别显示为 Group 和 Artifact。在本示例的其余部分,将用到这些值。

以下清单显示了选择 Maven 时创建的 pom.xml 文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.2.2.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.example</groupId>
	<artifactId>batch-processing</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>batch-processing</name>
	<description>Demo project for Spring Boot</description>

	<properties>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-batch</artifactId>
		</dependency>

		<dependency>
			<groupId>org.hsqldb</groupId>
			<artifactId>hsqldb</artifactId>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
			<exclusions>
				<exclusion>
					<groupId>org.junit.vintage</groupId>
					<artifactId>junit-vintage-engine</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.springframework.batch</groupId>
			<artifactId>spring-batch-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

以下清单显示了在选择 Gradle 时创建的 build.gradle 文件:

plugins {
	id 'org.springframework.boot' version '2.2.2.RELEASE'
	id 'io.spring.dependency-management' version '1.0.8.RELEASE'
	id 'java'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'

repositories {
	mavenCentral()
}

dependencies {
	implementation 'org.springframework.boot:spring-boot-starter-batch'
	runtimeOnly 'org.hsqldb:hsqldb'
	testImplementation('org.springframework.boot:spring-boot-starter-test') {
		exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
	}
	testImplementation 'org.springframework.batch:spring-batch-test'
}

test {
	useJUnitPlatform()
}

创建业务类

现在我们可以看到数据输入和输出的格式,可以编写代码来代表一行数据,如以下示例(来自 src/main/java/com/example/batchprocessing/Person.java)所示:

package com.example.batchprocessing;

public class Person {

  private String lastName;
  private String firstName;

  public Person() {
  }

  public Person(String firstName, String lastName) {
    this.firstName = firstName;
    this.lastName = lastName;
  }

  public void setFirstName(String firstName) {
    this.firstName = firstName;
  }

  public String getFirstName() {
    return firstName;
  }

  public String getLastName() {
    return lastName;
  }

  public void setLastName(String lastName) {
    this.lastName = lastName;
  }

  @Override
  public String toString() {
    return "firstName: " + firstName + ", lastName: " + lastName;
  }

}

我们可以通过构造函数或通过设置属性来使用姓和名实例化 Person 类。
 

创建中间处理器

批处理中的常见范例是摄取数据,对其进行转换,然后将其通过管道传输到其他地方。在这里,我们需要编写一个简单的转换器,将名称转换为大写。以下清单(来自 src/main/java/com/example/batchprocessing/PersonItemProcessor.java)显示了如何执行该操作:

package com.example.batchprocessing;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.batch.item.ItemProcessor;

public class PersonItemProcessor implements ItemProcessor<Person, Person> {

  private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);

  @Override
  public Person process(final Person person) throws Exception {
    final String firstName = person.getFirstName().toUpperCase();
    final String lastName = person.getLastName().toUpperCase();

    final Person transformedPerson = new Person(firstName, lastName);

    log.info("Converting (" + person + ") into (" + transformedPerson + ")");

    return transformedPerson;
  }

}

PersonItemProcessor 实现 Spring Batch 的 ItemProcessor 接口。这样可以很容易地将代码连接到批处理作业中,我们将在该指南后面定义。根据该接口,我们会收到一个传入的 Person 对象,然后将其转换为大写的 Person

输入和输出类型不必相同。实际上,在读取一个数据源之后,有时应用的数据流需要另一种数据类型。
 

汇总批处理作业

现在,我们需要整理实际的批处理作业。Spring Batch 提供了许多实用程序类,这些实用程序类减少了编写自定义代码的需求。相反,我们可以专注于业务逻辑。

要配置我们的作业,我们必须首先在 src/main/java/com/exampe/batchprocessing/BatchConfiguration.java 中创建一个 Spring @Configuration 类,如以下示例所示:

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

  @Autowired
  public JobBuilderFactory jobBuilderFactory;

  @Autowired
  public StepBuilderFactory stepBuilderFactory;

    ...

}

对于初学者来说,@EnableBatchProcessing 注解添加了许多关键的 bean,这些 bean 支持作业并为我们节省了很多工作。该示例使用基于内存的数据库(由 @EnableBatchProcessing 提供),这意味着完成之后数据将会消失。它还会自动识别下面需要的几个工厂。现在,将以下 bean 添加到我们的 BatchConfiguration 类中,以定义读取器、处理器以及写入器:

@Bean
public FlatFileItemReader<Person> reader() {
  return new FlatFileItemReaderBuilder<Person>()
    .name("personItemReader")
    .resource(new ClassPathResource("sample-data.csv"))
    .delimited()
    .names(new String[]{"firstName", "lastName"})
    .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
      setTargetType(Person.class);
    }})
    .build();
}

@Bean
public PersonItemProcessor processor() {
  return new PersonItemProcessor();
}

@Bean
public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
  return new JdbcBatchItemWriterBuilder<Person>()
    .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
    .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
    .dataSource(dataSource)
    .build();
}

第一部分代码定义了输入、处理器以及输出:

  • readder() 创建了一个 ItemReader。它会查找一个名为 sample-data.csv 的文件,并分析每个项并提供足够的信息以将其转换为 Person
  • processor() 创建我们先前定义的 PersonItemProcessor 实例,旨在将数据转换为大写;
  • write(DataSource) 创建了一个 ItemWriter。这针对的是 JDBC 目标,并自动获取由 @EnableBatchProcessing 创建的 dataSource 副本。它包含由 Java bean 属性驱动的插入单个 Person 所需的 SQL 语句。

最后一块(来自 src/main/java/com/example/batchprocessing/BatchConfiguration.java)显示了实际的作业配置:

@Bean
public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
  return jobBuilderFactory.get("importUserJob")
    .incrementer(new RunIdIncrementer())
    .listener(listener)
    .flow(step1)
    .end()
    .build();
}

@Bean
public Step step1(JdbcBatchItemWriter<Person> writer) {
  return stepBuilderFactory.get("step1")
    .<Person, Person> chunk(10)
    .reader(reader())
    .processor(processor())
    .writer(writer)
    .build();
}

第一个方法定义了作业,第二个方法定义了一个步骤。作业是按步骤构建的,其中每个步骤都会涉及到阅读器、处理器以及编写器。

在该作业定义中,我们需要一个增量器,因为作业使用数据库来维护执行状态。然后,我们列出每个步骤(尽管该作业只有一个步骤)。作业结束,并且 Java API 产生了配置完美的作业。

在步骤定义中,我们定义一次要写入多少数据。在这种情况下,它一次最多写入 10 条记录。接下来,使用前面注入的位来匹配读取器、处理器以及写入器。

chunk() 的前缀为 <Person,Person>,因为它是一个范型方法。这表示每个 “块” 处理的输入和输出类型,并与 ItemReader<Person> 以及 ItemWriter<Person> 对齐。

批处理配置的最后一个位是在作业完成时获得通知的方法。以下示例(来自 src/main/java/com/example/batchprocessing/JobCompletionNotificationListener.java)显示了该类:

package com.example.batchprocessing;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {

  private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);

  private final JdbcTemplate jdbcTemplate;

  @Autowired
  public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
    this.jdbcTemplate = jdbcTemplate;
  }

  @Override
  public void afterJob(JobExecution jobExecution) {
    if(jobExecution.getStatus() == BatchStatus.COMPLETED) {
      log.info("!!! JOB FINISHED! Time to verify the results");

      jdbcTemplate.query("SELECT first_name, last_name FROM people",
        (rs, row) -> new Person(
          rs.getString(1),
          rs.getString(2))
      ).forEach(person -> log.info("Found <" + person + "> in the database."));
    }
  }
}

JobCompletionNotificationListener 监听处在 BatchStatus.COMPLETED 的作业并使用 JdbcTemplate 来检查结果。
 

使应用可执行

尽管批处理可以嵌入到 Web 应用和 WAR 文件中,但下面演示的方法以更简单的方式创建了一个独立的应用。我们将所有内容打包在一个可执行的 JAR 文件中,由一个老而好的 Java main() 方法来驱动。

Spring Initializr 创建了一个应用类。对于这个简单的示例,它无需进一步修改即可工作。以下清单(来自 src/main/java/com/example/batchprocessing/BatchProcessingApplication.java)显示了应用类:

package com.example.batchprocessing;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BatchProcessingApplication {

  public static void main(String[] args) throws Exception {
    SpringApplication.run(BatchProcessingApplication.class, args);
  }
}

@SpringBootApplication 是一个便利的注解,它添加了以下所有内容:

  • @Configuration:将类标记为应用上下文 Bean 定义的源;
  • @EnableAutoConfiguration:告诉 Spring Boot 根据类路径配置、其他 bean 以及各种属性的配置来添加 bean。
  • @ComponentScan:告知 Spring 在 com/example 包中寻找他组件、配置以及服务。

main() 方法使用 Spring Boot 的 SpringApplication.run() 方法启动应用。

出于演示目的,有些代码可创建 JdbcTemplate,查询数据库并打印出批处理作业插入的人员姓名。
 

构建可执行 JAR

我们可以结合 Gradle 或 Maven 来从命令行运行该应用。我们还可以构建一个包含所有必须依赖项、类以及资源的可执行 JAR 文件,然后运行该文件。在整个开发生命周期中,跨环境等等情况下,构建可执行 JAR 可以轻松地将服务作为应用进行发布、版本化以及部署。

如果使用 Gradle,则可以借助 ./gradlew bootRun 来运行应用。或通过借助 ./gradlew build 来构建 JAR 文件,然后运行 JAR 文件,如下所示:

java -jar build/libs/gs-batch-processing-0.1.0.jar

由官网提供的以上这条命令的执行结果与我本地的不一样,我需要这样才能运行:java -jar build/libs/batch-processing-0.0.1-SNAPSHOT.jar

如果使用 Maven,则可以借助 ./mvnw spring-boot:run 来运行该用。或可以借助 ./mvnw clean package 来构建 JAR 文件,然后运行 JAR 文件,如下所示:

java -jar target/gs-batch-processing-0.1.0.jar

由官网提供的以上这条命令的执行结果与我本地的不一样,我需要这样才能运行:java -jar target/batch-processing-0.0.1-SNAPSHOT.jar

我们还可以构建一个经典的 WAR 文件

该作业为每个要转变的人打印一行。作业运行后,我们还可以看到数据库查询的输出。它应该类似于以下输出:

Converting (firstName: Jill, lastName: Doe) into (firstName: JILL, lastName: DOE)
Converting (firstName: Joe, lastName: Doe) into (firstName: JOE, lastName: DOE)
Converting (firstName: Justin, lastName: Doe) into (firstName: JUSTIN, lastName: DOE)
Converting (firstName: Jane, lastName: Doe) into (firstName: JANE, lastName: DOE)
Converting (firstName: John, lastName: Doe) into (firstName: JOHN, lastName: DOE)
Found <firstName: JILL, lastName: DOE> in the database.
Found <firstName: JOE, lastName: DOE> in the database.
Found <firstName: JUSTIN, lastName: DOE> in the database.
Found <firstName: JANE, lastName: DOE> in the database.
Found <firstName: JOHN, lastName: DOE> in the database.

概述

恭喜你!我们构建了一个批处理作业,改作业从电子表格中提取数据,对其进行处理,然后将其写入数据库。
 

参见

以下指南也可能会有所帮助:

想看指南的其他内容?请访问该指南的所属专栏:《Spring 官方指南

发布了132 篇原创文章 · 获赞 6 · 访问量 7980

猜你喜欢

转载自blog.csdn.net/stevenchen1989/article/details/104141393