您的位置:首页 > 运维架构 > 网站架构

zeppelin源码分析(4)——interpreter的调度和任务封装

2016-06-02 18:48 891 查看
SchedulerFactory工厂类负责创建所有的Scheduler实例。SchedulerFactory本身采用了Singleton设计模式,保证全局唯一实例,与Scheduler和其实现类之前形成了“简单工厂”设计模式:



每个具体的Interpreter实现类在获取Scheduler实例时,都由自己决定初始化哪种类型的Scheduler实例:



(PS:图中的SparkInterpreter是采用FIFOScheduler的代表,HiveInterpreter是采用ParallelScheduler的代表,而RemoteInterpreterInterpreter是唯一采用RemoteScheduler的)

各个Scheduler的共同点是:

1) 都是基于“共享缓冲区的生产者——消费者”模型,该模型中的几个角色对应如下:

a) 产品:org.apache.zeppelin.scheduler.Job(Interpreter的Job调度没有用到Quartz),实际是Job的具体实现类ParagraphJob和InterpretJob

b) 生产者:往共享缓冲区中提交“产品”的类,zeppelin中有2个:Note和RemoteInterpreterServer。

c) 消费者:Scheduler,实际是Scheduler的实现类FIFOScheduler、ParallelScheduler和RemoteScheduler。

d) 共享缓冲区:各个Scheduler中的内部的Job队列,实际是LinkedList。

2) 底层都采用java.util.concurrent.ExecutorService执行器框架来运行Job。

zeppelin对如下2种任务进行了Job封装(注:还有一种SleepingJob,由于仅仅用在了UnitTests中,故忽略之),以实现异步调度。



下面分析一下各个Scheduler的调度策略:

1.1.1.1 FIFOScheduler



FIFOScheduler采用FIFO调度策略,不支持并发,内部使用LinkedList来存储Job。由于Scheduler同时实现了Runnable接口,Scheduler的实际完成调度的过程是在run()方法中实现的,其核心的逻辑是:

每次从LinkedList对列头部取一个Job,然后交给ExecutorService去执行,并且在执行前后以及出错时,维护Job的状态,并且调用SchedulerListener进行一些事件处理。



而这个executor实际被初始化成Executors.newScheduledThreadPool(100)。

这里需要注意的是:FIFOScheduler维护了一个Job runningJob

字段,在执行过程中,通过判断当前正在执行的runningJob==null来判定是否有Job正在执行,如果有,则不会再从队列中取其他Job,保证了同一个时刻只有一个Job在执行。这点与ParallelScheduler不同。

1.1.1.1.1 哪些Interpreter的实现类是不支持并发的?
AngularInterpreter

FileInterpreter

HbaseInterpreter

IgniteInterpreter和IgniteSqlInterpreter

JDBCInterpreter

LivyPySparkInterpreter、LivySparkInterpreter、LivySparkRInterpreter和LivySparkSQLInterpreter

PhoenixInterpreter

PostgreSqlInterpreter

SparkInterpreter(scala)、SparkRInterpreter

TajoInterpreter

rinterpreter(原生R)

1.1.1.2 ParallelScheduler

ParallelScheduler对每个Job的执行过程与FIFOScheduler并没有区别,区别在于并发数量的控制上。


ParallelScheduler内部维护了一个maxConcurrency状态,该字段控制了最大并发数量,每个Interpreter默认支持多少个并发,由各个Interpreter的实现类在getScheduler()方法的实现中向SchedulerFactory的工厂方法传入该参数。

1.1.1.2.1 哪些Interpreter的实现类是可以并发的,并发数是多少?
解释器

默认并发数量

备注

CassandraInterpreter

10

每个实例允许的并发数量,总的并发数量=实例个数*默认并发数量

HiveInterpreter

10

LensInterpreter

10

Markdown

5

ShellInterpreter

10

SparkSqlInterpreter

10

zeppelin.spark.concurrentSQL

配置参数为true时

1.1.1.3 RemoteScheduler

与RemoteInterpreterProcess配合使用的,RemoteInterpreterProcess以独立的进程启动Interpreter,其内部同样运行了调度器,由于zeppelinServer运行在主进程中,与远程Interpreter进程(通过RemoteInterpreterServer启动的jvm,注意:不是运行InterpreterProcess类所在的进程,InterpreterProcess仍然运行在与ZeppelinServer相同的主进程中)不在同一个进程。RemoteScheduler的作用就作为运行在远程Interpreter进程的远程代理,RemoteScheduler与ZeppelinServer运行在同一个JVM进程中,负责向ZeppelinServer提供远程Interpreter进程中调度器的内部运行情况。(以下来自于RemoteScheduler的类注释:RemoteScheduler
runs in ZeppelinServer and proxies Scheduler runningon RemoteInterpreter)

实际上,RemoteScheduler需要通过获得Thrift-Client来查询远程Job的状态。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息