您的位置:首页 > 其它

kettle插件——实现资源库中JOB和其关联的脚本的下载

2014-07-01 00:00 246 查看
1、 设计需求:

现项目基于Kettle采集各个节点前置机数据到数据中心并且表结构是一致的。每台前置机上安装部署kettle运行JOB,定时采集数据到数据中心。开发的脚本除了前置数据库连接不一致外,其他的都是一样的。若是为每个节点开发JOB和其关联的Trans,开发和维护都有很大的工作量。所以设计一个下载JOB和其关联的脚本组件实现,下载采集的JOB,并能够实现脚本之间关联。

2、 实现过程:

在Kettle Spoon 设计器中已经实现了该功能。如下图:



Export all Linked resources to XML该菜单会将在spoon中当前选择的Tab的脚本,以zip压缩文件的形式导出到指定的文件夹里。

所以参考Export all Linked resources to XML菜单实现过程,来设计自己的脚本下载。

(1)首先在kettle源码中查找该功能的实现类和方法。

在org.pentaho.di.ui.spoon.Spoon.java类中找到exportAllXMLFile()方法。该方法实现了这个菜单功能。

/**
* Export this job or transformation including all depending resources to a single zip file.
*/
public void exportAllXMLFile() { ResourceExportInterface resourceExportInterface = getActiveTransformation();
if (resourceExportInterface==null) resourceExportInterface=getActiveJob();
if (resourceExportInterface==null) return; // nothing to do here, prevent an NPE

// ((VariableSpace)resourceExportInterface).getVariable("Internal.Transformation.Filename.Directory");

// Ask the user for a zip file to export to:
//
try {
String zipFilename = null;
while (Const.isEmpty(zipFilename)) {
FileDialog dialog = new FileDialog(shell, SWT.SAVE);
dialog.setText(Messages.getString("Spoon.ExportResourceSelectZipFile"));
dialog.setFilterExtensions(new String[] {"*.zip;*.ZIP", "*"});
dialog.setFilterNames(new String[] { Messages.getString("System.FileType.ZIPFiles"), Messages.getString("System.FileType.AllFiles"), });
setFilterPath(dialog);
if (dialog.open()!=null)
{
lastDirOpened = dialog.getFilterPath();
zipFilename = dialog.getFilterPath()+Const.FILE_SEPARATOR+dialog.getFileName();
FileObject zipFileObject = KettleVFS.getFileObject(zipFilename);
if (zipFileObject.exists()) {
MessageBox box = new MessageBox(shell, SWT.YES | SWT.NO | SWT.CANCEL);
box.setMessage(Messages.getString("Spoon.ExportResourceZipFileExists.Message", zipFilename));
box.setText(Messages.getString("Spoon.ExportResourceZipFileExists.Title"));
int answer = box.open();
if (answer==SWT.CANCEL) return;
if (answer==SWT.NO) zipFilename = null;
}
} else {
return;
}
}

// Export the resources linked to the currently loaded file...
// TopLevelResource topLevelResource = ResourceUtil.serializeResourceExportInterface(zipFilename, resourceExportInterface, (VariableSpace)resourceExportInterface, rep);
String message = ResourceUtil.getExplanation(zipFilename, topLevelResource.getResourceName(), resourceExportInterface);

EnterTextDialog enterTextDialog = new EnterTextDialog(shell, "Resource serialized", "This resource was serialized succesfully!", message);
enterTextDialog.setReadOnly();
enterTextDialog.open();
} catch(Exception e) {
new ErrorDialog(shell, "Error", "Error exporting current file", e);
}
}

通过这个方法,可以发现其调用了ResourceExportInterface、ResourceUtil这两个重要的类。

ResourceExportInterface——这是一个资源导出的接口,JobMeta、TransMeta都实现了该接口,所以在查看JobMeta、TransMeta实现的的方法。

ResourceUtil——将脚本导出到zip压缩文件的实现类,其实现方法如下:

/**
* Serializes the referenced resource export interface (Job, Transformation, Mapping, Step, Job Entry, etc) to a ZIP file.
*
* @param zipFilename The ZIP file to put the content in
* @param resourceExportInterface the interface to serialize
* @param space the space to use for variable replacement
* @param repository the repository to load objects from (or null if not used)
* @param injectXML The XML to inject into the resulting ZIP archive (optional, can be null)
* @param injectFilename The name of the file for the XML to inject in the ZIP archive (optional, can be null)
* @return The full VFS filename reference to the serialized export interface XML file in the ZIP archive.
* @throws KettleException in case anything goes wrong during serialization
*/
public static final TopLevelResource serializeResourceExportInterface(String zipFilename, ResourceExportInterface resourceExportInterface, VariableSpace space, Repository repository, String injectXML, String injectFilename) throws KettleException {
ZipOutputStream out = null;

try {
Map<String, ResourceDefinition> definitions = new HashMap<String, ResourceDefinition>();

// In case we want to add an extra pay-load to the exported ZIP file...
//
if (injectXML!=null) {
ResourceDefinition resourceDefinition = new ResourceDefinition(injectFilename, injectXML);
definitions.put(injectFilename, resourceDefinition);
}

ResourceNamingInterface namingInterface = new SequenceResourceNaming();

String topLevelResource = resourceExportInterface.exportResources(space, definitions, namingInterface, repository);

if (topLevelResource!=null && !definitions.isEmpty()) {
// Create the ZIP file...
//
FileObject fileObject = KettleVFS.getFileObject(zipFilename);

// Store the XML in the definitions in a ZIP file...
//
out = new ZipOutputStream(KettleVFS.getOutputStream(fileObject, false));
out.setEncoding("utf-8");
for(String filename : definitions.keySet()) {

ResourceDefinition resourceDefinition = definitions.get(filename);

ZipEntry zipEntry = new ZipEntry(resourceDefinition.getFilename());
String comment = Messages.getString("ResourceUtil.SerializeResourceExportInterface.ZipEntryComment.OriginatingFile", filename, Const.NVL(resourceDefinition.getOrigin(), "-"));
zipEntry.setComment(comment);
out.putNextEntry(zipEntry);
out.write(resourceDefinition.getContent().getBytes("utf-8"));
out.closeEntry();
}
String zipURL = fileObject.getName().toString();
return new TopLevelResource(topLevelResource, zipURL, "zip:"+zipURL+"!"+topLevelResource);
} else {
throw new KettleException(Messages.getString("ResourceUtil.Exception.NoResourcesFoundToExport"));
}
}
catch(Exception e) {
throw new KettleException(Messages.getString("ResourceUtil.Exception.ErrorSerializingExportInterface",resourceExportInterface.toString()), e);
}
finally {
if (out!=null) {
try {
out.close();
} catch (IOException e) {
throw new KettleException(Messages.getString("ResourceUtil.Exception.ErrorClosingZipStream", zipFilename));
}
}
}
}

(2)通过读取源码之后,发现了其实现的过程,可以参考源码来实现自己的组件。Job plugin开发不写了。这里只是简单将方法贴出来:

public Result execute(Result previousResult, int nr, Repository rep, Job parentJob)
{
Result result = previousResult;
result.setNrErrors(1);
result.setResult( false );
String realrepName=environmentSubstitute(repositoryname);
String realusername=environmentSubstitute(username);
String realpassword=environmentSubstitute(password);
String realJobRepositoryPath=environmentSubstitute(directoryPath);
if(realJobRepositoryPath == null) return result;
String realJobDir = realJobRepositoryPath.substring(0, realJobRepositoryPath.lastIndexOf("/"));
String realJobName =realJobRepositoryPath.substring(realJobRepositoryPath.lastIndexOf("/")+1,realJobRepositoryPath.length());
String realoutfilename=environmentSubstitute(targetDirectoryPath);
String realfindDatabaseName =environmentSubstitute(findDatabaseName);
String realoutconnectname=environmentSubstitute(connectname);
String realoutdbtype=environmentSubstitute(dbtype);
String realoutdbaccess=environmentSubstitute(dbaccess);
String realoutdbHost=environmentSubstitute(dbHost);
String realoutdbname=environmentSubstitute(dbname);
String realoutdbport=environmentSubstitute(dbport);
String realoutdbuser=environmentSubstitute(dbuser);
String realoutdbpassword=environmentSubstitute(dbpassword);
try
{
// 连接资源库
connectRep(log,realrepName, realJobDir, realusername, realpassword);
// 创建新的数据对象用于替换Trans脚本中前置机的数据库对象。实现脚本数据连接的变动。
DatabaseMeta clientDb = new DatabaseMeta(realoutconnectname,realoutdbtype,realoutdbaccess,realoutdbHost,realoutdbname,realoutdbport,realoutdbuser,realoutdbpassword);
//导出Job和其关联的脚本到指定的目录
exportAllXMLFile(realJobName,realoutfilename,realfindDatabaseName,clientDb);

result.setResult(true);
}catch(Exception e)
{
log.logError(toString(), Messages.getString("JobExportRepository.UnExpectedError",e.toString()));
log.logError(toString(), "Stack trace: "+Const.CR+Const.getStackTracker(e));
}finally
{
if(this.rep!=null)
{
this.rep.disconnect();
this.rep=null;
}
if(this.repinfo!=null) this.repinfo=null;
if(this.userinfo!=null) this.userinfo=null;
if(this.repsinfo!=null)
{
this.repsinfo.clear();
this.repsinfo=null;
}
}

return result;
}


public void exportAllXMLFile(String realJobName,String DirectoryPath,String findDatabaseName,DatabaseMeta databaseMeta) throws Exception {
ResourceExportInterface resourceExportInterface = null;
try {
resourceExportInterface = new JobMeta(log,rep,realJobName,dir);
} catch (Exception e) {
log.logError(toString(), Messages.getString("JobExportRepository.UnExpectedError", e.toString()));
log.logError(toString(), "Stack trace: " + Const.CR+ Const.getStackTracker(e));
}
if (resourceExportInterface == null)
return; // nothing to do here, prevent an NPE
if(!Const.isEmpty(DirectoryPath)) {
ResourceUtil.serializeResourceExportInterface(DirectoryPath,findDatabaseName,databaseMeta,resourceExportInterface,(VariableSpace) resourceExportInterface, rep);
}

}


3、 效果预览:





4、插件说明

(1)选择指定Job,填写前置机数据库信息,运行Job会,会从资源库中下载指定的job和其关联的脚本。如job 中transformation步骤中的trans,并自动更改transformation步骤中选定的tran脚本文件为下载目录中ktr文件。

(2)下载过的脚本不要在连接资源库的情况下保存,否则会把资源库中的脚本覆盖掉。在进行下载脚本的时候,就会报错。这是因为转换文件名不再为空,kettle优先使用这个设置,在进行下载时找不到ktr文件而报错。



5、插件下载

http://download.csdn.net/detail/jdk2006/4514371
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  脚本