您的位置:首页 > 数据库 > MySQL

MySQL在ROW模式下通过binlog提取SQ…

2016-01-29 10:31 513 查看

Linux


基于row模式的binlog,生成DML(insert/update/delete)的rollback语句

通过mysqlbinlog -v 解析binlog生成可读的sql文件

提取需要处理的有效sql

  "### "开头的行.如果输入的start-position位于某个event
group中间,则会导致"无法识别event"错误

将INSERT/UPDATE/DELETE 的sql反转,并且1个完整sql只能占1行

  INSERT: INSERT INTO => DELETE FROM, SET =>
WHERE

  UPDATE: WHERE => SET, SET =>
WHERE

  DELETE: DELETE FROM => INSERT INTO, WHERE
=> SET

用列名替换位置@{1,2,3}

  通过desc table获得列顺序及对应的列名

  特殊列类型value做特别处理

逆序

注意:

  表结构与现在的表结构必须相同[谨记]

 
由于row模式是幂等的,并且恢复是一次性,所以只提取sql,不提取BEGIN/COMMIT

  只能对INSERT/UPDATE/DELETE进行处理

mysql> select * from yoon;

+----------+------------+-----------+---------------------+

| actor_id | first_name | last_name | last_update
     
  |

+----------+------------+-----------+---------------------+

|      
 1 | HANK    
  | YOON    
 | 2006-02-15 04:34:33 |

|      
 2 | HANK    
  | YOON    
 | 2006-02-15 04:34:33 |

|      
 3 | HANK    
  | YOON    
 | 2006-02-15 04:34:33 |

|      
 4 | HANK    
  | YOON    
 | 2006-02-15 04:34:33 |

|      
 5 | HANK    
  | YOON    
 | 2006-02-15 04:34:33 |

|      
 6 | HANK    
  | YOON    
 | 2006-02-15 04:34:33 |

|      
 7 | HANK    
  | YOON    
 | 2006-02-15 04:34:33 |

|      
 8 | HANK    
  | YOON    
 | 2006-02-15 04:34:33 |

|      
 9 | HANK    
  | YOON    
 | 2006-02-15 04:34:33 |

|       10 |
HANK       |
YOON      |
2006-02-15 04:34:33 |

|       11 |
HANK       |
YOON      |
2006-02-15 04:34:33 |

+----------+------------+-----------+---------------------+

11 rows in set (0.00 sec)

mysql> delete from yoon;

Query OK, 11 rows affected (1.03 sec)

mysql> select * from yoon;

Empty set (0.00 sec)

命令之间的空格一定要注意,否则就会无法提取SQL语句:

[root@hank-yoon data]# perl
binlog-rollback.pl -f 'mysql-bin.000001' -o
'/export/data/mysql/data/yoon.sql' -u 'root' -p
'yoon'

Warning: Using a password on the command line interface can be
insecure.

[root@hank-yoon data]# ls

auto.cnf      
     hank
    ibdata2  
   ib_logfile1
 modify.pl  mysql-bin.000001
 performance_schema  test
 yoon.sql

binlog-rollback.pl  ibdata1
 ib_logfile0  ib_logfile2
 mysql    
 mysql-bin.index   sakila
     
     
 yoon

[root@hank-yoon data]# cat
yoon.sql 

INSERT INTO `yoon`.`yoon` SET `actor_id`=11, `first_name`='HANK',
`last_name`='YOON',
`last_update`=from_unixtime(1139949273);

INSERT INTO `yoon`.`yoon` SET `actor_id`=10, `first_name`='HANK',
`last_name`='YOON',
`last_update`=from_unixtime(1139949273);

INSERT INTO `yoon`.`yoon` SET `actor_id`=9, `first_name`='HANK',
`last_name`='YOON',
`last_update`=from_unixtime(1139949273);

INSERT INTO `yoon`.`yoon` SET `actor_id`=8, `first_name`='HANK',
`last_name`='YOON',
`last_update`=from_unixtime(1139949273);

INSERT INTO `yoon`.`yoon` SET `actor_id`=7, `first_name`='HANK',
`last_name`='YOON',
`last_update`=from_unixtime(1139949273);

INSERT INTO `yoon`.`yoon` SET `actor_id`=6, `first_name`='HANK',
`last_name`='YOON',
`last_update`=from_unixtime(1139949273);

INSERT INTO `yoon`.`yoon` SET `actor_id`=5, `first_name`='HANK',
`last_name`='YOON',
`last_update`=from_unixtime(1139949273);

INSERT INTO `yoon`.`yoon` SET `actor_id`=4, `first_name`='HANK',
`last_name`='YOON',
`last_update`=from_unixtime(1139949273);

INSERT INTO `yoon`.`yoon` SET `actor_id`=3, `first_name`='HANK',
`last_name`='YOON',
`last_update`=from_unixtime(1139949273);

INSERT INTO `yoon`.`yoon` SET `actor_id`=2, `first_name`='HANK',
`last_name`='YOON',
`last_update`=from_unixtime(1139949273);

INSERT INTO `yoon`.`yoon` SET `actor_id`=1, `first_name`='HANK',
`last_name`='YOON',
`last_update`=from_unixtime(1139949273);

mysql> INSERT INTO `yoon`.`yoon` SET `actor_id`=11,
`first_name`='HANK', `last_name`='YOON',
`last_update`=from_unixtime(1139949273);

Query OK, 1 row affected (0.01 sec)

mysql> select * from yoon;

+----------+------------+-----------+---------------------+

| actor_id | first_name | last_name | last_update
     
  |

+----------+------------+-----------+---------------------+

|       11 |
HANK       |
YOON      |
2006-02-15 04:34:33 |

+----------+------------+-----------+---------------------+

脚本:

#!/usr/lib/perl
-w

use strict;
use warnings;

use Class::Struct;
use Getopt::Long qw(:config no_ignore_case); # GetOption
# register handler system signals
use sigtrap 'handler', \&sig_int, 'normal-signals';

# catch signal
sub sig_int(){
my ($signals) = @_;
print STDERR "# Caught SIG$signals.\n";
exit 1;
}

my %opt;
my $srcfile;
my $host = '127.0.0.1';
my $port = 3306;
my ($user,$pwd);
my ($MYSQL, $MYSQLBINLOG, $ROLLBACK_DML);
my $outfile = '/dev/null';
my (%do_dbs,%do_tbs);

# tbname=>tbcol, tbcol: @n=>colname,type
my %tbcol_pos;

my $SPLITER_COL = ',';
my $SQLTYPE_IST = 'INSERT';
my $SQLTYPE_UPD = 'UPDATE';
my $SQLTYPE_DEL = 'DELETE';
my $SQLAREA_WHERE = 'WHERE';
my $SQLAREA_SET = 'SET';

my $PRE_FUNCT = '========================== ';

#
=========================================================
# 基于row模式的binlog,生成DML(insert/update/delete)的rollback语句
# 通过mysqlbinlog -v 解析binlog生成可读的sql文件
# 提取需要处理的有效sql
# "### "开头的行.如果输入的start-position位于某个event
group中间,则会导致"无法识别event"错误
#
# 将INSERT/UPDATE/DELETE 的sql反转,并且1个完整sql只能占1行
# INSERT: INSERT INTO => DELETE FROM, SET => WHERE
# UPDATE: WHERE => SET, SET => WHERE
# DELETE: DELETE FROM => INSERT INTO, WHERE => SET
# 用列名替换位置@{1,2,3}
# 通过desc table获得列顺序及对应的列名
# 特殊列类型value做特别处理
# 逆序

# 注意:
# 表结构与现在的表结构必须相同[谨记]
# 由于row模式是幂等的,并且恢复是一次性,所以只提取sql,不提取BEGIN/COMMIT
# 只能对INSERT/UPDATE/DELETE进行处理
#
========================================================
sub main{

# get input option
&get_options();


&init_tbcol();

#
&do_binlog_rollback();
}

&main();

#
----------------------------------------------------------------------------------------
# Func : get options and set option
flag 
#
----------------------------------------------------------------------------------------
sub get_options{
#Get options info
GetOptions(\%opt,
'help', # OUT : print help info
  
'f|srcfile=s', # IN  : binlog file
'o|outfile=s', # out : output sql file
'h|host=s', # IN  :
 host
'u|user=s',    
     
  # IN  :
 user
'p|password=s',    
    # IN  :
 password
'P|port=i', # IN  :
 port
'start-datetime=s', # IN  :
 start datetime
'stop-datetime=s', # IN  :
 stop datetime
'start-position=i', # IN  :
 start position
'stop-position=i', # IN  :
 stop position
'd|database=s', # IN  :
 database, split comma
'T|table=s', # IN  :  table,
split comma
'i|ignore', # IN  :  ignore
binlog check ddl and so on
'debug', # IN  :  print
debug information
 ) or print_usage();

if (!scalar(%opt)) {
&print_usage();
}

# Handle for options
if ($opt{'f'}){
$srcfile = $opt{'f'};
}else{
&merror("please input binlog file");
}

$opt{'h'} and $host = $opt{'h'};
$opt{'u'} and $user = $opt{'u'};
$opt{'p'} and $pwd = $opt{'p'};
$opt{'P'} and $port = $opt{'P'};
if ($opt{'o'}) {
$outfile = $opt{'o'};
# 清空 outfile
`echo '' > $outfile`;
}


$MYSQL = qq{mysql -h$host -u$user -p'$pwd' -P$port};
&mdebug("get_options::MYSQL\n\t$MYSQL");

# 提取binlog,不需要显示列定义信息,用-v,而不用-vv
$MYSQLBINLOG = qq{mysqlbinlog -v};
$MYSQLBINLOG .= " --start-position=".$opt{'start-position'} if
$opt{'start-position'};
$MYSQLBINLOG .= " --stop-position=".$opt{'stop-position'} if
$opt{'stop-postion'};
$MYSQLBINLOG .= "
--start-datetime='".$opt{'start-datetime'}."'" if
$opt{'start-datetime'};
$MYSQLBINLOG .= " --stop-datetime='$opt{'stop-datetime'}'" if
$opt{'stop-datetime'};
$MYSQLBINLOG .= " $srcfile";
&mdebug("get_options::MYSQLBINLOG\n\t$MYSQLBINLOG");

# 检查binlog中是否含有 ddl sql: CREATE|ALTER|DROP|RENAME
&check_binlog() unless ($opt{'i'});

# 不使用mysqlbinlog过滤,USE
dbname;方式可能会漏掉某些sql,所以不在mysqlbinlog过滤
# 指定数据库
if ($opt{'d'}){
my @dbs = split(/,/,$opt{'d'});
foreach my $db (@dbs){
$do_dbs{$db}=1;
}
}

# 指定表
if ($opt{'T'}){
my @tbs = split(/,/,$opt{'T'});
foreach my $tb (@tbs){
$do_tbs{$tb}=1;
}
}

# 提取有效DML SQL
$ROLLBACK_DML = $MYSQLBINLOG." | grep '^### '";
# 去掉注释: '### ' -> ''
# 删除首尾空格
$ROLLBACK_DML .= " | sed 's/###\\s*//g;s/\\s*\$//g'";
&mdebug("rollback dml\n\t$ROLLBACK_DML");
# 检查内容是否为空
my $cmd = "$ROLLBACK_DML | wc -l";
&mdebug("check contain dml sql\n\t$cmd");
my $size = `$cmd`;
chomp($size);
unless ($size >0){
&merror("binlog DML is empty:$ROLLBACK_DML");
};

}

#
----------------------------------------------------------------------------------------
# Func :  check binlog contain DDL
#
----------------------------------------------------------------------------------------
sub check_binlog{
&mdebug("$PRE_FUNCT check_binlog");
my $cmd = "$MYSQLBINLOG ";
$cmd .= " | grep -E -i '^(CREATE|ALTER|DROP|RENAME)' ";
&mdebug("check binlog has DDL cmd\n\t$cmd");
my $ddlcnt = `$cmd`;
chomp($ddlcnt);

my $ddlnum = `$cmd | wc -l`;
chomp($ddlnum);
my $res = 0;
if ($ddlnum>0){
# 在ddl sql前面加上前缀
$ddlcnt = `echo '$ddlcnt' | sed 's/^//g'`;
&merror("binlog contain $ddlnum DDL:$MYSQLBINLOG. ddl
sql:\n$ddlcnt");
}

return $res;
}

#
----------------------------------------------------------------------------------------
# Func : init all table column order
# if input --database --table params, only get set table
column order
#
----------------------------------------------------------------------------------------
sub init_tbcol{
&mdebug("$PRE_FUNCT init_tbcol");
# 提取DML语句
my $cmd .= "$ROLLBACK_DML | grep -E
'^(INSERT|UPDATE|DELETE)'";
# 提取表名,并去重
#$cmd .= " | awk '{if (\$1 ~ \"^UPDATE\") {print \$2}else
{print \$3}}' | uniq ";
$cmd .= " | awk '{if (\$1 ~ \"^UPDATE\") {print \$2}else
{print \$3}}' | sort | uniq ";
&mdebug("get table name cmd\n\t$cmd");
open ALLTABLE, "$cmd | " or die "can't open
file:$cmd\n";

while (my $tbname = ){
chomp($tbname);
#if (exists $tbcol_pos{$tbname}){
# next;
#}
&init_one_tbcol($tbname) unless
(&ignore_tb($tbname));
}
close ALLTABLE or die "can't close file:$cmd\n";

# init tb col
foreach my $tb (keys %tbcol_pos){
&mdebug("tbname->$tb");
my %colpos = %{$tbcol_pos{$tb}};
foreach my $pos (keys %colpos){
my $col = $colpos{$pos};
my ($cname,$ctype) = split(/$SPLITER_COL/, $col);

&mdebug("\tpos->$pos,cname->$cname,ctype->$ctype");
}
}
};

#
----------------------------------------------------------------------------------------
# Func : init one table column order
#
----------------------------------------------------------------------------------------
sub init_one_tbcol{
my $tbname = shift;
&mdebug("$PRE_FUNCT init_one_tbcol");
# 获取表结构及列顺序
my $cmd = $MYSQL." --skip-column-names --silent -e 'desc
$tbname'";
# 提取列名,并拼接
$cmd .= " | awk -F\'\\t\' \'{print
NR\"$SPLITER_COL`\"\$1\"`$SPLITER_COL\"\$2}'";
&mdebug("get table column infor cmd\n\t$cmd");
open TBCOL,"$cmd | " or die "can't open desc $tbname;";

my %colpos;
while (my $line = ){
chomp($line);
my ($pos,$col,$coltype) = split(/$SPLITER_COL/,$line);

&mdebug("linesss=$line\n\t\tpos=$pos\n\t\tcol=$col\n\t\ttype=$coltype");
$colpos{$pos} = $col.$SPLITER_COL.$coltype;
}
close TBCOL or die "can't colse desc $tbname";

$tbcol_pos{$tbname} = \%colpos;
}

#
----------------------------------------------------------------------------------------
# Func :  rollback sql:
INSERT/UPDATE/DELETE
#
----------------------------------------------------------------------------------------
sub do_binlog_rollback{
my $binlogfile = "$ROLLBACK_DML ";
&mdebug("$PRE_FUNCT do_binlog_rollback");

# INSERT|UPDATE|DELETE
my $sqltype;
# WHERE|SET
my $sqlarea;
my ($tbname, $sqlstr) = ('', '');
my ($notignore, $isareabegin) = (0,0);

# output sql file
open SQLFILE, ">> $outfile" or die "Can't open sql
file:$outfile";

# binlog file
open BINLOG, "$binlogfile |" or die "Can't open file:
$binlogfile";
while (my $line = ){
chomp($line);
if ($line =~ /^(INSERT|UPDATE|DELETE)/){
# export sql
if ($sqlstr ne ''){
$sqlstr .= ";\n";
print SQLFILE $sqlstr;
&mdebug("export sql\n\t".$sqlstr);
$sqlstr = '';
}

if ($line =~ /^INSERT/){
$sqltype = $SQLTYPE_IST;
$tbname = `echo '$line' | awk '{print \$3}'`;
chomp($tbname);
$sqlstr = qq{DELETE FROM $tbname};
}elsif ($line =~ /^UPDATE/){
$sqltype = $SQLTYPE_UPD;
$tbname = `echo '$line' | awk '{print \$2}'`;
chomp($tbname);
$sqlstr = qq{UPDATE $tbname};
}elsif ($line =~ /^DELETE/){
$sqltype = $SQLTYPE_DEL;
$tbname = `echo '$line' | awk '{print \$3}'`;
chomp($tbname);
$sqlstr = qq{INSERT INTO $tbname};
}

# check ignore table
if(&ignore_tb($tbname)){
$notignore = 0;
&mdebug("#IGNORE#:line:".$line);
$sqlstr = '';
}else{
$notignore = 1;
&mdebug("#DO#:line:".$line);
}
}else {
if($notignore){
&merror("can't get tbname") unless
(defined($tbname));
if ($line =~ /^WHERE/){
$sqlarea = $SQLAREA_WHERE;
$sqlstr .= qq{ SET};
$isareabegin = 1;
}elsif ($line =~ /^SET/){
$sqlarea = $SQLAREA_SET;
$sqlstr .= qq{ WHERE};
$isareabegin = 1;
}elsif ($line =~ /^\@/){
$sqlstr .= &deal_col_value($tbname, $sqltype, $sqlarea,
$isareabegin, $line);
$isareabegin = 0;
}else{
&mdebug("::unknown sql:".$line);
}
}
}
}
# export last sql
if ($sqlstr ne ''){
$sqlstr .= ";\n";
print SQLFILE $sqlstr;
&mdebug("export sql\n\t".$sqlstr);
}
close BINLOG or die "Can't close binlog file:
$binlogfile";

close SQLFILE or die "Can't close out sql file:
$outfile";

# 逆序
# 1!G: 只有第一行不执行G, 将hold space中的内容append回到pattern space
# h: 将pattern space 拷贝到hold space
# $!d: 除最后一行都删除
my $invert = "sed -i '1!G;h;\$!d' $outfile";
my $res = `$invert`;
&mdebug("inverter order sqlfile :$invert");
}

#
----------------------------------------------------------------------------------------
# Func :  transfer column pos to name
# deal column value
#
#  &deal_col_value($tbname, $sqltype,
$sqlarea, $isareabegin, $line);
#
----------------------------------------------------------------------------------------
sub deal_col_value($$$$$){
my ($tbname, $sqltype, $sqlarea, $isareabegin, $line) =
@_;
&mdebug("$PRE_FUNCT deal_col_value");

&mdebug("input:tbname->$tbname,type->$sqltype,area->$sqlarea,areabegin->$isareabegin,line->$line");
my @vals = split(/=/, $line);
my $pos = substr($vals[0],1);
my $valstartpos = length($pos)+2;
my $val = substr($line,$valstartpos);
my %tbcol = %{$tbcol_pos{$tbname}};
my ($cname,$ctype) = split(/$SPLITER_COL/,$tbcol{$pos});
&merror("can't get $tbname column $cname type") unless
(defined($cname) || defined($ctype));
&mdebug("column
infor:cname->$cname,type->$ctype");

# join str
my $joinstr;
if ($isareabegin){
$joinstr = ' ';
}else{
# WHERE 被替换为 SET, 使用 ,  连接
if ($sqlarea eq $SQLAREA_WHERE){
$joinstr = ', ';
# SET 被替换为 WHERE 使用 AND 连接
}elsif ($sqlarea eq $SQLAREA_SET){
$joinstr = ' AND ';
}else{
&merror("!!!!!!The scripts error");
}
}

my $newline = $joinstr;

# NULL value
if (($val eq 'NULL') && ($sqlarea eq
$SQLAREA_SET)){
$newline .= qq{ $cname IS NULL};
}else{
# timestamp: record seconds
if ($ctype eq 'timestamp'){
$newline .= qq{$cname=from_unixtime($val)};
# datetime: @n=yyyy-mm-dd hh::ii::ss
}elsif ($ctype eq 'datetime'){
$newline .= qq{$cname='$val'};
}else{
$newline .= qq{$cname=$val};
}
}
&mdebug("\told>$line\n\tnew>$newline");
return $newline;
}

#
----------------------------------------------------------------------------------------
# Func :  check is ignore table
# params: IN table full name #
 format:`dbname`.`tbname`
# RETURN:
# 0 not ignore
# 1 ignore
#
----------------------------------------------------------------------------------------
sub ignore_tb($){
my $fullname = shift;
# 删除`
$fullname =~ s/`//g;
my ($dbname,$tbname) = split(/\./,$fullname);
my $res = 0;
# 指定了数据库
if ($opt{'d'}){
# 与指定库相同
if ($do_dbs{$dbname}){
# 指定表
if ($opt{'T'}){
# 与指定表不同
unless ($do_tbs{$tbname}){
$res = 1;
}
}
# 与指定库不同
}else{
$res = 1;
}
}
#&mdebug("Table check ignore:$fullname->$res");
return $res;
}

#
----------------------------------------------------------------------------------------
# Func :  print debug msg
#
----------------------------------------------------------------------------------------
sub mdebug{
my (@msg) = @_;
print "@msg\n" if ($opt{'debug'});
}

#
----------------------------------------------------------------------------------------
# Func :  print error msg and exit
#
----------------------------------------------------------------------------------------
sub merror{
my (@msg) = @_;
print ":@msg\n";
&print_usage();
exit(1);
}

#
----------------------------------------------------------------------------------------
# Func :  print usage
#
----------------------------------------------------------------------------------------
sub print_usage{
print <<EOF;

==========================================================================================
Command line options :
--help # OUT : print help info
  
-f, --srcfile # IN  : binlog file.
[required]
-o, --outfile # OUT : output sql file. [required]
-h, --host # IN  : host. default
'127.0.0.1'
-u, --user # IN  : user. [required]
-p, --password # IN  : password.
[required] 
-P, --port # IN  : port. default '3306'
--start-datetime # IN  : start datetime
--stop-datetime # IN  : stop datetime
--start-position # IN  : start position
--stop-position # IN  : stop position
-d, --database # IN  : database, split
comma
-T, --table # IN  : table, split comma.
[required] set -d
-i, --ignore # IN  : ignore binlog check
contain DDL(CREATE|ALTER|DROP|RENAME)
--debug # IN  :  print debug
information

Sample :
   shell> perl
binlog-rollback.pl -f 'mysql-bin.000001' -o '/tmp/t.sql' -u 'user'
-p 'pwd' 
   shell> perl
binlog-rollback.pl -f 'mysql-bin.000001' -o '/tmp/t.sql' -u 'user'
-p 'pwd' -i
   shell> perl
binlog-rollback.pl -f 'mysql-bin.000001' -o '/tmp/t.sql' -u 'user'
-p 'pwd' --debug
   shell> perl
binlog-rollback.pl -f 'mysql-bin.000001' -o '/tmp/t.sql' -h
'192.168.1.2' -u 'user' -p 'pwd' -P 3307
   shell> perl
binlog-rollback.pl -f 'mysql-bin.000001' -o '/tmp/t.sql' -u 'user'
-p 'pwd' --start-position=107
   shell> perl
binlog-rollback.pl -f 'mysql-bin.000001' -o '/tmp/t.sql' -u 'user'
-p 'pwd' --start-position=107 --stop-position=10000
   shell> perl
binlog-rollback.pl -f 'mysql-bin.000001' -o '/tmp/t.sql' -u 'user'
-p 'pwd' -d 'db1,db2'
   shell> perl
binlog-rollback.pl -f 'mysql-bin.0000*' -o '/tmp/t.sql' -u 'user'
-p 'pwd' -d 'db1,db2' -T 'tb1,tb2'

==========================================================================================
EOF
exit;   
}

1;

参考:http://www.oschina.net/code/snippet_617180_34028
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: