基于binlog来分析mysql的行记录修改情况(python脚本分析)
2017-05-26 15:24
549 查看
最近写完mysql flashback,突然发现还有有这种使用场景:有些情况下,可能会统计在某个时间段内,MySQL修改了多少数据量?发生了多少事务?主要是哪些表格发生变动?变动的数量是怎么样的? 但是却不需要行记录的修改内容,只需要了解 行数据的 变动情况。故也整理了下。
昨晚写的脚本,因为个人python能力有限,本来想这不发这文,后来想想,没准会有哪位园友给出优化建议。
如果转载,请注明博文来源: www.cnblogs.com/xinysu/ ,版权归 博客园 苏家小萝卜 所有。望各位支持!
这些情况部分可以通过监控来大致了解,但是也可以基于binlog来全盘分析,binlog的格式是row模式。
在写flashback的时候,顺带把这个也写了个脚步,使用python编写,都差不多原理,只是这个简单些,介于个人python弱的不行,性能可能还有很大的提升空间,也希望园友能协助优化下。
先贴python脚步的分析结果图如下,分为4个部分:事务耗时情况、事务影响行数情况、DML行数情况以及操作最频繁表格情况。
创建类queryanalyse,其中有5个函数定义:_get_db、create_tab、rowrecord、binlogdesc跟closeconn。
ALL options need to assign:
-h : host, the database host,which database will store the results after analysis
-u : user, the db user
-p : password, the db user's password
-P : port, the db port
-f : file path, the binlog file
-tr : table name for record , the table name to store the row record
-tt : table name for transaction, the table name to store transactions
比如,执行脚本:python queryanalyse.py -h=127.0.0.1 -P=3310 -u=root -p=password -f=/tmp/stock_binlog.log -tt=flashback.tbtran -tr=flashback.tbrow,该函数负责处理各个选项的参数值情况,并存储。
事务表记录内容:事务的开始时间及事务的结束时间。
行记录表的内容:库名,表名,DML类型以及事务对应事务表的编号。
每个事务的结束点,是以 'Xid = ' 来查找
事务的开始时间,是事务内的第一个 'Table_map' 行里边的时间
事务的结束时间,是以 'Xid = '所在行的 里边的时间
每个行数据是属于哪个表格,是以 'Table_map'来查找
DML的类型是按照 行记录开头的情况是否为:'### INSERT INTO' 、'### UPDATE' 、'### DELETE FROM'
注意,单个事务可以包含多个表格多种DML多行数据修改的情况。
分析修改行数据的 事务耗时情况
分析修改行数据的 事务影响行数情况
分析DML分布情况
分析 最多DML操作的表格 ,取前十个分析
然后,把要分析的binlog文件先用 mysqlbinlog 指令分析存储,具体binlog的文件说明,可以查看之前的博文:关于binary log那些事——认真码了好长一篇。mysqlbinlog的指令使用方法,可以详细查看文档:https://dev.mysql.com/doc/refman/5.7/en/mysqlbinlog.html 。
比较常用通过指定开始时间跟结束时间来分析 binlog文件。
mysqlbinlog --start-datetime='2017-04-23 00:00:03' --stop-datetime='2017-04-23 00:30:00' --base64-output=decode-rows -v /data/mysql/logs/mysql-bin.007335 > /tmp/binlog_test.log
分析后,可以把这个 binlog_test.log文件拷贝到其他空闲服务器执行分析,只需要有个空闲的DB来存储分析记录即可。
假设这个时候,拷贝 binlog_test.log到测试服务器上,测试服务器上的数据库可以用来存储分析内容,则可以执行python脚本了,注意要进入到python脚本的目录中,或者指定python脚本路径。
python queryanalyse.py -h=127.0.0.1 -P=3310 -u=root -p=password -f= /tmp/binlog_test.log -tt=flashback.tbtran -tr=flashback.tbrow
没了,就等待输出吧。
性能是硬伤,在虚拟机上测试,大概500M的binlog文件需要分析2-3min,有待提高!
昨晚写的脚本,因为个人python能力有限,本来想这不发这文,后来想想,没准会有哪位园友给出优化建议。
如果转载,请注明博文来源: www.cnblogs.com/xinysu/ ,版权归 博客园 苏家小萝卜 所有。望各位支持!
1 实现内容
有些情况下,可能会统计在某个时间段内,MySQL修改了多少数据量?发生了多少事务?主要是哪些表格发生变动?变动的数量是怎么样的? 但是却不需要行记录的修改内容,只需要了解 行数据的 变动情况。这些情况部分可以通过监控来大致了解,但是也可以基于binlog来全盘分析,binlog的格式是row模式。
在写flashback的时候,顺带把这个也写了个脚步,使用python编写,都差不多原理,只是这个简单些,介于个人python弱的不行,性能可能还有很大的提升空间,也希望园友能协助优化下。
先贴python脚步的分析结果图如下,分为4个部分:事务耗时情况、事务影响行数情况、DML行数情况以及操作最频繁表格情况。
2 脚本简单描述
脚本依赖的模块中,pymysql需要自行安装。创建类queryanalyse,其中有5个函数定义:_get_db、create_tab、rowrecord、binlogdesc跟closeconn。
2.1 _get_db
该函数用来解析输入参数值,参数值一共有7个,都是必须填写的。分别为host,user,password,port,table name for transaction,table name for records,对应的简写如下:ALL options need to assign:
-h : host, the database host,which database will store the results after analysis
-u : user, the db user
-p : password, the db user's password
-P : port, the db port
-f : file path, the binlog file
-tr : table name for record , the table name to store the row record
-tt : table name for transaction, the table name to store transactions
比如,执行脚本:python queryanalyse.py -h=127.0.0.1 -P=3310 -u=root -p=password -f=/tmp/stock_binlog.log -tt=flashback.tbtran -tr=flashback.tbrow,该函数负责处理各个选项的参数值情况,并存储。
2.2 create_tab
创建两个表格,分别用来存储 binlog file文件的分析结果。一个用来存储事务的执行开始时间跟结束时间,由选项 -tt来赋值表名;一个是用来存储每一行记录的修改情况,由选项 -tr来赋值表名。事务表记录内容:事务的开始时间及事务的结束时间。
行记录表的内容:库名,表名,DML类型以及事务对应事务表的编号。
root@localhost:mysql3310.sock 14:42:29 [flashback]>show create table tbrow \G *************************** 1. row *************************** Table: tbrow Create Table: CREATE TABLE `tbrow` ( `auto_id` int(10) unsigned NOT NULL AUTO_INCREMENT, `sqltype` int(11) NOT NULL COMMENT '1 is insert,2 is update,3 is delete', `tran_num` int(11) NOT NULL COMMENT 'the transaction number', `dbname` varchar(50) NOT NULL, `tbname` varchar(50) NOT NULL, PRIMARY KEY (`auto_id`), KEY `sqltype` (`sqltype`), KEY `dbname` (`dbname`), KEY `tbname` (`tbname`) ) ENGINE=InnoDB AUTO_INCREMENT=295151 DEFAULT CHARSET=utf8 1 row in set (0.00 sec) root@localhost:mysql3310.sock 14:42:31 [flashback]>SHOW CREATE TABLE TBTRAN \G *************************** 1. row *************************** Table: TBTRAN Create Table: CREATE TABLE `tbtran` ( `auto_id` int(10) unsigned NOT NULL AUTO_INCREMENT, `begin_time` datetime NOT NULL, `end_time` datetime NOT NULL, PRIMARY KEY (`auto_id`) ) ENGINE=InnoDB AUTO_INCREMENT=6390 DEFAULT CHARSET=utf8 1 row in set (0.00 sec)
2.3 rowrecord
重点函数,分析binlog文件内容。这里有几个规律:每个事务的结束点,是以 'Xid = ' 来查找
事务的开始时间,是事务内的第一个 'Table_map' 行里边的时间
事务的结束时间,是以 'Xid = '所在行的 里边的时间
每个行数据是属于哪个表格,是以 'Table_map'来查找
DML的类型是按照 行记录开头的情况是否为:'### INSERT INTO' 、'### UPDATE' 、'### DELETE FROM'
注意,单个事务可以包含多个表格多种DML多行数据修改的情况。
2.4 binlogdesc
描述分析结果,简单4个SQL分析。分析修改行数据的 事务耗时情况
分析修改行数据的 事务影响行数情况
分析DML分布情况
分析 最多DML操作的表格 ,取前十个分析
2.5 closeconn
关闭数据库连接。3 使用说明
首先,确保python安装了pymysql模块,把python脚本拷贝到文件 queryanalyse.py。然后,把要分析的binlog文件先用 mysqlbinlog 指令分析存储,具体binlog的文件说明,可以查看之前的博文:关于binary log那些事——认真码了好长一篇。mysqlbinlog的指令使用方法,可以详细查看文档:https://dev.mysql.com/doc/refman/5.7/en/mysqlbinlog.html 。
比较常用通过指定开始时间跟结束时间来分析 binlog文件。
mysqlbinlog --start-datetime='2017-04-23 00:00:03' --stop-datetime='2017-04-23 00:30:00' --base64-output=decode-rows -v /data/mysql/logs/mysql-bin.007335 > /tmp/binlog_test.log
分析后,可以把这个 binlog_test.log文件拷贝到其他空闲服务器执行分析,只需要有个空闲的DB来存储分析记录即可。
假设这个时候,拷贝 binlog_test.log到测试服务器上,测试服务器上的数据库可以用来存储分析内容,则可以执行python脚本了,注意要进入到python脚本的目录中,或者指定python脚本路径。
python queryanalyse.py -h=127.0.0.1 -P=3310 -u=root -p=password -f= /tmp/binlog_test.log -tt=flashback.tbtran -tr=flashback.tbrow
没了,就等待输出吧。
性能是硬伤,在虚拟机上测试,大概500M的binlog文件需要分析2-3min,有待提高!
4 python脚本
import pymysql from pymysql.cursors import DictCursor import re import os import sys import datetime import time import logging import importlib importlib.reload(logging) logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(levelname)s %(message)s ') usage=''' usage: python [script's path] [option] ALL options need to assign: -h : host, the database host,which database will store the results after analysis -u : user, the db user -p : password, the db user's password -P : port, the db port -f : file path, the binlog file -tr : table name for record , the table name to store the row record -tt : table name for transaction, the table name to store transactions Example: python queryanalyse.py -h=127.0.0.1 -P=3310 -u=root -p=password -f=/tmp/stock_binlog.log -tt=flashback.tbtran -tr=flashback.tbrow ''' class queryanalyse: def __init__(self): #初始化 self.host='' self.user='' self.password='' self.port='3306' self.fpath='' self.tbrow='' self.tbtran='' self._get_db() logging.info('assign values to parameters is done:host={},user={},password=***,port={},fpath={},tb_for_record={},tb_for_tran={}'.format(self.host,self.user,self.port,self.fpath,self.tbrow,self.tbtran)) self.mysqlconn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port,charset='utf8') self.cur = self.mysqlconn.cursor(cursor=DictCursor) logging.info('MySQL which userd to store binlog event connection is ok') self.begin_time='' self.end_time='' self.db_name='' self.tb_name='' def _get_db(self): #解析用户输入的选项参数值,这里对password的处理是明文输入,可以自行处理成是input格式, #由于可以拷贝binlog文件到非线上环境分析,所以password这块,没有特殊处理 logging.info('begin to assign values to parameters') if len(sys.argv) == 1: print(usage) sys.exit(1) elif sys.argv[1] == '--help': print(usage) sys.exit() elif len(sys.argv) > 2: for i in sys.argv[1:]: _argv = i.split('=') if _argv[0] == '-h': self.host = _argv[1] elif _argv[0] == '-u': self.user = _argv[1] elif _argv[0] == '-P': self.port = int(_argv[1]) elif _argv[0] == '-f': self.fpath = _argv[1] elif _argv[0] == '-tr': self.tbrow = _argv[1] elif _argv[0] == '-tt': self.tbtran = _argv[1] elif _argv[0] == '-p': self.password = _argv[1] else: print(usage) def create_tab(self): #创建两个表格:一个用户存储事务情况,一个用户存储每一行数据修改的情况 #注意,一个事务可以存储多行数据修改的情况 logging.info('creating table ...') create_tb_sql ='''CREATE TABLE IF NOT EXISTS {} ( `auto_id` int(10) unsigned NOT NULL AUTO_INCREMENT, `begin_time` datetime NOT NULL, `end_time` datetime NOT NULL, PRIMARY KEY (`auto_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; CREATE TABLE IF NOT EXISTS {} ( `auto_id` int(10) unsigned NOT NULL AUTO_INCREMENT, `sqltype` int(11) NOT NULL COMMENT '1 is insert,2 is update,3 is delete', `tran_num` int(11) NOT NULL COMMENT 'the transaction number', `dbname` varchar(50) NOT NULL, `tbname` varchar(50) NOT NULL, PRIMARY KEY (`auto_id`), KEY `sqltype` (`sqltype`), KEY `dbname` (`dbname`), KEY `tbname` (`tbname`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; truncate table {}; truncate table {}; '''.format(self.tbtran,self.tbrow,self.tbtran,self.tbrow) self.cur.execute(create_tb_sql) logging.info('created table {} and {}'.format(self.tbrow,self.tbtran)) def rowrecord(self): #处理每一行binlog #事务的结束采用 'Xid =' 来划分 #分析结果,按照一个事务为单位存储提交一次到db try: tran_num=1 #事务数 record_sql='' #行记录的insert sql tran_sql='' #事务的insert sql self.create_tab() with open(self.fpath,'r') as binlog_file: logging.info('begining to analyze the binlog file ,this may be take a long time !!!') logging.info('analyzing...') for bline in binlog_file: if bline.find('Table_map:') != -1: l = bline.index('server') n = bline.index('Table_map') begin_time = bline[:l:].rstrip(' ').replace('#', '20') if record_sql=='': self.begin_time = begin_time[0:4] + '-' + begin_time[4:6] + '-' + begin_time[6:] self.db_name = bline[n::].split(' ')[1].replace('`', '').split('.')[0] self.tb_name = bline[n::].split(' ')[1].replace('`', '').split('.')[1] bline='' elif bline.startswith('### INSERT INTO'): record_sql=record_sql+"insert into {}(sqltype,tran_num,dbname,tbname) VALUES (1,{},'{}','{}');".format(self.tbrow,tran_num,self.db_name,self.tb_name) elif bline.startswith('### UPDATE'): record_sql=record_sql+"insert into {}(sqltype,tran_num,dbname,tbname) VALUES (2,{},'{}','{}');".format(self.tbrow,tran_num,self.db_name,self.tb_name) elif bline.startswith('### DELETE FROM'): record_sql=record_sql+"insert into {}(sqltype,tran_num,dbname,tbname) VALUES (3,{},'{}','{}');".format(self.tbrow,tran_num,self.db_name,self.tb_name) elif bline.find('Xid =') != -1: l = bline.index('server') end_time = bline[:l:].rstrip(' ').replace('#', '20') self.end_time = end_time[0:4] + '-' + end_time[4:6] + '-' + end_time[6:] tran_sql=record_sql+"insert into {}(begin_time,end_time) VALUES ('{}','{}')".format(self.tbtran,self.begin_time,self.end_time) self.cur.execute(tran_sql) self.mysqlconn.commit() record_sql = '' tran_num += 1 except Exception: return 'funtion rowrecord error' def binlogdesc(self): sql='' t_num=0 r_num=0 logging.info('Analysed result printing...\n') #分析总的事务数跟行修改数量 sql="select 'tbtran' name,count(*) nums from {} union all select 'tbrow' name,count(*) nums from {};".format(self.tbtran,self.tbrow) self.cur.execute(sql) rows=self.cur.fetchall() for row in rows: if row['name']=='tbtran': t_num = row['nums'] else: r_num = row['nums'] print('This binlog file has {} transactions, {} rows are changed '.format(t_num,r_num)) # 计算 最耗时 的单个事务 # 分析每个事务的耗时情况,分为5个时间段来描述 # 这里正常应该是 以毫秒来分析的,但是binlog中,只精确时间到second sql='''select count(case when cost_sec between 0 and 1 then 1 end ) cos_1, count(case when cost_sec between 1.1 and 5 then 1 end ) cos_5, count(case when cost_sec between 5.1 and 10 then 1 end ) cos_10, count(case when cost_sec between 10.1 and 30 then 1 end ) cos_30, count(case when cost_sec >30.1 then 1 end ) cos_more, max(cost_sec) cos_max from ( select auto_id,timestampdiff(second,begin_time,end_time) cost_sec from {} ) a;'''.format(self.tbtran) self.cur.execute(sql) rows=self.cur.fetchall() for row in rows: print('The most cost time : {} '.format(row['cos_max'])) print('The distribution map of each transaction costed time: ') print('Cost time between 0 and 1 second : {} , {}%'.format(row['cos_1'],int(row['cos_1']*100/t_num))) print('Cost time between 1.1 and 5 second : {} , {}%'.format(row['cos_5'], int(row['cos_5'] * 100 / t_num))) print('Cost time between 5.1 and 10 second : {} , {}%'.format(row['cos_10'], int(row['cos_10'] * 100 / t_num))) print('Cost time between 10.1 and 30 second : {} , {}%'.format(row['cos_30'], int(row['cos_30'] * 100 / t_num))) print('Cost time > 30.1 : {} , {}%\n'.format(row['cos_more'], int(row['cos_more'] * 100 / t_num))) # 计算 单个事务影响行数最多 的行数量 # 分析每个事务 影响行数 情况,分为5个梯度来描述 sql='''select count(case when nums between 0 and 10 then 1 end ) row_1, count(case when nums between 11 and 100 then 1 end ) row_2, count(case when nums between 101 and 1000 then 1 end ) row_3, count(case when nums between 1001 and 10000 then 1 end ) row_4, count(case when nums >10001 then 1 end ) row_5, max(nums) row_max from ( select count(*) nums from {} group by tran_num ) a;'''.format(self.tbrow) self.cur.execute(sql) rows=self.cur.fetchall() for row in rows: print('The most changed rows for each row: {} '.format(row['row_max'])) print('The distribution map of each transaction changed rows : ') print('Changed rows between 1 and 10 second : {} , {}%'.format(row['row_1'],int(row['row_1']*100/t_num))) print('Changed rows between 11 and 100 second : {} , {}%'.format(row['row_2'], int(row['row_2'] * 100 / t_num))) print('Changed rows between 101 and 1000 second : {} , {}%'.format(row['row_3'], int(row['row_3'] * 100 / t_num))) print('Changed rows between 1001 and 10000 second : {} , {}%'.format(row['row_4'], int(row['row_4'] * 100 / t_num))) print('Changed rows > 10001 : {} , {}%\n'.format(row['row_5'], int(row['row_5'] * 100 / t_num))) # 分析 各个行数 DML的类型情况 # 描述 delete,insert,update的分布情况 sql='select sqltype ,count(*) nums from {} group by sqltype ;'.format(self.tbrow) self.cur.execute(sql) rows=self.cur.fetchall() print('The distribution map of the {} changed rows : '.format(r_num)) for row in rows: if row['sqltype']==1: print('INSERT rows :{} , {}% '.format(row['nums'],int(row['nums']*100/r_num))) if row['sqltype']==2: print('UPDATE rows :{} , {}% '.format(row['nums'],int(row['nums']*100/r_num))) if row['sqltype']==3: print('DELETE rows :{} , {}%\n '.format(row['nums'],int(row['nums']*100/r_num))) # 描述 影响行数 最多的表格 # 可以分析是哪些表格频繁操作,这里显示前10个table name sql = '''select dbname,tbname , count(*) ALL_rows, count(*)*100/{} per, count(case when sqltype=1 then 1 end) INSERT_rows, count(case when sqltype=2 then 1 end) UPDATE_rows, count(case when sqltype=3 then 1 end) DELETE_rows from {} group by dbname,tbname order by ALL_rows desc limit 10;'''.format(r_num,self.tbrow) self.cur.execute(sql) rows = self.cur.fetchall() print('The distribution map of the {} changed rows : '.format(r_num)) print('tablename'.ljust(50), '|','changed_rows'.center(15), '|','percent'.center(10), '|','insert_rows'.center(18), '|','update_rows'.center(18), '|','delete_rows'.center(18) ) print('-------------------------------------------------------------------------------------------------------------------------------------------------') for row in rows: print((row['dbname']+'.'+row['tbname']).ljust(50), '|',str(row['ALL_rows']).rjust(15), '|',(str(int(row['per']))+'%').rjust(10), '|',str(row['INSERT_rows']).rjust(10)+' , '+(str(int(row['INSERT_rows']*100/row['ALL_rows']))+'%').ljust(5), '|',str(row['UPDATE_rows']).rjust(10)+' , '+(str(int(row['UPDATE_rows']*100/row['ALL_rows']))+'%').ljust(5), '|',str(row['DELETE_rows']).rjust(10)+' , '+(str(int(row['DELETE_rows']*100/row['ALL_rows']))+'%').ljust(5), ) print('\n') logging.info('Finished to analyse the binlog file !!!') def closeconn(self): self.cur.close() logging.info('release db connections\n') def main(): p = queryanalyse() p.rowrecord() p.binlogdesc() p.closeconn() if __name__ == "__main__": main()
相关文章推荐
- 基于binlog来分析mysql的行记录修改情况
- 【MySQL】binlog_format以及binlog事务记录分析
- python脚本在不中断运行的情况下,修改循环间隔
- 基于Python SimpleHTTPServer.py的修改脚本:HTTP文件服务器,修正中文目录列表,支持视频文件在线播放
- 记录特殊情况的Python脚本的内存异常与处理
- Python脚本分析CPU使用情况
- L2TP服务器,mysql5.6.33修改数据文件路径,PYTHON获取DNS记录
- C++嵌套调用 用Python 脚本写的 基于Gurobi 的解数学模型的经验记录
- python脚本修改mysql密码
- Python脚本分析CPU使用情况
- Python脚本分析CPU使用情况
- Python脚本之django---mysql-记录主机性能数据到数据库-web站点管理数据库及web显示命令执行结果
- 基于 Python 的数据结构与算法分析学习记录(6-6)—— 分析树
- python分析mysql-binlog,统计时间段内各表的操作次数_2016041302
- MySQL基于binlog文件的异地备份策略脚本--shell
- Python脚本分析CPU使用情况
- binlog文件分析与mysqlbinlog工具的修改
- MySQL性能测试分析 mysql表最大记录数
- 利用vbs脚本快速修改hosts记录,提供Google Picasa相册无法显示图片问题的傻瓜级方案(2009.11.11更新) 推荐
- 一个获取指定目录下一定格式的文件名称和文件修改时间并保存为文件的python脚本