您的位置:首页 > 其它

Spark源码解析(二)

2016-07-22 10:05 323 查看
     这次继续上次的地方,我这里只挑我个人感觉有意义的方法来讲,有些是内部的private方法,有些是展现给我们的外部可以调用的方法。这里主要讲的方法是他的内部方法withScope,方法如下:

/**
* Execute a block of code in a scope such that all new RDDs created in this body will
* be part of the same scope. For more detail, see {{org.apache.spark.rdd.RDDOperationScope}}.
*
* Note: Return statements are NOT allowed in the given body.
*/
private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](sc)(body)


我们可以个根据方法上上面的注释了解到:在一个范围内执行代码块,从而使得在这个body内创建的新的RDDS都是同一范围里面的不同部分。那么我们来看方法,首先声明是private方法,同时只在spark包中是有效的。然后定义了一个方法,名字叫做withScope。后面一开始我也有些懵逼,我是这么理解的(如果错了欢迎各位大神不吝赐教,小弟一定认真听教),这里定义了一个泛型,这个泛型是我们自定义的类型,继续往下看,这个函数需要一个参数,这个参数是一个函数。这样我们也就不难理解下面这段代码:

/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

      这个是RDD中最重要的一个方法了,我们这里先不对此方法进行讲解,我们看后面,可能有些人会有些茫然,啥东西,咋一个方法后面跟了个大括号。其实是这样的,因为方法需要一个函数作为参数,而我们这里直接一个匿名函数的实现来作为参数,跟java中的一样。那么这就不难理解了。再回到上面的withScope代码中,我们继续往下看,类型U是RDDOperationScope中的一个方法,后面有两个括号,不要紧张,这只是scala中的curring。说白了这一步相当于传统代码中的两步,先传sc调用一个函数,再传body调用一个函数(嗯,我感觉下一篇文章有必要介绍一下这个curring,挺有意思的)。那我们点进去查看一下,代码如下:

/**
* Execute the given body such that all RDDs created in this body will have the same scope.
* The name of the scope will be the first method name in the stack trace that is not the
* same as this method's.
*
* Note: Return statements are NOT allowed in body.
*/
private[spark] def withScope[T](
sc: SparkContext,
allowNesting: Boolean = false)(body: => T): T = {
val ourMethodName = "withScope"
val callerMethodName = Thread.currentThread.getStackTrace()
.dropWhile(_.getMethodName != ourMethodName)
.find(_.getMethodName != ourMethodName)
.map(_.getMethodName)
.getOrElse {
// Log a warning just in case, but this should almost certainly never happen
logWarning("No valid method name for this RDD operation scope!")
"N/A"
}
withScope[T](sc, callerMethodName, allowNesting, ignoreParent = false)(body)
}

      进入方法我们查看,这里方法又自定义了一个类型T,然后方法的第一个括号里面包含两个参数,有人问了,不对呀,刚才第一个括号里面不是只传了一个参数吗,不要着急,我们看第二个参数allowNesting/是否允许嵌套,他是一个Boolean类型的,而且有了默认值false,那么一切都解释的通了,调用方法时,这个参数可以不携带。然后后面第二个括号与之前传入的一致,然后定义了T类型,中间的代码大家可以略过,是用来获取一些变量。最后又调用了withScope,我们点进去查看,代码如下:

/**
* Execute the given body such that all RDDs created in this body will have the same scope.
*
* If nesting is allowed, any subsequent calls to this method in the given body will instantiate
* child scopes that are nested within our scope. Otherwise, these calls will take no effect.
*
* Additionally, the caller of this method may optionally ignore the configurations and scopes
* set by the higher level caller. In this case, this method will ignore the parent caller's
* intention to disallow nesting, and the new scope instantiated will not have a parent. This
* is useful for scoping physical operations in Spark SQL, for instance.
*
* Note: Return statements are NOT allowed in body.
*/
private[spark] def withScope[T](
sc: SparkContext,
name: String,
allowNesting: Boolean,
ignoreParent: Boolean)(body: => T): T = {
// Save the old scope to restore it later
val scopeKey = SparkContext.RDD_SCOPE_KEY
val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
val oldScopeJson = sc.getLocalProperty(scopeKey)
val oldScope = Option(oldScopeJson).map(RDDOperationScope.fromJson)
val oldNoOverride = sc.getLocalProperty(noOverrideKey)
try {
if (ignoreParent) {
// Ignore all parent settings and scopes and start afresh with our own root scope
sc.setLocalProperty(scopeKey, new RDDOperationScope(name).toJson)
} else if (sc.getLocalProperty(noOverrideKey) == null) {
// Otherwise, set the scope only if the higher level caller allows us to do so
sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson)
}
// Optionally disallow the child body to override our scope
if (!allowNesting) {
sc.setLocalProperty(noOverrideKey, "true")
}
body
} finally {
// Remember to restore any state that was modified before exiting
sc.setLocalProperty(scopeKey, oldScopeJson)
sc.setLocalProperty(noOverrideKey, oldNoOverride)
}
}
}

    参数与之前传入的相一致。不要查其余无用代码,注意只看类型T返回的是什么?没错可能大家都已经看到了,返回的是传入的body。不要发飙,没人耍咱们,我个人理解,之前我们所做的所有准备都是为了控制body的上下文,也就是他这里面所谓的bodyScope,把他们控制在一个相同的范围内,说白了就是让所有在调用withScope方法内部创建的RDD都拥有相同的上下文环境,啥是上下文环境,sc呗,咋设置相同,setxxx都一样呗。这就是该代码的意义所在。那么到这里withScope代码介绍结束,希望各位路过的大牛指定不足之处。

     感谢开源,让技术走近你我。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark scala 源码