使用canal同步Mysql操作到Oracle(windows)
版本信息
java version “1.8.0_141”
Mysql version mysql-8.0.19-winx64
Canal version canal.deployer.1.1.3
rt = 11112
配置mysql的my.ini配置文件
[mysqld]
mysql_native_password default_authentication_plugin=mysql_native_password
server-id=1
bind-address=0.0.0.0
#开启binlog日志
log-bin=mysql-bin
binlog_format = ROW
[mysql]
#设置mysql客户端默认字符集
default-character-set=utf8mb4
[client]
#设置mysql客户端连接服务端时默认使用的端口
port=3306
default-character-set=utf8mb4
配置canal.deployer.1.1.3\conf\example\instance.properties
#position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
#rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
#table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
配置canal.deployer.1.1.3\conf\canal.properties
#本地IP192.168.31.1:3306
canal.manager.jdbc.url=jdbc:mysql://192.168.31.1:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8
#canal.manager.jdbc.username=root
#canal.manager.jdbc.password=121212
canal.destinations=example
#与my.ini内的server id= 不同即可
canal.id = 111111
canal.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
#table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = canal.file.data.dir:../conf/{canal.file.data.dir:../conf}/canal.file.data.dir:../conf/{canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
#dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
#purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360
#aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
#################################################
######### destinations #############
#################################################
canal.destinations = example
#conf root dir
canal.conf.dir = …/conf
#auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
重启mysql,登录mysql内
创建canal用户和授权;
// 新增用户
CREATE USER ‘canal’@’%’ IDENTIFIED BY ‘canal’;
// 授权
GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘canal’@’%’;
// 刷新
FLUSH PRIVILEGES;
开启canal.deployer.1.1.3\bin\start.bat
cmd界面:
查看:canal.deployer.1.1.3\logs\canal\canal.log
canal配置的其他参数以及解释:
参考:写的很详细,在此感谢大神的辛苦创作
https://blog.csdn.net/u012758088/article/details/78789616?depth_1-utm_source=distribute.pc_relevant.none-task&utm_source=distribute.pc_relevant.none-task
javacanal连接。代码
注意事项一:下面的代码需要保证mysql与oracle导入导出的数据库名称和表名称完全一致,而且在mysql端创建删除表,oracle也会有对应的操作,创建表之后就可以插入数据,
因为Oracle与mysql的sql语句不通,而且拿到sql语句不支持oracle直接操作,因此该代码不支持delete,insert,update,create,drop之外的操作,需要另外开发(没必要)
注意事项二:因为拿到的数据均为string,所以在对数据进行操作时,除了基本数据类型和String类型,其他类型均要转换成oracle支持的数据类型,date的已经做了转换,基本数据类型和string不需要装换,如果有其他数据类型时,请先校验转换,参考date类型的处理方式,除date外,sql语句拼接的都是string(oracle可以用string数据导入基本数据类型的字段,例如 “1”等同于int 1)
注意事项3:如果有其他数据类型而且不能做转换,请联系QQ:1078442730,有另外的处理方式,但是要牺牲灵活性,所有的表都要事先创建对象,如果表有100个字段,就要创建100个属性,重复代码太多,灵活性太低,但是可以保证所有数据类型在mysql端和oracle端是一致的。
canla连接:通过配置文件连接 package com.zzw.Conn; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import java.io.FileInputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.util.Properties; public class CanalConn { static String hostname = null; static int port = 0; static String destination = null; static String username = null; static String password = null; static { Properties properties = new Properties(); try { properties.load(newFileInputStream("D:\\idea_code\\CanalMysqlToOrcal\\lib\\canal")); hostname = properties.getProperty("hostname"); port = Integer.parseInt(properties.getProperty("port")); destination = properties.getProperty("destination"); username = properties.getProperty("username"); password = properties.getProperty("Cpassword"); } catch (IOException e) { e.printStackTrace(); } } public static CanalConnector getconn(){ InetSocketAddress isa= new InetSocketAddress(hostname,port); CanalConnector connector = CanalConnectors.newSingleConnector(isa,destination,username,password); System.out.println("connection Successfully"); return connector; } }
Canal的java配置文件:
hostname=192.168.31.1 canal运行的客户端IP
port=11111 canal的conf下的配置文件中配置的port
destination=example canal的conf下的配置文件中配置的destination
username=canal 第四步创建授权的canal用户与密码,与canal的conf下的
配置文件中配置的保持一致
Password=canla
CanalMysqlToOrcal下的代码部分详解
package com.zzw.RunSoft; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.protocol.Message; import com.beimingsoft.Conn.CanalConn; import com.beimingsoft.Conn.OracleConn; import com.beimingsoft.actiontooracle.ActionToOracle; import java.sql.Connection; import java.sql.PreparedStatement; /** * long batchId = message.getId();此参数类似于KAFKA的偏移量,当操作成功时,偏移量增加,否则回滚 * connector.ack(batchId):提交偏移量 * connector.rollback(batchId):回滚偏移量 * tips: batchid=message.getId(),指的是同一个库同一个表的偏移量,例如如果操作的表未在目标库创建,则在创建后消费对应的message.getId() * 也就是:每个表都有对应的message.getId(),并且相互之间互不影响 */ public class MysqlToOracle { public static void main(String[] args) { //获取oracle连接 Connection conn = OracleConn.getConnection(); //创建PreparedStatement PreparedStatement ps = null; //创建GetTabFileds 对象 //获取canal的连接 CanalConnector connector = CanalConn.getconn(); connector.connect(); /*bin-log的分隔符,与cmd界面的: [New I/O server worker #1-2] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$保持一致,默认就是.*\..* 不建议修改canal的该项配置 */ connector.subscribe(".*\\..*"); //持续拉取数据,有bin-log产生就会消费 while (true) { //i,用于判断是否执行成功,i==1,成功,提交message.get.Id,否则回滚message.get.Id int i = 0; // 获取指定数量的数据 Message message = connector.getWithoutAck(100); //拿到偏移量 long batchId = message.getId(); //如果没有数据可以拉取,则休眠1s if (batchId == -1 || message.getEntries().isEmpty()) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } continue; } //得到i值,并执行操作 i = ActionToOracle.action(conn, ps, message.getEntries()); if (i == 1) { // 提交确认,消费成功,通知server删除数据 connector.ack(batchId); System.out.println("偏移量更新成功"); //***************************************************************** //因为在代码测试过程中 mysql与oracle的sql语句不通,所以会不断报错,所以设置了无论成功还是失败都提交偏移量 // 不然会不停的回滚不停地拉取不停的报错,因此按照需要这部分代码需要按照实际更改 } else if (i==0) { Connector.ack(batchId); System.out.println("再见蠢货"); ***//为防止不停报错,以下语句实际不会出现,按照需求修改*** }else { // 处理失败, 回滚数据,后续重新获取数据 connector.rollback(batchId); System.out.println("偏移量回滚成功"); } } } }
对oracle操作部分的代码
package com.zzw.actiontooracle; import com.alibaba.otter.canal.protocol.CanalEntry; import com.google.protobuf.InvalidProtocolBufferException; import java.sql.Connection; import java.sql.PreparedStatement; import java.util.List; public class ActionToOracle { //每个方法的返回值 1,代表成功,0代表失败 ,默认0 private static int status = 0; //从sql语句中抽取元数据表的库名和表名 public static String from_tab = null; public static String from_db = null; //ddl操作的sql语句 private static String ddl_sql = null; public static int action(Connection conn, PreparedStatement ps, List<CanalEntry.Entry> entries) { for (CanalEntry.Entry entry : entries) { if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) { continue; } CanalEntry.RowChange rowChange = null; try { //拿到binlog rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } CanalEntry.EventType eventType = rowChange.getEventType(); //从sql语句中抽取元数据表的库名和表名 from_tab = entry.getHeader().getTableName(); from_db = entry.getHeader().getSchemaName(); //拿到sql,该SQL智能拿到DDLSQL语句,DMLSQL语句拿不到,dml操作的话,rowChange.getSql()不执行(测试结果如此,没有查到资料讲为什么这样) ddl_sql = rowChange.getSql(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); /** 如果是ddl语句,匹配insert,delete,update,执行相关操作 */ if (!rowChange.getIsDdl()) { for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { switch (eventType) { case INSERT: System.out.println(); System.out.println("INSERT "); status = ActionToOracleFuction.insertToOracle(conn, ps, rowData.getAfterColumnsList()); break; case UPDATE: System.out.println(); System.out.println("UPDATE "); status = ActionToOracleFuction.updateToOracle(conn, ps, rowData.getAfterColumnsList(),rowData.getBeforeColumnsList()); break; case DELETE: System.out.println(); System.out.println("DELETE "); status = ActionToOracleFuction.deleteFromOracle(conn, ps, rowData.getBeforeColumnsList()); break; default: System.out.println(eventType); break; } } } else { //如果是DDL操作 执行下面的语句 System.out.println(); System.out.println("ddl操作"); status = ActionToOracleFuction.ddlAction(conn, ps, ddl_sql, eventType); } } return status; } }
执行oracle实际操作的函数类
package com.zzw.actiontooracle; import com.alibaba.otter.canal.protocol.CanalEntry; import com.beimingsoft.Conn.OracleConn; import java.sql.Connection; import java.sql.Date; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import static com.beimingsoft.actiontooracle.ActionToOracle.from_db; import static com.beimingsoft.actiontooracle.ActionToOracle.from_tab; ; public class ActionToOracleFuction { private static int deletestatus = 0; private static int updatestatus = 0; private static int insertstatus = 0; private static int ddlactionstatus = 0; private static String sql = null; private static Date date = null; private static List<Date> dates = new ArrayList<>(); //beforecolumns:执行DML操作之前的数据集 aftercolimns :之心DML操作之后的数据集 //具体思路查看insertFromOracle函数,总之一句话,拿到字段名,对应的值,重新组装一个适合oracle的SQL语句 //但是要注意date类型的数据,从beforecolumns或者aftercolumns中拿到的都是String,对于oracle插入的整数,//小数,字符串没有影响,对date类型不可操作,具体解决办法查看insertFromOracle,是否还有其他数据类型需要做校对//还不清楚 public static int deleteFromOracle(Connection conn, PreparedStatement ps, List<CanalEntry.Column> beforecolumns) { int i = 1; sql = "delete from " + from_db + "." + from_tab + " where "; for (CanalEntry.Column column : beforecolumns) { try { date = Date.valueOf(column.getValue()); } catch (IllegalArgumentException e) { sql = sql + column.getName() + "='" + column.getValue() + "' and "; } if (date != null) { sql = sql + column.getName() + "=? and "; dates.add(date); date = null; } } sql = sql.substring(0, sql.lastIndexOf("and")); System.err.println(sql); conn = OracleConn.getConnection(); try { ps = conn.prepareStatement(sql); for (Date date : dates) { ps.setDate(i, date); i++; } ps.execute(); System.out.println("sql提交成功"); conn.commit(); System.out.println("commit成功,执行状态:success"); deletestatus = 1; } catch (SQLException e) { e.printStackTrace(); System.err.println("表或视图不存在,请检查设置"); } finally { OracleConn.close(ps, conn); dates.clear(); } return deletestatus; } public static int updateToOracle(Connection conn, PreparedStatement ps, List<CanalEntry.Column> aftercolumns, List<CanalEntry.Column> beforecolumns) { String sql = "update " + from_db + "." + from_tab + " set "; int i = 1; for (CanalEntry.Column column : aftercolumns) { try { date = Date.valueOf(column.getValue()); } catch (IllegalArgumentException e) { sql = sql + column.getName() + "='" + column.getValue() + "',"; } if (date != null) { sql = sql + column.getName() + "=?,"; dates.add(date); date = null; } } sql = sql.substring(0, sql.lastIndexOf(",")) + " where "; for (CanalEntry.Column column : beforecolumns) { try { date = Date.valueOf(column.getValue()); } catch (IllegalArgumentException e) { sql = sql + column.getName() + "='" + column.getValue() + "' and "; } if (date != null) { sql = sql + column.getName() + "=? and "; dates.add(date); date = null; } } sql = sql.substring(0, sql.lastIndexOf("and")); System.err.println(sql); conn = OracleConn.getConnection(); try { conn = OracleConn.getConnection(); ps = conn.prepareStatement(sql); for (Date date : dates) { //把dates中的date取出,i=1,所以如果有date类型的数据就传到sql,同时i自增,i的值与参数的顺序就有序了 ps.setDate(i, date); i++; } ps.execute(); System.out.println("sql提交成功"); conn.commit(); System.out.println("commit成功,执行状态:success"); updatestatus = 1; } catch (SQLException e) { e.printStackTrace(); System.err.println("表或视图不存在,请检查设置"); } finally { OracleConn.close(ps, conn); //等把所有的dates中的date取出后清空dates,不然影响dates的数据会一直增加,传参到SQL有误 dates.clear(); } return updatestatus; } public static int insertToOracle(Connection conn, PreparedStatement ps, List<CanalEntry.Column> aftercolumns) { //ps的第一个参数,SQL语句的第一个?的索引值 int i = 1; //拼接sql : 例如:insert into scott.userinfo ( String sql = "insert into " + from_db + "." + from_tab + " ("; //拼接sql:例如:insert into scott.userinfo (u_id,u_name,u_age, for (CanalEntry.Column column : aftercolumns) { sql = sql + column.getName() + ","; } //截取sql并拼接:例如 insert into scott.userinfo (u_id,u_name,u_age) values ( sql = sql.substring(0, sql.lastIndexOf(",")) + ") values("; //拼接sql:例如insert into scott.userinfo (u_id,u_name,u_age) values (1,,张三',1, for (CanalEntry.Column column : aftercolumns) { try { //把String转成sqlDate,如果不Date类型则下面语句不执行,执行catch内容 date = Date.valueOf(column.getValue()); } catch (IllegalArgumentException e) { sql = sql + "'" + column.getValue() + "'" + ","; } //如果没有catch到异常,则拼接sql,把?拼接到sql中,同时把date放到list中 if (date != null) { sql = sql + "?,"; dates.add(date); date = null; } } //截取拼接sql:例如insert into scott.userinfo (u_id,u_name,u_age) values (1,,张三',1) sql = sql.substring(0, sql.lastIndexOf(",")) + ")"; System.err.println(sql); try { conn = OracleConn.getConnection(); ps = conn.prepareStatement(sql); for (Date date : dates) { //把dates中的date取出,i=1,所以如果有date类型的数据就传到sql,同时i自增,i的值与参数的顺序就有序了 ps.setDate(i, date); i++; } ps.execute(); System.out.println("sql提交成功"); conn.commit(); System.out.println("commit成功,执行状态:success"); insertstatus = 1; } catch (SQLException e) { e.printStackTrace(); System.err.println("表或视图不存在,请检查设置"); } finally { OracleConn.close(ps, conn); //等把所有的dates中的date取出后清空dates,不然影响dates的数据会一直增加,传参到SQL有误 dates.clear(); } return insertstatus; } public static int ddlAction(Connection conn, PreparedStatement ps, String ddl_sql, CanalEntry.EventType eventType) { switch (eventType) { case CREATE: sql = ddl_sql; break; case ERASE: //解析出来的binlog日志内的DROPsql:DROP TABLE `u` /* generated by server */ //oracl无法直接使用该sql,所以通过元信息中的数据库名,和表名重组sql //因此需要保证mysql和oracle中同步的数据库表名和数据名一致。不一致也可以,但是需要添加配置,在配置中设定 //而且会有局限性。 sql = "drop table " + from_db + "." + from_tab; } System.err.println(sql); try { conn = OracleConn.getConnection(); ps = conn.prepareStatement(sql); ps.execute(); System.out.println("sql提交成功"); conn.commit(); System.out.println("commit成功,执行状态:success"); ddlactionstatus = 1; } catch (SQLException e) { System.err.println("操作失败"); } finally { OracleConn.close(ps, conn); } return ddlactionstatus; } }
- 使用oracle goldengate 实现windows下mysql到oracle的数据同步
- 使用GoldenGate实现MySQL到Oracle的数据实时同步
- 使用canal同步mysql数据
- 使用canal进行mysql数据同步到Redis
- 使用JDBC操作数据库(Oracle,Mysql,SQLSERVER)
- 使用canal进行mysql数据同步到Redis
- 在Windows中 基于Oracle GoldenGate (OGG)进行MySQL->MySQL数据库同步配置(超详细)
- Oracle技术之使用goldengate同步mysql
- MySQL操作数据库命令汇总之windows 命令行使用
- Windows下使用MySQL客户端连接MySQL服务器的操作
- 使用canal进行mysql数据同步到Redis
- 使用KETTLE从mysql同步增量数据到oracle
- 使用canal同步mysql到mysql
- MFC中使用ADO操作各类数据库的封装类,包括MySql、Access、Oracle、MSSql
- GitHub for windows 使用【创建、提交、同步、分支等操作】总结
- 使用canal进行mysql数据同步到Redis
- 在windows环境下使用命令行控制Mysql(一、基础操作)
- 【原】无脑操作:Windows 10 + MySQL 5.5 安装使用及免安装使用
- nodejs使用async/await同步操作mysql
- 基于canal实现mysql、oracle的数据库实时同步