Zeppelin 源码分析-Interpreter 相关类(3)
2017-08-02 20:12
274 查看
和 Interpreter 直接相关类有以下几个:
Interpreter, InterpreterFactory, RemoteInterpreter, InterpreterGroup, InterpreterSetting。
由于篇幅有限,这里分开介绍。
该类的所有函数如下:
除了画圈部分,其余函数均需要调用本类的其余函数才能完成指定功能,因此先从三个独立的函数说起。
这个函数只有在 interpreterGroup 中找不到对应的代理解释器的时候才会被调用,因此首先需要做的就是处理正要创建的代理解释器所在的 InterpreterGroup 中的原来的解释器对象正在被移除的情况:
然后需要做的就是将某组中所有的解释器代理对象创建出来,具体的创建过程如下:之前说过 InterpreterSetting 对象其实就是记录了网页中一组解释器的一些配置信息,因此首先将这些配置信息找出,然后使用这些配置信息一次性创建该组的所有解释器的代理,对于 interpreterInfos 中每个对象,首先判断该解释器是运行在主 JVM 还是运行在独立的 JVM 中,当然目前为止 option.isRemote() 返回总是 true ,然后再判断该解释器是在本地的 JVM 还是在 另一台机器的 JVM,如果在本地调用 createRemoteRepl 方法,否则调用 connectToRemoteRepl 方法,创建完代理对象之后,就需要为代理对象设置 InterpreterGroup,并在该代理对象对应的 InterpreterGroup 中加入该代理对象,其中每组解释器的默认解释器放在 list 的第一个位置。
这个方法首先根据 noteID(即你在网页中创建的 NoteBook 的 ID),得到该 NoteBook 绑定的所有解释器的信息,其中,你建立该 NoteBook 时设置的默认解释器放在 settings 的第一个位置:
在 replName 是 null 或者 “” 的时候返回该 NoteBook 的默认解释器,这在程序刚开始初始化的时候调用,就算你没点进你创建的 NoteBook ,他也会调用这块, getDefaultInterpreterSetting 方法其实就是返回 settings[0] 。
如果 replName 非空,即是 spark,spark.sql 等,则分情况分析:
* 首先解析类似于 spark.sql 这种 replName ,根据组名 group 和 settings 获取需要的解释器的设置信息,getInterpreter(String user, String noteId, InterpreterSetting setting, String name) 方法就是根据 setting 中的 interpreterGroupRef 属性获取到对应 InterpreterGroup 中名字为 name 的代理解释器对象,获取到之后即返回解释器代理对象。
* 如果输入的是 spark,mysql 这种 replName ,首先找到默认解释器组,然后如果这里有的话直接返回,没有的话继续寻找,这时只能把 replName 参数当成解释器组名,从 settings 中寻找第一个组名为 replName 的解释器组,如果这里面依然没有的话,那么就从所有的 setting 里面搜索。
这里举个例子说明第二种情况,假设我建了两个解释器,组名都是 spark ,id 分别为 2CPNNU13C 和 2CPNNU13D,然后都是以共享进程启动:
然后我新建一个 NoteBook(ID 为 2CPNNU13E),网页中选的默认解释器是 spark1,绑定的解释器是所有,那么程序刚启动时 id 分别为 2CPNNU13D 的 InterpreterSetting 对象中的 interpreterGroupRef 属性中就会存有这个 id 为 2CPNNU13E 的 NoteBook 对象对应的解释器组对象,这个解释器组对象的 id 为 2CPNNU13E:share_progress ,只有一个元祖,key为 share_progress,value 就是 spark 的五个代理对象。这个函数的第一句话 interpreterSettingManager.getInterpreterSettings(noteId) 返回的是所有解释器设置,其中第一项就是 id 分别为 2CPNNU13D 的 InterpreterSetting 对象。当我在一个 ParaGraph 输入 %spark1 的时候,由于默认解释器就是 spark1,因此会进入第一个分支,而输入 %spark 的时候其实会进入第三个分支因为 settings 中寻找第一个组名为 spark 的解释器组其实返回的是 id 分别为 2CPNNU13D 的 InterpreterSetting 对象。
Interpreter, InterpreterFactory, RemoteInterpreter, InterpreterGroup, InterpreterSetting。
由于篇幅有限,这里分开介绍。
InterpreterFactor 类
InterpreterFactor 类是 Interpreter 接口的工厂类,各种 Interpreter (其实这个类创建出来的全部都是主进程中的代理)都是从这个类中创造出来的,换句话说,这个类专门为独立的解释器进程创建代理对象,这个类比较关键,因此下面一个一个函数介绍。该类的所有函数如下:
除了画圈部分,其余函数均需要调用本类的其余函数才能完成指定功能,因此先从三个独立的函数说起。
creatRepl
creatRepl 是创建一个在主进程中创建解释器的函数,但是鉴于目前所有的解释器都在独立的进程中,因此此函数目前来说其实没有什么意义。connectToRemoteRepl
这个函数是为远程的解释器 JVM 创建本地代理对象,关键代码如下:LazyOpenInterpreter intp = new LazyOpenInterpreter( new RemoteInterpreter(property, interpreterSessionKey, className, host, port, localRepoPath, connectTimeout, maxPoolSize, remoteInterpreterProcessListener, appEventListener, userName, isUserImpersonate, conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT)));
createRemoteRepl
这个函数除了构建本地代理的时候不需要传送 ip 地址和端口号外,其余基本相似,另外还需要传送一个 interpreterRunnerPath 参数,这个参数指定了启动 Thirf 服务器的脚本的路径,代码比较简单,这里不再赘述。createInterpreterGroup
这个函数就是根据给定的 id 建立一个 InterpreterGroup ,是在 InterpreterSetting 中的 getInterpreterGroup 方法被唯一调用,新建之后立即加入到该类的 interpreterGroupRef 中,该属性是一个 Map,key 为 InterpreterGroup 的 id,value 为 InterpreterGroup 对象,属性的作用就是根据 id 获取到真实的对象。这个函数在 2 中说过,这里不再赘述。createInterpretersForNote
这个函数的调用层次如下,可见也是一个相对基础的函数这个函数只有在 interpreterGroup 中找不到对应的代理解释器的时候才会被调用,因此首先需要做的就是处理正要创建的代理解释器所在的 InterpreterGroup 中的原来的解释器对象正在被移除的情况:
InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup(user, noteId); synchronized (interpreterGroup) { long interpreterRemovalWaitStart = System.nanoTime(); long minTimeout = 10L * 1000 * 1000000; // 10 sec long interpreterRemovalWaitTimeout = Math.max(minTimeout, conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT) * 1000000L * 2); while (interpreterGroup.containsKey(interpreterSessionKey)) { if (System.nanoTime() - interpreterRemovalWaitStart > interpreterRemovalWaitTimeout) { throw new InterpreterException("Can not create interpreter"); } try { interpreterGroup.wait(1000); } catch (InterruptedException e) { logger.debug(e.getMessage(), e); } } }
然后需要做的就是将某组中所有的解释器代理对象创建出来,具体的创建过程如下:之前说过 InterpreterSetting 对象其实就是记录了网页中一组解释器的一些配置信息,因此首先将这些配置信息找出,然后使用这些配置信息一次性创建该组的所有解释器的代理,对于 interpreterInfos 中每个对象,首先判断该解释器是运行在主 JVM 还是运行在独立的 JVM 中,当然目前为止 option.isRemote() 返回总是 true ,然后再判断该解释器是在本地的 JVM 还是在 另一台机器的 JVM,如果在本地调用 createRemoteRepl 方法,否则调用 connectToRemoteRepl 方法,创建完代理对象之后,就需要为代理对象设置 InterpreterGroup,并在该代理对象对应的 InterpreterGroup 中加入该代理对象,其中每组解释器的默认解释器放在 list 的第一个位置。
List<InterpreterInfo> interpreterInfos = interpreterSetting.getInterpreterInfos(); Interpreter interpreter; for (InterpreterInfo info : interpreterInfos) { if (option.isRemote()) { if (option.isExistingProcess()) { interpreter = connectToRemoteRepl(interpreterSessionKey, info.getClassName(), option.getHost(),option.getPort(), properties, interpreterSetting.getId(), user, option.isUserImpersonate); } else { interpreter = createRemoteRepl(path, interpreterSessionKey, info.getClassName(), properties, interpreterSetting.getId(), user, option.isUserImpersonate(), runner); } } else { interpreter = createRepl(interpreterSetting.getPath(), info.getClassName(), properties); } // 省略 }
createOrGetInterpreterList
createInterpretersForNote 只在 createOrGetInterpreterList 方法中会被调用, createOrGetInterpreterList 方法先根据 setting,user,noteId 创建或者获取需要执行的解释器所在的组,之后再检查该组中是否有指定 interpreterSessionKey 的解释器代理列表,如果没有就创建,否则就直接返回,代码如下:private List<Interpreter> createOrGetInterpreterList(String user, String noteId, InterpreterSetting setting) { InterpreterGroup interpreterGroup = setting.getInterpreterGroup(user, noteId); synchronized (interpreterGroup) { String interpreterSessionKey = interpreterSettingManager.getInterpreterSessionKey(user, noteId, setting); if (!interpreterGroup.containsKey(interpreterSessionKey)) { createInterpretersForNote(setting, user, noteId, interpreterSessionKey); } return interpreterGroup.get(interpreterSessionKey); } }
getInterpreter(String user, String noteId, String replName)
这个方法才是 InterpreterFactor 类供外界使用的得到一个解释器代理对象的方法。这个方法首先根据 noteID(即你在网页中创建的 NoteBook 的 ID),得到该 NoteBook 绑定的所有解释器的信息,其中,你建立该 NoteBook 时设置的默认解释器放在 settings 的第一个位置:
List<Interpre b97e terSetting> settings = interpreterSettingManager.getInterpreterSettings(noteId);
在 replName 是 null 或者 “” 的时候返回该 NoteBook 的默认解释器,这在程序刚开始初始化的时候调用,就算你没点进你创建的 NoteBook ,他也会调用这块, getDefaultInterpreterSetting 方法其实就是返回 settings[0] 。
if (replName == null || replName.trim().length() == 0) { // get default settings (first available) // TODO(jl): Fix it in case of returning null InterpreterSetting defaultSettings = interpreterSettingManager .getDefaultInterpreterSetting(settings); return createOrGetInterpreterList(user, noteId, defaultSettings).get(0); }
如果 replName 非空,即是 spark,spark.sql 等,则分情况分析:
* 首先解析类似于 spark.sql 这种 replName ,根据组名 group 和 settings 获取需要的解释器的设置信息,getInterpreter(String user, String noteId, InterpreterSetting setting, String name) 方法就是根据 setting 中的 interpreterGroupRef 属性获取到对应 InterpreterGroup 中名字为 name 的代理解释器对象,获取到之后即返回解释器代理对象。
* 如果输入的是 spark,mysql 这种 replName ,首先找到默认解释器组,然后如果这里有的话直接返回,没有的话继续寻找,这时只能把 replName 参数当成解释器组名,从 settings 中寻找第一个组名为 replName 的解释器组,如果这里面依然没有的话,那么就从所有的 setting 里面搜索。
if (replNameSplit.length == 2) { String group = null; String name = null; group = replNameSplit[0]; name = replNameSplit[1]; setting = getInterpreterSettingByGroup(settings, group); if (null != setting) { interpreter = getInterpreter(user, noteId, setting, name); if (null != interpreter) { return interpreter; } } throw new InterpreterException(replName + " interpreter not found"); } else { setting = interpreterSettingManager.getDefaultInterpreterSetting(settings); interpreter = getInterpreter(user, noteId, setting, replName); if (null != interpreter) { return interpreter; } setting = getInterpreterSettingByGroup(settings, replName); if (null != setting) { List<Interpreter> interpreters = createOrGetInterpreterList(user, noteId, setting); if (null != interpreters) { return interpreters.get(0); } } for (InterpreterSetting s : settings) { if (s.getGroup().equals(replName)) { List<Interpreter> interpreters = createOrGetInterpreterList(user, noteId, s); if (null != interpreters) { return interpreters.get(0); } } } }
这里举个例子说明第二种情况,假设我建了两个解释器,组名都是 spark ,id 分别为 2CPNNU13C 和 2CPNNU13D,然后都是以共享进程启动:
然后我新建一个 NoteBook(ID 为 2CPNNU13E),网页中选的默认解释器是 spark1,绑定的解释器是所有,那么程序刚启动时 id 分别为 2CPNNU13D 的 InterpreterSetting 对象中的 interpreterGroupRef 属性中就会存有这个 id 为 2CPNNU13E 的 NoteBook 对象对应的解释器组对象,这个解释器组对象的 id 为 2CPNNU13E:share_progress ,只有一个元祖,key为 share_progress,value 就是 spark 的五个代理对象。这个函数的第一句话 interpreterSettingManager.getInterpreterSettings(noteId) 返回的是所有解释器设置,其中第一项就是 id 分别为 2CPNNU13D 的 InterpreterSetting 对象。当我在一个 ParaGraph 输入 %spark1 的时候,由于默认解释器就是 spark1,因此会进入第一个分支,而输入 %spark 的时候其实会进入第三个分支因为 settings 中寻找第一个组名为 spark 的解释器组其实返回的是 id 分别为 2CPNNU13D 的 InterpreterSetting 对象。
相关文章推荐
- Zeppelin 源码分析-Interpreter 相关类(1)
- Zeppelin源码分析-Interpreter 相关类(2)
- Zeppelin 源码分析-独立解释器 JVM 相关分析(1)
- zeppelin源码分析(4)——interpreter的调度和任务封装
- Zeppelin 源码分析-独立解释器 JVM 相关分析(2)
- Zeppelin源码分析-独立解释器 JVM 相关分析(4)
- zeppelin源码分析(3)——主要的class分析(上)
- Thinking in Java之集合相关整理(源码分析)
- Orchard源码分析(5):Host相关(Orchard.Environment.DefaultOrchardHost类)
- zeppelin源码分析(0)——zeppelin要解决什么问题
- ThreadPoolExecutor的应用和实现分析(中)—— 任务处理相关源码分析
- PostgreSQL源码目录结构及其相关模块功能分析
- HBase 1.1.3 balance相关源码分析 一
- Android源码分析相关工具
- 【Android】条形码/二维码扫描——ZXing源码分析及相关jar包导入
- Linux 学习数据专题【管理、编程、源码分析】——Linux相关图书选购指南
- 结合源码分析HBase相关操作流程
- zeppelin源码分析(7)——interpreter调试
- spark core源码分析17 RDD相关API
- weka之Bagging的源码分析及相关知识点