spring batch + spring boot 配置
2015-11-06 16:08
507 查看
1. spring batch 批处理配置
BlackListDOItemProcessor 处理类
BlackListItemWriter 写入数据库类
JobCompletionNotificationListener 监听任务
RetryFailuireItemListener 重试监听
2. spring boot 配置
4. mybatis 配置
5. pom.xml文件
import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecutionListener; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.ParseException; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.LineMapper; import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.transaction.PlatformTransactionManager; /** * spring batch 配置 * @author */ @Configuration @EnableBatchProcessing public class BlackListBatchConfiguration { private static final Logger logger = LoggerFactory.getLogger(BlackListBatchConfiguration.class); /** * 读取外部文件方法 * @return * @throws IOException */ @Bean @StepScope public ItemReader<BlackListDO> reader(@Value("#{jobParameters[inputFileBlack]}") String inputFile) throws IOException { logger.info("inputFile:"+new ClassPathResource(inputFile).getURL().getPath()); if(inputFile == null){ logger.error("The blacklist reader file is null"); return null; } FlatFileItemReader<BlackListDO> reader = new FlatFileItemReader<BlackListDO>(); reader.setResource(new ClassPathResource(inputFile)); reader.setLineMapper(lineMapper()); reader.setLinesToSkip(1); reader.open(JobCompletionNotificationListener.jobExecution.getExecutionContext()); return reader; } /** * 读取文本行映射POJO * @return */ @Bean @StepScope public LineMapper<BlackListDO> lineMapper() { DefaultLineMapper<BlackListDO> lineMapper = new DefaultLineMapper<BlackListDO>(); DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer(); lineTokenizer.setDelimiter(","); lineTokenizer.setStrict(false); lineTokenizer.setNames(new String[] { "type","value","fraudType"}); BeanWrapperFieldSetMapper<BlackListDO> fieldSetMapper = new BeanWrapperFieldSetMapper<BlackListDO>(); fieldSetMapper.setTargetType(BlackListDO.class); lineMapper.setLineTokenizer(lineTokenizer); lineMapper.setFieldSetMapper(new BlackListFieldSetMapper()); return lineMapper; } /** * 处理过程 * @return */ @Bean @StepScope public ItemProcessor<BlackListDO, BlackListDO> processor(@Value("#{jobParameters[inputFileBlack]}") String inputFile) { return new BlackListDOItemProcessor(inputFile); } /** * 写出内容 * @return */ @Bean @StepScope public ItemWriter<BlackListDO> writer() { return new BlackListItemWriter(); } /** * 构建job * @param jobs * @param s1 * @param listener * @return */ @Bean public Job importFileJob(JobBuilderFactory jobs, Step step1,JobExecutionListener listener,JobRepository jobRepository) { return jobs.get("importFileJob") .incrementer(new RunIdIncrementer()) .repository(jobRepository) .listener(listener) .flow(step1) .end() .build(); } /** * 声明step * @param stepBuilderFactory * @param reader * @param writer * @param processor * @return */ @Bean public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<BlackListDO> reader, ItemWriter<BlackListDO> writer, ItemProcessor<BlackListDO, BlackListDO> processor,PlatformTransactionManager transactionManager) { logger.error("step1"); return stepBuilderFactory.get("step1") .<BlackListDO, BlackListDO> chunk(500) .reader(reader) .processor(processor) .writer(writer) .faultTolerant() .retry(Exception.class) // 重试 .noRetry(ParseException.class) .retryLimit(1) //每条记录重试一次 .listener(new RetryFailuireItemListener()) .skip(Exception.class) .skipLimit(500) //一共允许跳过200次异常 .taskExecutor(new SimpleAsyncTaskExecutor()) //设置并发方式执行 .throttleLimit(10) //并发任务数为 10,默认为4 .transactionManager(transactionManager) .build(); } }
BlackListDOItemProcessor 处理类
import java.sql.Timestamp; import java.util.Date; import java.util.UUID; import org.springframework.batch.item.ItemProcessor; import com.BlackListDO; /** * @author zhengyong * */ public class BlackListDOItemProcessor implements ItemProcessor<BlackListDO, BlackListDO> { public String inputFile; public BlackListDOItemProcessor() { } public BlackListDOItemProcessor(String inputFile) { this.inputFile = inputFile; } // 数据处理 public BlackListDO process(BlackListDO blackListDO) throws Exception { blackListDO.setDeleteFlag(0); blackListDO.setUuid(UUID.randomUUID().toString().replaceAll("-", "")); return blackListDO; } }
BlackListItemWriter 写入数据库类
import org.springframework.batch.item.ItemWriter; import org.springframework.beans.factory.annotation.Autowired; import com.BlackListDO; public class BlackListItemWriter implements ItemWriter<BlackListDO> { @Override public void write(List<? extends BlackListDO> blackList) throws Exception { // 插入数据库操作 } }
JobCompletionNotificationListener 监听任务
import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.listener.JobExecutionListenerSupport; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class JobCompletionNotificationListener extends JobExecutionListenerSupport { @Override public void afterJob(JobExecution jobExecution) { } @Override public void beforeJob(JobExecution jobExecution) { super.beforeJob(jobExecution); } }
RetryFailuireItemListener 重试监听
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.retry.RetryCallback; import org.springframework.retry.RetryContext; import org.springframework.retry.RetryListener; public class RetryFailuireItemListener implements RetryListener{ private static final Logger logger = LoggerFactory.getLogger(RetryFailuireItemListener.class); @Override public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) { return true; } @Override public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) { } @Override public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) { logger.error("【重试异常】:"+throwable.getMessage()); } }
2. spring boot 配置
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.boot.context.web.SpringBootServletInitializer; import org.springframework.context.annotation.ComponentScan; @ComponentScan("com.syncclient") @SpringBootApplication public class SpringBootJspApplication extends SpringBootServletInitializer{ /** * 500一批 * oracle : 单条插入基本每分钟2.5W条(50W,19.5min) ,批量插入基本每分钟10W条(50W,4.7mim) * mysql : 单条插入基本每分钟2.5W条(50W,11.4min) ,批量插入基本每分钟40W条(50W,1.3min) */ @Override protected SpringApplicationBuilder configure(SpringApplicationBuilder application) { return application.sources(SpringBootJspApplication.class); } public static void main(String[] args) throws Exception { SpringApplication.run(SpringBootJspApplication.class,new String[]{"appStart=true"}); } }3. 配置文件
# mysql config #spring.boot.database = mysql #spring.datasource.url = jdbc:mysql://127.0.0.1:3306/spring_batch?useUnicode=true&characterEncoding=utf8 #spring.datasource.username = admin #spring.datasource.password = 123456 #spring.datasource.driverClassName = com.mysql.jdbc.Driver #spring.batch.schema = classpath:/org/springframework/batch/core/schema-mysql.sql #spring.batch.drop = classpath:/org/springframework/batch/core/schema-drop-mysql.sql # oracle config spring.boot.database = oracle spring.datasource.url = jdbc:oracle:thin:@127.0.0.1:1521:spring_batch spring.datasource.username = admin spring.datasource.password = 123456 spring.datasource.driverClassName = oracle.jdbc.driver.OracleDriver spring.batch.schema = classpath:org/springframework/batch/core/schema-oracle10g.sql spring.batch.drop = classpath:org/springframework/batch/core/schema-drop-oracle10g.sql # batch config spring.batch.job.names = importFileJob spring.batch.job.enabled = true spring.batch.initializer.enabled=true
4. mybatis 配置
import javax.sql.DataSource; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.SqlSessionFactoryBean; import org.mybatis.spring.SqlSessionTemplate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.transaction.PlatformTransactionManager; import com.alibaba.druid.pool.DruidDataSource; import com.BlackListDao; import com.ProperitesUtil; /** * mybatis配置 * */ @Configuration public class MyBatisConfiguration { private static final Logger logger = LoggerFactory.getLogger(MyBatisConfiguration.class); @Autowired SqlSessionFactory sqlSessionFactory; @Autowired SqlSessionTemplate sessionTemplate; @Bean public SqlSessionTemplate sqlSessionTemplate() { return new SqlSessionTemplate(sqlSessionFactory); } @Bean public BlackListDao blackListMapper() { return sessionTemplate.getMapper(BlackListDao.class); } @Bean @Primary @ConfigurationProperties(prefix = "spring.datasource") public DataSource dataSource() { logger.debug("初始化dataSource"); return new DruidDataSource(); } @Bean public PlatformTransactionManager transactionManager(DataSource dataSource) { PlatformTransactionManager transactionManager = new DataSourceTransactionManager(dataSource); return transactionManager; } @Bean public SqlSessionFactory sqlSessionFactory() throws Exception { SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean(); sqlSessionFactoryBean.setDataSource(dataSource()); PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(); // 获取数据库类型 String dataBaseType = ProperitesUtil.getPropertyValue("spring.boot.database") == null ? "mysql" : ProperitesUtil.getPropertyValue("spring.boot.database"); String directory = "classpath:/mapper/" + dataBaseType.trim().toLowerCase() + "/*.xml"; sqlSessionFactoryBean.setMapperLocations(resolver.getResources(directory)); return sqlSessionFactoryBean.getObject(); } }
5. pom.xml文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.spring.batch</groupId> <artifactId>batch-test</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>batch-test</name> <url>http://maven.apache.org</url> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.2.5.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-core</artifactId> <version>3.0.4.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.tomcat.embed</groupId> <artifactId>tomcat-embed-jasper</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>jstl</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> </project>
相关文章推荐
- java事件处理机制
- ZooKeeper集群安装与配置(ZooKeeper3.4.6)
- myeclipse 8.6 software and workspace center is currently not available
- spring mvc常用配置
- Java语言基础之Scanner类
- Struts2 验证码图片生成实例
- java对象篇
- springmvc-servlet.xml
- javaWEB中前后台中文乱码问题解决方法
- Caused by: java.lang.UnsupportedOperationException
- eclipse导入java和android sdk源码,帮助文档
- Java基础——集合Collection
- JAVA执行顺序
- 详细介绍Java内存泄露原因
- Java 7 Fork/Join 并行计算框架概览
- Java语言基础之String类
- 设计模式案列
- Fork and Join: Java也可以轻松地编写并发程序
- Ubuntu上安装JAVA JDK
- java基本数据类型转换