使用disruptor 将5百多万数据从mysql导入到oracle
2014-04-02 14:36
639 查看
知道disruptor快1年多了,一直没有用武之地。这次正好要迁移数据。表结构由于完全不一样,导数据的时候还需要进行一些计算。果断用disruptor试试
最终速度还是比较快的,5min完成。1个线程读,2个线程写。双核cpu。
接下来又从7百万条数据的csv文件中导入到库中,3min就完成。
public class TransferProcessor implements Runnable,InitializingBean { private JdbcTemplate mysqlJdbcTemplate; private PlatformTransactionManager txManager; private JdbcTemplate oracleJdbcTemplate; public PlatformTransactionManager getTxManager() { return txManager; } public void setTxManager(PlatformTransactionManager txManager) { this.txManager = txManager; } public JdbcTemplate getMysqlJdbcTemplate() { return mysqlJdbcTemplate; } public void setMysqlJdbcTemplate(JdbcTemplate mysqlJdbcTemplate) { this.mysqlJdbcTemplate = mysqlJdbcTemplate; } public JdbcTemplate getOracleJdbcTemplate() { return oracleJdbcTemplate; } public void setOracleJdbcTemplate(JdbcTemplate oracleJdbcTemplate) { this.oracleJdbcTemplate = oracleJdbcTemplate; } private final ExecutorService EXECUTOR = Executors.newFixedThreadPool(2); private Integer offset = 0; private Integer lastId = 0; private Integer limit = 10000; private final WorkerValueEventHandler[] handlers = new WorkerValueEventHandler[2]; private RingBuffer<ValueEvent> ringBuffer = RingBuffer.createSingleProducer(ValueEvent.EVENT_FACTORY, 32); private WorkerPool<ValueEvent> workerPool = null; private String sql = "select t.uid as uid,t.tel,t.email,t.password,t.username,t3.extcredits2 as credits,1 as status,t.regdate,t2.realname,t2.gender,t2.birthyear,t2.birthmonth,t2.birthday,t2.constellation,t2.birthcity ,t2.residecity,t2.bloodtype,t2.qq,t2.msn,t2.taobao,t2.bio,t2.occupation,t.salt from sz_ucenter_members t left join sz_common_member_profile t2 on t.uid=t2.uid left join sz_common_member_count t3 on t.uid=t3.uid"; public void run() { RingBuffer<ValueEvent> ringBuffer = workerPool.start(EXECUTOR); ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); long t1 = System.currentTimeMillis(); List<Result> list = query(sql + " order by uid limit ?,?", offset, limit); long t2 = System.currentTimeMillis(); Result result = list.get(list.size() - 1); lastId = result.getUid(); Integer id = 1; Integer total = list.size(); System.out.println("查询第" + id + "批数据,耗时" + (t2 - t1) + "ms"); while (list != null && list.size() > 0) { Result r = list.get(list.size() - 1); lastId = r.getUid(); long sequence = ringBuffer.next(); ValueEvent event = ringBuffer.claimAndGetPreallocated(sequence); event.setValue(list); event.setId(id); ringBuffer.publish(sequence); id++; long t3 = System.currentTimeMillis(); list = query(sql + " where t.uid>? order by uid limit ?", lastId, limit); long t4 = System.currentTimeMillis(); total += list.size(); System.out.println("查询第" + id + "批数据,耗时" + (t4 - t3) + "ms,RingBuffer剩余空间:" + ringBuffer.remainingCapacity()); } while (true) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("RingBuffer中剩余数据量:" + (32 - ringBuffer.remainingCapacity())); if (ringBuffer.remainingCapacity() == 32) { System.out.println("数据迁移结束!总共" + total + "条记录"); System.exit(0); } } } private List<Result> query(String sql, Object p1, Object p2) { List<Result> list = mysqlJdbcTemplate.query(sql, new Object[] { p1, p2 }, new RowMapper<Result>() { @Override public Result mapRow(ResultSet rs, int rowNum) throws SQLException { Result r = new Result(); r.setUid(rs.getInt("uid")); String tel = rs.getString("tel"); r.setTel(StringUtils.isEmpty(tel) ? null : tel.length() > 11 ? tel.substring(0, 11) : tel); r.setEmail(rs.getString("email")); r.setPassword(rs.getString("password")); r.setUsername(StringUtils.isEmpty(rs.getString("username")) ? " " : rs.getString("username")); r.setCredits(rs.getInt("credits")); r.setStatus(rs.getInt("status")); r.setRegdate(rs.getString("regdate")); r.setRealname(rs.getString("realname")); r.setGender(rs.getInt("gender")); r.setBirthyear(rs.getInt("birthyear")); r.setBirthmonth(rs.getInt("birthmonth")); r.setBirthday(rs.getInt("birthday")); r.setConstellation(rs.getString("constellation")); r.setBirthcity(rs.getString("birthcity")); r.setResidecity(rs.getString("residecity")); r.setBloodtype(rs.getString("bloodtype")); r.setQq(rs.getString("qq")); r.setMsn(rs.getString("msn")); r.setTaobao(rs.getString("taobao")); r.setBio(rs.getString("bio")); r.setOccupation(rs.getString("occupation")); r.setSalt(rs.getString("salt")); return r; } }); return list; } private List<Address> queryAddress() { List<Address> addresss = oracleJdbcTemplate.query("SELECT t.NAME,t.ID from ADDRESS t where t.LVL=2", new RowMapper<Address>() { @Override public Address mapRow(ResultSet rs, int rowNum) throws SQLException { Address address = new Address(); address.setId(rs.getInt("ID")); address.setName(rs.getString("NAME")); return address; } }); return addresss; } @Override public void afterPropertiesSet() throws Exception { List<Address> addresss = queryAddress(); for (int i = 0; i < 2; i++) { handlers[i] = new WorkerValueEventHandler(oracleJdbcTemplate, addresss); handlers[i].setTxManager(txManager); } workerPool = new WorkerPool<ValueEvent>(ringBuffer, ringBuffer.newBarrier(), new FatalExceptionHandler(), handlers); } }
public class WorkerValueEventHandler implements WorkHandler<ValueEvent> { private JdbcTemplate jdbcTemplate; private List<Address> address; private PlatformTransactionManager txManager; public PlatformTransactionManager getTxManager() { return txManager; } public void setTxManager(PlatformTransactionManager txManager) { this.txManager = txManager; } private final String SQL = "insert into ACCOUNT(ID,PHONE,NICKNAME,EMAIL,PASSWORD,CREATETIME,UPDATETIME,CREDITS,CP_ID,BIRTHDAY,SEX,BLOOD,WORK,INTRODUCTION,CONSTELLATION,QQ,MSN,ALWW,STATE,ADDRESS_ID,HOMETOWN_ID,VERSION,REALNAME,SALT) values(?,?,?,?,?,?,?,?,1,?,?,?,?,?,?,?,?,?,?,?,?,0,?,?)"; public WorkerValueEventHandler(JdbcTemplate jdbcTemplate, List<Address> address) { this.jdbcTemplate = jdbcTemplate; this.address = address; } @Override public void onEvent(ValueEvent event) throws InterruptedException { final List<Result> value = event.getValue(); DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); TransactionStatus status = txManager.getTransaction(def); try { int[] num = jdbcTemplate.batchUpdate(SQL, new BatchPreparedStatementSetter() { @Override public void setValues(PreparedStatement ps, int i) throws SQLException { Result result = value.get(i); ps.setLong(1, result.getUid()); String tel = result.getTel(); ps.setString(2, tel); ps.setString(3, result.getUsername()); ps.setString(4, result.getEmail()); if (tel != null && tel.startsWith("111")) {//马甲帐号 ps.setString(5, "4b213b65fa4f12d5416354d3df6e5089"); ps.setString(22, "654cba"); } else { ps.setString(5, result.getPassword()); ps.setString(22, result.getSalt()); } Timestamp time = getCreateTime(result.getRegdate()); ps.setTimestamp(6, time); ps.setTimestamp(7, time); ps.setInt(8, result.getCredits()); Date birthDate = getBirthDate(result.getBirthyear(), result.getBirthmonth(), result.getBirthday()); ps.setDate(9, birthDate == null ? null : new java.sql.Date(birthDate.getTime())); Integer sex = result.getGender(); ps.setInt(10, sex == 0 ? -1 : sex == 2 ? 0 : 1); ps.setString(11, result.getBloodtype()); ps.setString(12, result.getOccupation()); ps.setString(13, result.getBio()); ps.setString(14, result.getConstellation()); ps.setString(15, result.getQq()); ps.setString(16, result.getMsn()); ps.setString(17, result.getTaobao()); ps.setInt(18, 1); ps.setInt(19, getAddressId(result.getResidecity())); ps.setInt(20, getAddressId(result.getBirthcity())); ps.setString(21, result.getRealname()); if (i % 2000 == 0) { ps.executeBatch(); } } @Override public int getBatchSize() { return value.size(); } }); txManager.commit(status); System.out.println("线程:" + Thread.currentThread().getName() + ">>>结束处理序列号为" + event.getId() + "的数据"); } catch (Throwable e) { txManager.rollback(status); System.out.println(e); System.exit(0); } } private Date getBirthDate(Integer year, Integer month, Integer day) { if (year == 0 || month == 0 || day == 0) { return null; } Calendar c = Calendar.getInstance(); c.set(Calendar.YEAR, year); c.set(Calendar.MONTH, month - 1); c.set(Calendar.DATE, day); return c.getTime(); } private Timestamp getCreateTime(String regDate) { Long time = Long.parseLong(regDate + "000"); return new Timestamp(time); } private Integer getAddressId(String name) { if (StringUtils.isEmpty(name)) { return -1; } Iterator<Address> it = address.iterator(); Integer id = -1; while (it.hasNext()) { Address address = it.next(); String n = address.getName(); if (n.equals(name)) { id = address.getId(); break; } } return id; }
public class ValueEvent { private Integer id; public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } private List<Result> value; public List<Result> getValue() { return value; } public void setValue(List<Result> value) { this.value = value; } public final static EventFactory<ValueEvent> EVENT_FACTORY = new EventFactory<ValueEvent>() { public ValueEvent newInstance() { return new ValueEvent(); } }; }
最终速度还是比较快的,5min完成。1个线程读,2个线程写。双核cpu。
接下来又从7百万条数据的csv文件中导入到库中,3min就完成。
public class TransferProcessor implements Runnable, InitializingBean { private PlatformTransactionManager txManager; private JdbcTemplate oracleJdbcTemplate; public PlatformTransactionManager getTxManager() { return txManager; } public void setTxManager(PlatformTransactionManager txManager) { this.txManager = txManager; } public JdbcTemplate getOracleJdbcTemplate() { return oracleJdbcTemplate; } public void setOracleJdbcTemplate(JdbcTemplate oracleJdbcTemplate) { this.oracleJdbcTemplate = oracleJdbcTemplate; } private final ExecutorService EXECUTOR = Executors.newFixedThreadPool(2); private final WorkerValueEventHandler[] handlers = new WorkerValueEventHandler[2]; private RingBuffer<ValueEvent> ringBuffer = RingBuffer.createSingleProducer(ValueEvent.EVENT_FACTORY, 32); private WorkerPool<ValueEvent> workerPool = null; public void run() { Integer total = 0; RingBuffer<ValueEvent> ringBuffer = workerPool.start(EXECUTOR); ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); Integer id = 1; BufferedReader reader = null; try { List<String> values = new ArrayList<String>(); reader = new BufferedReader(new FileReader("d:\\black.csv")); reader.readLine(); String line = null; while ((line = reader.readLine()) != null) { total += 1; line = line.replace("\"", ""); if (line.length() > 11) { line = line.substring(line.length() - 11); } values.add(line); if (values.size() % 10000 == 0) { publishEvent(ringBuffer, id, new ArrayList<String>(values)); values = new ArrayList<String>(); id++; } } if (values.size() != 0) { publishEvent(ringBuffer, id, new ArrayList<String>(values)); } } catch (IOException e) { e.printStackTrace(); System.exit(0); } finally { try { if (reader != null) reader.close(); } catch (IOException e) { e.printStackTrace(); } } while (true) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("RingBuffer中剩余数据量:" + (32 - ringBuffer.remainingCapacity())); if (ringBuffer.remainingCapacity() == 32) { System.out.println("数据迁移结束!总共" + total + "条记录"); System.exit(0); } } } protected void publishEvent(RingBuffer<ValueEvent> ringBuffer, Integer id, List<String> values) { long sequence = ringBuffer.next(); ValueEvent event = ringBuffer.claimAndGetPreallocated(sequence); event.setId(id); event.setValues(values); ringBuffer.publish(sequence); } @Override public void afterPropertiesSet() throws Exception { for (int i = 0; i < 2; i++) { handlers[i] = new WorkerValueEventHandler(oracleJdbcTemplate); handlers[i].setTxManager(txManager); } workerPool = new WorkerPool<ValueEvent>(ringBuffer, ringBuffer.newBarrier(), new FatalExceptionHandler(), handlers); } }
相关文章推荐
- Sqoop_详细总结 使用Sqoop将HDFS/Hive/HBase与MySQL/Oracle中的数据相互导入、导出
- 使用MySQL Migration Toolkit快速将Oracle数据导入MySQL
- 使用MySQL Migration Toolkit快速将Oracle数据导入MySQL
- 使用MySQL Migration Toolkit快速导入Oracle数据
- 使用MySQL Migration Toolkit快速将Oracle数据导入MySQL
- Sqoop_详细总结 使用Sqoop将HDFS/Hive/HBase与MySQL/Oracle中的数据相互导入、导出
- 使用MySQL Migration Toolkit快速将Oracle数据导入MySQL[转]
- 使用MySQL Migration Toolkit快速将Oracle数据导入MySQL
- 使用Sqoop将HDFS/Hive/HBase与MySQL/Oracle中的数据相互导入、导出
- 使用MySQLMigrationToolkit快速将Oracle数据导入MySQL
- 使用MySQL Migration Toolkit快速将Oracle数据导入MySQL
- 使用navicat工具将oracle数据导入到mysql
- 使用MySQL Migration Toolkit快速将Oracle数据导入MySQL
- 使用MySQL Migration Toolkit快速将Oracle数据导入MySQL
- 使用MySQL Migration Toolkit快速将Oracle数据导入MySQL
- 使用MySQL Migration Toolkit快速将Oracle数据导入MySQL
- 使用MySQL Migration Toolkit快速将Oracle数据导入MySQL
- 使用MySQL Migration Toolkit快速将Oracle数据导入MySQL
- 使用MySQL Migration Toolkit快速将Oracle数据导入MySQL
- Sqoop_具体总结 使用Sqoop将HDFS/Hive/HBase与MySQL/Oracle中的数据相互导入、导出