您的位置:首页 > 编程语言 > Java开发

GoldenGate for Java adapter介绍二(代码篇)

2018-04-01 22:04 495 查看

本示例主要介绍通过实现OGG的接口函数,实现自定义处理增量数据,将数据实时写入到mariadb (OGG官方不支持此数据库,所以只能采用自定义方式实现)。以下是本次示例的4个类:

ConnectionFactory

package sample.handler.jdbc;import java.sql.Connection;public interface ConnectionFactory {public Connection getConnection() throws Exception;}


DriverClassConnectionFactory

package sample.handler.jdbc;import java.sql.Connection;import java.sql.Driver;import java.sql.SQLException;import java.util.Properties;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class DriverClassConnectionFactory implements ConnectionFactory {private static final Logger logger = LoggerFactory.getLogger(DriverClassConnectionFactory.class);protected String driver;protected String url;protected Properties arguments;@Overridepublic Connection getConnection() throws InstantiationException, IllegalAccessException, ClassNotFoundException, SQLException {logger.info(new StringBuilder("Connect to [").append(url).append("] via [").append(driver).append("] with arguments: ").append(arguments).toString());Driver driv = (Driver)Class.forName(driver).newInstance();return driv.connect(url, arguments);}/*** @return the driver*/public String getDriver() {return driver;}/*** @param driver the driver to set*/public void setDriver(String driver) {this.driver = driver;}/*** @return the url*/public String getUrl() {return url;}/*** @param url the url to set*/public void setUrl(String url) {this.url = url;}/*** @return the arguments*/public Properties getArguments() {return arguments;}/*** @param arguments the arguments to set*/public void setArguments(Properties arguments) {this.arguments = arguments;}/*** @param user the user to set*/public void setUser(String user) {if (arguments==null) {arguments = new Properties();}arguments.setProperty("user", user);}/*** @param password the password to set*/public void setPassword(String password) {if (arguments==null) {arguments = new Properties();}arguments.setProperty("password", password);}}

[p]

OGG接口实现类SimpleJDBCHandler

[table] [tr] [td] package sample.handler.jdbc;import java.math.BigDecimal;import java.sql.Connection;import java.sql.Driver;import java.sql.PreparedStatement;import java.sql.SQLException;import java.sql.Timestamp;import java.sql.Types;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Arrays;import java.util.Properties;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import oracle.goldengate.datasource.*;import oracle.goldengate.datasource.GGDataSource.Status;import oracle.goldengate.datasource.meta.*;public class SimpleJDBCHandler extends AbstractHandler {private static final Logger logger = LoggerFactory.getLogger(SimpleJDBCHandler.class);protected ConnectionFactory connFactory;protected Connection conn;protected String driver;protected String url;protected Properties arguments;protected PreparedStatement pstmt;protected String lastOp;protected int keyIndex;protected int batchSize = 0;protected int maxBatchSize = 10000;protected long reportCount = 0;protected long lastReportTime;protected long opCount;protected SimpleDateFormat dateFormat;/*** @param connectionFactory the Class name of connection factory to set* @throws ClassNotFoundException * @throws IllegalAccessException * @throws InstantiationException */public void setConnectionFactory(String connectionFactory) throws InstantiationException, IllegalAccessException, ClassNotFoundException {connFactory = (ConnectionFactory)Class.forName(connectionFactory).newInstance();}// 获取目标端的DB连接public Connection getConnection() throws Exception {if (conn==null) {if (connFactory==null) {logger.info(new StringBuilder("Connect to [").append(url).append("] via [").append(driver).append("] with arguments: ").append(arguments).toString());Driver driv = (Driver)Class.forName(driver).newInstance();conn = driv.connect(url, arguments);} else {conn = connFactory.getConnection();}conn.setAutoCommit(false);}return conn;}/* (non-Javadoc)* 一个事务的开始*/@Overridepublic Status transactionBegin(DsEvent e, DsTransaction tx) {if (logger.isDebugEnabled())logger.debug("Method transactionBegin invoked. TxID="+tx.getTranID());//batchSize = 0;return super.transactionBegin(e, tx);}/* (non-Javadoc)*一个新的记录操作*/@Overridepublic Status operationAdded(DsEvent e, DsTransaction tx, DsOperation op) {if (logger.isDebugEnabled())logger.debug(new StringBuilder("Method operationAdded invoked. TxID=").append(tx.getTranID()).append(" OpNum=").append(tx.getTotalOps()).toString());Status status = GGDataSource.Status.OK;if (op.getOperationType().isInsert()) {status = insertAdded(e, tx, op);} else if (op.getOperationType().isUpdate()) {status = updateAdded(e, tx, op);} else if (op.getOperationType().isDelete()) {status = deleteAdded(e, tx, op);}if (reportCount>=100) {if (++opCount == reportCount) {long duration = System.currentTimeMillis() - lastReportTime;logger.info(new StringBuffer().append(opCount).append(" operations have been processed in last ").append(duration/1000).append(" seconds. Rate=").append(opCount*1000/duration).toString());opCount = 0;lastReportTime = System.currentTimeMillis();}}return status;}// insert操作protected Status insertAdded(DsEvent e, DsTransaction tx, DsOperation op) {try {String currentOp = op.getTableName()+".INSERT";if (!currentOp.equals(lastOp)) {if (batchSize>0 && pstmt!=null) {executeBatch();}String sql = prepareInsertSql(e, tx, op);System.out.println("insert-sql:"+ sql);if (logger.isDebugEnabled())logger.debug("Prepare insert. SQL=["+sql+"]");pstmt = getConnection().prepareStatement(sql);lastOp = currentOp;}TableMetaData tmeta = e.getMetaData().getTableMetaData(op.getTableName());for (int i=0; i0 && pstmt!=null) {executeBatch();}if (logger.isDebugEnabled())logger.debug("Prepare update. SQL=["+sql+"]");pstmt = getConnection().prepareStatement(sql);lastOp = sql;}System.out.println("update-sql:"+ sql);TableMetaData tmeta = e.getMetaData().getTableMetaData(op.getTableName());for (int i=0, j=0, k=keyIndex; i0 && pstmt!=null) {executeBatch();}String sql = prepareDeleteSql(e, tx, op);System.out.println("delete-sql:"+ sql);if (logger.isDebugEnabled())logger.debug("Prepare delete. SQL=["+sql+"]");pstmt = getConnection().prepareStatement(sql);lastOp = currentOp;}TableMetaData tmeta = e.getMetaData().getTableMetaData(op.getTableName());for (int i=0, j=0; i
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: