您的位置:首页 > 编程语言 > Java开发

Kettle7 ( Pentaho Data Integration )源码分析 每个step都有一个线程负责运行

2017-03-16 15:31 351 查看
Kettle 7 要求Java环境是Java 8

Kettle中的Transformation中包含多个step组件, 当运行transformation时, 这些组件并不是串行初始化的(数据!是在这些step间流式传递的). Step的运行是为了处理数据. 所以数据的流式传递和step的非串行启动运行是不冲突的.

类Trans.java中的方法prepareExecution代码可论证上面的观点.

......
......
......
StepInitThread[] initThreads = new StepInitThread[steps.size()];
Thread[] threads = new Thread[steps.size()];

// Initialize all the threads...
//
for ( int i = 0; i < steps.size(); i++ ) {
final StepMetaDataCombi sid = steps.get( i );

// Do the init code in the background!
// Init all steps at once, but ALL steps need to finish before we can
// continue properly!
//
initThreads[i] = new StepInitThread( sid, log );

// Put it in a separate thread!
//
threads[i] = new Thread( initThreads[i] );
threads[i].setName( "init of " + sid.stepname + "." + sid.copy + " (" + threads[i].getName() + ")" );

ExtensionPointHandler.callExtensionPoint( log, KettleExtensionPoint.StepBeforeInitialize.id, initThreads[i] );

threads[i].start();
}
......
......
......


类StepInitThread中的run()内容如下: (run方法中调用了step.init, 用来完成step的初始化操作)

public void run() {
// Set the internal variables also on the initialization thread!
// ((BaseStep)combi.step).setInternalVariables();

if ( !doIt ) {
// An extension point plugin decided we should not initialize the step.
// Logging, error handling, finished flag... should all be handled in the extension point.
//
return;
}

try {
combi.step.getLogChannel().snap( Metrics.METRIC_STEP_INIT_START );
// combi.step.init方法功能: "Initialize and do work where other steps need to wait for..."
if ( combi.step.init( combi.meta, combi.data ) ) {
combi.data.setStatus( StepExecutionStatus.STATUS_IDLE );
ok = true;
} else {
combi.step.setErrors( 1 );
log.logError( BaseMessages.getString( PKG, "Trans.Log.ErrorInitializingStep", combi.step.getStepname() ) );
}
} catch ( Throwable e ) {
log.logError( BaseMessages.getString( PKG, "Trans.Log.ErrorInitializingStep", combi.step.getStepname() ) );
log.logError( Const.getStackTracker( e ) );
} finally {
combi.step.getLogChannel().snap( Metrics.METRIC_STEP_INIT_STOP );
}

finished = true;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息