您的位置:首页 > 编程语言 > Java开发

SpringBoot之Spring Batch的支持

2018-01-03 16:39 369 查看

一、简介

Spring Batch 是一个轻量级的、完善的批处理框架,旨在帮助企业建立健壮、高效的批处理应用。Spring Batch是Spring的一个子项目,使用Java语言并基于Spring框架为基础开发,使的已经使用 Spring 框架的开发者或者企业更容易访问和利用企业服务。

二、Spring Batch主要组成

SpringBatch主要由以下几部分组成:



以上Spring Batch的主要组成部分只需注册成Spring的Bean即可。若想开启批处理的支持还需在配置类上使用@EnableBatchProcessing。

一个示意的Spring Batch的配置如下:

@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Bean
public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager)
throws Exception {
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
jobRepositoryFactoryBean.setDataSource(dataSource);
jobRepositoryFactoryBean.setTransactionManager(transactionManager);
jobRepositoryFactoryBean.setDatabaseType("oracle");
return jobRepositoryFactoryBean.getObject();
}
@Bean
public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager)
throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager));
return jobLauncher;
}
@Bean
public Job importJob(JobBuilderFactory jobs, Step s1) {
return jobs.get("importJob")
.incrementer(new RunIdIncrementer())
.flow(s1)
.end()
.build();
}
@Bean
public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer,
ItemProcessor<Person,Person> processor) {
return stepBuilderFactory
.get("step1")
.<Person, Person>chunk(65000)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
public ItemReader<Person> reader() throws Exception {
//新建ItemReader接口的实现类返回
return reader;
}
@Bean
public ItemProcessor<Person, Person> processor() {
//新建ItemProcessor接口的实现类返回
return processor;
}
@Bean
public ItemWriter<Person> writer(DataSource dataSource) {
//新建ItemWriter接口的实现类返回
return writer;
}
}


三、Job监听

若需要监听我们的Job的执行情况,则定义个一个类实现JobExecutionListener,并在定义Job的Bean上绑定该监听器。

监听器的定义如下:

public class MyJobListener implements JobExecutionListener{
@Override
public void beforeJob(JobExecution jobExecution) {
//Job开始前
}
@Override
public void afterJob(JobExecution jobExecution) {
//Job完成后
}
}


注册并绑定监听器到Job:

@Bean
public Job importJob(JobBuilderFactory jobs, Step s1) {
return jobs.get("importJob")
.incrementer(new RunIdIncrementer())
.flow(s1)
.end()
.listener(csvJobListener())
.build();
}
@Bean
public MyJobListener myJobListener() {
return new MyJobListener();
}


四、数据处理及校验

Spring Batch为我们提供了大量的ItemReader的实现,用来读取不同的数据来源。

(1)数据处理

数据处理只需实现ItemProcessor接口,重写其process方法。方法输入的参数是从ItemReader读取到的数据,返回的数据给ItemWriter。

public class MyItemProcessor implements ItemProcessor<Person, Person> {
@Override
public Person process(Person person){
String name = person.getName().toUpperCase();
person.setName(name);
return person;
}
}


(2)数据校验

我们可以JSR-303(主要实现有hibernatevalidator)的注解,来校验temReader读取到的数据是否满足要求。

我们可以让我们的ItemProcessor实现ValidatingItemProcessor接口:

public class MyItemProcessor extends ValidatingItemProcessor<Person>{
@Override
public Person process(Person item) throws ValidationException {
super.process(item);
return item;
}
}


定义我们的校验器,实现的Validator接口来自于Spring,我们将使用JSR-303的Validator来校验。

Spring Batch为我们提供了大量的ItemWriter的实现,用来将数据输出到不同的目的地。

Spring

Batch的任务是通过JobLauncher的run方法来执行的,因此我们只需在普通的计划任务方法中执行JobLauncher的run方法即可。

我们在ItemReader和ItemWriter的Bean定义的时候,参数已经硬编码在Bean的初始化中。

五、SpringBoot的支持

Spring Boot对Spring Batch支持的源码位于org.springframework.boot.autoconfigure.batch下。



Spring Boot为我们自动初始化了Spring Batch存储批处理记录的数据库,且当我们程序启动时,会自动执行我们定义的Job的Bean。

Spring Boot提供如下属性来定制Spring Batch:

SpringBoot Properties常用应用属性配置列表

# SPRING BATCH (BatchProperties)
spring.batch.initializer.enabled= # Create the required batch tables on startup if necessary. Enabled automatically if no custom table prefix is set or if a custom schema is configured.
spring.batch.job.enabled=true # Execute all Spring Batch jobs in the context on startup.
spring.batch.job.names= # Comma-separated list of job names to execute on startup (For instance `job1,job2`). By default, all Jobs found in the context are executed.
spring.batch.schema=classpath:org/springframework/batch/core/schema-@@platform@@.sql # Path to the SQL file to use to initialize the database schema.
spring.batch.table-prefix= # Table prefix for all the batch meta-data tables.


六、Springboot实战

(1)pom.xml

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
</dependency>


(2)application.yml

server:
port: 5000

spring:
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&characterSetResults=utf8
username: root
password: password

jpa:
hibernate:
ddl-auto: update
show-sql: true


(3)建表语句,放在/resources/包下schema.sql(mysql)

create table person
(
`id`  int(11) NOT NULL AUTO_INCREMENT ,
`name`  varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL ,
`age`  int(11) NULL DEFAULT NULL ,
`nation`  varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL ,
`address`  varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL ,
PRIMARY KEY (`id`)
)
ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci
AUTO_INCREMENT=1
ROW_FORMAT=COMPACT
;


(4)csv文件,放在/resources/包下people.csv

汪某某,11,汉族,合肥
张某某,12,汉族,上海
李某某,13,非汉族,武汉
刘某,14,非汉族,南京
欧阳某某,15,汉族,北京


(5)Person类

public class Person {
@Size(max = 4, min = 2) //1此处使用JSR-303注解来校验数据。
private String name;
private int age;
private String nation;
private String address;
//省略get、set方法
}


(6)数据处理CsvItemProcessor

public class CsvItemProcessor extends ValidatingItemProcessor<Person> {
@Override
public Person process(Person item) throws ValidationException {
super.process(item); //1需执行super.proces:(item)才会调用自定义校验器。
if (item.getNation().equals("汉族")) { //2对数据做简单的处理,若民族为汉族,则数据转换成01,其余转换成02。
item.setNation("01");
} else {
item.setNation("02");
}
return item;
}
}


(7)数据校验CsvBeanValidator

public class CsvBeanValidator<T> implements Validator<T>, InitializingBean {
private javax.validation.Validator validator;

@Override
public void afterPropertiesSet() throws Exception { //1使用JSR-303的Validator来校验我们的数据,在此处进行JSR-303的Validator的初始化。
ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory();
validator = validatorFactory.usingContext().getValidator();
}

@Override
public void validate(T value) throws ValidationException {
Set<ConstraintViolation<T>> constraintViolations = validator.validate(value); //2使用Validator的validate方法校验数据。
if (constraintViolations.size() > 0) {
StringBuilder message = new StringBuilder();
for (ConstraintViolation<T> constraintViolation : constraintViolations) {
message.append(constraintViolation.getMessage() + "\n");
}
throw new ValidationException(message.toString());
}
}
}


(8)Job监听CsvJobListener

public class CsvJobListener implements JobExecutionListener {
long startTime;
long endTime;

//监听器实现JobExecutionListener接口,并重写其beforeJob、afterJob方法即可。
@Override
public void beforeJob(JobExecution jobExecution) {
startTime = System.currentTimeMillis();
System.out.println("任务处理开始");
}

@Override
public void afterJob(JobExecution jobExecution) {
endTime = System.currentTimeMillis();
System.out.println("任务处理结束");
System.out.println("耗时:" + (endTime - startTime) + "ms");
}
}


(9)配置CsvBatchConfig(注意datasource方式)

@Configuration
@EnableBatchProcessing
public class CsvBatchConfig {
@Bean
public ItemReader<Person> reader() throws Exception {
FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>();//1使用FlatFileItemReader读取文件。
reader.setResource(new ClassPathResource("people.csv"));//2使用FlatFileItemReader的setResource方法设置csv文件的路径。
reader.setLineMapper(new DefaultLineMapper<Person>() {{//3在此处对cvs文件的数据和领域模型类做对应映射。
setLineTokenizer(new DelimitedLineTokenizer() {{
setNames(new String[]{"name", "age", "nation", "address"});
}});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
setTargetType(Person.class);
}});
}});
return reader;
}

@Bean
public ItemProcessor<Person, Person> processor() {
CsvItemProcessor processor = new CsvItemProcessor();//1使用我们自己定义的ItemProcessor的实现CsvItemProcessor。
processor.setValidator(csvBeanValidator());//2为processor指定校验器为CsvBeanValidator;
return processor;
}

@Bean
public ItemWriter<Person> writer(DataSource dataSource) {//1Spring能让容器中已有的Bean以参数的形式注入,Spring Boot已为我们定义了dataSource。
JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>();//2我们使用JDBC批处理的JdbcBatchItemWriter来写数据到数据库。
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
String sql = "insert into person " + " (name,age,nation,address) "
+ "values( :name, :age, :nation,:address)";
writer.setSql(sql);//3在此设置要执行批处理的SQL语句。
writer.setDataSource(dataSource);
return writer;
}

@Bean
public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager)
throws Exception {
//jobRepository的定义需要dataSource和transactioManager,Spring Boot已为我们自动配置了这两个类,Spring可通过方法注入已有的Bean。
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
jobRepositoryFactoryBean.setDataSource(dataSource);
jobRepositoryFactoryBean.setTransactionManager(transactionManager);
jobRepositoryFactoryBean.setDatabaseType("mysql");
return jobRepositoryFactoryBean.getObject();
}

@Bean
public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager)
throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager));
return jobLauncher;
}

@Bean
public Job importJob(JobBuilderFactory jobs, Step s1) {
return jobs.get("importJob")
.incrementer(new RunIdIncrementer())
.flow(s1) //1为Job指定Step。
.end()
.listener(csvJobListener()) //2绑定监听器csvJobListener。
.build();
}

@Bean
public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer,
ItemProcessor<Person, Person> processor) {
return stepBuilderFactory
.get("step1")
.<Person, Person>chunk(65000)//1批处理每次提交65000条数据。
.reader(reader)//2给step绑定reader。
.processor(processor)//3给step绑定processor。
.writer(writer)//4给step绑定writer。
.build();
}

@Bean
public CsvJobListener csvJobListener() {
return new CsvJobListener();
}

@Bean
public Validator<Person> csvBeanValidator() {
return new CsvBeanValidator<Person>();
}
}


七、测试

(1)启动项目(数据已经处理):



(2)为我们初始化的Spring Batch数据库表:



(3)处理数据入库



备注:这是通过启动后自动执行的方式,很多时候需要人为去触发。暂且不举例。

参考资料《JavaEE开发的颠覆者 Spring Boot》

新手一枚,欢迎拍砖~ ~ ~

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  springboot batch 批处理