最新版azkaban-3.40.0源码解析
2018-01-18 17:55
459 查看
web上传zip以及解析入库
手工执行flow
定时执行flow
execserver执行流程
提交flow
具体执行逻辑
具体job的执行
入口:
azkaban.webapp.servlet.LoginAbstractAzkabanServlet.doPost(HttpServletRequest, HttpServletResponse)
跟踪代码,进入下面的方法 上传zip包
azkaban.webapp.servlet.ProjectManagerServlet.ajaxHandleUpload(HttpServletRequest, HttpServletResponse, Map
在这个方法中,
azkaban.project.AzkabanProjectLoader.uploadProject(Project, File, String, User, Props),通过上传文件的类型加载loader
loader = this.flowLoaderFactory.createFlowLoader(file);
azkaban.webapp.servlet.LoginAbstractAzkabanServlet.doGet(HttpServletRequest, HttpServletResponse)
进入下面的方法通过ajaxName来判断是什么类型的操作,然后调用不用的方法。
azkaban.webapp.servlet.ExecutorServlet.handleAJAXAction(HttpServletRequest, HttpServletResponse, Session)
比如执行flow的ajaxName是executeFlow,最后通过ExecutorApiGateway.callForJsonString方法生成一个类似下面的uri,发送get请求去执行flow。
http://localhost:12321/executor?action=execute&execid=539&user
在AzkabanWebServer的main方法中,调用launch方法加载服务,具体是webServer.prepareAndStartServer();
之后调用configureRoutes方法,在这个方法中,通过 getTriggerManager().start();来启动TriggerManager。在start方法中,启动TriggerScannerThread线程来扫描所有的定时任务,
最终在azkaban.trigger.TriggerManager.TriggerScannerThread.checkAllTriggers()方法中做检查,如果满足了执行的条件,则通过onTriggerTrigger(t);触发调度。
azkaban.execapp.ExecutorServlet doGet方法
action为execute,处理的方法是handleAjaxExecute,在这个方法中,提交了一个flow,具体的处理方法在azkaban.execapp.FlowRunnerManager.submitFlow(int)
1.根据flowid从数据库查询出对应的flow。
2.设置执行的目录等
3.获取flow的参数。
4.提交flow
runReadyJob逻辑
执行完了之后,会在progressGraph方法进行查询下一个节点,推进DAG的执行,这个里面主要是收集已完成节点的下一个节点,放入nodesToCheck,然后循环调用runReadyJob执行。
手工执行flow
定时执行flow
execserver执行流程
提交flow
具体执行逻辑
具体job的执行
web上传zip以及解析入库
web服务上传zip包入口:
azkaban.webapp.servlet.LoginAbstractAzkabanServlet.doPost(HttpServletRequest, HttpServletResponse)
跟踪代码,进入下面的方法 上传zip包
azkaban.webapp.servlet.ProjectManagerServlet.ajaxHandleUpload(HttpServletRequest, HttpServletResponse, Map
logger.info("Uploading file " + name); final File archiveFile = new File(tempDir, name); out = new BufferedOutputStream(new FileOutputStream(archiveFile)); IOUtils.copy(item.getInputStream(), out); out.close();
在这个方法中,
azkaban.project.AzkabanProjectLoader.uploadProject(Project, File, String, User, Props),通过上传文件的类型加载loader
loader = this.flowLoaderFactory.createFlowLoader(file);
手工执行flow
入口azkaban.webapp.servlet.LoginAbstractAzkabanServlet.doGet(HttpServletRequest, HttpServletResponse)
进入下面的方法通过ajaxName来判断是什么类型的操作,然后调用不用的方法。
azkaban.webapp.servlet.ExecutorServlet.handleAJAXAction(HttpServletRequest, HttpServletResponse, Session)
比如执行flow的ajaxName是executeFlow,最后通过ExecutorApiGateway.callForJsonString方法生成一个类似下面的uri,发送get请求去执行flow。
http://localhost:12321/executor?action=execute&execid=539&user
定时执行flow
定时任务的触发是在execserver服务启动的时候初始化的。在AzkabanWebServer的main方法中,调用launch方法加载服务,具体是webServer.prepareAndStartServer();
之后调用configureRoutes方法,在这个方法中,通过 getTriggerManager().start();来启动TriggerManager。在start方法中,启动TriggerScannerThread线程来扫描所有的定时任务,
最终在azkaban.trigger.TriggerManager.TriggerScannerThread.checkAllTriggers()方法中做检查,如果满足了执行的条件,则通过onTriggerTrigger(t);触发调度。
execserver执行流程
提交flow
web发送了一个get请求,exec服务器接受请求的入库方法是azkaban.execapp.ExecutorServlet doGet方法
action为execute,处理的方法是handleAjaxExecute,在这个方法中,提交了一个flow,具体的处理方法在azkaban.execapp.FlowRunnerManager.submitFlow(int)
1.根据flowid从数据库查询出对应的flow。
2.设置执行的目录等
3.获取flow的参数。
4.提交flow
//根据flowid从数据库查询出对应的flow。 ExecutableFlow flow = null; flow = this.executorLoader.fetchExecutableFlow(execId); if (flow == null) { throw new ExecutorManagerException("Error loading flow with exec " + execId); } //设置执行的目录等 // Sets up the project files and execution directory. this.flowPreparer.setup(flow); //获取flow的参数 // Setup flow runner FlowWatcher watcher = null; final ExecutionOptions options = flow.getExecutionOptions(); ...... //提交flow final Future<?> future = this.executorService.submit(runner);
具体执行逻辑
具体执行在FlowRunner的run方法,最后调用了azkaban.execapp.FlowRunner.runFlow()方法。private void runFlow() throws Exception { this.logger.info("Starting flows"); runReadyJob(this.flow);//尝试开始执行 updateFlow(); while (!this.flowFinished) { synchronized (this.mainSyncObj) { if (this.flowPaused) { try { this.mainSyncObj.wait(CHECK_WAIT_MS); } catch (final InterruptedException e) { } continue; } else { if (this.retryFailedJobs) { retryAllFailures(); } else if (!progressGraph()) {//推进DAG的执行 try { this.mainSyncObj.wait(CHECK_WAIT_MS); } catch (final InterruptedException e) { } } } } } this.logger.info("Finishing up flow. Awaiting Termination"); this.executorService.shutdown(); updateFlow(); this.logger.info("Finished Flow"); }
runReadyJob逻辑
private boolean runReadyJob(final ExecutableNode node) throws IOException { .............. } else if (nextNodeStatus == Status.READY) { //如果node类型是一个子flow,也就是ExecutableFlowBase if (node instanceof ExecutableFlowBase) { final ExecutableFlowBase flow = ((ExecutableFlowBase) node); this.logger.info("Running flow '" + flow.getNestedId() + "'."); flow.setStatus(Status.RUNNING); flow.setStartTime(System.currentTimeMillis()); prepareJobProperties(flow); //循环递归执行子flow for (final String startNodeId : ((ExecutableFlowBase) node).getStartNodes()) { final ExecutableNode startNode = flow.getExecutableNode(startNodeId); runReadyJob(startNode); } } else { //普通的job ,直接运行job runExecutableNode(node); } } return true; }
执行完了之后,会在progressGraph方法进行查询下一个节点,推进DAG的执行,这个里面主要是收集已完成节点的下一个节点,放入nodesToCheck,然后循环调用runReadyJob执行。
具体job的执行
相关文章推荐
- Azkaban的Web Server源码探究系列10 : /对应的servlet解析
- Hadoop最新版 RPC远程过程调用源码解析及实例
- azkaban web-server源码解析
- IIS7源码泄露及文件类型解析错误——最新解决方案
- Azkaban的Web Server源码探究系列11: 登陆对应的servlet解析
- [Python]网络爬虫(九):百度贴吧的网络爬虫(v0.4)源码及解析
- SpringMVC源码之参数解析绑定原理
- 源码解析之IModel的构造方法
- Glide源码解析05-onLoadComplete
- Java 集合系列05之 LinkedList详细介绍(源码解析)和使用示例
- 十一.jQuery源码解析之.pushStack()
- gunicorn syncworker 源码解析
- Spring源码解读2——Bean资源的载入/解析
- solr&lucene3.6.0源码解析(二)
- Caffe源码解析5:Conv_Layer
- Android源码解析之ComponentCallbacks
- ArrayList源码解析 给jdk写注释系列之jdk1.6容器(1)
- Servlet基本结构的源码解析
- Hadoop源码解析之YARN客户端作业提交流程
- Android设计模式源码解析之外观模式(Facade)