Azkaban的Exec Server分析 31:FlowRunner如何推动Graph的前进
2016-04-12 00:00
375 查看
现在,我们知道一个job的执行过程,那么整个flow是一个图,图是如何推进的,如何推进下一个job的执行呢?这就是本节需要解决的问题!
==========================================================================================
jdb azkaban.execapp.AzkabanExecutorServer
stop in azkaban.execapp.FlowRunner.progressGraph
==========================================================================================
这个用图的算法就可以解决,下面研究下flow的结束部分
stop in azkaban.execapp.FlowRunner.finishExecutableNode
==========================================================================================
azkaban.execapp.FlowRunnerManager --- flow的listener
@Override
public void handleEvent(Event event) {
if (event.getType() == Event.Type.FLOW_FINISHED) {
FlowRunner flowRunner = (FlowRunner) event.getRunner();
ExecutableFlow flow = flowRunner.getExecutableFlow();
recentlyFinishedFlows.put(flow.getExecutionId(), flow);
logger.info("Flow " + flow.getExecutionId() + " is finished. Adding it to recently finished flows list.");
runningFlows.remove(flow.getExecutionId());
}
}
这个很简单!
==========================================================================================
那么,问题来了,一个Flow如何结束呢? 收尾工作要做好!
if (outNodeIds.isEmpty()) {
//
// There's no outnodes means it's the end of a flow, so we
// finalize
// and fire an event.
finalizeFlow(parentFlow);
然后这个函数里,
// If the finalized flow is actually the top level flow, than we finish
// the main loop.
if (flow instanceof ExecutableFlow) {
flowFinished = true;
}
这样就设置了flowFinished = true;
=================================================================跳出for循环
private void runFlow() throws Exception {
logger.info("Starting flows");
runReadyJob(this.flow);
updateFlow();
// 跳出这个循环
while (!flowFinished) {
===跳出来之后,执行
logger.info("Finishing up flow. Awaiting Termination");
executorService.shutdown();
updateFlow();
logger.info("Finished Flow");
关闭当前的进程,整个flowRunner这个task就结束了,顺利!
==========================================================================================
jdb azkaban.execapp.AzkabanExecutorServer
-conf /root/azkb/azkaban_3.
0
.0_debug/conf
stop in azkaban.execapp.FlowRunner.progressGraph
==========================================================================================
这个用图的算法就可以解决,下面研究下flow的结束部分
stop in azkaban.execapp.FlowRunner.finishExecutableNode
==========================================================================================
azkaban.execapp.FlowRunnerManager --- flow的listener
@Override
public void handleEvent(Event event) {
if (event.getType() == Event.Type.FLOW_FINISHED) {
FlowRunner flowRunner = (FlowRunner) event.getRunner();
ExecutableFlow flow = flowRunner.getExecutableFlow();
recentlyFinishedFlows.put(flow.getExecutionId(), flow);
logger.info("Flow " + flow.getExecutionId() + " is finished. Adding it to recently finished flows list.");
runningFlows.remove(flow.getExecutionId());
}
}
这个很简单!
==========================================================================================
那么,问题来了,一个Flow如何结束呢? 收尾工作要做好!
if (outNodeIds.isEmpty()) {
//
// There's no outnodes means it's the end of a flow, so we
// finalize
// and fire an event.
finalizeFlow(parentFlow);
然后这个函数里,
// If the finalized flow is actually the top level flow, than we finish
// the main loop.
if (flow instanceof ExecutableFlow) {
flowFinished = true;
}
这样就设置了flowFinished = true;
=================================================================跳出for循环
private void runFlow() throws Exception {
logger.info("Starting flows");
runReadyJob(this.flow);
updateFlow();
// 跳出这个循环
while (!flowFinished) {
===跳出来之后,执行
logger.info("Finishing up flow. Awaiting Termination");
executorService.shutdown();
updateFlow();
logger.info("Finished Flow");
关闭当前的进程,整个flowRunner这个task就结束了,顺利!
相关文章推荐
- azkaban简介
- Azkaban-任务调度管理器
- Azkaban的Web Server源码探究系列6: alerters及插件机制分析
- Azkaban的Web Server源码探究系列7: ExecutorManager的初始化
- Azkaban的Web Server源码探究系列8: 水文一篇
- Azkaban的Web Server源码探究系列9: Servlet引擎初始化
- Azkaban的Web Server源码探究系列10 : /对应的servlet解析
- Azkaban的Web Server源码探究系列11: 登陆对应的servlet解析
- Azkaban的Web Server源码探究系列12: 首页之前的跳转
- Azkaban的Web Server源码探究系列13:首页/index的内容获取
- Azkaban的Web Server源码探究系列14:创建Project
- Azkaban的Web Server源码探究系列15:使用过程中几个需注意的配置&3.0中丢失的文件
- Azkaban的Web Server源码探究系列16:跳转Project
- Azkaban的Web Server源码探究系列17:Creating Flows
- Azkaban的Web Server源码探究系列19:loadProjectFromDir&Chek
- Azkaban的Web Server源码探究系列20:resolve&buildFlow
- Azkaban的Web Server源码探究系列21: Flow执行前的准备工作
- Azkaban的Web Server源码探究系列22: 一次性执行execute的提交准备
- Azkaban的Web Server源码探究系列23: 一次性执行execute的正式提交
- Azkaban的Web Server源码探究系列24: 一次性执行execute任务取出处理