基于canal实现mysql、oracle的数据库实时同步
2017-11-13 16:16
826 查看
1. 前言
产品生态链中有一块是数据库实时同步模块,一直以来使用的是数据库特定相关的方法(如触发器)实现数据库同步,随着产品越来越多,该设计方法逐渐显得不合理。于是想到是否有通用的数据库实时同步模块,问了度娘,找到canal。
2. 需求
2.1. oracle发送端支持canal开源代码中发送端仅仅支持mysql,接收端由于采用jdbc,mysql、oracle等可以通吃。
2.2. 传输方式
公司所在行业对网络安全要求较高,不同安全分区使用纵向隔离装置而非防火墙进行安全隔离,纵向隔离装置原理与linux rsync相同,因此发送端与接收端不能采用网络方式传输,发送端将数据库记录写入文件,通过隔离装置主动将文件穿到对方服务器后,接收端加载文件,将记录入库。
而canal基于Google protobuf实现网络通信,因此这一块需要被替换。
2.3. 其他需求
同步时,对某些表可能需要带条件的同步,如某列=1的所有记录,同时需要将该记录对应的文件同步过去,该通用模块如何与文件打配合,需要好好考虑。
某些记录需要人工同步过去,无论是否已经同步过。
3. oracle数据库同步原理
oracle基于logminer实现数据库同步。3.1. 设置LogMiner字典文件路径
sqlplus /nolog SQL>conn / as sysdba SQL>create directory utlfile as ‘/home/logmnr’; SQL>alter system set utl_file_dir='/home/logmnr' scope=spfile; 注意文件夹权限给oracle |
SQL>show parameter utl; |
-- create user username identified by password SQL>create user logminer identified by logminer; SQL> grant dba to logminer; |
添加追加日志:
SQL>alter database add supplemental log data(primary key,unique index) columns; |
SQL>select supplemental_log_data_min,supplemental_log_data_pk,supplemental_log_data_ui from v$database; |
alter database drop supplemental log data (primary key ,unique index) columns |
SQL> shutdown abort SQL> startup |
SQL>select * from v$logfile |
3.6.1. 创建数据库字典
exec dbms_logmnr_d.build(dictionary_filename=>'zyhd.ora',dictionary_location=>'/home/logmnr'); |
可先查看日志清单,然后根据日志清单动态生成添加日志语句
exec dbms_logmnr.add_logfile(logfilename=>'/u01/app/oracle/oradata/orcl/redo01.log', options=>dbms_logmnr.new); exec dbms_logmnr.add_logfile(logfilename=>'/u01/app/oracle/oradata/orcl/redo02.log', options=>dbms_logmnr.addfile); exec dbms_logmnr.add_logfile(logfilename=>'/u01/app/oracle/oradata/orcl/redo03.log', options=>dbms_logmnr.addfile); |
exec dbms_logmnr.start_logmnr(startScn=>’0’, dictfilename=>'/home/logmnr/zyhd.ora’, options=>dbms_logmnr.no_rowid_in_stmt); |
SELECT scn,operation,timestamp,status,sql_redo FROM v$logmnr_contents WHERE seg_owner='ZH9000’ and seg_type_name=’TABLE’; |
exec dbms_logmnr.end_logmnr; |
3.7.1. options定义
COMMITTED_DATA_ONLY | 顾名思义就是只显示已经提交了的,那些正在进行中的及Oracle内部操作都忽略掉了 |
DDL_DICT_TRACKING | 适用于在线日志存放LogMiner字典的情况,当表发生了添加字段等情况,字典自动更新。 |
NO_SQL_DELIMITER | 去掉SQL_REDO及SQL_UNDO中SQL语句最后的分号,以CURSOR方式循环执行解析出的SQL会很方便和快捷。 |
NO_ROWID_IN_STMT | 在SQL_REDO和SQL_UNDO列语句中去掉ROWID |
4. canal重构
com.alibaba.otter.canal.server.netty.handler.SessionHandler负责处理网络版本接收端的订阅(subscription)、记录传输(get)、取消订阅(unsubscribe)、传输确认(ack),经模仿slave向master数据库请求后,回应给接收端。改为文件传输后,com.alibaba.otter.canal.server.netty.handler.SessionHandler不再继承SimpleChannelHandler,抽出subscription、get、unsubscribe、ack方法,在get完成时不再网络传输,翻译成sql语句后写本地文件。mysql版本的SessionHandler修改后的代码如下:
public class MysqlSessionHandler{ private String destination; private short clientId = 1001; private String filter = ""; private String dbName; private String outputPath; private static final Logger logger = LoggerFactory.getLogger(MysqlSessionHandler.class); private CanalServerWithEmbedded embeddedServer; public String getDestination() { return destination; } public void setDestination(String destination) { this.destination = destination; } public String getDbName() { return dbName; } public void setDbName(String dbName) { this.dbName = dbName; } public String getOutputPath() { return outputPath; } public void setOutputPath(String outputPath) { this.outputPath = outputPath; } public SessionHandler(){ } public MysqlSessionHandler(CanalServerWithEmbedded embeddedServer){ this.embeddedServer = embeddedServer; } public void subscription(){ ClientIdentity clientIdentity = new ClientIdentity(destination, clientId, filter); MDC.put("destination", clientIdentity.getDestination()); // 尝试启动,如果已经启动,忽略 if (!embeddedServer.isStart(clientIdentity.getDestination())) { ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination()); if (!runningMonitor.isStart()) { runningMonitor.start(); } } embeddedServer.subscribe(clientIdentity); } public void unsubscribe(){ ClientIdentity clientIdentity = new ClientIdentity(destination, clientId, filter); MDC.put("destination", clientIdentity.getDestination()); embeddedServer.unsubscribe(clientIdentity); stopCanalInstanceIfNecessary(clientIdentity);// 尝试关闭 //NettyUtils.ack(ctx.getChannel(), null); } public long get(){ ClientIdentity clientIdentity = new ClientIdentity(destination, clientId, filter); MDC.put("destination", clientIdentity.getDestination()); int batchSize = 1000; Message message = embeddedServer.getWithoutAck(clientIdentity, batchSize); printEntry(message.getEntries(), message.getId()); return message.getId(); } public void ack(long batchId){ ClientIdentity clientIdentity = new ClientIdentity(destination, clientId, filter); MDC.put("destination", clientIdentity.getDestination()); embeddedServer.ack(clientIdentity, batchId); } private void stopCanalInstanceIfNecessary(ClientIdentity clientIdentity) { List<ClientIdentity> clientIdentitys = embeddedServer.listAllSubscribe(clientIdentity.getDestination()); if (clientIdentitys != null && clientIdentitys.size() == 1 && clientIdentitys.contains(clientIdentity)) { ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination()); if (runningMonitor.isStart()) { runningMonitor.release(); } } } private void printEntry(List<Entry> entrys, long batchId) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e); } EventType eventType = rowChage.getEventType(); /*System.out.println(String.format("==> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); */ if(dbName.equals(entry.getHeader().getSchemaName())) { String tableName=entry.getHeader().getTableName(); List<String> sqls=new LinkedList<String>(); for (RowData rowData : rowChage.getRowDatasList()) { String sql=buildSql(tableName, rowData, eventType); if(sql!=null){ sqls.add(sql); System.err.println(sql); } } try { toLocal(sqls, batchId); } catch (IOException e) { e.printStackTrace(); } } } } final static String DELETE_SQL="delete from _tn_ where _cn_"; final static String INSERT_SQL="insert into _tn_(_cn_) values(_vn_)"; final static String UPDATE_SQL="update _tn_ set _vn_ where _cn_"; //创建SQL private String buildSql(String tableName,RowData rowData,EventType eventType){ tableName="`"+tableName+"`"; if (eventType == EventType.DELETE) { StringBuffer cn=new StringBuffer(); StringBuffer cn2=new StringBuffer(); for (Column column : rowData.getBeforeColumnsList()) { if(column.getIsKey()){ if(cn2.length()>0){ cn2.append(" and "); } cn2.append("`"+column.getName()+"`"); cn2.append("="); if(!isNumberType(column.getMysqlType())){ cn2.append("'"+column.getValue()+"'"); }else{ cn2.append(column.getValue()); } } if(column.getValue()==null||"".equals(column.getValue())){ continue; } if(cn.length()>0){ cn.append(" and "); } cn.append("`"+column.getName()+"`"); cn.append("="); if(!isNumberType(column.getMysqlType())){ cn.append("'"+column.getValue()+"'"); }else{ cn.append(column.getValue()); } } return DELETE_SQL.replaceAll("_tn_", tableName).replaceAll("_cn_", cn2.length()>0?cn2.toString():cn.toString()); } else if (eventType == EventType.INSERT) { StringBuffer cn=new StringBuffer(); StringBuffer vn=new StringBuffer(); for (Column column : rowData.getAfterColumnsList()) { if(cn.length()>0){ cn.append(","); vn.append(","); } cn.append("`"+column.getName()+"`"); if(!isNumberType(column.getMysqlType())){ vn.append("'"+column.getValue()+"'"); }else{ vn.append(column.getValue()); } } return INSERT_SQL.replaceAll("_tn_", tableName).replaceAll("_cn_", cn.toString()).replaceAll("_vn_", vn.toString()); } else { StringBuffer cn=new StringBuffer(); StringBuffer cn2=new StringBuffer(); for (Column column : rowData.getBeforeColumnsList()) { if(column.getIsKey()){ if(cn2.length()>0){ cn2.append(" and "); } cn2.append("`"+column.getName()+"`"); cn2.append("="); if(!isNumberType(column.getMysqlType())){ cn2.append("'"+column.getValue()+"'"); }else{ cn2.append(column.getValue()); } } if(column.getValue()==null||"".equals(column.getValue())){ continue; } if(cn.length()>0){ cn.append(" and "); } cn.append("`"+column.getName()+"`"); cn.append("="); if(!isNumberType(column.getMysqlType())){ cn.append("'"+column.getValue()+"'"); }else{ cn.append(column.getValue()); } } StringBuffer vn=new StringBuffer(); for (Column column : rowData.getAfterColumnsList()) { if(!column.getUpdated()){ continue; } if(vn.length()>0){ vn.append(","); } vn.append("`"+column.getName()+"`"); vn.append("="); if(!isNumberType(column.getMysqlType())){ vn.append("'"+column.getValue()+"'"); }else{ vn.append(column.getValue()); } } return UPDATE_SQL.replaceAll("_tn_", tableName).replaceAll("_cn_", cn2.length()>0?cn2.toString():cn.toString()).replaceAll("_vn_", vn.toString()); } } //判断数据库列字段类型是否是数字,不是数字的值需要加'' private static boolean isNumberType(String mysqlType){ if(mysqlType.indexOf("int")==-1&& mysqlType.indexOf("float")==-1&& mysqlType.indexOf("double")==-1&& mysqlType.indexOf("decimal")==-1&& mysqlType.indexOf("numeric")==-1){ return false; } return true; } public void setEmbeddedServer(CanalServerWithEmbedded embeddedServer) { this.embeddedServer = embeddedServer; } public boolean toLocal(List<String> sqls, long batchId) throws IOException{ if(sqls.isEmpty()) return false; String fileName = outputPath + "/msyql_data.scn" + batchId; File f = new File(fileName); FileOutputStream fop = new FileOutputStream(f); for(String sql : sqls){ fop.write(sql.getBytes()); fop.write("\n".getBytes()); } if(fop != null) fop.close(); return true; } |
在com.alibaba.otter.canal.server.netty.CanalServerWithNetty中启动网络服务改为直接订阅、传输即可:
sessionHandler.subscription(); while(true){ long batchId = sessionHandler.get(); if(batchId > 0) sessionHandler.ack(batchId); Thread.sleep(1000); } |
public void subscription(){ } public void unsubscribe(){ } public long get(){ Connection sourceConn = null; try { ResultSet resultSet = null; // 获取源数据库连接 Class.forName(dbDriver); sourceConn = java.sql.DriverManager.getConnection(dbUrl, username, password); if(createDictInit){ createDictionary(sourceConn); createDictInit = false; } Statement statement = sourceConn.createStatement(); if(!logFileLoaded) { // 打印获分析日志文件信息 resultSet = statement.executeQuery("select member from v$logfile"); String log = new String(); while (resultSet.next()) { log = resultSet.getObject(1).toString(); System.out.println("已添加日志文件==>" + log); logFiles.add(log); } logFileLoaded = true; } if(logFiles.isEmpty()) return -1; // 添加所有日志文件,本代码仅分析联机日志 StringBuffer sbSQL = new StringBuffer(); for(int i = 0; i < logFiles.size(); ++i){ if(i == 0){ sbSQL.append("BEGIN\n"); } sbSQL.append("dbms_logmnr.add_logfile(logfilename=>'" + logFiles.get(i) + "',options=>dbms_logmnr." +(i == 0?"new":"addfile")+");\n"); } sbSQL.append("END;\n"); CallableStatement callableStatement = sourceConn.prepareCall(sbSQL.toString()); callableStatement.execute(); //System.out.println("开始分析日志文件, 起始scn号:" + Constants.LAST_SCN); callableStatement = sourceConn.prepareCall("BEGIN\n" + "dbms_logmnr.start_logmnr(startScn=>'" + scn + "',dictfilename=>'" + dataDictionary + "/" + oraName+"',options=>dbms_logmnr.no_rowid_in_stmt) ;"// ,options=>dbms_logmnr.committed_data_only + dbms_logmnr.no_rowid_in_stmt + "END;"); callableStatement.execute(); // 查询获取分析结果 //System.out.println("查询分析结果"); String queryStr = "SELECT scn,operation,sql_redo FROM v$logmnr_contents WHERE seg_owner='" + username + "' AND seg_type_name='TABLE'";//AND operation !='SELECT_FOR_UPDATE' System.out.println(queryStr); resultSet = statement.executeQuery(queryStr); boolean isCreateDictionary = toLocal(resultSet); callableStatement = sourceConn.prepareCall("BEGIN\n" + "dbms_logmnr.end_logmnr;\n" + "END;"); callableStatement.execute(); // DDL发生变化,更新数据字典 if (isCreateDictionary) { System.out.println("DDL发生变化,更新数据字典"); createDictionary(sourceConn); System.out.println("完成更新数据字典"); isCreateDictionary = false; } } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { if (null != sourceConn) { try { sourceConn.close(); } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } } sourceConn = null; } return Long.valueOf(scn); } public void ack(long batchId){ } public void createDictionary(Connection sourceConn) throws Exception { String createDictSql = "BEGIN dbms_logmnr_d.build(dictionary_filename =>'"+oraName+"', dictionary_location =>'" + dataDictionary + "'); END;"; CallableStatement callableStatement = sourceConn.prepareCall(createDictSql); callableStatement.execute(); } public boolean toLocal(ResultSet resultSet) throws SQLException, IOException{ String lastScn = scn; String operation = null; String sql = null; boolean isCreateDictionary = false; String fileName = outputPath + "/oracle_data.scn"; fileName += lastScn.toString(); File f = null; FileOutputStream fop = null; while (resultSet.next()) { lastScn = resultSet.getObject(1) + ""; if (lastScn.equals(scn)) { continue; } operation = resultSet.getObject(2) + ""; if ("DDL".equalsIgnoreCase(operation)) { isCreateDictionary = true; } sql = resultSet.getObject(3) + ""; // 删除用户ZH9000. //sql = sql.replace("\"" + Constants.SOURCE_CLIENT_USERNAME + "\".", ""); //System.out.println("scn=" + lastScn + ", 自动执行sql==" + sql + ""); if(f == null){ f = new File(fileName); if(!f.exists()){ f.createNewFile(); } fop = new FileOutputStream(f); } fop.write(sql.getBytes()); fop.write("\n".getBytes()); } if(fop != null) fop.close(); // 更新scn scn = lastScn; saveScn2Local(); return isCreateDictionary; } public void setEmbeddedServer(CanalServerWithEmbedded embeddedServer) { } public void saveScn2Local() throws IOException{ String fileName = outputPath + "/.scn"; File f = new File(fileName); FileOutputStream fos = new FileOutputStream(f); fos.write(scn.getBytes()); fos.close(); } @SuppressWarnings("resource") public String loadScnFromLocal(){ String fileName = outputPath + "/.scn"; File f = new File(fileName); FileReader reader; try { reader = new FileReader(f); int fileLen = (int) f.length(); char [] buffer = new char[fileLen]; reader.read(buffer); return String.copyValueOf(buffer); } catch (IOException e) { return "0"; } } |
cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID)); destination = getProperty(properties, CanalConstants.CANAL_DESTINATIONS); dbName = getProperty(properties, CanalConstants.CANAL_DBNAME); outputPath = getProperty(properties, CanalConstants.CANAL_OUTPUT); embededCanalServer = CanalServerWithEmbedded.instance(); embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator canalServer = CanalServerWithNetty.instance(); canalServer.setDestination(destination);; canalServer.setDbName(dbName); canalServer.setOutputPath(outputPath); |
当然,在修改基于网络传输改为基于文件时,应考虑不破坏代码架构,可通过参数配置是基于网络传输还是基于文件,这就属于之后的代码整理工作了。
5. 参考
5.1. http://www.cnblogs.com/shishanyuan/p/3140440.html
5.1.1. 《1.OracleLogminer配置使用.pdf》
5.1.2. 《2.OracleLogminer性能测试.pdf》
5.1.3. 《3.使用OracleLogminer同步Demo.pdf》
搭建:
canal部署与实例运行
Canal简介
canal安装及使用
数据库迁移之从oracle
到 MySQL
将oracle的数据导入到mysql的四种方法
Mysql全量数据同步Oracle步骤详解
ORACLE实时同步技术之streams
浅谈Oracle
数据库之间数据同步方案
使用canal进行mysql数据同步到Redis
实时抓取MySQL的更新数据到Hadoop
canal
我的热门文章
基于canal实现mysql、oracle的数据库实时同步linux
批量ping检测
vxworks下可递归互斥锁
svn版本发布
wireshark
lua脚本开发
相关文章推荐
- 基于canal实现mysql、oracle的数据库实时同步
- 基于MySQL实现数据库的半同步主从复制
- 使用GoldenGate实现MySQL到Oracle的数据实时同步
- ORACLE与MYSQL的双数据库系统实现应用程序兼容
- Oracle Stream实现数据库同步
- Oracle 11g 通过创建物化视图实现不同数据库间的表数据同步
- 3大数据库(Sql-Server,MySql和Oracle)的分页SQL语句实现
- 基于MySQL的数据库集群系统的实现(摘)
- mysql主主同步两个数据库同时写入,实现原理:自动增长主键不重复
- Oracle如何实现两个数据库的同步(用实体化视图实现)(oracle快照实例)
- [转] 基于MySQL的数据库集群系统的实现
- 快照、刷新-[置顶] Oracle如何实现两个数据库的同步(用实体化视图实现)(oracle快照实例)-by小雨
- 基于MySQL的数据库集群系统的实现(来源:IBM DW)
- 基于semisync实现MySQL的主从半同步复制
- 通过Oracle Stream实现数据库之间的同步
- MySQL 备份和恢复策略四:使用主从复制机制(replication)实现数据库实时备份
- [转] 基于MySQL的数据库集群系统的实现
- 使用 PDI 和 Oracle CDC 来实现Oracle 数据库向其他数据库的数据同步
- 3大数据库(Sql-Server,MySql和Oracle)的分页SQL语句实现
- 说一下mysql,oracle等常见数据库的分页实现方案?