您的位置:首页 > 数据库 > Oracle

基于canal实现mysql、oracle的数据库实时同步

2017-11-13 16:16 826 查看


1. 前言

产品生态链中有一块是数据库实时同步模块,一直以来使用的是数据库特定相关的方法(如触发器)实现数据库同步,随着产品越来越多,该设计方法逐渐显得不合理。于是想到是否有通用的数据库实时同步模块,问了度娘,找到canal。


2. 需求

2.1. oracle发送端支持

canal开源代码中发送端仅仅支持mysql,接收端由于采用jdbc,mysql、oracle等可以通吃。

2.2. 传输方式

公司所在行业对网络安全要求较高,不同安全分区使用纵向隔离装置而非防火墙进行安全隔离,纵向隔离装置原理与linux rsync相同,因此发送端与接收端不能采用网络方式传输,发送端将数据库记录写入文件,通过隔离装置主动将文件穿到对方服务器后,接收端加载文件,将记录入库。

而canal基于Google protobuf实现网络通信,因此这一块需要被替换。

2.3. 其他需求

同步时,对某些表可能需要带条件的同步,如某列=1的所有记录,同时需要将该记录对应的文件同步过去,该通用模块如何与文件打配合,需要好好考虑。

某些记录需要人工同步过去,无论是否已经同步过。


3. oracle数据库同步原理

oracle基于logminer实现数据库同步。

3.1. 设置LogMiner字典文件路径
sqlplus /nolog

SQL>conn / as sysdba

SQL>create directory utlfile as ‘/home/logmnr’;

SQL>alter system set utl_file_dir='/home/logmnr' scope=spfile;

注意文件夹权限给oracle
查看LogMiner文件夹是否设置:
SQL>show parameter utl;
3.2. 创建数据库同步用户
-- create user username identified by password

SQL>create user logminer identified by logminer;

SQL> grant dba to logminer;
3.3. 设置追加日志

添加追加日志:
SQL>alter database add supplemental log data(primary key,unique index) columns;
检查追加日志:
SQL>select supplemental_log_data_min,supplemental_log_data_pk,supplemental_log_data_ui from v$database;
删除追加日志:
alter database drop supplemental log data (primary key ,unique index) columns
3.4. 重启数据库
SQL> shutdown abort

SQL> startup
3.5. 查看日志清单
SQL>select * from v$logfile
3.6. 程序实现

3.6.1. 创建数据库字典
exec dbms_logmnr_d.build(dictionary_filename=>'zyhd.ora',dictionary_location=>'/home/logmnr');
3.6.2. 添加日志

可先查看日志清单,然后根据日志清单动态生成添加日志语句
exec dbms_logmnr.add_logfile(logfilename=>'/u01/app/oracle/oradata/orcl/redo01.log', options=>dbms_logmnr.new);

exec dbms_logmnr.add_logfile(logfilename=>'/u01/app/oracle/oradata/orcl/redo02.log', options=>dbms_logmnr.addfile);

exec dbms_logmnr.add_logfile(logfilename=>'/u01/app/oracle/oradata/orcl/redo03.log', options=>dbms_logmnr.addfile);
3.6.3. 从某个scn序列号开始分析日志
exec dbms_logmnr.start_logmnr(startScn=>’0’, dictfilename=>'/home/logmnr/zyhd.ora’, options=>dbms_logmnr.no_rowid_in_stmt);
3.6.4. 查询所有结果
SELECT scn,operation,timestamp,status,sql_redo FROM v$logmnr_contents WHERE seg_owner='ZH9000’ and seg_type_name=’TABLE’;
3.6.5. 释放分析内存
exec dbms_logmnr.end_logmnr;
3.7. 附件

3.7.1. options定义
COMMITTED_DATA_ONLY
顾名思义就是只显示已经提交了的,那些正在进行中的及Oracle内部操作都忽略掉了
DDL_DICT_TRACKING
适用于在线日志存放LogMiner字典的情况,当表发生了添加字段等情况,字典自动更新。
NO_SQL_DELIMITER
去掉SQL_REDO及SQL_UNDO中SQL语句最后的分号,以CURSOR方式循环执行解析出的SQL会很方便和快捷。
NO_ROWID_IN_STMT
在SQL_REDO和SQL_UNDO列语句中去掉ROWID


4. canal重构

com.alibaba.otter.canal.server.netty.handler.SessionHandler负责处理网络版本接收端的订阅(subscription)、记录传输(get)、取消订阅(unsubscribe)、传输确认(ack),经模仿slave向master数据库请求后,回应给接收端。

改为文件传输后,com.alibaba.otter.canal.server.netty.handler.SessionHandler不再继承SimpleChannelHandler,抽出subscription、get、unsubscribe、ack方法,在get完成时不再网络传输,翻译成sql语句后写本地文件。mysql版本的SessionHandler修改后的代码如下:
public class MysqlSessionHandler{

private String destination;

private short clientId = 1001;

private String filter = "";

private String dbName;

private String outputPath;

private static final Logger logger =
LoggerFactory.getLogger(MysqlSessionHandler.class);

private CanalServerWithEmbedded embeddedServer;

public String getDestination() {

return destination;

}

public void setDestination(String destination)
{

this.destination = destination;

}

public String getDbName() {

return dbName;

}

public void setDbName(String dbName)
{

this.dbName = dbName;

}

public String getOutputPath() {

return outputPath;

}

public void setOutputPath(String outputPath)
{

this.outputPath = outputPath;

}

public SessionHandler(){

}

public MysqlSessionHandler(CanalServerWithEmbedded embeddedServer){

this.embeddedServer = embeddedServer;

}

public void subscription(){

ClientIdentity clientIdentity = new ClientIdentity(destination, clientId, filter);

MDC.put("destination", clientIdentity.getDestination());

// 尝试启动,如果已经启动,忽略

if (!embeddedServer.isStart(clientIdentity.getDestination()))
{

ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination());

if (!runningMonitor.isStart()) {

runningMonitor.start();

}

}

embeddedServer.subscribe(clientIdentity);

}

public void unsubscribe(){

ClientIdentity clientIdentity = new ClientIdentity(destination, clientId, filter);

MDC.put("destination", clientIdentity.getDestination());

embeddedServer.unsubscribe(clientIdentity);

stopCanalInstanceIfNecessary(clientIdentity);// 尝试关闭

//NettyUtils.ack(ctx.getChannel(), null);

}

public long get(){

ClientIdentity clientIdentity = new ClientIdentity(destination, clientId, filter);

MDC.put("destination", clientIdentity.getDestination());

int batchSize = 1000;

Message message = embeddedServer.getWithoutAck(clientIdentity, batchSize);

printEntry(message.getEntries(), message.getId());

return message.getId();

}

public void ack(long batchId){

ClientIdentity clientIdentity = new ClientIdentity(destination, clientId, filter);

MDC.put("destination", clientIdentity.getDestination());

embeddedServer.ack(clientIdentity, batchId);

}

private void stopCanalInstanceIfNecessary(ClientIdentity clientIdentity)
{

List<ClientIdentity> clientIdentitys = embeddedServer.listAllSubscribe(clientIdentity.getDestination());

if (clientIdentitys != null && clientIdentitys.size()
== 1 && clientIdentitys.contains(clientIdentity)) {

ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination());

if (runningMonitor.isStart()) {

runningMonitor.release();

}

}

}

private void printEntry(List<Entry> entrys, long batchId)
{

for (Entry entry : entrys)
{

if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType()
== EntryType.TRANSACTIONEND) {

continue;

}

RowChange rowChage = null;

try {

rowChage = RowChange.parseFrom(entry.getStoreValue());

} catch (Exception e) {

throw new RuntimeException("ERROR
## parser of eromanga-event has an error , data:" + entry.toString(),e);

}

EventType eventType = rowChage.getEventType();

/*System.out.println(String.format("==> binlog[%s:%s] , name[%s,%s] , eventType : %s",

entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),

entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),

eventType));

*/

if(dbName.equals(entry.getHeader().getSchemaName()))

{

String tableName=entry.getHeader().getTableName();

List<String> sqls=new LinkedList<String>();

for (RowData rowData : rowChage.getRowDatasList())
{

String sql=buildSql(tableName, rowData, eventType);

if(sql!=null){

sqls.add(sql);

System.err.println(sql);

}

}

try {

toLocal(sqls, batchId);

} catch (IOException e) {

e.printStackTrace();

}

}

}

}

final static String DELETE_SQL="delete
from _tn_ where _cn_";

final static String INSERT_SQL="insert
into _tn_(_cn_) values(_vn_)";

final static String UPDATE_SQL="update
_tn_ set _vn_ where _cn_";

//创建SQL

private String buildSql(String tableName,RowData rowData,EventType eventType){

tableName="`"+tableName+"`";

if (eventType == EventType.DELETE)
{

StringBuffer cn=new StringBuffer();

StringBuffer cn2=new StringBuffer();

for (Column column : rowData.getBeforeColumnsList())
{

if(column.getIsKey()){

if(cn2.length()>0){

cn2.append(" and ");

}

cn2.append("`"+column.getName()+"`");

cn2.append("=");

if(!isNumberType(column.getMysqlType())){

cn2.append("'"+column.getValue()+"'");

}else{

cn2.append(column.getValue());

}

}

if(column.getValue()==null||"".equals(column.getValue())){

continue;

}

if(cn.length()>0){

cn.append(" and ");

}

cn.append("`"+column.getName()+"`");

cn.append("=");

if(!isNumberType(column.getMysqlType())){

cn.append("'"+column.getValue()+"'");

}else{

cn.append(column.getValue());

}

}

return DELETE_SQL.replaceAll("_tn_", tableName).replaceAll("_cn_", cn2.length()>0?cn2.toString():cn.toString());

} else if (eventType == EventType.INSERT)
{

StringBuffer cn=new StringBuffer();

StringBuffer vn=new StringBuffer();

for (Column column : rowData.getAfterColumnsList())
{

if(cn.length()>0){

cn.append(",");

vn.append(",");

}

cn.append("`"+column.getName()+"`");

if(!isNumberType(column.getMysqlType())){

vn.append("'"+column.getValue()+"'");

}else{

vn.append(column.getValue());

}

}

return INSERT_SQL.replaceAll("_tn_", tableName).replaceAll("_cn_", cn.toString()).replaceAll("_vn_", vn.toString());

} else {

StringBuffer cn=new StringBuffer();

StringBuffer cn2=new StringBuffer();

for (Column column : rowData.getBeforeColumnsList())
{

if(column.getIsKey()){

if(cn2.length()>0){

cn2.append(" and ");

}

cn2.append("`"+column.getName()+"`");

cn2.append("=");

if(!isNumberType(column.getMysqlType())){

cn2.append("'"+column.getValue()+"'");

}else{

cn2.append(column.getValue());

}

}

if(column.getValue()==null||"".equals(column.getValue())){

continue;

}

if(cn.length()>0){

cn.append(" and ");

}

cn.append("`"+column.getName()+"`");

cn.append("=");

if(!isNumberType(column.getMysqlType())){

cn.append("'"+column.getValue()+"'");

}else{

cn.append(column.getValue());

}

}

StringBuffer vn=new StringBuffer();

for (Column column : rowData.getAfterColumnsList())
{

if(!column.getUpdated()){

continue;

}

if(vn.length()>0){

vn.append(",");

}

vn.append("`"+column.getName()+"`");

vn.append("=");

if(!isNumberType(column.getMysqlType())){

vn.append("'"+column.getValue()+"'");

}else{

vn.append(column.getValue());

}

}

return UPDATE_SQL.replaceAll("_tn_", tableName).replaceAll("_cn_", cn2.length()>0?cn2.toString():cn.toString()).replaceAll("_vn_", vn.toString());

}

}

//判断数据库列字段类型是否是数字,不是数字的值需要加''

private static boolean isNumberType(String mysqlType){

if(mysqlType.indexOf("int")==-1&&

mysqlType.indexOf("float")==-1&&

mysqlType.indexOf("double")==-1&&

mysqlType.indexOf("decimal")==-1&&

mysqlType.indexOf("numeric")==-1){

return false;

}

return true;

}

public void setEmbeddedServer(CanalServerWithEmbedded embeddedServer)
{

this.embeddedServer = embeddedServer;

}

public boolean toLocal(List<String> sqls, long batchId) throws IOException{

if(sqls.isEmpty())

return false;

String fileName = outputPath + "/msyql_data.scn" + batchId;

File f = new File(fileName);

FileOutputStream fop = new FileOutputStream(f);

for(String sql : sqls){

fop.write(sql.getBytes());

fop.write("\n".getBytes());

}

if(fop != null)

fop.close();

return true;

}
所有增量记录改变都会保存在scn[记录编号]为后缀名的文件中。接收端只需要通过扫描文件夹中所有文件、通过inotify感知所有文件,并直接执行语句即可。

在com.alibaba.otter.canal.server.netty.CanalServerWithNetty中启动网络服务改为直接订阅、传输即可:
sessionHandler.subscription();

while(true){

long batchId = sessionHandler.get();

if(batchId > 0)

sessionHandler.ack(batchId);

Thread.sleep(1000);

}
oracle版本的subscription、get、unsubscribe、ack实现如下:
public void subscription(){

}

public void unsubscribe(){

}

public long get(){

Connection sourceConn = null;

try {

ResultSet resultSet = null;

// 获取源数据库连接

Class.forName(dbDriver);

sourceConn = java.sql.DriverManager.getConnection(dbUrl, username, password);

if(createDictInit){

createDictionary(sourceConn);

createDictInit = false;

}

Statement statement = sourceConn.createStatement();

if(!logFileLoaded)

{

// 打印获分析日志文件信息

resultSet = statement.executeQuery("select member
from v$logfile");

String log = new String();

while (resultSet.next()) {

log = resultSet.getObject(1).toString();

System.out.println("已添加日志文件==>" + log);

logFiles.add(log);

}

logFileLoaded = true;

}

if(logFiles.isEmpty())

return -1;

// 添加所有日志文件,本代码仅分析联机日志

StringBuffer sbSQL = new StringBuffer();

for(int i =
0; i < logFiles.size(); ++i){

if(i == 0){

sbSQL.append("BEGIN\n");

}

sbSQL.append("dbms_logmnr.add_logfile(logfilename=>'"

+ logFiles.get(i) + "',options=>dbms_logmnr."

+(i == 0?"new":"addfile")+");\n");

}

sbSQL.append("END;\n");

CallableStatement callableStatement = sourceConn.prepareCall(sbSQL.toString());

callableStatement.execute();

//System.out.println("开始分析日志文件, 起始scn号:"
+ Constants.LAST_SCN);

callableStatement = sourceConn.prepareCall("BEGIN\n"

+ "dbms_logmnr.start_logmnr(startScn=>'" + scn

+ "',dictfilename=>'" + dataDictionary

+ "/" + oraName+"',options=>dbms_logmnr.no_rowid_in_stmt)
;"// ,options=>dbms_logmnr.committed_data_only + dbms_logmnr.no_rowid_in_stmt

+ "END;");

callableStatement.execute();

// 查询获取分析结果

//System.out.println("查询分析结果");

String queryStr = "SELECT scn,operation,sql_redo FROM v$logmnr_contents WHERE seg_owner='"

+ username

+ "' AND seg_type_name='TABLE'";//AND operation !='SELECT_FOR_UPDATE'

System.out.println(queryStr);

resultSet = statement.executeQuery(queryStr);

boolean isCreateDictionary = toLocal(resultSet);

callableStatement = sourceConn.prepareCall("BEGIN\n"

+ "dbms_logmnr.end_logmnr;\n"

+ "END;");

callableStatement.execute();

// DDL发生变化,更新数据字典

if (isCreateDictionary) {

System.out.println("DDL发生变化,更新数据字典");

createDictionary(sourceConn);

System.out.println("完成更新数据字典");

isCreateDictionary = false;

}

} catch (ClassNotFoundException e) {

// TODO Auto-generated catch
block

e.printStackTrace();

} catch (SQLException e) {

// TODO Auto-generated catch
block

e.printStackTrace();

} catch (Exception e) {

// TODO Auto-generated catch
block

e.printStackTrace();

}

finally {

if (null != sourceConn)
{

try {

sourceConn.close();

} catch (SQLException e) {

// TODO Auto-generated
catch block

e.printStackTrace();

}

}

sourceConn = null;

}

return Long.valueOf(scn);

}

public void ack(long batchId){

}

public void createDictionary(Connection sourceConn) throws Exception
{

String createDictSql = "BEGIN dbms_logmnr_d.build(dictionary_filename =>'"+oraName+"',
dictionary_location =>'"

+ dataDictionary + "'); END;";

CallableStatement callableStatement = sourceConn.prepareCall(createDictSql);

callableStatement.execute();

}

public boolean toLocal(ResultSet resultSet) throws SQLException,
IOException{

String lastScn = scn;

String operation = null;

String sql = null;

boolean isCreateDictionary = false;

String fileName = outputPath + "/oracle_data.scn";

fileName += lastScn.toString();

File f = null;

FileOutputStream fop = null;

while (resultSet.next()) {

lastScn = resultSet.getObject(1) + "";

if (lastScn.equals(scn))
{

continue;

}

operation = resultSet.getObject(2) + "";

if ("DDL".equalsIgnoreCase(operation))
{

isCreateDictionary = true;

}

sql = resultSet.getObject(3) + "";

// 删除用户ZH9000.

//sql = sql.replace("\"" + Constants.SOURCE_CLIENT_USERNAME + "\".", "");

//System.out.println("scn=" + lastScn + ", 自动执行sql=="
+ sql + "");

if(f == null){

f = new File(fileName);

if(!f.exists()){

f.createNewFile();

}

fop = new FileOutputStream(f);

}

fop.write(sql.getBytes());

fop.write("\n".getBytes());

}

if(fop != null)

fop.close();

// 更新scn

scn = lastScn;

saveScn2Local();

return isCreateDictionary;

}

public void setEmbeddedServer(CanalServerWithEmbedded embeddedServer)
{

}

public void saveScn2Local() throws IOException{

String fileName = outputPath + "/.scn";

File f = new File(fileName);

FileOutputStream fos = new FileOutputStream(f);

fos.write(scn.getBytes());

fos.close();

}

@SuppressWarnings("resource")

public String loadScnFromLocal(){

String fileName = outputPath + "/.scn";

File f = new File(fileName);

FileReader reader;

try {

reader = new FileReader(f);

int fileLen = (int) f.length();

char [] buffer = new char[fileLen];

reader.read(buffer);

return String.copyValueOf(buffer);

} catch (IOException e) {

return "0";

}

}
最后,在com.alibaba.otter.canal.deployer. CanalController中,可根据需要修改配置文件定义,并加载配置:
cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID));

destination = getProperty(properties, CanalConstants.CANAL_DESTINATIONS);

dbName = getProperty(properties, CanalConstants.CANAL_DBNAME);

outputPath = getProperty(properties, CanalConstants.CANAL_OUTPUT);

embededCanalServer = CanalServerWithEmbedded.instance();

embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator

canalServer = CanalServerWithNetty.instance();

canalServer.setDestination(destination);;

canalServer.setDbName(dbName);

canalServer.setOutputPath(outputPath);
至此完成mysql、oracle的发送端实现。

当然,在修改基于网络传输改为基于文件时,应考虑不破坏代码架构,可通过参数配置是基于网络传输还是基于文件,这就属于之后的代码整理工作了。

5. 参考

5.1. http://www.cnblogs.com/shishanyuan/p/3140440.html
5.1.1. 《1.OracleLogminer配置使用.pdf》

5.1.2. 《2.OracleLogminer性能测试.pdf》

5.1.3. 《3.使用OracleLogminer同步Demo.pdf》

搭建:
canal部署与实例运行

Canal简介

canal安装及使用

数据库迁移之从oracle
到 MySQL

将oracle的数据导入到mysql的四种方法

Mysql全量数据同步Oracle步骤详解

ORACLE实时同步技术之streams

浅谈Oracle
数据库之间数据同步方案

使用canal进行mysql数据同步到Redis

实时抓取MySQL的更新数据到Hadoop
canal


我的热门文章

基于canal实现mysql、oracle的数据库实时同步

linux
批量ping检测

vxworks下可递归互斥锁

svn版本发布

wireshark
lua脚本开发
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: