您的位置:首页 > 编程语言 > Java开发

spring Batch实现数据库大数据量读写

2016-05-03 11:48 393 查看
1. data-source-context.xml

Xml代码


<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"

xmlns:tx="http://www.springframework.org/schema/tx"

xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">

<!-- 1) USE ANNOTATIONS TO IDENTIFY AND WIRE SPRING BEANS. -->

<context:component-scan base-package="net.etongbao.vasp.ac" />

<!-- 2) DATASOURCE, TRANSACTION MANAGER AND JDBC TEMPLATE -->

<bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource"

destroy-method="close" abstract="false" scope="singleton">

<!-- oracle.jdbc.driver.oracleDriver -->

<property name="driverClass" value="oracle.jdbc.OracleDriver" />

<property name="jdbcUrl" value="jdbc:oracle:thin:@192.168.1.23:1521:orcl01" />

<property name="user" value="USR_DEV01" />

<property name="password" value="2AF0829C" />

<property name="checkoutTimeout" value="30000" />

<property name="maxIdleTime" value="120" />

<property name="maxPoolSize" value="100" />

<property name="minPoolSize" value="2" />

<property name="initialPoolSize" value="2" />

<property name="maxStatements" value="0" />

<property name="maxStatementsPerConnection" value="0" />

<property name="idleConnectionTestPeriod" value="30" />

</bean>

<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">

<property name="dataSource" ref="dataSource" />

</bean>

<bean id="transactionManager"

class="org.springframework.jdbc.datasource.DataSourceTransactionManager">

<property name="dataSource" ref="dataSource" />

</bean>

<tx:annotation-driven transaction-manager="transactionManager" />

</beans>

2. quartz-context.xml commit-interval="10000"每次批量数据的条数,数值越大效率越高,可在此处添加事物处理,

每次回滚数就是commit-interval数

Xml代码


<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:batch="http://www.springframework.org/schema/batch"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"

xmlns:tx="http://www.springframework.org/schema/tx"

xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">

<import resource="data-source-context.xml"/>

<!-- JOB REPOSITORY - WE USE IN-MEMORY REPOSITORY FOR OUR EXAMPLE -->

<bean id="jobRepository"

class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">

<property name="transactionManager" ref="transactionManager" />

</bean>

<!-- batch config -->

<bean id="jobLauncher"

class="org.springframework.batch.core.launch.support.SimpleJobLauncher">

<property name="jobRepository" ref="jobRepository" />

</bean>

<!-- FINALLY OUR JOB DEFINITION. THIS IS A 1 STEP JOB -->

<batch:job id="ledgerJob">

<batch:listeners>

<batch:listener ref="appJobExecutionListener" />

</batch:listeners>

<batch:step id="step1">

Xml代码


<span style="white-space: pre;"> </span> <batch:tasklet transaction-manager="transactionManager">

<batch:tasklet>

<batch:listeners>

<batch:listener ref="itemFailureLoggerListener" />

</batch:listeners>

<batch:chunk reader="ledgerReader" writer="ledgerWriter"

commit-interval="10000" /> <!-- 1万条进行一次commit -->

</batch:tasklet>

Xml代码


</batch:tasklet>

</batch:step>

</batch:job>

<!-- READER -->

<bean id="ledgerReader"

class="org.springframework.batch.item.database.JdbcCursorItemReader">

<property name="dataSource" ref="dataSource" />

<property name="sql" value="select * from ledger" />

<property name="rowMapper" ref="ledgerRowMapper" />

</bean>

<!-- Spring Batch Job同一个job instance,成功执行后是不允许重新执行的【失败后是否允许重跑,可通过配置Job的restartable参数来控制,默认是true】,如果需要重新执行,可以变通处理,

添加一个JobParameters构建类,以当前时间作为参数,保证其他参数相同的情况下却是不同的job instance -->

<bean id="jobParameterBulider" class="org.springframework.batch.core.JobParametersBuilder" />

<!-- 定时任务 开始 -->

<bean id="ledgerJobDetail" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">

<property name="targetObject">

<!-- 定时执行的类 -->

<ref bean="quartzLedgerJob" />

</property>

<property name="targetMethod">

<!-- 定时执行的类方法 -->

<value>execute</value>

</property>

</bean>

<bean id="ledgerCronTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean" >

<!-- 这里不可以直接在属性jobDetail中引用taskJob,因为他要求的是一个jobDetail类型的对象,所以我们得通过MethodInvokingJobDetailFactoryBean来转一下 -->

<property name="jobDetail" >

<ref bean="ledgerJobDetail" />

</property>

<!--在每天下午18点到下午18:59期间的每1分钟触发 -->

<!--在每天上午10点40分准时触发 -->

<property name="cronExpression" >

<!-- <value>0 * 15 * * ?</value> -->

<value>0 45 10 * * ? * </value>

</property>

</bean>

<!-- 触发器工厂,将所有的定时任务都注入工厂-->

<bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean">

<!-- 添加触发器 -->

<property name="triggers">

<list>

<!-- 将上面定义的测试定时任务注入(可以定义多个定时任务,同时注入)-->

<ref local="ledgerCronTrigger" />

</list>

</property>

</bean>

<!-- 定时任务 结束 -->

lt;/beans>

3.定时调度job类 QuartzLedgerJob.java

package net.etongbao.vasp.ac.quartz;

Java代码


import java.util.Date;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.batch.core.Job;

import org.springframework.batch.core.JobParametersBuilder;

import org.springframework.batch.core.launch.JobLauncher;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import org.springframework.util.StopWatch;

/**

* 定时调度类

* @author Fu Wei

*

*/

@Component("quartzLedgerJob")

public class QuartzLedgerJob {

private static final Logger LOG = LoggerFactory.getLogger(QuartzLedgerJob.class);

@Autowired

private JobLauncher jobLauncher;

@Autowired

private Job ledgerJob;

@Autowired

JobParametersBuilder jobParameterBulider;

private static long counter = 0l;

/**

* 执行业务方法

* @throws Exception

*/

public void execute() throws Exception {

LOG.debug("start...");

StopWatch sw = new StopWatch();

sw.start();

/*

* Spring Batch Job同一个job instance,成功执行后是不允许重新执行的【失败后是否允许重跑,

* 可通过配置Job的restartable参数来控制,默认是true】,如果需要重新执行,可以变通处理,

* 添加一个JobParameters构建类,以当前时间作为参数,保证其他参数相同的情况下却是不同的job instance

*/

jobParameterBulider.addDate("date", new Date());

jobLauncher.run(ledgerJob, jobParameterBulider.toJobParameters());

sw.stop();

LOG.debug("Time elapsed:{},Execute quartz ledgerJob:{}", sw.prettyPrint(), ++counter);

}

}

4.程序启动类 StartQuartz.java

package net.etongbao.vasp.ac.quartz;

Java代码


import java.io.FileNotFoundException;

import org.springframework.context.support.ClassPathXmlApplicationContext;

/**

* 启动定时调度

* @author Fu Wei

*

*/

public class StartQuartz {

public static void main(String[] args) throws FileNotFoundException {

new ClassPathXmlApplicationContext("/net/etongbao/vasp/ac/resources/quartz-context.xml");

}

}

5.pojo类 Ledger.java

Java代码


package net.etongbao.vasp.ac.pojo;

import java.io.Serializable;

import java.util.Date;

public class Ledger implements Serializable {

private int id;

private Date receiptDate;

private String memberName;

private String checkNumber;

private Date checkDate;

private String paymentType;

private double depositAmount;

private double paymentAmount;

private String comments;

public Ledger() {

super();

}

public Ledger(int id, Date receiptDate, String memberName, String checkNumber, Date checkDate, String paymentType,

double depositAmount, double paymentAmount, String comments) {

super();

this.id = id;

this.receiptDate = receiptDate;

this.memberName = memberName;

this.checkNumber = checkNumber;

this.checkDate = checkDate;

this.paymentType = paymentType;

this.depositAmount = depositAmount;

this.paymentAmount = paymentAmount;

this.comments = comments;

}

public int getId() {

return id;

}

public void setId(int id) {

this.id = id;

}

public Date getReceiptDate() {

return receiptDate;

}

public void setReceiptDate(Date receiptDate) {

this.receiptDate = receiptDate;

}

public String getMemberName() {

return memberName;

}

public void setMemberName(String memberName) {

this.memberName = memberName;

}

public String getCheckNumber() {

return checkNumber;

}

public void setCheckNumber(String checkNumber) {

this.checkNumber = checkNumber;

}

public Date getCheckDate() {

return checkDate;

}

public void setCheckDate(Date checkDate) {

this.checkDate = checkDate;

}

public String getPaymentType() {

return paymentType;

}

public void setPaymentType(String paymentType) {

this.paymentType = paymentType;

}

public double getDepositAmount() {

return depositAmount;

}

public void setDepositAmount(double depositAmount) {

this.depositAmount = depositAmount;

}

public double getPaymentAmount() {

return paymentAmount;

}

public void setPaymentAmount(double paymentAmount) {

this.paymentAmount = paymentAmount;

}

public String getComments() {

return comments;

}

public void setComments(String comments) {

this.comments = comments;

}

}

6. LedgerDaoImpl.java

package net.etongbao.vasp.ac.dao.impl;

Java代码


import java.sql.PreparedStatement;

import java.sql.SQLException;

import net.etongbao.vasp.ac.dao.LedgerDao;

import net.etongbao.vasp.ac.pojo.Ledger;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.jdbc.core.JdbcTemplate;

import org.springframework.jdbc.core.PreparedStatementSetter;

import org.springframework.stereotype.Repository;

/**

* ledger数据操作类

*

* @author Fu Wei

*

*/

@Repository

public class LedgerDaoImpl implements LedgerDao {

private static final String SAVE_SQL = "insert into ledger_temp (rcv_dt, mbr_nm, chk_nbr, chk_dt, pymt_typ, dpst_amt, pymt_amt, comments) values(?,?,?,?,?,?,?,?)";

@Autowired

private JdbcTemplate jdbcTemplate;

@Override

public void save(final Ledger item) {

jdbcTemplate.update(SAVE_SQL, new PreparedStatementSetter() {

public void setValues(PreparedStatement stmt) throws SQLException {

stmt.setDate(1, new java.sql.Date(item.getReceiptDate().getTime()));

stmt.setString(2, item.getMemberName());

stmt.setString(3, item.getCheckNumber());

stmt.setDate(4, new java.sql.Date(item.getCheckDate().getTime()));

stmt.setString(5, item.getPaymentType());

stmt.setDouble(6, item.getDepositAmount());

stmt.setDouble(7, item.getPaymentAmount());

stmt.setString(8, item.getComments());

}

});

}

}

7.接口 LedgerDao .java

Java代码


package net.etongbao.vasp.ac.dao;

import net.etongbao.vasp.ac.pojo.Ledger;

public interface LedgerDao {

public void save(final Ledger item) ;

}

8. JdbcTemplete 需要的LedgerRowMapper.java

package net.etongbao.vasp.ac.batch.writer;

Java代码


import java.sql.ResultSet;

import java.sql.SQLException;

import net.etongbao.vasp.ac.pojo.Ledger;

import org.springframework.jdbc.core.RowMapper;

import org.springframework.stereotype.Component;

/**

* ledger行的映射类

* @author Administrator

*

*/

@Component("ledgerRowMapper")

public class LedgerRowMapper implements RowMapper {

public Object mapRow(ResultSet rs, int rowNum) throws SQLException {

Ledger ledger = new Ledger();

ledger.setId(rs.getInt("id"));

ledger.setReceiptDate(rs.getDate("rcv_dt"));

ledger.setMemberName(rs.getString("mbr_nm"));

ledger.setCheckNumber(rs.getString("chk_nbr"));

ledger.setCheckDate(rs.getDate("chk_dt"));

ledger.setPaymentType(rs.getString("pymt_typ"));

ledger.setDepositAmount(rs.getDouble("dpst_amt"));

ledger.setPaymentAmount(rs.getDouble("pymt_amt"));

ledger.setComments(rs.getString("comments"));

return ledger;

}

}

9.关键类LedgerWriter.java
,写入数据,负责数据的添加

Java代码


package net.etongbao.vasp.ac.batch.writer;

import java.util.List;

import net.etongbao.vasp.ac.dao.LedgerDao;

import net.etongbao.vasp.ac.pojo.Ledger;

import org.springframework.batch.item.ItemWriter;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

/**

* ledger写入数据

*

* @author Fu Wei

*

*/

@Component("ledgerWriter")

public class LedgerWriter implements ItemWriter<Ledger> {

@Autowired

private LedgerDao ledgerDao;

/**

* 写入数据

*

* @param ledgers

*/

public void write(List<? extends Ledger> ledgers) throws Exception {

for (Ledger ledger : ledgers) {

ledgerDao.save(ledger);

}

}

}

classPath:

<?xml version="1.0" encoding="UTF-8"?>

Xml代码


<classpath>

<classpathentry kind="src" path="src"/>

<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/jrockit-jdk1.6.0_24-R28.1.3-4.0.1"/>

<classpathentry kind="lib" path="lib/aopalliance-1.0.jar"/>

<classpathentry kind="lib" path="lib/c3p0-0.9.1.2.jar"/>

<classpathentry kind="lib" path="lib/commons-collections-3.2.1.jar"/>

<classpathentry kind="lib" path="lib/commons-lang-2.3.jar"/>

<classpathentry kind="lib" path="lib/commons-logging-1.1.1.jar"/>

<classpathentry kind="lib" path="lib/etb-log4j-1.2.16.jar"/>

<classpathentry kind="lib" path="lib/etb-slf4j-api-1.5.8.jar"/>

<classpathentry kind="lib" path="lib/etb-slf4j-log4j12-1.5.8.jar"/>

<classpathentry kind="lib" path="lib/ojdbc6.jar"/>

<classpathentry kind="lib" path="lib/org.springframework.aop-3.0.5.RELEASE.jar"/>

<classpathentry kind="lib" path="lib/org.springframework.asm-3.0.5.RELEASE.jar"/>

<classpathentry kind="lib" path="lib/org.springframework.aspects-3.0.5.RELEASE.jar"/>

<classpathentry kind="lib" path="lib/org.springframework.beans-3.0.5.RELEASE.jar"/>

<classpathentry kind="lib" path="lib/org.springframework.context-3.0.5.RELEASE.jar"/>

<classpathentry kind="lib" path="lib/org.springframework.context.support-3.0.5.RELEASE.jar"/>

<classpathentry kind="lib" path="lib/org.springframework.core-3.0.5.RELEASE.jar"/>

<classpathentry kind="lib" path="lib/org.springframework.expression-3.0.5.RELEASE.jar"/>

<classpathentry kind="lib" path="lib/org.springframework.instrument-3.0.5.RELEASE.jar"/>

<classpathentry kind="lib" path="lib/org.springframework.instrument.tomcat-3.0.5.RELEASE.jar"/>

<classpathentry kind="lib" path="lib/org.springframework.jdbc-3.0.5.RELEASE.jar"/>

<classpathentry kind="lib" path="lib/org.springframework.jms-3.0.5.RELEASE.jar"/>

<classpathentry kind="lib" path="lib/org.springframework.orm-3.0.5.RELEASE.jar"/>

<classpathentry kind="lib" path="lib/org.springframework.oxm-3.0.5.RELEASE.jar"/>

<classpathentry kind="lib" path="lib/org.springframework.test-3.0.5.RELEASE.jar"/>

<classpathentry kind="lib" path="lib/org.springframework.transaction-3.0.5.RELEASE.jar"/>

<classpathentry kind="lib" path="lib/quartz-all-1.6.5.jar"/>

<classpathentry kind="lib" path="lib/spring-batch-core-2.1.6.RELEASE.jar"/>

<classpathentry kind="lib" path="lib/spring-batch-infrastructure-2.1.6.RELEASE.jar"/>

<classpathentry kind="lib" path="lib/spring-batch-test-2.1.6.RELEASE.jar"/>

<classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>

<classpathentry kind="output" path="bin"/>

</classpath>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: