GoldenGate for Java adapter介绍二(代码篇)
2018-04-01 22:04
495 查看
本示例主要介绍通过实现OGG的接口函数,实现自定义处理增量数据,将数据实时写入到mariadb (OGG官方不支持此数据库,所以只能采用自定义方式实现)。以下是本次示例的4个类:
ConnectionFactory
package sample.handler.jdbc;import java.sql.Connection;public interface ConnectionFactory {public Connection getConnection() throws Exception;} |
DriverClassConnectionFactory
package sample.handler.jdbc;import java.sql.Connection;import java.sql.Driver;import java.sql.SQLException;import java.util.Properties;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class DriverClassConnectionFactory implements ConnectionFactory {private static final Logger logger = LoggerFactory.getLogger(DriverClassConnectionFactory.class);protected String driver;protected String url;protected Properties arguments;@Overridepublic Connection getConnection() throws InstantiationException, IllegalAccessException, ClassNotFoundException, SQLException {logger.info(new StringBuilder("Connect to [").append(url).append("] via [").append(driver).append("] with arguments: ").append(arguments).toString());Driver driv = (Driver)Class.forName(driver).newInstance();return driv.connect(url, arguments);}/*** @return the driver*/public String getDriver() {return driver;}/*** @param driver the driver to set*/public void setDriver(String driver) {this.driver = driver;}/*** @return the url*/public String getUrl() {return url;}/*** @param url the url to set*/public void setUrl(String url) {this.url = url;}/*** @return the arguments*/public Properties getArguments() {return arguments;}/*** @param arguments the arguments to set*/public void setArguments(Properties arguments) {this.arguments = arguments;}/*** @param user the user to set*/public void setUser(String user) {if (arguments==null) {arguments = new Properties();}arguments.setProperty("user", user);}/*** @param password the password to set*/public void setPassword(String password) {if (arguments==null) {arguments = new Properties();}arguments.setProperty("password", password);}} |
OGG接口实现类SimpleJDBCHandler
[table] [tr] [td] package sample.handler.jdbc;import java.math.BigDecimal;import java.sql.Connection;import java.sql.Driver;import java.sql.PreparedStatement;import java.sql.SQLException;import java.sql.Timestamp;import java.sql.Types;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Arrays;import java.util.Properties;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import oracle.goldengate.datasource.*;import oracle.goldengate.datasource.GGDataSource.Status;import oracle.goldengate.datasource.meta.*;public class SimpleJDBCHandler extends AbstractHandler {private static final Logger logger = LoggerFactory.getLogger(SimpleJDBCHandler.class);protected ConnectionFactory connFactory;protected Connection conn;protected String driver;protected String url;protected Properties arguments;protected PreparedStatement pstmt;protected String lastOp;protected int keyIndex;protected int batchSize = 0;protected int maxBatchSize = 10000;protected long reportCount = 0;protected long lastReportTime;protected long opCount;protected SimpleDateFormat dateFormat;/*** @param connectionFactory the Class name of connection factory to set* @throws ClassNotFoundException * @throws IllegalAccessException * @throws InstantiationException */public void setConnectionFactory(String connectionFactory) throws InstantiationException, IllegalAccessException, ClassNotFoundException {connFactory = (ConnectionFactory)Class.forName(connectionFactory).newInstance();}// 获取目标端的DB连接public Connection getConnection() throws Exception {if (conn==null) {if (connFactory==null) {logger.info(new StringBuilder("Connect to [").append(url).append("] via [").append(driver).append("] with arguments: ").append(arguments).toString());Driver driv = (Driver)Class.forName(driver).newInstance();conn = driv.connect(url, arguments);} else {conn = connFactory.getConnection();}conn.setAutoCommit(false);}return conn;}/* (non-Javadoc)* 一个事务的开始*/@Overridepublic Status transactionBegin(DsEvent e, DsTransaction tx) {if (logger.isDebugEnabled())logger.debug("Method transactionBegin invoked. TxID="+tx.getTranID());//batchSize = 0;return super.transactionBegin(e, tx);}/* (non-Javadoc)*一个新的记录操作*/@Overridepublic Status operationAdded(DsEvent e, DsTransaction tx, DsOperation op) {if (logger.isDebugEnabled())logger.debug(new StringBuilder("Method operationAdded invoked. TxID=").append(tx.getTranID()).append(" OpNum=").append(tx.getTotalOps()).toString());Status status = GGDataSource.Status.OK;if (op.getOperationType().isInsert()) {status = insertAdded(e, tx, op);} else if (op.getOperationType().isUpdate()) {status = updateAdded(e, tx, op);} else if (op.getOperationType().isDelete()) {status = deleteAdded(e, tx, op);}if (reportCount>=100) {if (++opCount == reportCount) {long duration = System.currentTimeMillis() - lastReportTime;logger.info(new StringBuffer().append(opCount).append(" operations have been processed in last ").append(duration/1000).append(" seconds. Rate=").append(opCount*1000/duration).toString());opCount = 0;lastReportTime = System.currentTimeMillis();}}return status;}// insert操作protected Status insertAdded(DsEvent e, DsTransaction tx, DsOperation op) {try {String currentOp = op.getTableName()+".INSERT";if (!currentOp.equals(lastOp)) {if (batchSize>0 && pstmt!=null) {executeBatch();}String sql = prepareInsertSql(e, tx, op);System.out.println("insert-sql:"+ sql);if (logger.isDebugEnabled())logger.debug("Prepare insert. SQL=["+sql+"]");pstmt = getConnection().prepareStatement(sql);lastOp = currentOp;}TableMetaData tmeta = e.getMetaData().getTableMetaData(op.getTableName());for (int i=0; i0 && pstmt!=null) {executeBatch();}if (logger.isDebugEnabled())logger.debug("Prepare update. SQL=["+sql+"]");pstmt = getConnection().prepareStatement(sql);lastOp = sql;}System.out.println("update-sql:"+ sql);TableMetaData tmeta = e.getMetaData().getTableMetaData(op.getTableName());for (int i=0, j=0, k=keyIndex; i0 && pstmt!=null) {executeBatch();}String sql = prepareDeleteSql(e, tx, op);System.out.println("delete-sql:"+ sql);if (logger.isDebugEnabled())logger.debug("Prepare delete. SQL=["+sql+"]");pstmt = getConnection().prepareStatement(sql);lastOp = currentOp;}TableMetaData tmeta = e.getMetaData().getTableMetaData(op.getTableName());for (int i=0, j=0; i相关文章推荐
- 无需编写Java代码就能生成增删改查功能的CZTZ-JavaEE平台介绍
- 介绍一个java代码自动生产工具
- 一段代码, 搞明白Java中的for语句
- Auto activation triggers for Java(代码提示)功能扩展
- 介绍一个 C/C++ 、C#、JAVA 代码美化工具
- 介绍一个 C/C++ 、C#、JAVA 代码美化工具
- Java代码缺陷自动分析工具介绍
- 静态代码分析工具-jenkins应用(checkstyle and findbugs for java,cccc and cppcheck for c/c++)
- goldengate for sqlserver 2008 R2
- Code Conventions for Java java代码写法规范小结
- java代码缺陷自动分析工具之FindBugs介绍
- 几种java for循环写法介绍
- GoldenGate for win安装配置
- goldengate for oracle 10g学习
- Java 代码缺陷自动分析工具介绍
- goldengate for sqlserver 2008 R2
- 无需编写Java代码就能生成增删改查功能的CZTZ-JavaEE平台介绍
- 静态代码分析工具-jenkins应用(checkstyle and findbugs for java,cccc and cppcheck for c/c++)
- 关于eclipse编写java代码时不能运行for-each语句的问题
- GoldenGate Build for Oracle 8i