您的位置:首页 > 编程语言 > Java开发

高性能java实现不同服务器直接的数据库迁移。

2016-07-12 18:06 651 查看
需求为。公司要迁移数据库。但是我们用的liunx里面内置的数据库导入导出的工具,出来的结果为。数据不完整。没导入完整。公司要求我用java代码实现这一功能。

该功能也是参考了http://blog.csdn.net/thinker28754/article/details/6919126。

简单的逻辑为。先一开始的时候Connection 两个服务器。之后取来源服务器中的数据库名字,在根据数据库名字遍历出每个表名,在根据表名判断每个表字段数量。

之后就使用jdbc中的预处理PreparedStatement动态的插入数据。

下面的是相关代码,

SyDateDao:

package com.koolma.dsp.getData.SynchroForDate.dao;

import java.sql.Connection;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.SQLException;

import java.util.ArrayList;

import java.util.List;

public class SyDateDao {

// koolbao_ad_00

// 获取数据库名字

@SuppressWarnings({ "unchecked", "rawtypes" })

public List getDateBaseDao(Connection con) {

PreparedStatement ptst = null;

ResultSet rs = null;

List<String> datename = new ArrayList(); // 数据库名

String sql = "show databases";

try {

ptst = con.prepareStatement(sql);

rs = ptst.executeQuery();

while (rs.next()) {

datename.add(rs.getString("Database"));

}

ptst.close();

rs.close();

} catch (SQLException e) {

// TODO 自动生成的 catch 块

e.printStackTrace();

}

return datename;

}

//获取表名

@SuppressWarnings({ "unchecked", "rawtypes" })

public List getTableDao(Connection con, String dataname) {

PreparedStatement ptst = null;

ResultSet rs = null;

List<String> Table = new ArrayList();

// 表名

String sql = "SELECT TABLE_NAME FROM information_schema.tables t WHERE t.table_schema = '"+dataname+"'";

try {

ptst = con.prepareStatement(sql);

rs = ptst.executeQuery();

while (rs.next()) {

Table.add(rs.getString("TABLE_NAME"));

}

ptst.close();

rs.close();

} catch (SQLException e) {

// TODO 自动生成的 catch 块

e.printStackTrace();

}

return Table;

}

/*// use databases;

@SuppressWarnings({ "rawtypes", "unchecked" })

public List getTableDao_test(Connection con, String table)

throws SQLException {

PreparedStatement ptst = null;

List<String> Table = new ArrayList();

// 表名

String sql = "use koolbao_zuan_9c";

ptst = con.prepareStatement(sql);

ptst.execute();

ptst.close();

return Table;

}

// show tables

@SuppressWarnings({ "unchecked", "rawtypes" })

public List getTableDao_test_new(Connection con, String table)

throws SQLException {

PreparedStatement ptst = null;

ResultSet rs;

List Table = new ArrayList();

// 表名

String sql = "show tables";

ptst = con.prepareStatement(sql);

rs = ptst.executeQuery();

while (rs.next()) {

Table.add(rs.getObject("Tables_in_koolbao_zuan_9c"));

}

ptst.close();

rs.close();

return Table;

}*/

//获取k121服务器内指定表的大小

@SuppressWarnings({ "rawtypes", "unchecked" })

public List K121_TableCount(Connection con,String table) {

PreparedStatement ptst = null;

ResultSet rs;

List count = new ArrayList(); //数量

try {

String sql = "select count(1) from "+table;

ptst = con.prepareStatement(sql);

rs = ptst.executeQuery();

while(rs.next()){

count.add(rs.getObject("count(1)"));

}

ptst.close();

rs.close();

} catch (SQLException e) {

return null;

}

return count;

}

//获取ALI服务器内指定表的大小

@SuppressWarnings({ "rawtypes", "unchecked" })

public List Ali_TableCount(Connection con,String table) {

PreparedStatement ptst = null;

ResultSet rs;

List count = new ArrayList(); //数量

String sql = " select count(1) from "+table ;

try {

ptst = con.prepareStatement(sql);

rs = ptst.executeQuery();

while(rs.next()){

count.add(rs.getObject("count(1)"));

}

ptst.close();

rs.close();

} catch (SQLException e) {

return null;

}

return count;

}

//查询所有表中有字段

@SuppressWarnings({ "rawtypes", "unchecked" })

public List Find_table_field(Connection con , String table) {

PreparedStatement ptst = null;

ResultSet rs;

List field = new ArrayList(); //字段

String sql = "desc "+table;

try {

ptst = con.prepareStatement(sql);

rs = ptst.executeQuery();

while(rs.next()){

field.add(rs.getObject("field"));

}

ptst.close();

rs.close();

} catch (SQLException e) {

// TODO 自动生成的 catch 块

e.printStackTrace();

}

return field;

}

/*

//replace into ----> Ali数据库里面的

public boolean End_ReplaceDb(Connection con,String sql) throws SQLException{

PreparedStatement ptst = null;

ptst = con.prepareStatement(sql);

boolean type = ptst.execute();

System.out.println("type/replace into 数据库。状态"+type);

ptst.close();

return type;

}

*/

}

以上是关于dao对于数据库的实现方法。

SyDbAction:

package com.koolma.dsp.getData.SynchroForDate.action;

import java.sql.Connection;

import java.sql.SQLException;

import java.util.ArrayList;

import java.util.List;

import com.koolma.dsp.getData.SynchroForDate.dao.SyDateDao;

@SuppressWarnings({ "rawtypes" })

public class SyDbAction {

private SyDateDao dao = new SyDateDao();

@SuppressWarnings("unchecked")

private List<String> data = new ArrayList(); //获取所有数据库名字

@SuppressWarnings("unchecked")

private List<String> table = new ArrayList();//获取所有表名字

@SuppressWarnings({ "unchecked" })

private List<String> sql = new ArrayList();//可执行的sql

@SuppressWarnings({ "unchecked" })

private List<String> end_table = new ArrayList();//可执行的表

public List<String> getEnd_table() {

return end_table;

}

public void setEnd_table(List<String> end_table) {

this.end_table = end_table;

}

Connection con = new ConnectionDateBases().k121_connectionSql1();

Connection ali_con = new ConnectionDateBases().ali_connectionSql();

/**

* 对比两个服务器中同一个数据库---表的大小

* @author yinhao

* @param (k121-ali)/k121 > 3% true 就开始同步

* @param false 就 continue

* @throws SQLException

*/

@SuppressWarnings({ "unchecked" })

public List<String> compareTableSize(String[] table_name) {

List<Long> k121_num = null;

List<Long> ali_num = null ;

/*data = dao.getDateBaseDao(con); //获取数据库名字

for(String dataname: data){

table = dao.getTableDao(con, dataname); //获取表名

}

for(String tablename : table){

dao.K121_TableCount(con, tablename); //获取k121表的大小

}*/

for(String k121_table : table_name){ //k121表名

try {

k121_num = dao.K121_TableCount(con, k121_table);

ali_num = dao.Ali_TableCount(ali_con, k121_table);

} catch (Exception e) {

System.out.println("k121_num = dao.K121_TableCount(con, k121_table);ali_num = dao.Ali_TableCount(ali_con, k121_table);"+e);

}

try {

if(k121_num.get(0)==null){

continue;

}

long k121 = k121_num.get(0);

long ali = ali_num.get(0);

double cha = (k121-ali)/(double)k121*1000;

if((cha > 1)){

end_table.add(k121_table); //满足条件的添加到end_table里面

}

} catch (Exception e) {

continue;

}

}

return end_table;

}

//创建sql

@SuppressWarnings({ "unchecked" })

public String createSql(List<String> end_table )

{

List<String> field = new ArrayList();
//表字段

StringBuffer sb = new StringBuffer();

for(String tables : end_table){
//一表多字段

field = dao.Find_table_field(con, tables);

sb.append("replace into" + tables + " ( " );

for(String field_cr : field){

sb.append(field_cr);

}

sb.append(")");

sql.add(String.valueOf(sb));

System.out.println("Tm的这个sql是什么鬼"+sb);

}

return null;

}

//拼接 ------ 一个带数据库名的K121_table表名

//database+"."+tablename

@SuppressWarnings({ "unchecked" })

public void createTable(String[] args) {

StringBuffer tbnames = new StringBuffer();

data = dao.getDateBaseDao(con); //获取数据库名

if(args[0]==null){

for(String dataname : data){

if(dataname.equals("mysql")||dataname.equals("test")||dataname.equals("information_schema")){

continue;

}

table = dao.getTableDao(con, dataname);//取到表了:这个是 数据库和表是 一对多的关系

for(String tableson : table){

tbnames.append(dataname+"."+"`"+tableson+"`"+",");//拼接完表名s

}

String tabnames_1 =String.valueOf(tbnames);

String[] table_name = tabnames_1.split(",");

try {

new CoreDataSyncImpl().syncData(compareTableSize(table_name), con, ali_con);

} catch (SQLException e) {

// TODO 自动生成的 catch 块

e.printStackTrace();

} catch (Exception e) {

// TODO 自动生成的 catch 块

e.printStackTrace();

}

System.out.println("end_table size is ==="+end_table.size());

end_table.clear();

}

}else{

SingleDb_CreateTable(args[0]);

}

}

//单个数据库同步

@SuppressWarnings({ "unchecked" })

public void SingleDb_CreateTable(String dataname) {

StringBuffer tbnames = new StringBuffer();

table = dao.getTableDao(con, dataname);//取到表了:这个是 数据库和表是 一对多的关系

for(String tableson : table){

tbnames.append(dataname+"."+tableson+",");//拼接完表名

}

String tabnames_1 =String.valueOf(tbnames);

String[] table_name = tabnames_1.split(",");

try {

new CoreDataSyncImpl().syncData(compareTableSize(table_name), con, ali_con);

} catch (SQLException e) {

// TODO 自动生成的 catch 块

e.printStackTrace();

} catch (Exception e) {

// TODO 自动生成的 catch 块

e.printStackTrace();

}

System.out.println("end_table size is ==="+end_table.size());

end_table.clear();

}

/*

//拼接 ------ 一个带数据库名的Ali_table表名

@SuppressWarnings({ "unchecked", "unused" })

public String[] Ali_createTable() throws SQLException{

StringBuffer tbnames = new StringBuffer();

data = dao.getDateBaseDao(ali_con); //获取数据库名

for(String dataname : data){

table = dao.getTableDao(ali_con, dataname);//取到表了:这个是 数据库和表是 一对多的关系

for(String tableson : table){

tbnames.append(dataname+"."+table+",");//拼接完表名

}

String tabnames_1 =String.valueOf(tbnames);

String[] table_name = tabnames_1.split(",");

System.out.println("Ali_拼接的数据库表名字"+" "+table_name);

return table_name;

}

return null;

}*/

}

接下来是主要是实现 同步数据库的方法:

package com.koolma.dsp.getData.SynchroForDate.action;

import java.sql.Connection;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.SQLException;

import java.sql.Statement;

import java.util.List;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicLong;

import org.apache.log4j.Logger;

import org.apache.log4j.chainsaw.Main;

import com.koolma.dsp.getData.SynchroForDate.dao.SyDateDao;

/**

* <p>

* title: 数据同步类

* </p>

* <p>

* Description: 该类用于将生产核心库数据同步到开发库

* </p>

*

* @author yinhao

*/

public class CoreDataSyncImpl {

@SuppressWarnings("unused")

private List<String> coreTBNames = new SyDbAction().getEnd_table(); // 要同步的核心库表名

private Logger log = Logger.getLogger(getClass());

private AtomicLong currentSynCount = new AtomicLong(0L); // 当前已同步的条数

private int syncThreadNum =1; // 同步的线程数

private static Connection coreConnection ,targetConn; //公用链接

public void syncData(List<String> coreTBNames,Connection k121,Connection ali) throws Exception {

coreConnection = k121;

targetConn = ali;

if(coreConnection==null || targetConn==null){

targetConn = new ConnectionDateBases().ali_connectionSql();

coreConnection = new ConnectionDateBases().k121_connectionSql1();

}

System.out.println("coreTBNames=="+coreTBNames+"同步表的长度 =="+coreTBNames.size());

for (String tmpTBName : coreTBNames) {

log.info("开始同步核心库" + tmpTBName + "表数据");

// 获得核心库连接

/*Connection coreConnection = new ConnectionDateBases().k121_connectionSql1();*/

Statement coreStmt = coreConnection.createStatement();

// 为每个线程分配结果集

ResultSet coreRs = coreStmt.executeQuery("SELECT count(*) FROM "+tmpTBName);

/*ResultSet coreRs = coreStmt.executeQuery("SELECT count(*) FROM "+"test.items"); //test

*/ coreRs.next();

// 总共处理的数量

long totalNum = coreRs.getLong(1);

// 每个线程处理的数量

long ownerRecordNum =(long) Math.ceil((totalNum / syncThreadNum));

System.out.println("共需要同步的数据量:"+totalNum);

System.out.println("同步线程数量:"+syncThreadNum);

System.out.println("每个线程可处理的数量:"+ownerRecordNum);

log.info("共需要同步的数据量:"+totalNum);

log.info("同步线程数量:"+syncThreadNum);

log.info("每个线程可处理的数量:"+ownerRecordNum);

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(syncThreadNum);

// 开启五个线程向目标库同步数据

for(int i=0; i < syncThreadNum; i ++){

StringBuilder sqlBuilder = new StringBuilder();

// 拼装后SQL示例

// Select * From dms_core_ds Where id between 1 And 657398

// Select * From dms_core_ds Where id between 657399 And

// 1314796

// Select * From dms_core_ds Where id between 1314797 And

// 1972194

// Select * From dms_core_ds Where id between 1972195 And

// 2629592

// Select * From dms_core_ds Where id between 2629593 And

// 3286990

// ..

sqlBuilder.append("Select * From ").append(tmpTBName)

/* sqlBuilder.append("Select * From ").append("test.items") //test

*/ .append(" limit " ).append(i * ownerRecordNum )

.append( " , ")

.append((ownerRecordNum+ 1));

Thread workThread = new Thread(

new WorkerHandler(sqlBuilder.toString(),tmpTBName));

/* new WorkerHandler(sqlBuilder.toString(),"test.items")); */

workThread.setName("SyncThread-"+i);

/* workThread.start();*/

fixedThreadPool.execute(workThread);

}

fixedThreadPool.awaitTermination(3, TimeUnit.HOURS);

while (currentSynCount.get() < totalNum);

// 休眠一会儿让数据库有机会commit剩余的批处理(只针对JUnit单元测试,因为单元测试完成后会关闭虚拟器,使线程里的代码没有机会作提交操作);

// Thread.sleep(1000 * 3);

log.info( "核心库"+tmpTBName+"表数据同步完成,共同步了" + currentSynCount.get() + "条数据");

System.out.println( "核心库"+tmpTBName+"表数据同步完成,共同步了" + currentSynCount.get() + "条数据");

}

}// end for loop

public void setCoreTBNames(List<String> coreTBNames) {

this.coreTBNames = coreTBNames;

}

public void setSyncThreadNum(int syncThreadNum) {

this.syncThreadNum = syncThreadNum;

}

// 数据同步线程

final class WorkerHandler implements Runnable {

ResultSet coreRs;

String queryStr;

int businessType;

String targetTBName;

public static final long TIMEOUT = 30000l;

public long startTime ;

public WorkerHandler(String queryStr,String targetTBName) {

this.queryStr = queryStr;

this.targetTBName = targetTBName;

/* this.targetTBName = "test.items";*/

}

@Override

public void run() {

try {

startTime = System.currentTimeMillis();

// 开始同步

launchSyncData(coreConnection,targetConn);

Thread.yield();//调度器

if (System.currentTimeMillis() - startTime > TIMEOUT) {

Thread.interrupted();

}

} catch(Exception e){

log.error(e);

System.out.println(e);

}

}

// 同步数据方法

@SuppressWarnings({ "unchecked" })

void launchSyncData(Connection coreConnection, Connection targetConn) {

// 获得核心库连接

/* Connection coreConnection = new ConnectionDateBases().k121_connectionSql1();*/

if(coreConnection==null || targetConn==null){

targetConn = new ConnectionDateBases().ali_connectionSql();

coreConnection = new ConnectionDateBases().k121_connectionSql1();

}

try {

Statement coreStmt = coreConnection.createStatement();

// 获得目标库连接

/* Connection targetConn = new ConnectionDateBases().ali_connectionSql();*/

targetConn.setAutoCommit(false);// 设置手动提交

List<String> field = new SyDateDao().Find_table_field(targetConn, targetTBName);

int a = field.size();

int b = a;

int c =0;

StringBuffer wenhao = new StringBuffer();

while(a-- > 1){

wenhao.append("?,");

}

wenhao.append("?");

PreparedStatement targetPstmt = targetConn.prepareStatement("REPLACE INTO " + targetTBName+" VALUES ("+String.valueOf(wenhao)+")");

ResultSet coreRs = coreStmt.executeQuery(queryStr);

log.info(Thread.currentThread().getName()+"'s Query SQL::"+queryStr);

System.out.println(Thread.currentThread().getName()+"'s Query SQL::"+queryStr);

int batchCounter = 0; // 累加的批处理数量

while (coreRs.next()) { //有数据库就一直查=插

while(++c<b+1){

targetPstmt.setObject(c, coreRs.getObject(c));

}

targetPstmt.addBatch();

batchCounter++;

/* System.out.println(Thread.currentThread().getName()+batchCounter);*/

currentSynCount.incrementAndGet();// 递增

c = 0;

if (batchCounter % 10000 == 0) { // 1万条数据一提交

targetPstmt.executeBatch();

targetPstmt.clearBatch();

targetConn.commit();

}

}

// 提交剩余的批处理

targetPstmt.executeBatch();

targetPstmt.clearBatch();

targetConn.commit();

coreRs.close();

// 释放连接

coreStmt.close();

targetPstmt.close();

coreRs.close();

Thread.yield();

} catch (SQLException e) {

// TODO 自动生成的 catch 块

e.printStackTrace();

}

}

}

}

MainAction:

package com.koolma.dsp.getData.SynchroForDate.action;

import org.mortbay.log.Log;

public class MainAction extends SyDbAction{

public static void main(String[] args) {

long start = System.currentTimeMillis();

try {

new SyDbAction().createTable(args); //拼接 ------ 一个带数据库名的K121_table表名

} catch (Exception e) {

System.out.println(e);

}

long end = System.currentTimeMillis();

//这里可以加join。可以避免主线程比子线程先结束

Thread.join();

Log.info("同步成功,。。。。。");

Log.info("同步数据库 "+ args[0] +"消耗的时间为 :"+(end - start)/1000+"秒");

}

}

实现方式还是很简单。但是关于大数据的话容易报错。经常出现的错误一般为线程堵塞问题。。

建议在方法加锁 这样 或者 使用线程堵塞执行。

该方法针对数据量大的方法
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java 服务器 迁移 数据