您的位置:首页 > 数据库 > MySQL

sqlserver数据导入mysql五:多线程导数据脚本(读取前面拆分的表名进行数据导入)

2014-04-09 16:14 603 查看
#!/usr/bin/perl
use Encode;
use Encode::CN;
use DBI;
use Switch;
use strict;
use Net::HandlerSocket;
use threads;
use Time::HiRes 'time';

my $aim_ip="192.168.0.208";
my $aim_db_name = "mysqldb";
my $hs_port = 9999;

my $source_name = "sqldb";
my $source_user_name = "sa";
my $source_user_psd = "123";

my $db_name="mysqldb";
my $location="192.168.0.208";
my $port="3306";
my $db_user="zoe";
my $db_pass="123";

my $dbh=DBI->connect("dbi:ODBC:$source_name",$source_user_name,$source_user_psd);

# my $sth=$dbh->prepare("select name,object_id from sys.all_objects ao where type='U' and not exists(
# select 1 from  sys.all_columns col where col.object_id=ao.object_id and system_type_id=240)");

my $sth=$dbh->prepare("select name,object_id from sys.all_objects where type='U' and is_ms_shipped=0 and name <>'sysdiagrams'");
$sth->execute();

my $threads_cnt=(not defined $ARGV[0])?10:$ARGV[0];
my $per_records=(not defined $ARGV[1])?3400:$ARGV[1];
my @data;
my $datacount;
my $n=0;

my $ok=0;
my $geo=0;

#print "请输入数字确认运行第几份表的操作";
#my $var=0;
#$var=<STDIN>;
#chop ($var);
my $var=$ARGV[2];
my $openname="alltablename_exportname_"."$var"."\.txt";
my $losetxtname="alltablename_loseprimary_"."$var"."\.txt";
my $bigtxtname="alltablename_bigcount_"."$var"."\.txt";
my $geotxtname='alltablename_geo_'.$var.'.txt';
my $okname="alltablename_ok_"."$var"."\.txt";
my $logname="alltablename_errorlog_"."$var"."\.txt";
my $repairname="alltablename_repair_"."$var"."\.txt";
my $linename="alltablename_line_"."$var"."\.txt";
open(FILE,">>$losetxtname");
syswrite(FILE,"缺少主键的表有:\n");
close(FILE);

open(FILE,">>$bigtxtname");
syswrite(FILE,"超过千万条的表有:\n");
close(FILE);

my($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst);
my $format_time;

my @a;
open IN, "<", "$openname" or die "IN: $!";
while (<IN>) {
chomp;
if(defined($_ ))
{
push @a,$_;
}

}
close IN;

while (@data=$sth->fetchrow_array())
{
$datacount=0;
$ok=0;
$geo=0;
my ($select_columns,$insert_columns,$column_count,$sort_column,$column_types);
($select_columns,$insert_columns,$column_count,$sort_column,$column_types)=get_columns($data[0],$data[1]);
$n+=1;

if($ok ==1){

print '该表有主键'."\n";

if($geo ==1){

if($data[0]~~@a)
{
print '该表有地理值'."\n";
open(FILE,">>$geotxtname");
syswrite(FILE,"$n\n");
syswrite(FILE,"$data[0]\n");
($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime(time());
$format_time=sprintf("%d-%d-%d %d:%d:%d",$year+1990,$mon+1,$mday,$hour,$min,$sec);
syswrite(FILE,$format_time."\n");
close(FILE);

my $relt = export_data_in ($select_columns,$insert_columns,$column_count,$sort_column,$data[0],$column_types);
open(FILE,">>$okname");
syswrite(FILE,$n."\n");
syswrite(FILE,$data[0]."\n");
syswrite(FILE,$datacount."\n");
($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime(time());
$format_time=sprintf("%d-%d-%d %d:%d:%d",$year+1990,$mon+1,$mday,$hour,$min,$sec);
syswrite(FILE,$format_time."\n");
close(FILE);

open(FILE,">>$linename");
syswrite(FILE,'数量:'.$datacount."\n");
syswrite(FILE,'完成'.$data[0].'复制'."\n");
syswrite(FILE,$format_time."\n");
close(FILE);
}

}
else
{

if($data[0]~~@a)
{
print '该表在表名单内'."\n";
open(FILE,">>$linename");
syswrite(FILE,$data[0].'开始复制表'."\n");
($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime(time());
$format_time=sprintf("%d-%d-%d %d:%d:%d",$year+1990,$mon+1,$mday,$hour,$min,$sec);
syswrite(FILE,$format_time."\n");
close(FILE);

my $relt = export_data_in ($select_columns,$insert_columns,$column_count,$sort_column,$data[0],$column_types);
open(FILE,">>$okname");
syswrite(FILE,$n."\n");
syswrite(FILE,$data[0]."\n");
syswrite(FILE,$datacount."\n");
($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime(time());
$format_time=sprintf("%d-%d-%d %d:%d:%d",$year+1990,$mon+1,$mday,$hour,$min,$sec);
syswrite(FILE,$format_time."\n");
close(FILE);

open(FILE,">>$linename");
syswrite(FILE,'数量:'.$datacount."\n");
syswrite(FILE,'完成'.$data[0].'复制'."\n");
syswrite(FILE,$format_time."\n");
close(FILE);
}
else
{
print '该表不在表名单内'."\n";
}

}

}
else
{
if($data[0]~~@a)
{
print '该表无主键'."\n";
open(FILE,">>$losetxtname");
syswrite(FILE,"$n\n");
syswrite(FILE,"$data[0]\n");
($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime(time());
$format_time=sprintf("%d-%d-%d %d:%d:%d",$year+1990,$mon+1,$mday,$hour,$min,$sec);
syswrite(FILE,$format_time."\n");
close(FILE);
}
}

}
$sth->finish;
$dbh->disconnect;
print '所有表的入库执行完毕!!!!'."\n";

sub export_data_in
{
my($select_columns,$insert_columns,$columns_count,$sort_column,$table_name,$column_types) = @_;
print '开始读取mysql中的数量------------'."\n";
my $data_base = "DBI:mysql:$aim_db_name:$aim_ip:$port";
my $dbhmysql=DBI -> connect($data_base,$db_user,$db_pass);
$dbhmysql->do("SET character_set_client = 'utf8'");
$dbhmysql->do("SET character_set_connection = 'utf8'");

my $mysql="select count(1),max($sort_column) from $aim_db_name\.$table_name";

print "$mysql\n";

#print "执行语句$mysql\n";

my $mysqlsth=$dbhmysql->prepare($mysql);

$mysqlsth->execute() or die "ERROR::$_[0]::$mysqlsth->errstr";

my @data_count1=$mysqlsth->fetchrow_array();
my	$mysqlcount=@data_count1[0];
my $mymaxid=@data_count1[1];
$mysqlsth->finish;
$dbhmysql->disconnect;

print "mysql中已有数量:$mysqlcount\n目前id:$mymaxid\n";

$sort_column="[$sort_column]";

#构建SQL
print '开始读取sqlserver中的数量------------'."\n";
my $rows_count=0;
my $dbh2=DBI->connect("dbi:ODBC:$source_name",$source_user_name,$source_user_psd);
my $sth_sc=$dbh2->prepare("select count(1),min($sort_column),max($sort_column) from $table_name");
$sth_sc->execute();
my @data_count=$sth_sc->fetchrow_array();
$datacount=@data_count[0];
my  $minid=@data_count[1];
my   $maxid=@data_count[2];
print 'sqlserver中数量为:'. $datacount."\n";
if($datacount>=10000000)
{

open(FILE,">>$bigtxtname");
syswrite(FILE,"$n\n");
syswrite(FILE,"$table_name\n");
close(FILE);
}

if($mysqlcount==0)
{
$mymaxid=$minid;
}
else
{
$mymaxid=$mymaxid+1;

}

if($mysqlcount<$datacount)
{
my $begin_cnt = $mymaxid;
##这里不-1,会报21
my $end_cnt =$begin_cnt+$per_records ;
my $thread;
while($begin_cnt-1-$per_records < $maxid)
{
if($datacount<$per_records)
{
my $sql_select="SELECT $select_columns  FROM $table_name  where $sort_column BETWEEN $begin_cnt and $end_cnt";

##开线程。参数请参照上边的描述
export_data($table_name,$sql_select,$insert_columns,$columns_count,$column_types,$datacount,$maxid,$begin_cnt,$end_cnt);
$begin_cnt = $begin_cnt + $per_records+1;
$end_cnt = $end_cnt + $per_records+1;

}
else
{

while(scalar(threads->list())<$threads_cnt)
{
my $sql_select="SELECT $select_columns  FROM $table_name  where $sort_column BETWEEN $begin_cnt and $end_cnt";

##开线程。参数请参照上边的描述
threads->new(\&export_data,   $table_name,$sql_select,$insert_columns,$columns_count,$column_types,$datacount,$maxid,$begin_cnt,$end_cnt);
$begin_cnt = $begin_cnt + $per_records+1;
$end_cnt = $end_cnt + $per_records+1;
}

foreach $thread(threads->list(threads::all))
{    if($thread->is_joinable())
{    $thread->join();
}
}
}
}
foreach $thread(threads->list(threads::all))
{    $thread->join();
}
}

$sth_sc->finish;
$dbh2->disconnect;

}

sub export_data
{
#print "进入export_data函数\n";
my ($table_name,$sql_select,$insert_columns,$columns_count,$column_types,$datacount,$maxid,$begin_cnt,$end_cnt);
my $insert_sql;
eval{
my $startTime=time;
($table_name,$sql_select,$insert_columns,$columns_count,$column_types,$datacount,$maxid,$begin_cnt,$end_cnt)=@_;

($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime(time());
$format_time=sprintf("%d-%d-%d %d:%d:%d",$year+1990,$mon+1,$mday,$hour,$min,$sec);
print 'exporting data '.$table_name.'; total:'.$datacount.'; maxid:'.$maxid.'; nowid:'.$begin_cnt.' to '.$end_cnt."\n";
print  $format_time."\n";
print '开始读取sqlserver数据------------'."\n";
my $dbh_mssql=DBI->connect("dbi:ODBC:$source_name",$source_user_name,$source_user_psd,{RaiseError =>1});
$dbh_mssql->{LongTruncOk}=1;
$dbh_mssql->{LongReadLen}=1048576;

my $sth_select=$dbh_mssql->prepare($sql_select);

# open(FILE,">>all_export_data222.txt");
# syswrite(FILE,"$sql_select\n");
# close(FILE);

$sth_select->execute() or die "Cannot execute: ". $sth_select->errstr();

my $data_str="";

my @select_col;

my $select_data;
#while($select_data=$sth_select->fetchrow_arrayref())
#{

#    if($data_str ne "")
#  {
#    $data_str="$data_str,";
#  }

#   $data_str=$data_str."[$gid,'+',['".join("','",@{$select_data})."']]";

# }q
my $nn=0;
while($select_data=$sth_select->fetchrow_arrayref())
{
$select_col[$nn]=[@$select_data];
$nn++;
}
printf("读出时间%.1f seconds.\n",time-$startTime);

my $col=@select_col;

print '提出行数为:'.$col."\n";

if($col !=0)
{

print '开始组合字符串------------'."\n";
my $startTime=time;
my @col_data;
foreach my  $aref (@select_col)
{
@col_data=@{$aref};
#print @col_data;

if($data_str ne '')
{
$data_str.=' ,';
}
my $data_col="";

for(my $nnn=0;$nnn<@col_data;$nnn++)
{

if ($data_col ne "")
{
$data_col.=" ,";
}

if(@$column_types[$nnn] eq "geometry")
{
$data_col.=" GeomFromText('@col_data[$nnn]',4326)";
}
elsif(@$column_types[$nnn] eq "int")
{
if(@col_data[$nnn]>4200000000)
{
my $value=4294967295-@col_data[$nnn]+1;
$data_col.='-'."$value";
}
else
{
$data_col.="'@col_data[$nnn]'";

}

}
elsif(@$column_types[$nnn] eq "date")
{
if(@col_data[$nnn] eq '1900-01-01')
{
$data_col.="'1000-01-01'";
}
else
{
$data_col.="'@col_data[$nnn]'";

}

}
elsif(@$column_types[$nnn] eq "datetime")
{
if(@col_data[$nnn] eq '1900-01-01 00:00:00')
{
$data_col.="'1000-01-01 00:00:00'";
}
else
{
$data_col.="'@col_data[$nnn]'";

}

}
else
{
$data_col.="'@col_data[$nnn]'";
}

}
#构建插入的时候的值字符串
$data_str.="($data_col)\n";

}

#open(FILE,">>all_export_data.txt");
#syswrite(FILE,"$data_str\n");
#close(FILE);

printf("组合字符串时间%.1f seconds.\n",time-$startTime);

$sth_select->finish;
$dbh_mssql->disconnect;

# open(FILE,">>all_export_data_fre.txt");
# syswrite(FILE,"$data_str\n");
# close(FILE);

$data_str=encode("utf8",decode("gbk",$data_str));

##测试的时候,查看数据的语句。
# print "\n",$data_str,"\n";
$startTime=time;
if($data_str ne "")
{
print '开始写入mysql------------'."\n";
my $data_base = "DBI:mysql:$aim_db_name:$aim_ip:$port";
my $dbh_mysql=DBI -> connect($data_base,$db_user,$db_pass);
$dbh_mysql->do("SET character_set_client = 'utf8'");
$dbh_mysql->do("SET character_set_connection = 'utf8'");
#插入的SQL语句
$data_str=~s/\\//g;
$insert_sql = 'INSERT '.$table_name.'('.$insert_columns.') values '.$data_str .';';

# open(FILE,">>all_export_data_insert.txt");
# syswrite(FILE,"$insert_sql\n");
# close(FILE);

my $sth_mysql=$dbh_mysql->prepare($insert_sql);

$sth_mysql->execute() or die $dbh_mysql->errstr()."\n" ;
$sth_mysql->finish();
$dbh_mysql->disconnect;
}

undef $data_str;
printf("写入时间%.1f seconds.\n",time-$startTime);
}

else
{
undef $data_str;
printf('数目为0,不执行'."\n",time-$startTime);
}
};

print "An error occurred: !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"  if  $@;
if($@)
{
open(FILE,">>$logname");
syswrite(FILE,"$n\n");
syswrite(FILE,"$table_name\n");
syswrite(FILE,"$sql_select\n");
# syswrite(FILE,"$insert_sql\n");
syswrite(FILE,"$@\n");
($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime(time());
$format_time=sprintf("%d-%d-%d %d:%d:%d",$year+1990,$mon+1,$mday,$hour,$min,$sec);
syswrite(FILE,$format_time."\n");
close(FILE);

open(FILE,">>$repairname");
syswrite(FILE,"$table_name\:\:$sql_select\:\:");
close(FILE);
}
print "An error occurred: $@"  if  $@;
}

sub get_columns
{     ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime(time());
$format_time=sprintf("%d-%d-%d %d:%d:%d",$year+1990,$mon+1,$mday,$hour,$min,$sec);
print "loading columns of $_[0] \n";
print   "$format_time\n";
my $sql="select col.name,tp.name,col.[is_identity] from sys.all_columns col
inner join sys.types tp on col.system_type_id=tp.system_type_id  and col.user_type_id=tp.user_type_id
where object_id=$_[1]";
my $dbh2=DBI->connect("dbi:ODBC:$source_name",$source_user_name,$source_user_psd);
my $cols=$dbh2 -> prepare($sql);
$cols->execute();
my $cols_insert = "";
my $cols_select = "";
my $cols_count = 0;
my $sort_column="";
my @cols_types;
my @col;
while(@col= $cols->fetchrow_array())
{
my ($col_name,$type_name,$is_identity)=@col;
@cols_types[$cols_count]=$type_name;
if($cols_count>0)
{
$cols_insert="$cols_insert,";
$cols_select="$cols_select ,";
}
#else
#{
#$sort_column="[$col_name]";
#}
if($type_name eq "hierarchyid")
{
$cols_select = "$cols_select [$col_name].ToString() as [$col_name]";
$cols_insert = "$cols_insert$col_name";
}
elsif($type_name eq "nvarchar")
{
$cols_select.="CAST((ISNULL(replace(replace(replace(replace([$col_name],'',''),' ',''),char(10),''),char(13),''),'')) as TEXT) as [$col_name]";
$cols_insert = "$cols_insert$col_name";

}
elsif($type_name eq "varchar")
{
$cols_select.="CAST((ISNULL(replace(replace(replace(replace([$col_name],'',''),' ',''),char(10),''),char(13),''),'')) as TEXT) as [$col_name]";
$cols_insert = "$cols_insert$col_name";

}
elsif ($type_name eq "int")
{
$cols_select="$cols_select ISNULL([$col_name],0) as [$col_name]";
$cols_insert = "$cols_insert$col_name";
}
elsif ($type_name eq "numeric")
{
$cols_select="$cols_select ISNULL([$col_name],0) as [$col_name]";
$cols_insert = "$cols_insert$col_name";
}
elsif ($type_name eq "bit")
{
$cols_select="$cols_select ISNULL([$col_name],0) as [$col_name]";
$cols_insert = "$cols_insert$col_name";
}
elsif ($type_name eq "money")
{
$cols_select="$cols_select ISNULL([$col_name],0) as [$col_name]";
$cols_insert = "$cols_insert$col_name";
}
elsif ($type_name eq "datetime")
{

$cols_select="$cols_select ISNULL(CONVERT(VARCHAR(24),[$col_name],20),'1900-01-01 00:00:00') as [$col_name]";
$cols_insert = "$cols_insert$col_name";
}
elsif ($type_name eq "date")
{
$cols_select="$cols_select ISNULL([$col_name],'1900-01-01') as [$col_name]";
$cols_insert = "$cols_insert$col_name";
}
elsif($type_name eq "geometry")
{
# $geo=1;
$cols_select = "$cols_select isnull([$col_name],'POINT (0 0)').STAsText() as $col_name";
$cols_insert = "$cols_insert `$col_name`";
}
else
{
$cols_select="$cols_select [$col_name]";
$cols_insert = "$cols_insert$col_name";
}
if($is_identity == 1)
{
$ok=1;
$sort_column="$col_name";
}
$cols_count++;
}
$cols->finish;
$dbh2->disconnect;
($cols_select,$cols_insert,$cols_count,$sort_column,\@cols_types);
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐