zeppelin源码分析(4)——interpreter的调度和任务封装
2016-06-02 18:48
891 查看
SchedulerFactory工厂类负责创建所有的Scheduler实例。SchedulerFactory本身采用了Singleton设计模式,保证全局唯一实例,与Scheduler和其实现类之前形成了“简单工厂”设计模式:
每个具体的Interpreter实现类在获取Scheduler实例时,都由自己决定初始化哪种类型的Scheduler实例:
(PS:图中的SparkInterpreter是采用FIFOScheduler的代表,HiveInterpreter是采用ParallelScheduler的代表,而RemoteInterpreterInterpreter是唯一采用RemoteScheduler的)
各个Scheduler的共同点是:
1) 都是基于“共享缓冲区的生产者——消费者”模型,该模型中的几个角色对应如下:
a) 产品:org.apache.zeppelin.scheduler.Job(Interpreter的Job调度没有用到Quartz),实际是Job的具体实现类ParagraphJob和InterpretJob
b) 生产者:往共享缓冲区中提交“产品”的类,zeppelin中有2个:Note和RemoteInterpreterServer。
c) 消费者:Scheduler,实际是Scheduler的实现类FIFOScheduler、ParallelScheduler和RemoteScheduler。
d) 共享缓冲区:各个Scheduler中的内部的Job队列,实际是LinkedList。
2) 底层都采用java.util.concurrent.ExecutorService执行器框架来运行Job。
zeppelin对如下2种任务进行了Job封装(注:还有一种SleepingJob,由于仅仅用在了UnitTests中,故忽略之),以实现异步调度。
下面分析一下各个Scheduler的调度策略:
FIFOScheduler采用FIFO调度策略,不支持并发,内部使用LinkedList来存储Job。由于Scheduler同时实现了Runnable接口,Scheduler的实际完成调度的过程是在run()方法中实现的,其核心的逻辑是:
每次从LinkedList对列头部取一个Job,然后交给ExecutorService去执行,并且在执行前后以及出错时,维护Job的状态,并且调用SchedulerListener进行一些事件处理。
而这个executor实际被初始化成Executors.newScheduledThreadPool(100)。
这里需要注意的是:FIFOScheduler维护了一个Job runningJob
字段,在执行过程中,通过判断当前正在执行的runningJob==null来判定是否有Job正在执行,如果有,则不会再从队列中取其他Job,保证了同一个时刻只有一个Job在执行。这点与ParallelScheduler不同。
1.1.1.1.1 哪些Interpreter的实现类是不支持并发的?
ParallelScheduler内部维护了一个maxConcurrency状态,该字段控制了最大并发数量,每个Interpreter默认支持多少个并发,由各个Interpreter的实现类在getScheduler()方法的实现中向SchedulerFactory的工厂方法传入该参数。
1.1.1.2.1 哪些Interpreter的实现类是可以并发的,并发数是多少?
runs in ZeppelinServer and proxies Scheduler runningon RemoteInterpreter)
实际上,RemoteScheduler需要通过获得Thrift-Client来查询远程Job的状态。
每个具体的Interpreter实现类在获取Scheduler实例时,都由自己决定初始化哪种类型的Scheduler实例:
(PS:图中的SparkInterpreter是采用FIFOScheduler的代表,HiveInterpreter是采用ParallelScheduler的代表,而RemoteInterpreterInterpreter是唯一采用RemoteScheduler的)
各个Scheduler的共同点是:
1) 都是基于“共享缓冲区的生产者——消费者”模型,该模型中的几个角色对应如下:
a) 产品:org.apache.zeppelin.scheduler.Job(Interpreter的Job调度没有用到Quartz),实际是Job的具体实现类ParagraphJob和InterpretJob
b) 生产者:往共享缓冲区中提交“产品”的类,zeppelin中有2个:Note和RemoteInterpreterServer。
c) 消费者:Scheduler,实际是Scheduler的实现类FIFOScheduler、ParallelScheduler和RemoteScheduler。
d) 共享缓冲区:各个Scheduler中的内部的Job队列,实际是LinkedList。
2) 底层都采用java.util.concurrent.ExecutorService执行器框架来运行Job。
zeppelin对如下2种任务进行了Job封装(注:还有一种SleepingJob,由于仅仅用在了UnitTests中,故忽略之),以实现异步调度。
下面分析一下各个Scheduler的调度策略:
1.1.1.1 FIFOScheduler
FIFOScheduler采用FIFO调度策略,不支持并发,内部使用LinkedList来存储Job。由于Scheduler同时实现了Runnable接口,Scheduler的实际完成调度的过程是在run()方法中实现的,其核心的逻辑是:
每次从LinkedList对列头部取一个Job,然后交给ExecutorService去执行,并且在执行前后以及出错时,维护Job的状态,并且调用SchedulerListener进行一些事件处理。
而这个executor实际被初始化成Executors.newScheduledThreadPool(100)。
这里需要注意的是:FIFOScheduler维护了一个Job runningJob
字段,在执行过程中,通过判断当前正在执行的runningJob==null来判定是否有Job正在执行,如果有,则不会再从队列中取其他Job,保证了同一个时刻只有一个Job在执行。这点与ParallelScheduler不同。
1.1.1.1.1 哪些Interpreter的实现类是不支持并发的?
AngularInterpreter |
FileInterpreter |
HbaseInterpreter |
IgniteInterpreter和IgniteSqlInterpreter |
JDBCInterpreter |
LivyPySparkInterpreter、LivySparkInterpreter、LivySparkRInterpreter和LivySparkSQLInterpreter |
PhoenixInterpreter |
PostgreSqlInterpreter |
SparkInterpreter(scala)、SparkRInterpreter |
TajoInterpreter |
rinterpreter(原生R) |
1.1.1.2 ParallelScheduler
ParallelScheduler对每个Job的执行过程与FIFOScheduler并没有区别,区别在于并发数量的控制上。ParallelScheduler内部维护了一个maxConcurrency状态,该字段控制了最大并发数量,每个Interpreter默认支持多少个并发,由各个Interpreter的实现类在getScheduler()方法的实现中向SchedulerFactory的工厂方法传入该参数。
1.1.1.2.1 哪些Interpreter的实现类是可以并发的,并发数是多少?
解释器 | 默认并发数量 | 备注 |
CassandraInterpreter | 10 | 每个实例允许的并发数量,总的并发数量=实例个数*默认并发数量 |
HiveInterpreter | 10 | |
LensInterpreter | 10 | |
Markdown | 5 | |
ShellInterpreter | 10 | |
SparkSqlInterpreter | 10 | zeppelin.spark.concurrentSQL 配置参数为true时 |
1.1.1.3 RemoteScheduler
与RemoteInterpreterProcess配合使用的,RemoteInterpreterProcess以独立的进程启动Interpreter,其内部同样运行了调度器,由于zeppelinServer运行在主进程中,与远程Interpreter进程(通过RemoteInterpreterServer启动的jvm,注意:不是运行InterpreterProcess类所在的进程,InterpreterProcess仍然运行在与ZeppelinServer相同的主进程中)不在同一个进程。RemoteScheduler的作用就作为运行在远程Interpreter进程的远程代理,RemoteScheduler与ZeppelinServer运行在同一个JVM进程中,负责向ZeppelinServer提供远程Interpreter进程中调度器的内部运行情况。(以下来自于RemoteScheduler的类注释:RemoteSchedulerruns in ZeppelinServer and proxies Scheduler runningon RemoteInterpreter)
实际上,RemoteScheduler需要通过获得Thrift-Client来查询远程Job的状态。
相关文章推荐
- 从源码安装Mysql/Percona 5.5
- 架构纵横谈之二 ---- 架构的模式与要点
- BS项目中的CSS架构_仅加载自己需要的CSS
- ruby 单态方法 分析
- 典型入侵日志分析
- 浅析Ruby的源代码布局及其编程风格
- 关于三种主流WEB架构的思考
- 一根网线内的8根线哪4根是传输数据的,哪四根是防干扰的
- Android操作系统的架构设计分析
- asp.net 抓取网页源码三种实现方法
- w3c技术架构介绍
- JavaScript分析、压缩工具JavaScript Analyser
- JavaScript 组件之旅(一)分析和设计
- JS小游戏之仙剑翻牌源码详解
- Ajax 的六个误区小结分析
- JS小游戏之宇宙战机源码详解
- jquery的总体架构分析及实现示例详解
- jQuery源码分析之jQuery中的循环技巧详解
- 本人自用的global.js库源码分享
- linux学习笔记 linux目录架构