您的位置:首页 > 编程语言 > Java开发

mysql binlog系列(二)----java解析binlog

2015-12-22 23:34 399 查看
在进入正题之前,我们需要知道binlog的event的类型,先来看看自己binlog文件有哪些?



其中红色部分为event_type。

binlog event 的类型有很多,具体可以参见mysql官方文档:http://dev.mysql.com/doc/internals/en/event-meanings.html

(一)Open Replicator中相关的Event类与接口

Open Replicator是一个用Java编写的MySQL binlog分析程序。Open Replicator 首先连接到MySQL(就像一个普通的MySQL Slave一样),然后接收和分析binlog,最终将分析得出的binlog
events以回调的方式通知应用。

所有的Event实现了BinlogEventV4接口。



BinlogEventV4的接口如下:

/**
* +=====================================+
* | event | timestamp 0 : 4 |
* | header +----------------------------+
* | | type_code 4 : 1 |
* | +----------------------------+
* | | server_id 5 : 4 |
* | +----------------------------+
* | | event_length 9 : 4 |
* | +----------------------------+
* | | next_position 13 : 4 |
* | +----------------------------+
* | | flags 17 : 2 |
* +=====================================+
* | event | fixed part 19 : y |
* | data +----------------------------+
* | | variable part |
* +=====================================+
* @author Jingqi Xu
*/
public interface BinlogEventV4 {

BinlogEventV4Header getHeader();
}

(二)利用Open
Replicator解析binlog

在这里首先申明本人的测试环境为:mysql 5.1.61 ,binlog的类型设置为Row,本次解析只考虑insert、update、delete三种事件类型。我们先将三种类型的时间包装为一个新的Event,如下所示:

public class LogEvent implements Serializable{

/**
* 只针对delete、insert、update事件
*/
private static final long serialVersionUID = 5503152746318421290L;

private String eventId = null;
private String databaseName = null;
private String tableName = null;
private String eventType = null;
private Long timestamp = null;
private Long timestampRecepite = null;
private String binlogName = null;
private Long position = null;
private Long nextPosition = null;
private Long serverId = null;
private Map<String, String> before =null;
private Map<String, String> after = null;

public LogEvent(){

}

public LogEvent(final QueryEvent qe,String databaseName,String tableName){
this.init(qe);
this.databaseName=databaseName;
this.tableName=tableName;
}

public LogEvent(final AbstractRowEvent re){
this.init(re);
TableMapEvent tableMapEvent =re.getTme();
this.databaseName=tableMapEvent.getDatabaseName().toString();
this.tableName=tableMapEvent.getTableName().toString();
}

private void init(final BinlogEventV4 be){
this.eventId=UUID.randomUUID().toString();
BinlogEventV4Header header = be.getHeader();
this.binlogName = header.getBinlogName();
this.position = header.getPosition();
this.nextPosition = header.getNextPosition();
this.timestamp = header.getTimestamp();
this.timestampRecepite = header.getTimestampOfReceipt();
this.serverId=header.getServerId();
this.eventType=MySqlEventTypeIdToString.getInstance().get(header.getEventType());
}

@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("{ eventId:").append(eventId);
builder.append(",databaseName:").append(databaseName);
builder.append(",tableName:").append(tableName);
builder.append(",eventType:").append(eventType);
builder.append(",timestamp:").append(timestamp);
builder.append(",timestampRecepite:").append(timestampRecepite);
builder.append(",binlogName:").append(binlogName);
builder.append(",position:").append(position);
builder.append(",nextPosition:").append(nextPosition);
builder.append(",serverId:").append(serverId);
builder.append(",before:").append(before);
builder.append(",after:").append(after).append(" }");
return builder.toString();
}

public String getEventId() {
return eventId;
}

public void setEventId(String eventId) {
this.eventId = eventId;
}

public String getDatabaseName() {
return databaseName;
}

public void setDatabaseName(String databaseName) {
this.databaseName = databaseName;
}

public String getTableName() {
return tableName;
}

public void setTableName(String tableName) {
this.tableName = tableName;
}

public String getEventType() {
return eventType;
}

public void setEventType(String eventType) {
this.eventType = eventType;
}

public Long getTimestamp() {
return timestamp;
}

public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}

public Long getTimestampRecepite() {
return timestampRecepite;
}

public void setTimestampRecepite(Long timestampRecepite) {
this.timestampRecepite = timestampRecepite;
}

public String getBinlogName() {
return binlogName;
}

public void setBinlogName(String binlogName) {
this.binlogName = binlogName;
}

public Long getPosition() {
return position;
}

public void setPosition(Long position) {
this.position = position;
}

public Long getNextPosition() {
return nextPosition;
}

public void setNextPosition(Long nextPosition) {
this.nextPosition = nextPosition;
}

public Long getServerId() {
return serverId;
}

public void setServerId(Long serverId) {
this.serverId = serverId;
}

public Map<String, String> getBefore() {
return before;
}

public void setBefore(Map<String, String> before) {
this.before = before;
}

public Map<String, String> getAfter() {
return after;
}

public void setAfter(Map<String, String> after) {
this.after = after;
}

其中 before、after为一个map,表示变化前后所在行的所有数据(columnName:columnValue)!

好的,先上主程序:public class OpenReplicatorTest {
public static void main(String args[]) throws Exception {
final OpenReplicator or = new OpenReplicator();
or.setUser("root");
or.setPassword("root");
or.setHost("xx.xxx.xx.xx");
or.setPort(3306);
or.setServerId(23);
or.setBinlogPosition(106);
or.setBinlogFileName("mysql-bin.000001");

or.setBinlogEventListener(new NotificationListener());
or.start();
}
}

设置监控器NotificationListener,NotificationListener需要实现BinlogEventListener接口:
public class NotificationListener implements BinlogEventListener{

private static Logger logger = LoggerFactory.getLogger(NotificationListener.class);

private String host="xx.xx.xx.xx";
private Integer port=3306;
private String username="root";
private String password="root";

public void onEvents(BinlogEventV4 event) {
if(event==null){
logger.error("binlog event is null");
return;
}

if(event instanceof UpdateRowsEvent){
UpdateRowsEvent updateRowsEvent = (UpdateRowsEvent)event;
LogEvent logEvent = new LogEvent(updateRowsEvent);

List<Pair<Row>> rows = updateRowsEvent.getRows();
List<Column> cols_after = null;
List<Column> cols_before = null;
for(Pair<Row> p : rows){
Row after = p.getAfter();
Row before = p.getBefore();
cols_after = after.getColumns();
cols_before = before.getColumns();
break;
}
logEvent.setBefore(getMap(cols_before, updateRowsEvent.getTme().getDatabaseName().toString(), updateRowsEvent.getTme().getTableName().toString()));
logEvent.setAfter(getMap(cols_after, updateRowsEvent.getTme().getDatabaseName().toString(), updateRowsEvent.getTme().getTableName().toString()));
logger.info("update event is:"+logEvent);
}else if(event instanceof DeleteRowsEvent){
DeleteRowsEvent deleteRowsEvent = (DeleteRowsEvent)event;
LogEvent logEvent = new LogEvent(deleteRowsEvent);
List<Row> rows = deleteRowsEvent.getRows();
List<Column> before = null;
for(Row row:rows){
before = row.getColumns();
break;
}
logEvent.setBefore(getMap(before, deleteRowsEvent.getTme().getDatabaseName().toString(), deleteRowsEvent.getTme().getTableName().toString()));
logger.info("delete event is:"+logEvent);

}else if(event instanceof WriteRowsEvent){
WriteRowsEvent wrtiteRowsEvent = (WriteRowsEvent)event;
LogEvent logEvent = new LogEvent(wrtiteRowsEvent);
List<Row> rows = wrtiteRowsEvent.getRows();
List<Column> before = null;
for(Row row:rows){
before = row.getColumns();
break;
}
logEvent.setAfter(getMap(before, wrtiteRowsEvent.getTme().getDatabaseName().toString(), wrtiteRowsEvent.getTme().getTableName().toString()));
logger.info("write event is:"+logEvent);

}
}

private Map<String, String> getMap(List<Column> cols,String databaseName,String tableName){
if(cols==null||cols.size()==0){
return null;
}
List<String> columnNames = new TableInfo(host,username,password, port).getColumns(databaseName, tableName);
if(columnNames==null){
return null;
}
if(columnNames.size()!=cols.size()){
logger.error("the size does not match...");
return null;
}
Map<String, String> map = new HashMap<String, String>();
for(int i=0;i<columnNames.size();i++){
if(cols.get(i).getValue()==null){
map.put(columnNames.get(i).toString(),"");
}else{
map.put(columnNames.get(i).toString(),cols.get(i).toString());
}

}
return map;
}


由于Open Replicator提供的Event中不包含数据库表中所有字段column name的信息,DeleteRowsEvent、UpdateRowsEvent、WriteRowsEvent包含变化前后的字段column
value信息,而我们需要将其组合成before与after,因此需要想办法获取column names:
public class TableInfo {
private static Logger logger = LoggerFactory.getLogger(TableInfo.class);

/**
* key:databaseName+""+tableName
* value:columns name
*/
private static Map<String, List<String>> columnsMap = new HashMap<String, List<String>>();
private String host;
private Integer port;
private String username;
private String password;

public TableInfo(String host,String username,String password,Integer port){
this.host=host;
this.username=username;
this.password=password;
this.port = port;
if(columnsMap==null||columnsMap.size()==0){
MysqlConnection.setConnection(this.host,this.port,this.username,this.password);
columnsMap = MysqlConnection.getColumns();
}
}

public Map<String, List<String>> getMap(){
return columnsMap;
}

public List<String> getColumns(String databaseName,String tableName){
if(StringUtils.isNullOrEmpty(databaseName)||StringUtils.isNullOrEmpty(tableName)){
return null;
}
String key = databaseName + "."+tableName;
List<String> list =null;
if(columnsMap.size()==0){
MysqlConnection.setConnection(this.host,this.port,this.username,this.password);
columnsMap = MysqlConnection.getColumns();
list = columnsMap.get(key);
}else{
list=columnsMap.get(key);
if(list==null||list.size()==0){
MysqlConnection.setConnection(this.host,this.port,this.username,this.password);
columnsMap = MysqlConnection.getColumns();
list = columnsMap.get(key);
}

}
return list;
}

MysqlConnection实现类如下:
public class MysqlConnection {

private static Connection conn;

private static final Logger logger = LoggerFactory.getLogger(MysqlConnection.class);
private static String host;
private static Integer port;
private static String user;
private static String password;

public static void setConnection(String mySQLHost, Integer mySQLPort, String mySQLUser,
String mySQLPassword) {
try {
if (conn == null || conn.isClosed()) {
Class.forName("com.mysql.jdbc.Driver");

conn = DriverManager.getConnection("jdbc:mysql://" + mySQLHost + ":" + mySQLPort
+ "/", mySQLUser, mySQLPassword);
logger.info("connected to mysql:{} : {}", mySQLHost, mySQLPort);
host = mySQLHost;
port = mySQLPort;
user = mySQLUser;
password = mySQLPassword;
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}

public static Connection getConnection() {
try {
if (conn == null || conn.isClosed()) {
setConnection(host, port, user, password);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return conn;
}

public static Map<String, List<String>> getColumns(){
Map<String, List<String>> cols = new HashMap<String, List<String>>();
Connection conn = getConnection();
try {
DatabaseMetaData metaData = conn.getMetaData();
ResultSet r = metaData.getCatalogs();
String tableType[] = { "TABLE" };
while (r.next()) {
String databaseName = r.getString("TABLE_CAT");
ResultSet result = metaData.getTables(databaseName, null, null, tableType);
while (result.next()) {
String tableName = result.getString("TABLE_NAME");
String key = databaseName + "." + tableName;
ResultSet colSet = metaData.getColumns(databaseName, null, tableName, null);
cols.put(key, new ArrayList<String>());
while (colSet.next()) {
String column = colSet.getString("COLUMN_NAME");
cols.get(key).add(column);
}
}
}

} catch (SQLException e) {
logger.error(e.getMessage(), e);
return null;
}
return cols;
}
}


辅助类,根据event id获取event type:
public class MySqlEventTypeIdToString {
private static Map<Integer, String> idToString = new HashMap<Integer, String>();
private MySqlEventTypeIdToString() {
Init();
}
public static MySqlEventTypeIdToString getInstance() {
return m;
}
private void Init() {
idToString.put(0,"UNKNOWN_EVENT");
idToString.put(1,"START_EVENT_V3");
idToString.put(2,"QUERY_EVENT");
idToString.put(3,"STOP_EVENT");
idToString.put(4,"ROTATE_EVENT");
idToString.put(5,"INTVAR_EVENT");
idToString.put(6,"LOAD_EVENT");
idToString.put(7,"SLAVE_EVENT");
idToString.put(8,"CREATE_FILE_EVENT");
idToString.put(9,"APPEND_BLOCK_EVENT");
idToString.put(10,"EXEC_LOAD_EVENT");
idToString.put(11,"DELETE_FILE_EVENT");
idToString.put(12,"NEW_LOAD_EVENT");
idToString.put(13,"RAND_EVENT");
idToString.put(14,"USER_VAR_EVENT");
idToString.put(15,"FORMAT_DESCRIPTION_EVENT");
idToString.put(16,"XID_EVENT");
idToString.put(17,"BEGIN_LOAD_QUERY_EVENT");
idToString.put(18,"EXECUTE_LOAD_QUERY_EVENT");
idToString.put(19,"TABLE_MAP_EVENT");
idToString.put(20,"PRE_GA_WRITE_ROWS_EVENT");
idToString.put(21,"PRE_GA_UPDATE_ROWS_EVENT");
idToString.put(22,"PRE_GA_DELETE_ROWS_EVENT");
idToString.put(23,"WRITE_ROWS_EVENT");
idToString.put(24,"UPDATE_ROWS_EVENT");
idToString.put(25,"DELETE_ROWS_EVENT");
idToString.put(26,"INCIDENT_EVENT");
idToString.put(27,"HEARTBEAT_LOG_EVENT");
idToString.put(28,"IGNORABLE_LOG_EVENT");
idToString.put(29,"ROWS_QUERY_LOG_EVENT");
idToString.put(30,"WRITE_ROWS_EVENT_V2");
idToString.put(31,"UPDATE_ROWS_EVENT_V2");
idToString.put(32,"DELETE_ROWS_EVENT_V2");
idToString.put(33,"GTID_LOG_EVENT");
idToString.put(34,"ANONYMOUS_GTID_LOG_EVENT");
idToString.put(35,"PREVIOUS_GTIDS_LOG_EVENT");
}
public String get(Integer eventId) {
return idToString.get(eventId);
}
}

运行:
update event is: {
eventId: a7acc3d0-7721-4ffe-84d4-4c2b7db5423a,
databaseName: test,
tableName: task,
eventType: UPDATE_ROWS_EVENT,
timestamp: 1450753740000,
timestampRecepite: 1450887259271,
binlogName: mysql-bin.000001,
position: 248,
nextPosition: 358,
serverId: 23,
before: {
id=791,
user_name=123,
topology_path=,
update_time=2015-08-05 10:53:57.0,
status=1,
department=,
name=user01,
create_time=2015-12-21 19:30:36.0,
user_id=-1
},
after: {
id=791,
user_name=123,
topology_path=,
update_time=2015-08-05 10:53:57.0,
status=2,
department=,
name=user02,
create_time=2015-12-22 11:09:00.0,
user_id=-1
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: