SpringBoot之Spring Batch的支持
2018-01-03 16:39
369 查看
一、简介
Spring Batch 是一个轻量级的、完善的批处理框架,旨在帮助企业建立健壮、高效的批处理应用。Spring Batch是Spring的一个子项目,使用Java语言并基于Spring框架为基础开发,使的已经使用 Spring 框架的开发者或者企业更容易访问和利用企业服务。二、Spring Batch主要组成
SpringBatch主要由以下几部分组成:以上Spring Batch的主要组成部分只需注册成Spring的Bean即可。若想开启批处理的支持还需在配置类上使用@EnableBatchProcessing。
一个示意的Spring Batch的配置如下:
@Configuration @EnableBatchProcessing public class BatchConfig { @Bean public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); jobRepositoryFactoryBean.setDataSource(dataSource); jobRepositoryFactoryBean.setTransactionManager(transactionManager); jobRepositoryFactoryBean.setDatabaseType("oracle"); return jobRepositoryFactoryBean.getObject(); } @Bean public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager)); return jobLauncher; } @Bean public Job importJob(JobBuilderFactory jobs, Step s1) { return jobs.get("importJob") .incrementer(new RunIdIncrementer()) .flow(s1) .end() .build(); } @Bean public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer, ItemProcessor<Person,Person> processor) { return stepBuilderFactory .get("step1") .<Person, Person>chunk(65000) .reader(reader) .processor(processor) .writer(writer) .build(); } @Bean public ItemReader<Person> reader() throws Exception { //新建ItemReader接口的实现类返回 return reader; } @Bean public ItemProcessor<Person, Person> processor() { //新建ItemProcessor接口的实现类返回 return processor; } @Bean public ItemWriter<Person> writer(DataSource dataSource) { //新建ItemWriter接口的实现类返回 return writer; } }
三、Job监听
若需要监听我们的Job的执行情况,则定义个一个类实现JobExecutionListener,并在定义Job的Bean上绑定该监听器。监听器的定义如下:
public class MyJobListener implements JobExecutionListener{ @Override public void beforeJob(JobExecution jobExecution) { //Job开始前 } @Override public void afterJob(JobExecution jobExecution) { //Job完成后 } }
注册并绑定监听器到Job:
@Bean public Job importJob(JobBuilderFactory jobs, Step s1) { return jobs.get("importJob") .incrementer(new RunIdIncrementer()) .flow(s1) .end() .listener(csvJobListener()) .build(); } @Bean public MyJobListener myJobListener() { return new MyJobListener(); }
四、数据处理及校验
Spring Batch为我们提供了大量的ItemReader的实现,用来读取不同的数据来源。(1)数据处理
数据处理只需实现ItemProcessor接口,重写其process方法。方法输入的参数是从ItemReader读取到的数据,返回的数据给ItemWriter。public class MyItemProcessor implements ItemProcessor<Person, Person> { @Override public Person process(Person person){ String name = person.getName().toUpperCase(); person.setName(name); return person; } }
(2)数据校验
我们可以JSR-303(主要实现有hibernatevalidator)的注解,来校验temReader读取到的数据是否满足要求。我们可以让我们的ItemProcessor实现ValidatingItemProcessor接口:
public class MyItemProcessor extends ValidatingItemProcessor<Person>{ @Override public Person process(Person item) throws ValidationException { super.process(item); return item; } }
定义我们的校验器,实现的Validator接口来自于Spring,我们将使用JSR-303的Validator来校验。
Spring Batch为我们提供了大量的ItemWriter的实现,用来将数据输出到不同的目的地。
Spring
Batch的任务是通过JobLauncher的run方法来执行的,因此我们只需在普通的计划任务方法中执行JobLauncher的run方法即可。
我们在ItemReader和ItemWriter的Bean定义的时候,参数已经硬编码在Bean的初始化中。
五、SpringBoot的支持
Spring Boot对Spring Batch支持的源码位于org.springframework.boot.autoconfigure.batch下。Spring Boot为我们自动初始化了Spring Batch存储批处理记录的数据库,且当我们程序启动时,会自动执行我们定义的Job的Bean。
Spring Boot提供如下属性来定制Spring Batch:
SpringBoot Properties常用应用属性配置列表
# SPRING BATCH (BatchProperties) spring.batch.initializer.enabled= # Create the required batch tables on startup if necessary. Enabled automatically if no custom table prefix is set or if a custom schema is configured. spring.batch.job.enabled=true # Execute all Spring Batch jobs in the context on startup. spring.batch.job.names= # Comma-separated list of job names to execute on startup (For instance `job1,job2`). By default, all Jobs found in the context are executed. spring.batch.schema=classpath:org/springframework/batch/core/schema-@@platform@@.sql # Path to the SQL file to use to initialize the database schema. spring.batch.table-prefix= # Table prefix for all the batch meta-data tables.
六、Springboot实战
(1)pom.xml<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.hibernate</groupId> <artifactId>hibernate-validator</artifactId> </dependency>
(2)application.yml
server: port: 5000 spring: datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&characterSetResults=utf8 username: root password: password jpa: hibernate: ddl-auto: update show-sql: true
(3)建表语句,放在/resources/包下schema.sql(mysql)
create table person ( `id` int(11) NOT NULL AUTO_INCREMENT , `name` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL , `age` int(11) NULL DEFAULT NULL , `nation` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL , `address` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL , PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci AUTO_INCREMENT=1 ROW_FORMAT=COMPACT ;
(4)csv文件,放在/resources/包下people.csv
汪某某,11,汉族,合肥 张某某,12,汉族,上海 李某某,13,非汉族,武汉 刘某,14,非汉族,南京 欧阳某某,15,汉族,北京
(5)Person类
public class Person { @Size(max = 4, min = 2) //1此处使用JSR-303注解来校验数据。 private String name; private int age; private String nation; private String address; //省略get、set方法 }
(6)数据处理CsvItemProcessor
public class CsvItemProcessor extends ValidatingItemProcessor<Person> { @Override public Person process(Person item) throws ValidationException { super.process(item); //1需执行super.proces:(item)才会调用自定义校验器。 if (item.getNation().equals("汉族")) { //2对数据做简单的处理,若民族为汉族,则数据转换成01,其余转换成02。 item.setNation("01"); } else { item.setNation("02"); } return item; } }
(7)数据校验CsvBeanValidator
public class CsvBeanValidator<T> implements Validator<T>, InitializingBean { private javax.validation.Validator validator; @Override public void afterPropertiesSet() throws Exception { //1使用JSR-303的Validator来校验我们的数据,在此处进行JSR-303的Validator的初始化。 ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory(); validator = validatorFactory.usingContext().getValidator(); } @Override public void validate(T value) throws ValidationException { Set<ConstraintViolation<T>> constraintViolations = validator.validate(value); //2使用Validator的validate方法校验数据。 if (constraintViolations.size() > 0) { StringBuilder message = new StringBuilder(); for (ConstraintViolation<T> constraintViolation : constraintViolations) { message.append(constraintViolation.getMessage() + "\n"); } throw new ValidationException(message.toString()); } } }
(8)Job监听CsvJobListener
public class CsvJobListener implements JobExecutionListener { long startTime; long endTime; //监听器实现JobExecutionListener接口,并重写其beforeJob、afterJob方法即可。 @Override public void beforeJob(JobExecution jobExecution) { startTime = System.currentTimeMillis(); System.out.println("任务处理开始"); } @Override public void afterJob(JobExecution jobExecution) { endTime = System.currentTimeMillis(); System.out.println("任务处理结束"); System.out.println("耗时:" + (endTime - startTime) + "ms"); } }
(9)配置CsvBatchConfig(注意datasource方式)
@Configuration @EnableBatchProcessing public class CsvBatchConfig { @Bean public ItemReader<Person> reader() throws Exception { FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>();//1使用FlatFileItemReader读取文件。 reader.setResource(new ClassPathResource("people.csv"));//2使用FlatFileItemReader的setResource方法设置csv文件的路径。 reader.setLineMapper(new DefaultLineMapper<Person>() {{//3在此处对cvs文件的数据和领域模型类做对应映射。 setLineTokenizer(new DelimitedLineTokenizer() {{ setNames(new String[]{"name", "age", "nation", "address"}); }}); setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{ setTargetType(Person.class); }}); }}); return reader; } @Bean public ItemProcessor<Person, Person> processor() { CsvItemProcessor processor = new CsvItemProcessor();//1使用我们自己定义的ItemProcessor的实现CsvItemProcessor。 processor.setValidator(csvBeanValidator());//2为processor指定校验器为CsvBeanValidator; return processor; } @Bean public ItemWriter<Person> writer(DataSource dataSource) {//1Spring能让容器中已有的Bean以参数的形式注入,Spring Boot已为我们定义了dataSource。 JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>();//2我们使用JDBC批处理的JdbcBatchItemWriter来写数据到数据库。 writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>()); String sql = "insert into person " + " (name,age,nation,address) " + "values( :name, :age, :nation,:address)"; writer.setSql(sql);//3在此设置要执行批处理的SQL语句。 writer.setDataSource(dataSource); return writer; } @Bean public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { //jobRepository的定义需要dataSource和transactioManager,Spring Boot已为我们自动配置了这两个类,Spring可通过方法注入已有的Bean。 JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); jobRepositoryFactoryBean.setDataSource(dataSource); jobRepositoryFactoryBean.setTransactionManager(transactionManager); jobRepositoryFactoryBean.setDatabaseType("mysql"); return jobRepositoryFactoryBean.getObject(); } @Bean public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager)); return jobLauncher; } @Bean public Job importJob(JobBuilderFactory jobs, Step s1) { return jobs.get("importJob") .incrementer(new RunIdIncrementer()) .flow(s1) //1为Job指定Step。 .end() .listener(csvJobListener()) //2绑定监听器csvJobListener。 .build(); } @Bean public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer, ItemProcessor<Person, Person> processor) { return stepBuilderFactory .get("step1") .<Person, Person>chunk(65000)//1批处理每次提交65000条数据。 .reader(reader)//2给step绑定reader。 .processor(processor)//3给step绑定processor。 .writer(writer)//4给step绑定writer。 .build(); } @Bean public CsvJobListener csvJobListener() { return new CsvJobListener(); } @Bean public Validator<Person> csvBeanValidator() { return new CsvBeanValidator<Person>(); } }
七、测试
(1)启动项目(数据已经处理):(2)为我们初始化的Spring Batch数据库表:
(3)处理数据入库
备注:这是通过启动后自动执行的方式,很多时候需要人为去触发。暂且不举例。
参考资料《JavaEE开发的颠覆者 Spring Boot》
新手一枚,欢迎拍砖~ ~ ~
相关文章推荐
- springboot springjpa 支持多个数据源
- springboot笔记(2)springboot-添加对jsp支持
- 【原】spring boot source 1.5 中不支持 diamond 运算符
- Java 9 和Spring Boot 2.0纷纷宣布支持的HTTP/2到底是什么?
- 代码生成器CodeGenerator(目前只支持SpringBoot/SpringDataJpa)
- Spring Boot 之FilterRegistrationBean --支持web Filter 排序的使用
- Spring Boot对Ajax跨域访问的支持实现
- 详解Spring Boot 添加JSP支持
- spring boot 学习笔记(二) 构建web支持jsp
- Spring Boot 之web Filter --支持排序的使用扩展
- Spring Boot 添加JSP支持【从零开始学Spring Boot】
- SpringBoot学习-支持Jersey
- SpringCloud SpringBoot mybatis 分布式微服务(十一)Spring Boot多数据源配置与使用(Spring-data-jpa支持)
- SpringBoot学习-支持PageHelp插件使用
- Spring Boot 集成 批处理框架Spring batch
- springboot支持处理DELETE!
- SpringBoot对SpringSecurity的支持
- spring-boot 添加跨域支持
- 19. Spring Boot 添加JSP支持【从零开始学Spring Boot】
- java鬼混笔记:springboot 6、springboot整合mybatis(支持多数源)