您的位置:首页 > 编程语言 > Python开发

基于阿里云RDS数据误删除的回滚方案

2017-06-27 09:52 483 查看
场景是这样子的:

如果有人不小心删了数据库,怎么办?

       主从?恐怕不行,数据实时同步,备库的数据也被删了。

       那从库延迟同步,如何?嗯嗯,应该可以。那问题来了。如果延迟同步的情况下,发生数据库误删除,运维人员赶紧切到从库上,终究是能尽快的恢复业务,只是有一部分数据会丢失,那么怎样让服务继续运行的情况下,补回那一部分丢失的数据呢?

      这种情况下,我的脚本就派上用场了!

我的思路是这样子的:

     忘记一件事,强调一下,在切换到从库之前,请将关键表中的主键如果是自增ID,那请将自增ID向后增大一定数值,这样是为了让丢失的数据能补回!

     alter table users AUTO_INCREMENT=xxxxx; 这个语句应该是可帮助你的.

    有些请款,可能需要暂时关闭外键set foreign_key_checks=0;

     言归正传的讲,我们当时的场景是这样的,我们用的阿里云的RDS,所以恢复之前还得先下载binlog日志。

    所以准备条件就这样的:

1. 需要在阿里云上创建一个子账号,

2. 需要aliyuncli工具调用RDS接口,得到binlog日志内网下载地址。

3. 需要在灾备RDS上创建一个binlog的账号

4. 需要一个下载binlog日志并能解包的脚本

5. 需要一个将binlog日志转换成可用sql的脚本

开始干活写脚本:

1. 利用aliyuncli工具,输入如下命令:

aliyuncli rds DescribeBinlogFiles--DBInstanceId rdsid --StartTime 2017-02-15T14:00:00Z --EndTime2017-02-15T14:30:00Z

脚本: 

#!/usr/bin/python
#coding:utf-8
import os,sys
import json
import time

def get_url():
#urlContent = os.popen("aliyuncli rds DescribeBinlogFiles--DBInstanceId rdsid --StartTime 2017-02-17T10:00:00Z --EndTime2017-02-17T12:00:00Z")
DBId = ''
starttime = '2017-02-20 09:00:00'
endtime = '2017-02-20 12:00:00'
starttime = conv_time(starttime,8)
endtime = conv_time(endtime,8)

urlContent = os.popen("aliyuncli rds DescribeBinlogFiles--DBInstanceId %s --StartTime %s --EndTime%s"%(DBId,starttime,endtime)).read()
urlContent = json.loads(urlContent)
urlfile = file('urlfile','w+')
for url in urlContent['Items']['BinLogFile']:
if url['HostInstanceID'] == 1669377:
print url
urlfile.write('%s\n'%url["IntranetDownloadLink"])
urlfile.close()

def conv_time(n_time,n=8):
s_time = time.mktime(time.strptime(n_time,"%Y-%m-%d%H:%M:%S"))
addtime = 3600 * n
f_time = s_time -addtime
n_time =time.strftime("%Y-%m-%dT%H:%M:%SZ",time.localtime(f_time))
return n_time

if __name__ == '__main__':
get_url()

2. 得到的内网地址,放入urlfie文件

 

#!/bin/bash

wget -c -i /opt/binlog/rds_binlog/urlfile-P /opt/binlog/rds_tar
if [ $? != 0 ];then
echo "wget error"
fi
cd /opt/binlog/rds_sql
ls /opt/binlog/rds_tar/*.tar* |xargs -n1tar xvf

3. 利用python脚本对binlog日志进行转换

  脚本思路:

 1. 利用binlog工具进行转换成sql,但是sql有些需要去转换成我们可识别的形式

  2. 转换sql,查询mysql的表字段,利用正则表达式匹配insert,delete,update的语句,将所有@1这种类型的字段进行转换

#encoding:UTF-8
#!/usr/bin/python

import os,sys,re,getopt
import MySQLdb
import time,datetime

host =''
user = ''
password = ''
port = 3306
start_datetime = ''
stop_datetime = ''
start_position = ''
stop_position = ''
database = ''
mysqlbinlog_bin = 'mysqlbinlog -v'
binlog = ''
fileContent = ''
output='rollback.sql'
only_primary = 1
tmp_binlog_file ="tmp_binlog_file"

field_list = ''

#----------------------------------------------------------------------------------------
# 功能:获取参数,生成相应的binlog解析文件
#----------------------------------------------------------------------------------------
def getopts_parse_binlog():
globalhost
globaluser
globalpassword
globalport
globalfileContent
globaloutput
globalbinlog
globalstart_datetime
globalstop_datetime
globalstart_position
globalstop_position
globaldatabase
globalonly_primary
try:
options,args= getopt.getopt(sys.argv[1:],"f:o:h:u:p:P:d:",["help","binlog=","output=","host=","user=",\
"password=","port=","start-datetime=","stop-datetime=","start-position=","stop-position=","database=","only-primary="])
exceptgetopt.GetoptError:
print"参数输入有误!!!"
options= []
ifoptions == [] or options[0][0] in ("--help"): #如果输入错误或者--help则提示帮助信息
usage() #显示帮助信息
sys.exit()
print"正在获取参数......"
printoptions
forname, value in options:
ifname == "-f" or name == "--binlog":
binlog= value
elifname == "-o" or name == "--output":
output= value
elifname == "-h" or name == "--host":
host= value
elifname == "-u" or name == "--user":
user= value
elifname == "-p" or name == "--password":
password= value
elifname == "-P" or name == "--port":
port= value
elifname == "--start-datetime":
start_datetime= value
elifname == "--stop-datetime":
stop_datetime = value
elifname == "--start-position":
start_position= value
elifname == "--stop-position":
stop_position= value
elifname == "-d" or name == "--database":
database= value
elifname == "--only-primary":
only_primary= value
printstart_datetime

ifbinlog == '' :
print"错误:请指定binlog文件名!"
usage()
ifuser == '' :
print"错误:请指定用户名!"
usage()
ifpassword == '' :
print"错误:请指定密码!"
usage()
ifstart_datetime <> '':
co_start_datetime= "--start-datetime='%s'"%start_datetime

4000
else:
co_start_datetime= ''
ifstop_datetime <> '':
co_stop_datetime= "--stop-datetime='%s'"%stop_datetime
else:
co_stop_datetime= ''
ifstart_position <> '':
co_start_position= "--start-position='%s'"%start_position
else:
co_start_position= ''
ifstop_position <> '':
co_stop_position= "--stop-position='%s'"%stop_position
else:
co_stop_position= ''
ifdatabase <> '':
co_database= "--database='%s'"%database
else:
co_database= ''
print"正在解析binlog....."

starttime= datetime.datetime.now()
print("\nConverting binlog to text file...")
#os.popen('mysqlbinlog-v --base64-output=DECODE-ROWS %s %s %s > %s'%(start_datetime,stop_datetime,binlog,tmp_binlog_file))
fshell= "%s %s --base64-output=DECODE-ROWS%s %s %s %s %s\
>%s " \
%(mysqlbinlog_bin,binlog,co_start_datetime,co_stop_datetime,co_start_position,co_stop_position,co_database,tmp_binlog_file)
printfshell
os.popen(fshell)

#fileContent=os.popen("cat%s|grep '###' -B 2|sed -e 's/### //g' -e 's/^INSERT/##INSERT/g' -e's/^UPDATE/##UPDATE/g' -e 's/^DELETE/##DELETE/g' >tmp_binlog.sql"%(tmp_binlog_file))
fileContent=os.popen("cat%s|grep '###' -B 2|sed -e 's/### //g' -e 's/^INSERT/##INSERT/g' -e's/^UPDATE/##UPDATE/g' -e 's/^DELETE/##DELETE/g'"%(tmp_binlog_file)).read()

print("File converting complete.")
endtime= datetime.datetime.now()
timeinterval= endtime - starttime
print("Convertingelapsed :" + str(timeinterval.seconds) + '.' +str(timeinterval.microseconds) + " seconds")

print"解析完成......"

#------------------------------------------------
# 功能:初始化binlog里的所有表名和列表,用全局字典result_dict来存储每个表有哪些列
#------------------------------------

def init_col_name():
globalresult_dict
globalpri_dict
globalfileContent
#fileContent= file("aaa.txt","r+").read()
result_dict= {} #存储表和表的字段
pri_dict= {} #存储表对应的主键字段
table_list= re.findall('`.*`\\.`.*`',fileContent) #正则去找出所有的数据库和表
table_list= list(set(table_list)) #将得到的列表数据去重,重新生成列表
print"正在初始化列名....."

for table in table_list:
sname= table.split('.')[0].replace('`','') #将得到的表名去引号
tname= table.split('.')[1].replace('`','')

#连接数据库获取列和列id
try:
conn = MySQLdb.connect(host=host,user=user,passwd=password,port=int(port))
cursor = conn.cursor()
cursor.execute("selectordinal_position,column_name \
frominformation_schema.columns \
wheretable_schema='%s' and table_name='%s' " %(sname,tname))
result=cursor.fetchall()
ifresult == ():
print'Warning:'+sname+'.'+tname+'已删除'
#sys.exit()
result_dict[sname+'.'+tname]=result
cursor.execute("selectordinal_position,column_name \
frominformation_schema.columns \
wheretable_schema='%s' and table_name='%s' and column_key='PRI' "%(sname,tname))
pri= cursor.fetchall()
printpri
pri_dict[sname+'.'+tname]= pri
cursor.close()
conn.close()
excep tMySQLdb.Error, e:
try:
print"Error %d:%s" % (e.args[0], e.args[1])
exceptIndexError:
print "MySQLError:%s" % str(e)
sys.exit()

#-------------------------------------------------------------------------------------------------------
# 功能:拼凑回滚sql
#---------------------------------------------------------------------------------------------------------

def gen_rollback_sql():
starttime= datetime.datetime.now()
global only_primary
globalfield_list
fileOutput = open(output,'w')
#先将文件根据'--'分块,每块代表一个sql
area_list = fileContent.split('--\n')
#print area_list
#读取分块
print "正在开始拼凑sql......"
for area in area_list:
#由于一条sql可能影响多行,每个sql又可以逐条执行的sql
sql_list= area.split('##')
#先将pos点和timestamp传入输出文件中
for sql_head in sql_list[0].splitlines():
sql_head= '#'+sql_head+'\n'
fileOutput.write(sql_head)
#逐条sql进行替换更新
rollback_sql= sql_list[1]
try:
#INSERT模型为INSERTINTO TABLE(filed1,filed2,filed3....) VALUES(A,B,C....)
if rollback_sql.split()[0] == 'INSERT':
try:
rollback_sql= re.sub('SET\n','VALUES(\n',rollback_sql,1)
tablename_pos= 2
table_name= rollback_sql.split()[tablename_pos].replace('`','') #将得到表名
#找出表的对应字段
filed_list= ''
forfiled in result_dict[table_name][:-1]:

if 'Date'in filed[1]: #替换时间戳为标准时间
tmp= '@%d=(\d+)'%(filed[0])
init_time= re.search(tmp,rollback_sql)
if init_time: #如果有值
init_time= float(init_time.group().split('=')[1])
final_time= time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(init_time))
tmp_1= "@%d=%d\n"%(filed[0],init_time)
tmp_2= "@%d='%s'\n"%(filed[0],final_time)
rollback_sql= re.sub(tmp_1,tmp_2,rollback_sql)

filed_list= filed_list+filed[1]+','
filed_list= '('+filed_list+result_dict[table_name][-1][1]+')'
#获取该sql中所有的列
col_list= sorted(list(set(re.findall('@\d+=',rollback_sql))))
#因为第一个列前面没有逗号或者and,所以单独替换
rollback_sql= rollback_sql.replace('@1=','')
forcol in col_list:
ifcol[:-1] <> result_dict[table_name][0][1]: #判断不为第一个字段时,则进行替换
i= int(col[1:-1]) -1
rollback_sql= rollback_sql.replace(col, ',',1)
rollback_sql= re.sub('\n$',');\n',rollback_sql)
except IndexError,e:
print"INSERT----Error-------------------------------------------"
print"Error:%s"% str(e)

#UPDATE的模型为UPDATETABLE SET FILED1=A,FIED2=B... WHERE FILED1=''..;
elifrollback_sql.split()[0] == 'UPDATE':
try:
#原始rollback_sql
init_sql= rollback_sql
#得到主键的值
pri_id= re.search('@1=(.*?)\n',rollback_sql)
pri_id= pri_id.group().split('=')[1].strip()
#将rollback_sql进行切割,删除WHERE后的语句
update_sql= rollback_sql.split('\nWHERE\n',1)[0]+'SET'
set_sql= rollback_sql.split('\nSET\n',1)[1]
#拼接sql为 updatetable set .....
rollback_sql= update_sql+set_sql
tablename_pos= 1
table_name= rollback_sql.split()[tablename_pos].replace('`','') #将得到表名
#找出表的对应字段
filed_list= ''
forfiled in result_dict[table_name][:-1]:

if'Date' in filed[1]: #替换时间戳为标准时间
tmp= '@%d=(\d+)'%(filed[0])
init_time= re.search(tmp,rollback_sql)
ifinit_time: #如果有值
init_time= float(init_time.group().split('=')[1])
final_time= time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(init_time))
tmp_1= "@%d=%d\n"%(filed[0],init_time)
tmp_2= "@%d='%s'\n"%(filed[0],final_time)
rollback_sql= re.sub(tmp_1,tmp_2,rollback_sql)
#exit()

filed_list= filed_list+filed[1]+','
filed_list= '('+filed_list+result_dict[table_name][-1][1]+')'
#获取该sql中所有的列
#col_list= sorted(list(set(re.findall('@\d+=',rollback_sql))))
col_list= re.findall('@\d+=',rollback_sql)
#因为第一个列前面没有逗号或者and,所以单独替换,同时防止值中有@相关数据,需要加=
rollback_sql= rollback_sql.replace('@1=',result_dict[table_name][0][1]+'=',1)
forcol in col_list:
ifcol[:-1] <> result_dict[table_name][0][1]: #判断不为第一个字段时,则进行替换
i= int(col[1:-1]) -1
rollback_sql= rollback_sql.replace(col,','+result_dict[table_name][i][1]+'=',1)
#rollback_sql= rollback_sql.replace(col+'=', ',',1)
#如果only_primary开启且存在主键,where条件就只列出主键字段
ifint(only_primary) == 1 and pri_dict[table_name] <> ():
sub_where= ''
forprimary in pri_dict[table_name]:
primary_name= primary[1]
sub_where= "WHERE %s=%s"%(primary_name,pri_id)
else:
#没有主键时,条件判断,切割rollback_sql
where_sql= init_sql.split('SET\n',1)[0].split('WHERE\n',1)[1].strip()
col_list= re.findall('@\d+=',where_sql)
sub_where= ''
sub_where= where_sql.replace('@1=',result_dict[table_name][0][1]+'=',1)
forcol in col_list:
ifcol <> result_dict[table_name][0][1]: #判断不为第一个字段时,则进行替换
i= int(col[1:-1]) -1
sub_where= sub_where.replace(col,','+result_dict[table_name][i][1]+'=',1)
sub_where= "WHERE\n"+sub_where
rollback_sql= rollback_sql+sub_where+';\n'
except IndexError,e:
print"UPDATE----Error-------------------------------------------"
printtable_name
printcol_list
printrollback_sql.split()
print"Error:%s"% str(e)
sys.exit()

#如果为DELETE,则模板为DELETEFROM TABLE WHERE.....;
elifrollback_sql.split()[0] == 'DELETE':
try:
#原始rollback_sql
init_sql= rollback_sql
#得到主键的值
pri_id= re.search('@1=(.*?)\n',rollback_sql)
pri_id= pri_id.group().split('=')[1].strip()
#将rollback_sql进行切割,删除WHERE后的语句
delete_sql= rollback_sql.split('WHERE\n',1)[0]
#set_sql= rollback_sql.split('SET',1)[1]
#rollback_sql= update_sql+set_sql
tablename_pos= 2
table_name= rollback_sql.split()[tablename_pos].replace('`','') #将得到表名
#找出表的对应字段
'''
filed_list= ''
forfiled in result_dict[table_name][:-1]:
filed_list= filed_list+filed[1]+','
filed_list= '('+filed_list+result_dict[table_name][-1][1]+')'
'''
#获取该sql中所有的列
col_list= sorted(list(set(re.findall('@\d+=',rollback_sql))))
#因为第一个列前面没有逗号或者and,所以单独替换
rollback_sql= rollback_sql.replace('@1=',result_dict[table_name][0][1]+'=',1)
forcol in col_list:
ifcol[:-1] <> result_dict[table_name][0][1]: #判断不为第一个字段时,则进行替换

abf7
i= int(col[1:-1]) -1
rollback_sql= rollback_sql.replace(col,','+result_dict[table_name][i][1]+'=',1)
#rollback_sql= rollback_sql.replace(col+'=', ',',1)
#如果only_primary开启且存在主键,where条件就只列出主键字段
ifint(only_primary) == 1 and pri_dict[table_name] <> ():
sub_where= ''
forprimary in pri_dict[table_name]:
primary_name= primary[1]
sub_where= "WHERE %s=%s"%(primary_name,pri_id)
rollback_sql= delete_sql+sub_where+"\n"
rollback_sql= re.sub('\n$',';\n',rollback_sql)
except IndexError,e:
print"DELETE----Error-------------------------------------------"
print"Error:%s"% str(e)

#将sql语句写入文本
fileOutput.write(rollback_sql)

exceptIndexError,e:
print"Error-------------------------------------------"
print"Error:%s"% str(e)
sys.exit()
print"done!"
endtime= datetime.datetime.now()
timeinterval= endtime - starttime
print("Convertingelapsed :" + str(timeinterval.seconds) + '.' +str(timeinterval.microseconds) + " seconds")

def conv_time(init_time):
init_time= 1475903984
init_time= float(init_time)
final_time= time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(init_time))
printfinal_time
returnfinal_time

def usage():
help_info="""==========================================================================================
Command line options :
--help # OUT :print help info
-f, --binlog # IN : binlog file. (required)
-o, --outfile # OUT :output rollback sql file. (default 'rollback.sql')
-h, --host # IN : host. (default '127.0.0.1')
-u, --user # IN : user. (required)
-p,--password # IN : password. (required)
-P, --port # IN : port. (default 3306)
--start-datetime # IN : start datetime. (default '1970-01-0100:00:00')
--stop-datetime # IN : stop datetime. default '2070-01-0100:00:00'
--start-position # IN : start position. (default '4')
--stop-position # IN : stop position. (default'18446744073709551615')
-d, --database # IN : List entries for just this database (Nodefault value).
--only-primary # IN : Only list primary key in where condition(default 0)

Sample :
shell> python binlog_rollback.py -f 'mysql-bin.000001' -o'/tmp/rollback.sql' -h 192.168.0.1 -u 'user' -p 'pwd' -P 3307 -d dbname
=========================================================================================="""

print help_info
sys.exit()

if __name__ == '__main__':

result= getopts_parse_binlog()
init_col_name()
gen_rollback_sql()

有了上面的脚本,那问题就简单了。一个一个使用就得了。
步骤是这样的:

1.使用第一个脚本得到binlog日志下载地址

2.使用第二个脚本进行下载解压

3.开始使用转换脚本进行转化sql

4.最后进行sql恢复

再写个脚本搞定这个事情:

我命名为go.sh

#!/bin/bash
# author: liya
# recover rds

user=''
passwd=''
host=''
database=''

function download(){
wget -c -i /opt/binlog/rds_sql/urlfile -P /opt/binlog/rds_tar
if [ $? != 0 ];then
echo "wget error"
fi
cd /opt/binlog/rds_bin
ls /opt/binlog/rds_tar/*.tar* |xargs -n1 tar xvf
}

function conver(){
sql_path=/opt/binlog/rds_sql/sqls
cd /opt/binlog/rds_bin
ls /opt/binlog/rds_bin/mysql-bin* | while read line
do
num=`echo $line | cut -d . -f 2`
python /opt/binlog/rds_sql/binlog_conv.py -f "$line" -o "$sql_path/rollback_$num.sql" -d esign
done

}

function get_data(){
ls /opt/binlog/rds_sql/sqls/rollback* | while read sql
do
echo $sql
mysql -u$user -p$passwd -h$host -D$database < $sql
if [ $? != 0 ];then
echo "eroor"
echo $sql
exit
fi
mv $sql bak_$sql
done



好了,大功告成!
最后再总结下:

假设出事故了,前提是

1.你要有个延迟从库(数据比主库少一定时间的数据)

2.binlog日志下载权限的账号(如果不是云RDS请忽略)

3.一个日志恢复主句,上面已经下载好了aliyun-python-sdk

那你要做的事情就是:

1. 运行第一个脚本 

python binlog_conv.py -f  'mysql-bin' -o 'rooback_3716.sql' -d esign --start-datetime '2017-02-17 10:00:00'

得到内网地址urlfile文件

2. 运行最后一个脚本go.sh

   1. 通过urlfile得到数据

   2. 转换数据

   3. 恢复数据

    
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  mysql binlog python 灾备