Spark源码解析(二)
2016-07-22 10:05
323 查看
这次继续上次的地方,我这里只挑我个人感觉有意义的方法来讲,有些是内部的private方法,有些是展现给我们的外部可以调用的方法。这里主要讲的方法是他的内部方法withScope,方法如下:
我们可以个根据方法上上面的注释了解到:在一个范围内执行代码块,从而使得在这个body内创建的新的RDDS都是同一范围里面的不同部分。那么我们来看方法,首先声明是private方法,同时只在spark包中是有效的。然后定义了一个方法,名字叫做withScope。后面一开始我也有些懵逼,我是这么理解的(如果错了欢迎各位大神不吝赐教,小弟一定认真听教),这里定义了一个泛型,这个泛型是我们自定义的类型,继续往下看,这个函数需要一个参数,这个参数是一个函数。这样我们也就不难理解下面这段代码:
这个是RDD中最重要的一个方法了,我们这里先不对此方法进行讲解,我们看后面,可能有些人会有些茫然,啥东西,咋一个方法后面跟了个大括号。其实是这样的,因为方法需要一个函数作为参数,而我们这里直接一个匿名函数的实现来作为参数,跟java中的一样。那么这就不难理解了。再回到上面的withScope代码中,我们继续往下看,类型U是RDDOperationScope中的一个方法,后面有两个括号,不要紧张,这只是scala中的curring。说白了这一步相当于传统代码中的两步,先传sc调用一个函数,再传body调用一个函数(嗯,我感觉下一篇文章有必要介绍一下这个curring,挺有意思的)。那我们点进去查看一下,代码如下:
进入方法我们查看,这里方法又自定义了一个类型T,然后方法的第一个括号里面包含两个参数,有人问了,不对呀,刚才第一个括号里面不是只传了一个参数吗,不要着急,我们看第二个参数allowNesting/是否允许嵌套,他是一个Boolean类型的,而且有了默认值false,那么一切都解释的通了,调用方法时,这个参数可以不携带。然后后面第二个括号与之前传入的一致,然后定义了T类型,中间的代码大家可以略过,是用来获取一些变量。最后又调用了withScope,我们点进去查看,代码如下:
参数与之前传入的相一致。不要查其余无用代码,注意只看类型T返回的是什么?没错可能大家都已经看到了,返回的是传入的body。不要发飙,没人耍咱们,我个人理解,之前我们所做的所有准备都是为了控制body的上下文,也就是他这里面所谓的bodyScope,把他们控制在一个相同的范围内,说白了就是让所有在调用withScope方法内部创建的RDD都拥有相同的上下文环境,啥是上下文环境,sc呗,咋设置相同,setxxx都一样呗。这就是该代码的意义所在。那么到这里withScope代码介绍结束,希望各位路过的大牛指定不足之处。
感谢开源,让技术走近你我。
/** * Execute a block of code in a scope such that all new RDDs created in this body will * be part of the same scope. For more detail, see {{org.apache.spark.rdd.RDDOperationScope}}. * * Note: Return statements are NOT allowed in the given body. */ private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](sc)(body)
我们可以个根据方法上上面的注释了解到:在一个范围内执行代码块,从而使得在这个body内创建的新的RDDS都是同一范围里面的不同部分。那么我们来看方法,首先声明是private方法,同时只在spark包中是有效的。然后定义了一个方法,名字叫做withScope。后面一开始我也有些懵逼,我是这么理解的(如果错了欢迎各位大神不吝赐教,小弟一定认真听教),这里定义了一个泛型,这个泛型是我们自定义的类型,继续往下看,这个函数需要一个参数,这个参数是一个函数。这样我们也就不难理解下面这段代码:
/** * Return a new RDD by applying a function to all elements of this RDD. */ def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) }
这个是RDD中最重要的一个方法了,我们这里先不对此方法进行讲解,我们看后面,可能有些人会有些茫然,啥东西,咋一个方法后面跟了个大括号。其实是这样的,因为方法需要一个函数作为参数,而我们这里直接一个匿名函数的实现来作为参数,跟java中的一样。那么这就不难理解了。再回到上面的withScope代码中,我们继续往下看,类型U是RDDOperationScope中的一个方法,后面有两个括号,不要紧张,这只是scala中的curring。说白了这一步相当于传统代码中的两步,先传sc调用一个函数,再传body调用一个函数(嗯,我感觉下一篇文章有必要介绍一下这个curring,挺有意思的)。那我们点进去查看一下,代码如下:
/** * Execute the given body such that all RDDs created in this body will have the same scope. * The name of the scope will be the first method name in the stack trace that is not the * same as this method's. * * Note: Return statements are NOT allowed in body. */ private[spark] def withScope[T]( sc: SparkContext, allowNesting: Boolean = false)(body: => T): T = { val ourMethodName = "withScope" val callerMethodName = Thread.currentThread.getStackTrace() .dropWhile(_.getMethodName != ourMethodName) .find(_.getMethodName != ourMethodName) .map(_.getMethodName) .getOrElse { // Log a warning just in case, but this should almost certainly never happen logWarning("No valid method name for this RDD operation scope!") "N/A" } withScope[T](sc, callerMethodName, allowNesting, ignoreParent = false)(body) }
进入方法我们查看,这里方法又自定义了一个类型T,然后方法的第一个括号里面包含两个参数,有人问了,不对呀,刚才第一个括号里面不是只传了一个参数吗,不要着急,我们看第二个参数allowNesting/是否允许嵌套,他是一个Boolean类型的,而且有了默认值false,那么一切都解释的通了,调用方法时,这个参数可以不携带。然后后面第二个括号与之前传入的一致,然后定义了T类型,中间的代码大家可以略过,是用来获取一些变量。最后又调用了withScope,我们点进去查看,代码如下:
/** * Execute the given body such that all RDDs created in this body will have the same scope. * * If nesting is allowed, any subsequent calls to this method in the given body will instantiate * child scopes that are nested within our scope. Otherwise, these calls will take no effect. * * Additionally, the caller of this method may optionally ignore the configurations and scopes * set by the higher level caller. In this case, this method will ignore the parent caller's * intention to disallow nesting, and the new scope instantiated will not have a parent. This * is useful for scoping physical operations in Spark SQL, for instance. * * Note: Return statements are NOT allowed in body. */ private[spark] def withScope[T]( sc: SparkContext, name: String, allowNesting: Boolean, ignoreParent: Boolean)(body: => T): T = { // Save the old scope to restore it later val scopeKey = SparkContext.RDD_SCOPE_KEY val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY val oldScopeJson = sc.getLocalProperty(scopeKey) val oldScope = Option(oldScopeJson).map(RDDOperationScope.fromJson) val oldNoOverride = sc.getLocalProperty(noOverrideKey) try { if (ignoreParent) { // Ignore all parent settings and scopes and start afresh with our own root scope sc.setLocalProperty(scopeKey, new RDDOperationScope(name).toJson) } else if (sc.getLocalProperty(noOverrideKey) == null) { // Otherwise, set the scope only if the higher level caller allows us to do so sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson) } // Optionally disallow the child body to override our scope if (!allowNesting) { sc.setLocalProperty(noOverrideKey, "true") } body } finally { // Remember to restore any state that was modified before exiting sc.setLocalProperty(scopeKey, oldScopeJson) sc.setLocalProperty(noOverrideKey, oldNoOverride) } } }
参数与之前传入的相一致。不要查其余无用代码,注意只看类型T返回的是什么?没错可能大家都已经看到了,返回的是传入的body。不要发飙,没人耍咱们,我个人理解,之前我们所做的所有准备都是为了控制body的上下文,也就是他这里面所谓的bodyScope,把他们控制在一个相同的范围内,说白了就是让所有在调用withScope方法内部创建的RDD都拥有相同的上下文环境,啥是上下文环境,sc呗,咋设置相同,setxxx都一样呗。这就是该代码的意义所在。那么到这里withScope代码介绍结束,希望各位路过的大牛指定不足之处。
感谢开源,让技术走近你我。
相关文章推荐
- 从源码安装Mysql/Percona 5.5
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- Windows下Scala环境搭建
- Spark随谈——开发指南(译)
- Spark,一种快速数据分析替代方案
- 浅析Ruby的源代码布局及其编程风格
- asp.net 抓取网页源码三种实现方法
- JS小游戏之仙剑翻牌源码详解
- JS小游戏之宇宙战机源码详解
- 深入浅析knockout源码分析之订阅
- jQuery源码分析之jQuery中的循环技巧详解
- 本人自用的global.js库源码分享
- java中原码、反码与补码的问题分析
- Windows7下安装Scala 2.9.2教程
- ASP.NET使用HttpWebRequest读取远程网页源代码
- PHP网页游戏学习之Xnova(ogame)源码解读(六)
- C#获取网页HTML源码实例
- PHP网页游戏学习之Xnova(ogame)源码解读(八)
- PHP网页游戏学习之Xnova(ogame)源码解读(四)