您的位置:首页 > 其它

异构数据的同步——kettle

2014-05-12 16:02 211 查看
首先说明一下:在网上看到了别人这样实现了这样的功能
整理记录一下,好歹也是我的实验成果

例子库:https://github.com/cwarden/kettle/tree/master/samples/transformations

问题:如果不使用资源库,获得数据库连接该如果获得呢?

java.util.List list = getTrans().getRepository().readDatabases();

同时放一下:http://ainidehsj.iteye.com/blog/1735434
别人的成果

这个问题完美了,找到解决方法了:
import java.sql.*;
import org.pentaho.di.core.database.*;

private DatabaseMeta databaseMeta;
private Database database;

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
Object[] r = getRow();
if (r == null) {
setOutputDone();
return false;
}

if (first)
{
first = false;
}

String tableName = getInputRowMeta().getString(r, getParameter("TABLEFIELD"), null );
if (tableName==null) {
throw new KettleException("Unable to find field with name "+getParameter("TABLEFIELD")+" in the input row.");
}


ResultSet resultSet;

try {
resultSet = database.getDatabaseMetaData().getIndexInfo(null, null, tableName, true, false);
Object[] idxRow = database.getRow(resultSet);
while (idxRow!=null) {
RowMetaInterface idxRowMeta = database.getReturnRowMeta();

r = createOutputRow(r, data.outputRowMeta.size());
int index = getInputRowMeta().size();

// Add the index name
//
r[index++] = idxRowMeta.getString(idxRow, "INDEX_NAME", null);

// Add the column name
//
r[index++] = idxRowMeta.getString(idxRow, "COLUMN_NAME", null);

// Add the ordinal position
//
r[index++] = idxRowMeta.getInteger(idxRow, "ORDINAL_POSITION", -1L);

// Add uniqueness
//
r[index++] = idxRowMeta.getBoolean(idxRow, idxRowMeta.indexOfValue("NON_UNIQUE"));

putRow(data.outputRowMeta, r);

idxRow = database.getRow(resultSet);
}
} catch(Exception e) {
throw new KettleException(e);
}

if (resultSet!=null) database.closeQuery(resultSet);


database.closeQuery(resultSet);

return true;
}

public boolean init(StepMetaInterface stepMetaInterface, StepDataInterface stepDataInterface)
{
if (!parent.initImpl(stepMetaInterface, stepDataInterface)) {
return false;
}
String dbName = getParameter("oraa");
databaseMeta = getTransMeta().findDatabase(dbName);
if (databaseMeta==null) {
logError("A connection with name "+dbName+" could not be found!");
setErrors(1);
return false;
}

database = new Database(getTrans(), databaseMeta);

try {
database.connect();
} catch(Exception e) {
logError("Connecting to database "+dbName+" failed.", e);
setErrors(1);
return false;
}

return true;
}

public void dispose(StepMetaInterface smi, StepDataInterface sdi)
{
if (database!=null) {
database.disconnect();
}

parent.disposeImpl(smi, sdi);
} 阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: