您的位置:首页 > 其它

Zeppelin 源码分析-调度和资源分析(2)

2017-08-02 20:14 211 查看

Scheduler 类

Scheduler 类是调度类的抽象类,其中定义了很多关键方法,比如 submit 方法等,并且是一个线程类,一直运行在主进程或者独立 JVM 进程。



RemoteScheduler 类

RemoteScheduler 类是运行在主进程的调度类,支持并发操作,实现的方式就是之前说过的将 Job 封装成 RemoteScheduler.JobRunner 类,RemoteScheduler 中维护两个队列,queue 用于存储所有的需要运行的 Job,running 用于存储正在运行的 Job,由于该类在主进程,所有同一时间可能会有多个解释器 JVM 中的 Job 正在运行,所以 running 是一个队列,在 FIFOScheduler 中,该属性其实就是一个对象,因为先进先出调度同一时间只能有一个正在运行的 Job 类的对象:

List<Job> queue = new LinkedList<>();
List<Job> running = new LinkedList<>();


该线程中 run 方法流程如下:

首先当正在运行的 Job 总数大于阈值或者任务 queue 是空的时候,等待 0.5s ,其他情况下从队列中取出一个 Job 并加入到 running 队列中:

synchronized (queue) {
if (running.size() >= maxConcurrency || queue.isEmpty() == true) {
try {
queue.wait(500);
} catch (InterruptedException e) {
logger.error("Exception in RemoteScheduler while run queue.wait", e);
}
continue;
}

job = queue.remove(0);
running.add(job);
}


然后对 job 封装成 JobRunner 的对象 jobRunner,封装原因在 Job 相关类中说过,封装之后直接将 jobRunner 直接扔到 Java 自带的线程池运行:

Scheduler scheduler = this;
JobRunner jobRunner = new JobRunner(scheduler, job);
executor.execute(jobRunner);


一旦该 job 被提交到解释器 JVM,则进行下一次循环,重复以上步骤。

ParallelScheduler 类

这个类是运行在解释器 JVM 中的调度类,支持并发 Job,实现的方式是 Job 封装成 ParallelScheduler.JobRunner 类,代码基本和 RemoteScheduler 相似,这里不再赘述。

FIFOScheduler 类

这个类也是运行在解释器 JVM 中的调度类,只不过不支持并发,由于比较简单,这里不再赘述。

SchedulerListener 类

SchedulerListener 接口中定义了两个方法,jobStarted 和 jobFinished 方法,它唯一的实现类是 SchedulerFactory 类,实现类中这两个方法都是向日志中输出一句话。

SchedulerFactory 类

这个类是创建各种调度的工厂类,调用 reateOrGetXXXScheduler 的时候,返回的已经是在运行的线程了:

public Scheduler createOrGetFIFOScheduler(String name) {
synchronized (schedulers) {
if (schedulers.containsKey(name) == false) {
Scheduler s = new FIFOScheduler(name, executor, this);
schedulers.put(name, s);
executor.execute(s);
}
return schedulers.get(name);
}
}


Java 线程池参考资料

Java ExecutorService四种线程池的例子与说明

【Java并发编程】之十九:并发新特性—Executor框架与线程池(含代码)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  zeppelin