ETL工具kettle与JAVA结合使用程序生成转换
2017-12-20 14:46
531 查看
最近公司领导安排打造ETL支撑平台,使用ETL工具kettle6.0对数据库的数据进行清洗,使用工具spoon来使用一些图形化的操作比较简单,抽空研究了下用使用kettle的一些jar包,把kettle结合到java(web项目一样)程序中。留作一个记录,以后备用查看。
先看看网站上下了很多资料看http://infocenter.pentaho.com/help/index.jsp?topic=%2Fcat_dev_guides%2Ftop_dev_guides.html(主要看Developer Guides/Embedding and Extending Pentaho Data Integration/...)
下载kettle的api和源码看看,也可以帮你解决不少问题的。
下面是我自己写的一个生成.ktr文件的代码。
(添加的jar包,我也没有太多的注意,看例子加入(有些可能没有必要,可以尝试的去掉一些测试下)的:
avalon-framework-4.1.3.jar
commons-collections-3.2.jar
commons-io-1.4.jar
commons-lang-2.4.jar
commons-logging-1.1.jar
commons-vfs-20091118-pentaho.jar
kettle-core-4.4.0-GA.jar
kettle-db-4.4.0-GA.jar
kettle-engine-4.4.0-GA.jar
log4j-1.2.12.jar
logkit-1.0.1.jar
servlet-api-2.3.jar)
package com.jeefw.test.testKettle;
import java.io.File;
import org.apache.commons.io.FileUtils;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.plugins.PluginRegistry;
import org.pentaho.di.core.plugins.StepPluginType;
import org.pentaho.di.core.xml.XMLHandler;
import org.pentaho.di.repository.kdr.KettleDatabaseRepository;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransHopMeta;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.steps.insertupdate.InsertUpdateMeta;
import org.pentaho.di.trans.steps.tableinput.TableInputMeta;
import org.w3c.dom.Document;
import com.dareway.framework.util.KettleUtil;
public class TransDemo {
public static TransDemo transDemo;
/**
* 两个库中的表名
*/
public static String bjdt_tablename = "wr";
public static String kettle_tablename = "wr";
/**
* 数据库连接信息,适用于DatabaseMeta其中 一个构造器DatabaseMeta(String xml)
*/
public static final String[] databasesXML = {
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
"<connection>" +
"<name>bjdt</name>" +
"<server>127.0.0.1</server>" +
"<type>Oracle</type>" +
"<access>Native</access>" +
"<database>orcl</database>" +
"<port>1521</port>" +
"<username>scott</username>" +
"<password>tiger</password>" +
"<attributes>"+
"<attribute><code>EXTRA_OPTION_ORACLE.characterEncoding</code><attribute>utf-8</attribute></attribute>"+
"</attributes>"+
"</connection>",
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
"<connection>" +
"<name>kettle</name>" +
"<server>127.0.0.1</server>" +
"<type>Mysql</type>" +
"<access>Native</access>" +
"<database>jeefw</database>" +
"<port>3306</port>" +
"<username>root</username>" +
"<password>root</password>" +
"<attributes>"+
"<attribute><code>EXTRA_OPTION_MYSQL.characterEncoding</code><attribute>utf-8</attribute></attribute>"+
"</attributes>"+
"</connection>"
};
/**
* @param args
*/
public static void main(String[] args) {
try {
KettleEnvironment.init();
transDemo = new TransDemo();
TransMeta transMeta = transDemo.generateMyOwnTrans();
String transXml = transMeta.getXML();
String transName = "update_insert_Trans.ktr";
File file = new File(transName);
FileUtils.writeStringToFile(file, transXml, "UTF-8");
// System.out.println("transXml:"+transXml);
// System.out.println(databasesXML.length+"\n"+databasesXML[0]+"\n"+databasesXML[1]);
String xmls = "<?xml version=\"1.0\" encoding=\"UTF-8\"?> \n" + transMeta.getXML().toString();
Document doc = XMLHandler.loadXMLString(xmls);
KettleDatabaseRepository repository = KettleUtil.getRepository();
TransMeta tm = new TransMeta();
tm.loadXML(doc.getDocumentElement(), repository, false);
tm.setRepositoryDirectory(repository.findDirectory("/"));
Trans trans=new Trans(tm);
trans.execute(null);
//trans.setParameterValue("characterEncoding", "utf-8");
} catch (Exception e) {
e.printStackTrace();
return;
}
}
/**
* 生成一个转化,把一个数据库中的数据转移到另一个数据库中,只有两个步骤,第一个是表输入,第二个是表插入与更新操作
* @return
* @throws KettleException
*/
public TransMeta generateMyOwnTrans() throws KettleException{
System.out.println("************start to generate my own transformation***********");
TransMeta transMeta = new TransMeta();
//设置转化的名称
transMeta.setName("insert_update");
//添加转换的数据库连接
for (int i=0;i<databasesXML.length;i++){
DatabaseMeta databaseMeta = new DatabaseMeta(databasesXML[i]);
transMeta.addDatabase(databaseMeta);
}
//registry是给每个步骤生成一个标识Id用
PluginRegistry registry = PluginRegistry.getInstance();
//******************************************************************
//第一个表输入步骤(TableInputMeta)
TableInputMeta tableInput = new TableInputMeta();
String tableInputPluginId = registry.getPluginId(StepPluginType.class, tableInput);
//给表输入添加一个DatabaseMeta连接数据库
DatabaseMeta database_bjdt = transMeta.findDatabase("bjdt");
// java.util.Properties p = new java.util.Properties();
// p.setProperty("attribute", value)
//
// database_bjdt.setAttributes(attributes);
tableInput.setDatabaseMeta(database_bjdt);
String select_sql = "SELECT * FROM "+bjdt_tablename;
tableInput.setSQL(select_sql);
//添加TableInputMeta到转换中
StepMeta tableInputMetaStep = new StepMeta(tableInputPluginId,"table input",tableInput);
//给步骤添加在spoon工具中的显示位置
tableInputMetaStep.setDraw(true);
tableInputMetaStep.setLocation(100, 100);
transMeta.addStep(tableInputMetaStep);
//******************************************************************
//******************************************************************
//第二个步骤插入与更新
InsertUpdateMeta insertUpdateMeta = new InsertUpdateMeta();
String insertUpdateMetaPluginId = registry.getPluginId(StepPluginType.class,insertUpdateMeta);
//添加数据库连接
DatabaseMeta database_kettle = transMeta.findDatabase("kettle");
insertUpdateMeta.setDatabaseMeta(database_kettle);
//设置操作的表
insertUpdateMeta.setTableName(kettle_tablename);
//设置用来查询的关键字
insertUpdateMeta.setKeyLookup(new String[]{"ID"});
insertUpdateMeta.setKeyStream(new String[]{"ID"});
insertUpdateMeta.setKeyStream2(new String[]{""});//一定要加上
insertUpdateMeta.setKeyCondition(new String[]{"="});
//设置要更新的字段
String[] updatelookup = {"ID","dept_no","dept_name","dept_sex","dept_addr"} ;
String [] updateStream = {"id","dept_no","dept_name","dept_sex","dept_addr"};
Boolean[] updateOrNot = {false,true,true,true,true,true,true};
insertUpdateMeta.setUpdateLookup(updatelookup);
insertUpdateMeta.setUpdateStream(updateStream);
insertUpdateMeta.setUpdate(updateOrNot);
String[] lookup = insertUpdateMeta.getUpdateLookup();
//System.out.println("******:"+lookup[1]);
//System.out.println("insertUpdateMetaXMl:"+insertUpdateMeta.getXML());
//添加步骤到转换中
StepMeta insertUpdateStep = new StepMeta(insertUpdateMetaPluginId,"insert_update",insertUpdateMeta);
insertUpdateStep.setDraw(true);
insertUpdateStep.setLocation(250,100);
transMeta.addStep(insertUpdateStep);
//******************************************************************
a6c1
//******************************************************************
//添加hop把两个步骤关联起来
transMeta.addTransHop(new TransHopMeta(tableInputMetaStep, insertUpdateStep));
System.out.println("***********the end************");
return transMeta;
}
}
先看看网站上下了很多资料看http://infocenter.pentaho.com/help/index.jsp?topic=%2Fcat_dev_guides%2Ftop_dev_guides.html(主要看Developer Guides/Embedding and Extending Pentaho Data Integration/...)
下载kettle的api和源码看看,也可以帮你解决不少问题的。
下面是我自己写的一个生成.ktr文件的代码。
(添加的jar包,我也没有太多的注意,看例子加入(有些可能没有必要,可以尝试的去掉一些测试下)的:
avalon-framework-4.1.3.jar
commons-collections-3.2.jar
commons-io-1.4.jar
commons-lang-2.4.jar
commons-logging-1.1.jar
commons-vfs-20091118-pentaho.jar
kettle-core-4.4.0-GA.jar
kettle-db-4.4.0-GA.jar
kettle-engine-4.4.0-GA.jar
log4j-1.2.12.jar
logkit-1.0.1.jar
servlet-api-2.3.jar)
package com.jeefw.test.testKettle;
import java.io.File;
import org.apache.commons.io.FileUtils;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.plugins.PluginRegistry;
import org.pentaho.di.core.plugins.StepPluginType;
import org.pentaho.di.core.xml.XMLHandler;
import org.pentaho.di.repository.kdr.KettleDatabaseRepository;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransHopMeta;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.steps.insertupdate.InsertUpdateMeta;
import org.pentaho.di.trans.steps.tableinput.TableInputMeta;
import org.w3c.dom.Document;
import com.dareway.framework.util.KettleUtil;
public class TransDemo {
public static TransDemo transDemo;
/**
* 两个库中的表名
*/
public static String bjdt_tablename = "wr";
public static String kettle_tablename = "wr";
/**
* 数据库连接信息,适用于DatabaseMeta其中 一个构造器DatabaseMeta(String xml)
*/
public static final String[] databasesXML = {
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
"<connection>" +
"<name>bjdt</name>" +
"<server>127.0.0.1</server>" +
"<type>Oracle</type>" +
"<access>Native</access>" +
"<database>orcl</database>" +
"<port>1521</port>" +
"<username>scott</username>" +
"<password>tiger</password>" +
"<attributes>"+
"<attribute><code>EXTRA_OPTION_ORACLE.characterEncoding</code><attribute>utf-8</attribute></attribute>"+
"</attributes>"+
"</connection>",
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
"<connection>" +
"<name>kettle</name>" +
"<server>127.0.0.1</server>" +
"<type>Mysql</type>" +
"<access>Native</access>" +
"<database>jeefw</database>" +
"<port>3306</port>" +
"<username>root</username>" +
"<password>root</password>" +
"<attributes>"+
"<attribute><code>EXTRA_OPTION_MYSQL.characterEncoding</code><attribute>utf-8</attribute></attribute>"+
"</attributes>"+
"</connection>"
};
/**
* @param args
*/
public static void main(String[] args) {
try {
KettleEnvironment.init();
transDemo = new TransDemo();
TransMeta transMeta = transDemo.generateMyOwnTrans();
String transXml = transMeta.getXML();
String transName = "update_insert_Trans.ktr";
File file = new File(transName);
FileUtils.writeStringToFile(file, transXml, "UTF-8");
// System.out.println("transXml:"+transXml);
// System.out.println(databasesXML.length+"\n"+databasesXML[0]+"\n"+databasesXML[1]);
String xmls = "<?xml version=\"1.0\" encoding=\"UTF-8\"?> \n" + transMeta.getXML().toString();
Document doc = XMLHandler.loadXMLString(xmls);
KettleDatabaseRepository repository = KettleUtil.getRepository();
TransMeta tm = new TransMeta();
tm.loadXML(doc.getDocumentElement(), repository, false);
tm.setRepositoryDirectory(repository.findDirectory("/"));
Trans trans=new Trans(tm);
trans.execute(null);
//trans.setParameterValue("characterEncoding", "utf-8");
} catch (Exception e) {
e.printStackTrace();
return;
}
}
/**
* 生成一个转化,把一个数据库中的数据转移到另一个数据库中,只有两个步骤,第一个是表输入,第二个是表插入与更新操作
* @return
* @throws KettleException
*/
public TransMeta generateMyOwnTrans() throws KettleException{
System.out.println("************start to generate my own transformation***********");
TransMeta transMeta = new TransMeta();
//设置转化的名称
transMeta.setName("insert_update");
//添加转换的数据库连接
for (int i=0;i<databasesXML.length;i++){
DatabaseMeta databaseMeta = new DatabaseMeta(databasesXML[i]);
transMeta.addDatabase(databaseMeta);
}
//registry是给每个步骤生成一个标识Id用
PluginRegistry registry = PluginRegistry.getInstance();
//******************************************************************
//第一个表输入步骤(TableInputMeta)
TableInputMeta tableInput = new TableInputMeta();
String tableInputPluginId = registry.getPluginId(StepPluginType.class, tableInput);
//给表输入添加一个DatabaseMeta连接数据库
DatabaseMeta database_bjdt = transMeta.findDatabase("bjdt");
// java.util.Properties p = new java.util.Properties();
// p.setProperty("attribute", value)
//
// database_bjdt.setAttributes(attributes);
tableInput.setDatabaseMeta(database_bjdt);
String select_sql = "SELECT * FROM "+bjdt_tablename;
tableInput.setSQL(select_sql);
//添加TableInputMeta到转换中
StepMeta tableInputMetaStep = new StepMeta(tableInputPluginId,"table input",tableInput);
//给步骤添加在spoon工具中的显示位置
tableInputMetaStep.setDraw(true);
tableInputMetaStep.setLocation(100, 100);
transMeta.addStep(tableInputMetaStep);
//******************************************************************
//******************************************************************
//第二个步骤插入与更新
InsertUpdateMeta insertUpdateMeta = new InsertUpdateMeta();
String insertUpdateMetaPluginId = registry.getPluginId(StepPluginType.class,insertUpdateMeta);
//添加数据库连接
DatabaseMeta database_kettle = transMeta.findDatabase("kettle");
insertUpdateMeta.setDatabaseMeta(database_kettle);
//设置操作的表
insertUpdateMeta.setTableName(kettle_tablename);
//设置用来查询的关键字
insertUpdateMeta.setKeyLookup(new String[]{"ID"});
insertUpdateMeta.setKeyStream(new String[]{"ID"});
insertUpdateMeta.setKeyStream2(new String[]{""});//一定要加上
insertUpdateMeta.setKeyCondition(new String[]{"="});
//设置要更新的字段
String[] updatelookup = {"ID","dept_no","dept_name","dept_sex","dept_addr"} ;
String [] updateStream = {"id","dept_no","dept_name","dept_sex","dept_addr"};
Boolean[] updateOrNot = {false,true,true,true,true,true,true};
insertUpdateMeta.setUpdateLookup(updatelookup);
insertUpdateMeta.setUpdateStream(updateStream);
insertUpdateMeta.setUpdate(updateOrNot);
String[] lookup = insertUpdateMeta.getUpdateLookup();
//System.out.println("******:"+lookup[1]);
//System.out.println("insertUpdateMetaXMl:"+insertUpdateMeta.getXML());
//添加步骤到转换中
StepMeta insertUpdateStep = new StepMeta(insertUpdateMetaPluginId,"insert_update",insertUpdateMeta);
insertUpdateStep.setDraw(true);
insertUpdateStep.setLocation(250,100);
transMeta.addStep(insertUpdateStep);
//******************************************************************
a6c1
//******************************************************************
//添加hop把两个步骤关联起来
transMeta.addTransHop(new TransHopMeta(tableInputMetaStep, insertUpdateStep));
System.out.println("***********the end************");
return transMeta;
}
}
相关文章推荐
- ETL工具kettle与java结合使用程序生成一个简单的转化文件
- WebService -- Java 实现之 CXF ( 使用CXF工具生成client 程序)
- 使用linux perf工具生成java程序火焰图
- 使用Java程序调用本地转换盒作业,资源库中的转换和作业(kettle4.2)
- 要求根据RandomStr.java:使用类型转换生成六位验证字符串,示例程序每次运 行时,都会生成不同的字符串。
- Eclipse java项目打包工具(fatjar)、Java EXE 启动文件生成程序
- C Java PHP Perl Python的程序代码美化工具使用
- 使用java的native2ascii工具命令来转换编码gbk,gb2312,utf-8【转】
- C Java PHP Perl Python的程序代码美化工具使用
- 【转贴】C Java PHP Perl Python的程序代码美化工具使用
- 使用java编写程序生成loadrunner参数化文件
- Java Service Wrapper工具把Java程序转换为Windows服务
- java生成exe工具之exe4j.exe(Java Exe Maker)的使用和注意事项
- 关于使用ETL工具Kettle的简单介绍(二)
- Java入门小程序,使用Java转换用户输入字母为大写如何避开中文?
- win32平台中的程序转换为wince中的一些错误 . 未能为“VCCLCompilerTool”工具生成命令行
- Eclipse java项目打包工具(fatjar)、Java EXE 启动文件生成程序
- 关于使用SQL自动生成,程序转化工具,提高开发数据库的效率
- 关于使用ETL工具Kettle的简单介绍(一)
- openssh生成密钥及使用方法,以及puttygen工具自由转换私钥