您的位置:首页 > 数据库 > Oracle

基于canal实现mysql、oracle的数据库实时同步

2017-02-14 22:30 1136 查看

1.   前言

产品生态链中有一块是数据库实时同步模块,一直以来使用的是数据库特定相关的方法(如触发器)实现数据库同步,随着产品越来越多,该设计方法逐渐显得不合理。于是想到是否有通用的数据库实时同步模块,问了度娘,找到canal。

2.   需求

2.1.      oracle发送端支持

canal开源代码中发送端仅仅支持mysql,接收端由于采用jdbc,mysql、oracle等可以通吃。

 

2.2.      传输方式

公司所在行业对网络安全要求较高,不同安全分区使用纵向隔离装置而非防火墙进行安全隔离,纵向隔离装置原理与linux rsync相同,因此发送端与接收端不能采用网络方式传输,发送端将数据库记录写入文件,通过隔离装置主动将文件穿到对方服务器后,接收端加载文件,将记录入库。

而canal基于Google protobuf实现网络通信,因此这一块需要被替换。

2.3.      其他需求

同步时,对某些表可能需要带条件的同步,如某列=1的所有记录,同时需要将该记录对应的文件同步过去,该通用模块如何与文件打配合,需要好好考虑。

某些记录需要人工同步过去,无论是否已经同步过。

3.   oracle数据库同步原理

oracle基于logminer实现数据库同步。

3.1.       设置LogMiner字典文件路径

sqlplus /nolog

SQL>conn / as sysdba

SQL>create directory utlfile as ‘/home/logmnr’;

SQL>alter system set utl_file_dir='/home/logmnr' scope=spfile;

注意文件夹权限给oracle

查看LogMiner文件夹是否设置:

SQL>show parameter utl;

 

3.2.       创建数据库同步用户

-- create user username identified by password

SQL>create user logminer identified by
logminer;

SQL> grant dba to logminer;

 

3.3.       设置追加日志

添加追加日志:

SQL>alter database add supplemental log data(primary key,unique index) columns;

检查追加日志:

SQL>select supplemental_log_data_min,supplemental_log_data_pk,supplemental_log_data_ui from v$database;

 

删除追加日志:

alter database drop supplemental log data (primary key ,unique index) columns

3.4.       重启数据库

SQL> shutdown abort

SQL> startup

3.5.       查看日志清单

SQL>select * from v$logfile

3.6.       程序实现

3.6.1.     创建数据库字典

exec dbms_logmnr_d.build(dictionary_filename=>'zyhd.ora',dictionary_location=>'/home/logmnr');

3.6.2.     添加日志

可先查看日志清单,然后根据日志清单动态生成添加日志语句

exec dbms_logmnr.add_logfile(logfilename=>'/u01/app/oracle/oradata/orcl/redo01.log', options=>dbms_logmnr.new);

exec dbms_logmnr.add_logfile(logfilename=>'/u01/app/oracle/oradata/orcl/redo02.log', options=>dbms_logmnr.addfile);

exec dbms_logmnr.add_logfile(logfilename=>'/u01/app/oracle/oradata/orcl/redo03.log', options=>dbms_logmnr.addfile);

3.6.3.     从某个scn序列号开始分析日志

exec dbms_logmnr.start_logmnr(startScn=>’0’, dictfilename=>'/home/logmnr/zyhd.ora’, options=>dbms_logmnr.no_rowid_in_stmt);

3.6.4.     查询所有结果

SELECT scn,operation,timestamp,status,sql_redo FROM v$logmnr_contents WHERE seg_owner='ZH9000’ and seg_type_name=’TABLE’;

 

3.6.5.     释放分析内存

exec dbms_logmnr.end_logmnr;

 

3.7.       附件

3.7.1.     options定义

COMMITTED_DATA_ONLY

顾名思义就是只显示已经提交了的,那些正在进行中的及Oracle内部操作都忽略掉了

DDL_DICT_TRACKING

适用于在线日志存放LogMiner字典的情况,当表发生了添加字段等情况,字典自动更新。

NO_SQL_DELIMITER

去掉SQL_REDO及SQL_UNDO中SQL语句最后的分号,以CURSOR方式循环执行解析出的SQL会很方便和快捷。

NO_ROWID_IN_STMT

在SQL_REDO和SQL_UNDO列语句中去掉ROWID

4.   canal重构

com.alibaba.otter.canal.server.netty.handler.SessionHandler负责处理网络版本接收端的订阅(subscription)、记录传输(get)、取消订阅(unsubscribe)、传输确认(ack),经模仿slave向master数据库请求后,回应给接收端。

改为文件传输后,com.alibaba.otter.canal.server.netty.handler.SessionHandler不再继承SimpleChannelHandler,抽出subscription、get、unsubscribe、ack方法,在get完成时不再网络传输,翻译成sql语句后写本地文件。mysql版本的SessionHandler修改后的代码如下:

public
class
MysqlSessionHandler{
    private String
destination;
    private
short
clientId = 1001;
    private  String
filter = "";
    private String
dbName;
    private String
outputPath;
    private
static final
Logger    
logger
= LoggerFactory.getLogger(MysqlSessionHandler.class);
    private CanalServerWithEmbedded
embeddedServer;
   
    public String getDestination() {
        return
destination;
    }
 
    public
void
setDestination(String destination) {
        this.destination =
destination;
    }
 
    public String getDbName() {
        return
dbName;
    }
 
    public
void
setDbName(String dbName) {
        this.dbName =
dbName;
    }
 
    public String getOutputPath() {
        return
outputPath;
    }
 
    public
void
setOutputPath(String outputPath) {
        this.outputPath =
outputPath;
    }
 
    public SessionHandler(){
    }
 
    public MysqlSessionHandler(CanalServerWithEmbedded
embeddedServer){
        this.embeddedServer =
embeddedServer;
    }
 
    public
void
subscription(){
    ClientIdentity clientIdentity =
new ClientIdentity(destination,
clientId, filter);
    MDC.put("destination",
clientIdentity.getDestination());
    // 尝试启动,如果已经启动,忽略
        if (!embeddedServer.isStart(clientIdentity.getDestination())) {
            ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination());
            if (!runningMonitor.isStart()) {
                runningMonitor.start();
            }
        }
 
        embeddedServer.subscribe(clientIdentity);
    }
   
    public
void
unsubscribe(){
    ClientIdentity clientIdentity =
new ClientIdentity(destination,
clientId, filter);
        MDC.put("destination",
clientIdentity.getDestination());
        embeddedServer.unsubscribe(clientIdentity);
        stopCanalInstanceIfNecessary(clientIdentity);//
尝试关闭
        //NettyUtils.ack(ctx.getChannel(), null);
    }
   
    public
long
get(){
    ClientIdentity clientIdentity =
new ClientIdentity(destination,
clientId, filter);
        MDC.put("destination",
clientIdentity.getDestination());
        int
batchSize = 1000;
        Message message =
embeddedServer.getWithoutAck(clientIdentity,
batchSize);
        printEntry(message.getEntries(),
message.getId());
        return
message.getId();
    }
   
    public
void
ack(long
batchId){
    ClientIdentity clientIdentity =
new ClientIdentity(destination,
clientId, filter);
    MDC.put("destination",
clientIdentity.getDestination());
    embeddedServer.ack(clientIdentity,
batchId);
    }
    private
void
stopCanalInstanceIfNecessary(ClientIdentity
clientIdentity) {
        List<ClientIdentity> clientIdentitys =
embeddedServer.listAllSubscribe(clientIdentity.getDestination());
        if (clientIdentitys !=
null &&
clientIdentitys.size() == 1 && clientIdentitys.contains(clientIdentity)) {
            ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination());
            if (runningMonitor.isStart()) {
                runningMonitor.release();
            }
        }
    }
   
    private
void
printEntry(List<Entry> entrys,
long
batchId) {
        for (Entry
entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN ||
entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }
 
            RowChange rowChage =
null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception
e) {
                throw
new
RuntimeException("ERROR ## parser of eromanga-event has an error , data:" +
entry.toString(),e);
            }
 
            EventType eventType =
rowChage.getEventType();
            /*System.out.println(String.format("==>
binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));
            */
            if(dbName.equals(entry.getHeader().getSchemaName()))
            {
                String tableName=entry.getHeader().getTableName();
              List<String> sqls=new LinkedList<String>();
                for (RowData
rowData : rowChage.getRowDatasList()) {
                    String sql=buildSql(tableName,
rowData, eventType);
                    if(sql!=null){
                       sqls.add(sql);
                       System.err.println(sql);
                    }
                }
               
                try {
                    toLocal(sqls,
batchId);
                } catch (IOException
e) {
e.printStackTrace();
                }
            }
        }
    }
    final
static
String DELETE_SQL="delete from _tn_ where _cn_";
    final
static
String INSERT_SQL="insert into _tn_(_cn_) values(_vn_)";
    final
static
String UPDATE_SQL="update _tn_ set _vn_ where _cn_";
    //创建SQL
    private String buildSql(String
tableName,RowData
rowData,EventType eventType){
    tableName="`"+tableName+"`";
    if (eventType == EventType.DELETE) {
         StringBuffer cn=new StringBuffer();
         StringBuffer cn2=new StringBuffer();
         for (Column
column : rowData.getBeforeColumnsList()) {
             if(column.getIsKey()){
                 if(cn2.length()>0){
                       cn2.append(" and ");
                    }
                 cn2.append("`"+column.getName()+"`");
                 cn2.append("=");
                 if(!isNumberType(column.getMysqlType())){
                     cn2.append("'"+column.getValue()+"'");
                 }else{
                     cn2.append(column.getValue());
                 }
             }
             if(column.getValue()==null||"".equals(column.getValue())){
                 continue;
             }
             if(cn.length()>0){
                  cn.append(" and ");
                }
             cn.append("`"+column.getName()+"`");
             cn.append("=");
             if(!isNumberType(column.getMysqlType())){
                 cn.append("'"+column.getValue()+"'");
             }else{
                 cn.append(column.getValue());
             }
            }
         return DELETE_SQL.replaceAll("_tn_",
tableName).replaceAll("_cn_",
cn2.length()>0?cn2.toString():cn.toString());
        } else
if
(eventType == EventType.INSERT) {
        StringBuffer cn=new StringBuffer();
        StringBuffer vn=new StringBuffer();
         for (Column
column : rowData.getAfterColumnsList()) {
             if(cn.length()>0){
                  cn.append(",");
                  vn.append(",");
                }
             cn.append("`"+column.getName()+"`");
             if(!isNumberType(column.getMysqlType())){
                 vn.append("'"+column.getValue()+"'");
             }else{
                 vn.append(column.getValue());
             }
            }
         return INSERT_SQL.replaceAll("_tn_",
tableName).replaceAll("_cn_",
cn.toString()).replaceAll("_vn_",
vn.toString());
        } else {
        StringBuffer cn=new StringBuffer();
         StringBuffer cn2=new StringBuffer();
         for (Column
column : rowData.getBeforeColumnsList()) {
             if(column.getIsKey()){
                 if(cn2.length()>0){
                       cn2.append(" and ");
                    }
                 cn2.append("`"+column.getName()+"`");
                 cn2.append("=");
                 if(!isNumberType(column.getMysqlType())){
                     cn2.append("'"+column.getValue()+"'");
                 }else{
                     cn2.append(column.getValue());
                 }
             }
             if(column.getValue()==null||"".equals(column.getValue())){
                 continue;
             }
             if(cn.length()>0){
                  cn.append(" and ");
                }
             cn.append("`"+column.getName()+"`");
             cn.append("=");
             if(!isNumberType(column.getMysqlType())){
                 cn.append("'"+column.getValue()+"'");
             }else{
                 cn.append(column.getValue());
             }
            }
        StringBuffer vn=new StringBuffer();
         for (Column
column : rowData.getAfterColumnsList()) {
             if(!column.getUpdated()){
                 continue;
             }
             if(vn.length()>0){
                  vn.append(",");
                }
             vn.append("`"+column.getName()+"`");
             vn.append("=");
             if(!isNumberType(column.getMysqlType())){
                 vn.append("'"+column.getValue()+"'");
             }else{
                 vn.append(column.getValue());
             }
            }
         return UPDATE_SQL.replaceAll("_tn_",
tableName).replaceAll("_cn_",
cn2.length()>0?cn2.toString():cn.toString()).replaceAll("_vn_",
vn.toString());
        }
    }
   
  //判断数据库列字段类型是否是数字,不是数字的值需要加''
    private
static boolean
isNumberType(String
mysqlType){
    if(mysqlType.indexOf("int")==-1&&
             mysqlType.indexOf("float")==-1&&
             mysqlType.indexOf("double")==-1&&
             mysqlType.indexOf("decimal")==-1&&
             mysqlType.indexOf("numeric")==-1){
         return
false
;
    }
    return
true
;
    }
 
    public
void
setEmbeddedServer(CanalServerWithEmbedded
embeddedServer) {
        this.embeddedServer =
embeddedServer;
    }
   
    public
boolean
toLocal(List<String> sqls,
long
batchId) throws IOException{
    if(sqls.isEmpty())
         return
false
;
   
    String fileName =
outputPath + "/msyql_data.scn" +
batchId;
    File f = new File(fileName);
    FileOutputStream fop =
new
FileOutputStream(f);
   
    for(String
sql : sqls){
         fop.write(sql.getBytes());
         fop.write("\n".getBytes());
        }
       
        if(fop !=
null)
            fop.close();
       
        return
true
;
    }

所有增量记录改变都会保存在scn[记录编号]为后缀名的文件中。接收端只需要通过扫描文件夹中所有文件、通过inotify感知所有文件,并直接执行语句即可。

在com.alibaba.otter.canal.server.netty.CanalServerWithNetty中启动网络服务改为直接订阅、传输即可:

sessionHandler.subscription();
        while(true){
            long
batchId = sessionHandler.get();
            if(batchId > 0)
                 sessionHandler.ack(batchId);
            Thread.sleep(1000);
}
oracle版本的subscription、get、unsubscribe、ack实现如下:

public
void
subscription(){
    }
   
    public
void
unsubscribe(){
   
    }
   
    public
long
get(){
    Connection sourceConn =
null
;
        try {
            ResultSet resultSet =
null;
            // 获取源数据库连接
            Class.forName(dbDriver);
            sourceConn = java.sql.DriverManager.getConnection(dbUrl,
username, password);
           
            if(createDictInit){
                createDictionary(sourceConn);
                createDictInit =
false;
            }
           
            Statement statement =
sourceConn.createStatement();
            if(!logFileLoaded)
            {
                // 打印获分析日志文件信息
                resultSet =
statement.executeQuery("select member from v$logfile");
                String log =
new
String();
                while (resultSet.next()) {
                    log =
resultSet.getObject(1).toString();
                    System.out.println("已添加日志文件==>" +
log);
                    logFiles.add(log);
                }
               
                logFileLoaded =
true;
            }
           
            if(logFiles.isEmpty())
                return -1;
               
            // 添加所有日志文件,本代码仅分析联机日志
            StringBuffer sbSQL =
new StringBuffer();
            for(int
i = 0; i <
logFiles.size(); ++i){
                if(i == 0){
                    sbSQL.append("BEGIN\n");
                }
               
                sbSQL.append("dbms_logmnr.add_logfile(logfilename=>'"

                    + logFiles.get(i) +
"',options=>dbms_logmnr."
                    +(i == 0?"new":"addfile")+");\n");
            }
            sbSQL.append("END;\n");
           
            CallableStatement callableStatement =
sourceConn.prepareCall(sbSQL.toString());
            callableStatement.execute();
           
            //System.out.println("开始分析日志文件,
起始scn号:" + Constants.LAST_SCN);
            callableStatement =
sourceConn.prepareCall("BEGIN\n"
                    + "dbms_logmnr.start_logmnr(startScn=>'" +
scn
                    + "',dictfilename=>'" +
dataDictionary
                    + "/" +
oraName+"',options=>dbms_logmnr.no_rowid_in_stmt) ;"// ,options=>dbms_logmnr.committed_data_only + dbms_logmnr.no_rowid_in_stmt
                    + "END;");
            callableStatement.execute();
           
            // 查询获取分析结果
            //System.out.println("查询分析结果");
            String queryStr =
"SELECT scn,operation,sql_redo FROM v$logmnr_contents WHERE seg_owner='"
                    + username
                    + "' AND seg_type_name='TABLE'";//AND operation !='SELECT_FOR_UPDATE'
            System.out.println(queryStr);
            resultSet =
statement.executeQuery(queryStr);
            boolean
isCreateDictionary = toLocal(resultSet);
           
            callableStatement =
sourceConn.prepareCall("BEGIN\n"
                    + "dbms_logmnr.end_logmnr;\n"
                    + "END;");
            callableStatement.execute();
           
           
            // DDL发生变化,更新数据字典
            if (isCreateDictionary) {
                System.out.println("DDL发生变化,更新数据字典");
                createDictionary(sourceConn);
                System.out.println("完成更新数据字典");
                isCreateDictionary =
false;
            }
        } catch (ClassNotFoundException
e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (SQLException
e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (Exception
e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        finally {
            if (null !=
sourceConn) {
                try {
                    sourceConn.close();
                } catch (SQLException
e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            sourceConn =
null
;
        }
       
        return Long.valueOf(scn);
    }
   
    public
void
ack(long
batchId){
   
    }
   
    public
void
createDictionary(Connection sourceConn)
throws Exception {
        String createDictSql =
"BEGIN dbms_logmnr_d.build(dictionary_filename =>'"+oraName+"', dictionary_location =>'"
                + dataDictionary +
"'); END;";
        CallableStatement callableStatement =
sourceConn.prepareCall(createDictSql);
        callableStatement.execute();
    }
   
    public
boolean
toLocal(ResultSet resultSet)
throws SQLException, IOException{
        String lastScn =
scn;
        String operation =
null
;
        String sql = null;
        boolean
isCreateDictionary = false;
       
        String fileName =
outputPath + "/oracle_data.scn";
        fileName +=
lastScn.toString();
       
        File f = null;
        FileOutputStream fop =
null;
       
        while (resultSet.next()) {
            lastScn =
resultSet.getObject(1) + "";
            if (lastScn.equals(scn)) {
                continue;
            }
            operation =
resultSet.getObject(2) + "";
            if ("DDL".equalsIgnoreCase(operation)) {
                isCreateDictionary =
true;
            }
            sql =
resultSet.getObject(3) + "";
            // 删除用户ZH9000.
            //sql = sql.replace("\"" + Constants.SOURCE_CLIENT_USERNAME + "\".", "");
            //System.out.println("scn=" + lastScn + ",
自动执行sql==" +
sql + "");
            if(f ==
null){
                f = new File(fileName);
                if(!f.exists()){
                    f.createNewFile();
                }
               
                fop =
new
FileOutputStream(f);
            }
           
            fop.write(sql.getBytes());
            fop.write("\n".getBytes());
        }
       
        if(fop !=
null)
            fop.close();
       
        // 更新scn
        scn =
lastScn;
        saveScn2Local();
        return
isCreateDictionary;
    }
 
    public
void
setEmbeddedServer(CanalServerWithEmbedded
embeddedServer) {
      
    }
   
    public
void
saveScn2Local() throws IOException{
        String fileName =
outputPath + "/.scn";
        File f = new File(fileName);
        FileOutputStream fos =
new FileOutputStream(f);
        fos.write(scn.getBytes());
        fos.close();
    }
   
    @SuppressWarnings("resource")
    public String loadScnFromLocal(){
        String fileName =
outputPath + "/.scn";
        File f = new File(fileName);
        FileReader reader;
       
        try {
            reader = new FileReader(f);
            int
fileLen = (int)
f.length();
            char []
buffer = new
char
[fileLen];
            reader.read(buffer);
            return String.copyValueOf(buffer);
        } catch (IOException
e) {
            return
"0";
        }
    }

最后,在com.alibaba.otter.canal.deployer. CanalController中,可根据需要修改配置文件定义,并加载配置:

cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID));
        destination = getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
        dbName = getProperty(properties, CanalConstants.CANAL_DBNAME);
        outputPath = getProperty(properties, CanalConstants.CANAL_OUTPUT);
        embededCanalServer = CanalServerWithEmbedded.instance();
        embededCanalServer.setCanalInstanceGenerator(instanceGenerator);//
设置自定义的instanceGenerator
        canalServer = CanalServerWithNetty.instance();
        canalServer.setDestination(destination);;
        canalServer.setDbName(dbName);
        canalServer.setOutputPath(outputPath);

至此完成mysql、oracle的发送端实现。

当然,在修改基于网络传输改为基于文件时,应考虑不破坏代码架构,可通过参数配置是基于网络传输还是基于文件,这就属于之后的代码整理工作了。

5.       参考

5.1.       http://www.cnblogs.com/shishanyuan/p/3140440.html
5.1.1.     《1.OracleLogminer配置使用.pdf》

5.1.2.     《2.OracleLogminer性能测试.pdf》

5.1.3.     《3.使用OracleLogminer同步Demo.pdf》

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  canal oracle mysql logminer