Kettle7 ( Pentaho Data Integration )源码分析 每个step都有一个线程负责运行
2017-03-16 15:31
351 查看
Kettle 7 要求Java环境是Java 8
Kettle中的Transformation中包含多个step组件, 当运行transformation时, 这些组件并不是串行初始化的(数据!是在这些step间流式传递的). Step的运行是为了处理数据. 所以数据的流式传递和step的非串行启动运行是不冲突的.
类Trans.java中的方法prepareExecution代码可论证上面的观点.
类StepInitThread中的run()内容如下: (run方法中调用了step.init, 用来完成step的初始化操作)
Kettle中的Transformation中包含多个step组件, 当运行transformation时, 这些组件并不是串行初始化的(数据!是在这些step间流式传递的). Step的运行是为了处理数据. 所以数据的流式传递和step的非串行启动运行是不冲突的.
类Trans.java中的方法prepareExecution代码可论证上面的观点.
...... ...... ...... StepInitThread[] initThreads = new StepInitThread[steps.size()]; Thread[] threads = new Thread[steps.size()]; // Initialize all the threads... // for ( int i = 0; i < steps.size(); i++ ) { final StepMetaDataCombi sid = steps.get( i ); // Do the init code in the background! // Init all steps at once, but ALL steps need to finish before we can // continue properly! // initThreads[i] = new StepInitThread( sid, log ); // Put it in a separate thread! // threads[i] = new Thread( initThreads[i] ); threads[i].setName( "init of " + sid.stepname + "." + sid.copy + " (" + threads[i].getName() + ")" ); ExtensionPointHandler.callExtensionPoint( log, KettleExtensionPoint.StepBeforeInitialize.id, initThreads[i] ); threads[i].start(); } ...... ...... ......
类StepInitThread中的run()内容如下: (run方法中调用了step.init, 用来完成step的初始化操作)
public void run() { // Set the internal variables also on the initialization thread! // ((BaseStep)combi.step).setInternalVariables(); if ( !doIt ) { // An extension point plugin decided we should not initialize the step. // Logging, error handling, finished flag... should all be handled in the extension point. // return; } try { combi.step.getLogChannel().snap( Metrics.METRIC_STEP_INIT_START ); // combi.step.init方法功能: "Initialize and do work where other steps need to wait for..." if ( combi.step.init( combi.meta, combi.data ) ) { combi.data.setStatus( StepExecutionStatus.STATUS_IDLE ); ok = true; } else { combi.step.setErrors( 1 ); log.logError( BaseMessages.getString( PKG, "Trans.Log.ErrorInitializingStep", combi.step.getStepname() ) ); } } catch ( Throwable e ) { log.logError( BaseMessages.getString( PKG, "Trans.Log.ErrorInitializingStep", combi.step.getStepname() ) ); log.logError( Const.getStackTracker( e ) ); } finally { combi.step.getLogChannel().snap( Metrics.METRIC_STEP_INIT_STOP ); } finished = true; }
相关文章推荐
- 细水长流Hadoop源码分析(4)RPC服务器运行之Listener线程
- Hadoop-2.4.1源码分析--HDFS HeartBeat(心跳检测)之BPServiceActor工作线程运行流程(下)
- 第二人生的源码分析(四十一)使用Apache运行库线程
- HDFS源码分析心跳汇报之BPServiceActor工作线程运行流程
- Mesos源码分析(10): MesosSchedulerDriver的启动及运行一个Task
- Erlang运行时源码分析之——线程进度机制
- Mesos源码分析(10): MesosSchedulerDriver的启动及运行一个Task
- 第二人生的源码分析(四十一)使用Apache运行库线程
- Mesos源码分析(13): MesosContainerier运行一个Task
- Mesos源码分析(13): MesosContainerier运行一个Task
- JVM源码分析之一个Java进程究竟能创建多少线程
- Erlang运行时源码分析之——线程进度机制
- 第二人生的源码分析(四十一)使用Apache运行库线程
- 通过源码分析一个linux进程可以运行多个android应用
- Mesos源码分析(14): DockerContainerier运行一个Task
- 【深入Java开发】JVM源码分析之一个Java进程究竟能创建多少线程
- Hadoop-2.4.1源码分析--HDFS HeartBeat(心跳检测)之BPServiceActor工作线程运行流程(上)
- Giraph源码分析(八)—— 统计每个SuperStep中参与计算的顶点数目
- JVM源码分析之一个Java进程究竟能创建多少线程
- 第二人生的源码分析(四十五)图像解压线程