您的位置:首页 > 其它

Zeppelin 源码分析-Interpreter 相关类(3)

2017-08-02 20:12 274 查看
和 Interpreter 直接相关类有以下几个:

Interpreter, InterpreterFactory, RemoteInterpreter, InterpreterGroup, InterpreterSetting。

由于篇幅有限,这里分开介绍。

InterpreterFactor 类

InterpreterFactor 类是 Interpreter 接口的工厂类,各种 Interpreter (其实这个类创建出来的全部都是主进程中的代理)都是从这个类中创造出来的,换句话说,这个类专门为独立的解释器进程创建代理对象,这个类比较关键,因此下面一个一个函数介绍。

该类的所有函数如下:



除了画圈部分,其余函数均需要调用本类的其余函数才能完成指定功能,因此先从三个独立的函数说起。

creatRepl

creatRepl 是创建一个在主进程中创建解释器的函数,但是鉴于目前所有的解释器都在独立的进程中,因此此函数目前来说其实没有什么意义。

connectToRemoteRepl

这个函数是为远程的解释器 JVM 创建本地代理对象,关键代码如下:

LazyOpenInterpreter intp = new LazyOpenInterpreter(
new RemoteInterpreter(property, interpreterSessionKey, className, host, port, localRepoPath,
connectTimeout, maxPoolSize, remoteInterpreterProcessListener, appEventListener,
userName, isUserImpersonate, conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT)));


createRemoteRepl

这个函数除了构建本地代理的时候不需要传送 ip 地址和端口号外,其余基本相似,另外还需要传送一个 interpreterRunnerPath 参数,这个参数指定了启动 Thirf 服务器的脚本的路径,代码比较简单,这里不再赘述。

createInterpreterGroup

这个函数就是根据给定的 id 建立一个 InterpreterGroup ,是在 InterpreterSetting 中的 getInterpreterGroup 方法被唯一调用,新建之后立即加入到该类的 interpreterGroupRef 中,该属性是一个 Map,key 为 InterpreterGroup 的 id,value 为 InterpreterGroup 对象,属性的作用就是根据 id 获取到真实的对象。这个函数在 2 中说过,这里不再赘述。

createInterpretersForNote

这个函数的调用层次如下,可见也是一个相对基础的函数



这个函数只有在 interpreterGroup 中找不到对应的代理解释器的时候才会被调用,因此首先需要做的就是处理正要创建的代理解释器所在的 InterpreterGroup 中的原来的解释器对象正在被移除的情况:

InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup(user, noteId);
synchronized (interpreterGroup) {
long interpreterRemovalWaitStart = System.nanoTime();
long minTimeout = 10L * 1000 * 1000000; // 10 sec
long interpreterRemovalWaitTimeout = Math.max(minTimeout,
conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT) * 1000000L * 2);
while (interpreterGroup.containsKey(interpreterSessionKey)) {
if (System.nanoTime() - interpreterRemovalWaitStart > interpreterRemovalWaitTimeout) {
throw new InterpreterException("Can not create interpreter");
}
try {
interpreterGroup.wait(1000);
} catch (InterruptedException e) {
logger.debug(e.getMessage(), e);
}
}
}


然后需要做的就是将某组中所有的解释器代理对象创建出来,具体的创建过程如下:之前说过 InterpreterSetting 对象其实就是记录了网页中一组解释器的一些配置信息,因此首先将这些配置信息找出,然后使用这些配置信息一次性创建该组的所有解释器的代理,对于 interpreterInfos 中每个对象,首先判断该解释器是运行在主 JVM 还是运行在独立的 JVM 中,当然目前为止 option.isRemote() 返回总是 true ,然后再判断该解释器是在本地的 JVM 还是在 另一台机器的 JVM,如果在本地调用 createRemoteRepl 方法,否则调用 connectToRemoteRepl 方法,创建完代理对象之后,就需要为代理对象设置 InterpreterGroup,并在该代理对象对应的 InterpreterGroup 中加入该代理对象,其中每组解释器的默认解释器放在 list 的第一个位置。

List<InterpreterInfo> interpreterInfos = interpreterSetting.getInterpreterInfos();
Interpreter interpreter;
for (InterpreterInfo info : interpreterInfos) {
if (option.isRemote()) {
if (option.isExistingProcess()) {
interpreter = connectToRemoteRepl(interpreterSessionKey, info.getClassName(), option.getHost(),option.getPort(), properties, interpreterSetting.getId(), user, option.isUserImpersonate);
} else {
interpreter = createRemoteRepl(path, interpreterSessionKey, info.getClassName(),
properties, interpreterSetting.getId(), user, option.isUserImpersonate(), runner);
}
} else {
interpreter = createRepl(interpreterSetting.getPath(), info.getClassName(), properties);
}
// 省略
}


createOrGetInterpreterList

createInterpretersForNote 只在 createOrGetInterpreterList 方法中会被调用, createOrGetInterpreterList 方法先根据 setting,user,noteId 创建或者获取需要执行的解释器所在的组,之后再检查该组中是否有指定 interpreterSessionKey 的解释器代理列表,如果没有就创建,否则就直接返回,代码如下:

private List<Interpreter> createOrGetInterpreterList(String user, String noteId, InterpreterSetting setting) {
InterpreterGroup interpreterGroup = setting.getInterpreterGroup(user, noteId);
synchronized (interpreterGroup) {
String interpreterSessionKey =
interpreterSettingManager.getInterpreterSessionKey(user, noteId, setting);
if (!interpreterGroup.containsKey(interpreterSessionKey)) {
createInterpretersForNote(setting, user, noteId, interpreterSessionKey);
}
return interpreterGroup.get(interpreterSessionKey);
}
}


getInterpreter(String user, String noteId, String replName)

这个方法才是 InterpreterFactor 类供外界使用的得到一个解释器代理对象的方法。

这个方法首先根据 noteID(即你在网页中创建的 NoteBook 的 ID),得到该 NoteBook 绑定的所有解释器的信息,其中,你建立该 NoteBook 时设置的默认解释器放在 settings 的第一个位置:

List<Interpre
b97e
terSetting> settings = interpreterSettingManager.getInterpreterSettings(noteId);


在 replName 是 null 或者 “” 的时候返回该 NoteBook 的默认解释器,这在程序刚开始初始化的时候调用,就算你没点进你创建的 NoteBook ,他也会调用这块, getDefaultInterpreterSetting 方法其实就是返回 settings[0] 。

if (replName == null || replName.trim().length() == 0) {
// get default settings (first available)
// TODO(jl): Fix it in case of returning null
InterpreterSetting defaultSettings = interpreterSettingManager
.getDefaultInterpreterSetting(settings);
return createOrGetInterpreterList(user, noteId, defaultSettings).get(0);
}


如果 replName 非空,即是 spark,spark.sql 等,则分情况分析:

* 首先解析类似于 spark.sql 这种 replName ,根据组名 group 和 settings 获取需要的解释器的设置信息,getInterpreter(String user, String noteId, InterpreterSetting setting, String name) 方法就是根据 setting 中的 interpreterGroupRef 属性获取到对应 InterpreterGroup 中名字为 name 的代理解释器对象,获取到之后即返回解释器代理对象。

* 如果输入的是 spark,mysql 这种 replName ,首先找到默认解释器组,然后如果这里有的话直接返回,没有的话继续寻找,这时只能把 replName 参数当成解释器组名,从 settings 中寻找第一个组名为 replName 的解释器组,如果这里面依然没有的话,那么就从所有的 setting 里面搜索。

if (replNameSplit.length == 2) {
String group = null;
String name = null;
group = replNameSplit[0];
name = replNameSplit[1];
setting = getInterpreterSettingByGroup(settings, group);
if (null != setting) {
interpreter = getInterpreter(user, noteId, setting, name);
if (null != interpreter) {
return interpreter;
}
}
throw new InterpreterException(replName + " interpreter not found");
} else {
setting = interpreterSettingManager.getDefaultInterpreterSetting(settings);
interpreter = getInterpreter(user, noteId, setting, replName);
if (null != interpreter) {
return interpreter;
}
setting = getInterpreterSettingByGroup(settings, replName);
if (null != setting) {
List<Interpreter> interpreters = createOrGetInterpreterList(user, noteId, setting);
if (null != interpreters) {
return interpreters.get(0);
}
}
for (InterpreterSetting s : settings) {
if (s.getGroup().equals(replName)) {
List<Interpreter> interpreters = createOrGetInterpreterList(user, noteId, s);
if (null != interpreters) {
return interpreters.get(0);
}
}
}
}


这里举个例子说明第二种情况,假设我建了两个解释器,组名都是 spark ,id 分别为 2CPNNU13C 和 2CPNNU13D,然后都是以共享进程启动:





然后我新建一个 NoteBook(ID 为 2CPNNU13E),网页中选的默认解释器是 spark1,绑定的解释器是所有,那么程序刚启动时 id 分别为 2CPNNU13D 的 InterpreterSetting 对象中的 interpreterGroupRef 属性中就会存有这个 id 为 2CPNNU13E 的 NoteBook 对象对应的解释器组对象,这个解释器组对象的 id 为 2CPNNU13E:share_progress ,只有一个元祖,key为 share_progress,value 就是 spark 的五个代理对象。这个函数的第一句话 interpreterSettingManager.getInterpreterSettings(noteId) 返回的是所有解释器设置,其中第一项就是 id 分别为 2CPNNU13D 的 InterpreterSetting 对象。当我在一个 ParaGraph 输入 %spark1 的时候,由于默认解释器就是 spark1,因此会进入第一个分支,而输入 %spark 的时候其实会进入第三个分支因为 settings 中寻找第一个组名为 spark 的解释器组其实返回的是 id 分别为 2CPNNU13D 的 InterpreterSetting 对象。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  zeppelin 解释器