您的位置:首页 > 数据库 > Oracle

使用disruptor 将5百多万数据从mysql导入到oracle

2014-04-02 14:36 639 查看
知道disruptor快1年多了,一直没有用武之地。这次正好要迁移数据。表结构由于完全不一样,导数据的时候还需要进行一些计算。果断用disruptor试试

public class TransferProcessor implements Runnable,InitializingBean {

private JdbcTemplate mysqlJdbcTemplate;

private PlatformTransactionManager txManager;

private JdbcTemplate oracleJdbcTemplate;

public PlatformTransactionManager getTxManager() {
return txManager;
}

public void setTxManager(PlatformTransactionManager txManager) {
this.txManager = txManager;
}

public JdbcTemplate getMysqlJdbcTemplate() {
return mysqlJdbcTemplate;
}

public void setMysqlJdbcTemplate(JdbcTemplate mysqlJdbcTemplate) {
this.mysqlJdbcTemplate = mysqlJdbcTemplate;
}

public JdbcTemplate getOracleJdbcTemplate() {
return oracleJdbcTemplate;
}

public void setOracleJdbcTemplate(JdbcTemplate oracleJdbcTemplate) {
this.oracleJdbcTemplate = oracleJdbcTemplate;
}

private final ExecutorService EXECUTOR = Executors.newFixedThreadPool(2);

private Integer offset = 0;

private Integer lastId = 0;

private Integer limit = 10000;

private final WorkerValueEventHandler[] handlers = new WorkerValueEventHandler[2];

private RingBuffer<ValueEvent> ringBuffer = RingBuffer.createSingleProducer(ValueEvent.EVENT_FACTORY, 32);
private WorkerPool<ValueEvent> workerPool = null;

private String sql = "select t.uid as uid,t.tel,t.email,t.password,t.username,t3.extcredits2 as credits,1 as status,t.regdate,t2.realname,t2.gender,t2.birthyear,t2.birthmonth,t2.birthday,t2.constellation,t2.birthcity ,t2.residecity,t2.bloodtype,t2.qq,t2.msn,t2.taobao,t2.bio,t2.occupation,t.salt from sz_ucenter_members t left join sz_common_member_profile t2 on t.uid=t2.uid left join sz_common_member_count t3 on t.uid=t3.uid";

public void run() {
RingBuffer<ValueEvent> ringBuffer = workerPool.start(EXECUTOR);
ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
long t1 = System.currentTimeMillis();
List<Result> list = query(sql + "  order by uid limit ?,?", offset, limit);
long t2 = System.currentTimeMillis();
Result result = list.get(list.size() - 1);
lastId = result.getUid();
Integer id = 1;
Integer total = list.size();
System.out.println("查询第" + id + "批数据,耗时" + (t2 - t1) + "ms");
while (list != null && list.size() > 0) {
Result r = list.get(list.size() - 1);
lastId = r.getUid();
long sequence = ringBuffer.next();
ValueEvent event = ringBuffer.claimAndGetPreallocated(sequence);
event.setValue(list);
event.setId(id);
ringBuffer.publish(sequence);
id++;
long t3 = System.currentTimeMillis();
list = query(sql + " where t.uid>?  order by uid limit ?", lastId, limit);
long t4 = System.currentTimeMillis();
total += list.size();
System.out.println("查询第" + id + "批数据,耗时" + (t4 - t3) + "ms,RingBuffer剩余空间:" + ringBuffer.remainingCapacity());
}
while (true) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("RingBuffer中剩余数据量:" + (32 - ringBuffer.remainingCapacity()));
if (ringBuffer.remainingCapacity() == 32) {
System.out.println("数据迁移结束!总共" + total + "条记录");
System.exit(0);
}
}
}

private List<Result> query(String sql, Object p1, Object p2) {
List<Result> list = mysqlJdbcTemplate.query(sql, new Object[] { p1, p2 }, new RowMapper<Result>() {
@Override
public Result mapRow(ResultSet rs, int rowNum) throws SQLException {
Result r = new Result();
r.setUid(rs.getInt("uid"));
String tel = rs.getString("tel");
r.setTel(StringUtils.isEmpty(tel) ? null : tel.length() > 11 ? tel.substring(0, 11) : tel);
r.setEmail(rs.getString("email"));
r.setPassword(rs.getString("password"));
r.setUsername(StringUtils.isEmpty(rs.getString("username")) ? " " : rs.getString("username"));
r.setCredits(rs.getInt("credits"));
r.setStatus(rs.getInt("status"));
r.setRegdate(rs.getString("regdate"));
r.setRealname(rs.getString("realname"));
r.setGender(rs.getInt("gender"));
r.setBirthyear(rs.getInt("birthyear"));
r.setBirthmonth(rs.getInt("birthmonth"));
r.setBirthday(rs.getInt("birthday"));
r.setConstellation(rs.getString("constellation"));
r.setBirthcity(rs.getString("birthcity"));
r.setResidecity(rs.getString("residecity"));
r.setBloodtype(rs.getString("bloodtype"));
r.setQq(rs.getString("qq"));
r.setMsn(rs.getString("msn"));
r.setTaobao(rs.getString("taobao"));
r.setBio(rs.getString("bio"));
r.setOccupation(rs.getString("occupation"));
r.setSalt(rs.getString("salt"));
return r;
}
});
return list;
}

private List<Address> queryAddress() {
List<Address> addresss = oracleJdbcTemplate.query("SELECT t.NAME,t.ID from ADDRESS t where t.LVL=2", new RowMapper<Address>() {
@Override
public Address mapRow(ResultSet rs, int rowNum) throws SQLException {
Address address = new Address();
address.setId(rs.getInt("ID"));
address.setName(rs.getString("NAME"));
return address;
}

});
return addresss;
}

@Override
public void afterPropertiesSet() throws Exception {
List<Address> addresss = queryAddress();
for (int i = 0; i < 2; i++) {
handlers[i] = new WorkerValueEventHandler(oracleJdbcTemplate, addresss);
handlers[i].setTxManager(txManager);
}
workerPool = new WorkerPool<ValueEvent>(ringBuffer, ringBuffer.newBarrier(), new FatalExceptionHandler(), handlers);
}

}


public class WorkerValueEventHandler implements WorkHandler<ValueEvent> {

private JdbcTemplate jdbcTemplate;

private List<Address> address;

private PlatformTransactionManager txManager;

public PlatformTransactionManager getTxManager() {
return txManager;
}

public void setTxManager(PlatformTransactionManager txManager) {
this.txManager = txManager;
}

private final String SQL = "insert into ACCOUNT(ID,PHONE,NICKNAME,EMAIL,PASSWORD,CREATETIME,UPDATETIME,CREDITS,CP_ID,BIRTHDAY,SEX,BLOOD,WORK,INTRODUCTION,CONSTELLATION,QQ,MSN,ALWW,STATE,ADDRESS_ID,HOMETOWN_ID,VERSION,REALNAME,SALT) values(?,?,?,?,?,?,?,?,1,?,?,?,?,?,?,?,?,?,?,?,?,0,?,?)";

public WorkerValueEventHandler(JdbcTemplate jdbcTemplate, List<Address> address) {
this.jdbcTemplate = jdbcTemplate;
this.address = address;
}

@Override
public void onEvent(ValueEvent event) throws InterruptedException {
final List<Result> value = event.getValue();
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
TransactionStatus status = txManager.getTransaction(def);
try {
int[] num = jdbcTemplate.batchUpdate(SQL, new BatchPreparedStatementSetter() {

@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
Result result = value.get(i);
ps.setLong(1, result.getUid());
String tel = result.getTel();
ps.setString(2, tel);
ps.setString(3, result.getUsername());
ps.setString(4, result.getEmail());
if (tel != null && tel.startsWith("111")) {//马甲帐号
ps.setString(5, "4b213b65fa4f12d5416354d3df6e5089");
ps.setString(22, "654cba");
} else {
ps.setString(5, result.getPassword());
ps.setString(22, result.getSalt());
}
Timestamp time = getCreateTime(result.getRegdate());
ps.setTimestamp(6, time);
ps.setTimestamp(7, time);
ps.setInt(8, result.getCredits());
Date birthDate = getBirthDate(result.getBirthyear(), result.getBirthmonth(), result.getBirthday());
ps.setDate(9, birthDate == null ? null : new java.sql.Date(birthDate.getTime()));
Integer sex = result.getGender();
ps.setInt(10, sex == 0 ? -1 : sex == 2 ? 0 : 1);
ps.setString(11, result.getBloodtype());
ps.setString(12, result.getOccupation());
ps.setString(13, result.getBio());
ps.setString(14, result.getConstellation());
ps.setString(15, result.getQq());
ps.setString(16, result.getMsn());
ps.setString(17, result.getTaobao());
ps.setInt(18, 1);
ps.setInt(19, getAddressId(result.getResidecity()));
ps.setInt(20, getAddressId(result.getBirthcity()));
ps.setString(21, result.getRealname());
if (i % 2000 == 0) {
ps.executeBatch();
}

}

@Override
public int getBatchSize() {
return value.size();
}

});
txManager.commit(status);
System.out.println("线程:" + Thread.currentThread().getName() + ">>>结束处理序列号为" + event.getId() + "的数据");
} catch (Throwable e) {
txManager.rollback(status);
System.out.println(e);
System.exit(0);
}

}

private Date getBirthDate(Integer year, Integer month, Integer day) {
if (year == 0 || month == 0 || day == 0) {
return null;
}
Calendar c = Calendar.getInstance();
c.set(Calendar.YEAR, year);
c.set(Calendar.MONTH, month - 1);
c.set(Calendar.DATE, day);
return c.getTime();
}

private Timestamp getCreateTime(String regDate) {
Long time = Long.parseLong(regDate + "000");
return new Timestamp(time);
}

private Integer getAddressId(String name) {
if (StringUtils.isEmpty(name)) {
return -1;
}
Iterator<Address> it = address.iterator();
Integer id = -1;
while (it.hasNext()) {
Address address = it.next();
String n = address.getName();
if (n.equals(name)) {
id = address.getId();
break;
}
}
return id;
}


public class ValueEvent {

private Integer id;

public Integer getId() {
return id;
}

public void setId(Integer id) {
this.id = id;
}

private List<Result> value;

public List<Result> getValue() {
return value;
}

public void setValue(List<Result> value) {
this.value = value;
}

public final static EventFactory<ValueEvent> EVENT_FACTORY = new EventFactory<ValueEvent>() {
public ValueEvent newInstance() {
return new ValueEvent();
}
};
}


最终速度还是比较快的,5min完成。1个线程读,2个线程写。双核cpu。

接下来又从7百万条数据的csv文件中导入到库中,3min就完成。

public class TransferProcessor implements Runnable, InitializingBean {

private PlatformTransactionManager txManager;

private JdbcTemplate oracleJdbcTemplate;

public PlatformTransactionManager getTxManager() {
return txManager;
}

public void setTxManager(PlatformTransactionManager txManager) {
this.txManager = txManager;
}

public JdbcTemplate getOracleJdbcTemplate() {
return oracleJdbcTemplate;
}

public void setOracleJdbcTemplate(JdbcTemplate oracleJdbcTemplate) {
this.oracleJdbcTemplate = oracleJdbcTemplate;
}

private final ExecutorService EXECUTOR = Executors.newFixedThreadPool(2);

private final WorkerValueEventHandler[] handlers = new WorkerValueEventHandler[2];

private RingBuffer<ValueEvent> ringBuffer = RingBuffer.createSingleProducer(ValueEvent.EVENT_FACTORY, 32);
private WorkerPool<ValueEvent> workerPool = null;

public void run() {
Integer total = 0;
RingBuffer<ValueEvent> ringBuffer = workerPool.start(EXECUTOR);
ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
Integer id = 1;
BufferedReader reader = null;
try {
List<String> values = new ArrayList<String>();
reader = new BufferedReader(new FileReader("d:\\black.csv"));
reader.readLine();
String line = null;
while ((line = reader.readLine()) != null) {
total += 1;
line = line.replace("\"", "");
if (line.length() > 11) {
line = line.substring(line.length() - 11);
}
values.add(line);

if (values.size() % 10000 == 0) {
publishEvent(ringBuffer, id, new ArrayList<String>(values));
values = new ArrayList<String>();
id++;
}
}
if (values.size() != 0) {
publishEvent(ringBuffer, id, new ArrayList<String>(values));
}
} catch (IOException e) {
e.printStackTrace();
System.exit(0);
} finally {
try {
if (reader != null)
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
while (true) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("RingBuffer中剩余数据量:" + (32 - ringBuffer.remainingCapacity()));
if (ringBuffer.remainingCapacity() == 32) {
System.out.println("数据迁移结束!总共" + total + "条记录");
System.exit(0);
}
}

}

protected void publishEvent(RingBuffer<ValueEvent> ringBuffer, Integer id, List<String> values) {
long sequence = ringBuffer.next();
ValueEvent event = ringBuffer.claimAndGetPreallocated(sequence);
event.setId(id);
event.setValues(values);
ringBuffer.publish(sequence);
}

@Override
public void afterPropertiesSet() throws Exception {
for (int i = 0; i < 2; i++) {
handlers[i] = new WorkerValueEventHandler(oracleJdbcTemplate);
handlers[i].setTxManager(txManager);
}
workerPool = new WorkerPool<ValueEvent>(ringBuffer, ringBuffer.newBarrier(), new FatalExceptionHandler(), handlers);
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: