您的位置:首页 > 其它

Spark算子执行流程详解之二

2017-03-02 10:09 381 查看

4.count

def count(): Long = sc.runJob(this, Utils.getIteratorSize_).sum

计算数据总量,每个分区各自计算自己的总数,然后汇总到driver端,driver端再把每个分区的总数相加统计出对应rdd的数据量,其流程如下:

 


5.countApprox

在一定的超时时间之内返回rdd元素的个数,其rdd元素的总数分布符合正态分布,其分布因子为confidence,当超过timeout时,返回一个未完成的结果。

/**

 * :: Experimental ::

 * Approximate version of count() that returns a potentially incomplete result

 * within a timeout, even if not all tasks have finished.

 */
@Experimental
def countApprox(

    timeout: Long,

    confidence: Double = 0.95): PartialResult[BoundedDouble] = withScope {

//定义在excutor端计算总数的函数

  val
countElements: (TaskContext,
Iterator[T]) => Long = { (ctx, iter) =>

    var result =
0L

    while (iter.hasNext) {

      result += 1L

      iter.next()

    }

    result

  }

//定义在driver端的一个监听回调函数,当task完成的时候,会触发里面的merge操作,当超时时间到之后或者任务提前完成的话,会取里面的当前状态,即currentResult

  val
evaluator = newCountEvaluator(partitions.length, confidence)

//提交任务

  sc.runApproximateJob(this, countElements, evaluator, timeout)

}

继续往下看,看看evaluator是如何执行的:

def runApproximateJob[T,U,
R](

    rdd: RDD[T],

    func: (TaskContext, Iterator[T]) =>U,

    evaluator: ApproximateEvaluator[U,
R],

    timeout: Long): PartialResult[R] = {

  assertNotStopped()

  val callSite = getCallSite

  logInfo("Starting job: " + callSite.shortForm)

  val start = System.nanoTime

  val cleanedFunc = clean(func)

// cleanedFunc就是countElements,evaluator就是CountEvaluator,超时时间为timeout

  val
result = dagScheduler.runApproximateJob(rdd, cleanedFunc, evaluator, callSite, timeout,

    localProperties.get)

  logInfo(

    "Job finished: " + callSite.shortForm +", took "
+ (System.nanoTime- start) / 1e9
+ " s")

  result

}

继续看runApproximateJob的实现:

def runApproximateJob[T,U,
R](

    rdd: RDD[T],

    func: (TaskContext, Iterator[T]) =>U,

    evaluator: ApproximateEvaluator[U,
R],

    callSite: CallSite,

    timeout: Long,

    properties: Properties): PartialResult[R] = {

//定义一个监听器,当有任务完成的时候触发taskSucceeded,当超时时间到的时候返回CountEvaluator的当前值

  val
listener = newApproximateActionListener(rdd, func, evaluator, timeout)

  val func2 = func.asInstanceOf[(TaskContext,Iterator[_]) => _]

  val partitions = (0until rdd.partitions.size).toArray

  val jobId = nextJobId.getAndIncrement()

//提交任务

  eventProcessLoop.post(JobSubmitted(

    jobId, rdd, func2, partitions, allowLocal = false, callSite, listener,

    SerializationUtils.clone(properties)))

//等待计算结果

  listener.awaitResult()    // Will throw an exception if the job fails

}

因此其超时计算总数的逻辑主要在ApproximateActionListener里面,请看ApproximateActionListener:

private[spark] classApproximateActionListener[T,
U, R](

    rdd: RDD[T],

    func: (TaskContext, Iterator[T]) =>U,

    evaluator: ApproximateEvaluator[U,
R],

    timeout: Long)

  extends JobListener {

  val startTime= System.currentTimeMillis()

  val totalTasks= rdd.partitions.size

  var finishedTasks=
0

  var failure: Option[Exception] = None             // Set if the job has failed (permanently)

  var resultObject: Option[PartialResult[R]] = None// Set if we've already returned a PartialResult

 

//当某个分区完成的时候触发taskSucceeded回调函数

  override def taskSucceeded(index: Int, result: Any) {

    synchronized {

//更新CountEvaluator的当前值

      evaluator.merge(index, result.asInstanceOf[U])

      finishedTasks +=
1

      if (finishedTasks==
totalTasks) {//当全部分区都完成的是退出等待,返回计算结果

        // If we had already returned a PartialResult, set its final value

        resultObject.foreach(r => r.setFinalValue(evaluator.currentResult()))

        // Notify any waiting thread that may have called awaitResult

//退出等待

        this.notifyAll()

      }

    }

  }

  ……

  /**

   * Waits for up to timeout milliseconds since the listener was created and then returns a

   * PartialResult with the result so far. This may be complete if the whole job is done.

   */

//等待计算结果

  def awaitResult(): PartialResult[R] = synchronized {

    val finishTime = startTime+ timeout

    while (true) {

      val time = System.currentTimeMillis()

      if (failure.isDefined) {

        throw failure.get

      } else if (finishedTasks==
totalTasks) {//如果在超时时间之内计算完成,则返回全部结果

        return new
PartialResult(evaluator.currentResult(),true)

      } else if (time >= finishTime) {//如果已经超时,则返回部分结果

        resultObject = Some(newPartialResult(evaluator.currentResult(),
false))

        return resultObject.get

      } else {//如果超时时间没到,则继续休眠

        this
.wait(finishTime - time)

      }

    }

    // Should never be reached, but required to keep the compiler happy

    return null

 
}

}

其中如果在超时时间之内没有完成的话,evaluator.currentResult()会返回符合总数符合正态分布的一个近似结果,感兴趣的同学可以继续研究下去:

private[spark] classCountEvaluator(totalOutputs: Int, confidence: Double)

  extends ApproximateEvaluator[Long, BoundedDouble] {

  var outputsMerged=
0

  var sum: Long =0

  override def merge(outputId: Int, taskResult: Long) {

    outputsMerged +=
1

    sum += taskResult

  }

  override def currentResult(): BoundedDouble = {

    if (outputsMerged== totalOutputs) {//全部完成

      new
BoundedDouble(sum,1.0,
sum,sum)

    } else if (outputsMerged==
0) {//一个任务都没完成

      new
BoundedDouble(0,0.0, Double.NegativeInfinity, Double.PositiveInfinity)

    } else {//部分完成,计算其理论总数的正态分布参数

      val
p = outputsMerged.toDouble / totalOutputs

      val mean = (sum+
1 - p) / p

      val variance = (sum+
1) * (1 - p) / (p * p)

      val stdev = math.sqrt(variance)

      val confFactor = newNormalDistribution().

        inverseCumulativeProbability(1 - (1- confidence) /
2)

      val low = mean - confFactor * stdev

      val high = mean + confFactor * stdev

      new BoundedDouble(mean, confidence, low, high)

    }

  }

}

因此countApprox的计算过程大致如下:1)excutor端不断的计算分区的总数然后上报给driver端;2)driver端接受excutor上报的总数进行统计,如果在超时时间之内没有全部上报完成的话,则强制退出,返回一个其总数符合正态分布的值,如果在超时时间之内计算完成的话,则返回一个准确值。

6.countApproxDistinct

作用是对RDD集合内容进行去重统计,该统计是一个大约的统计,参数relativeSD控制统计的精确度。relativeSD越小,结果越准确。

/**

 * Return approximate number of distinct elements in the RDD.

 *

 * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:

 * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available

 * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.

 *

 * @param relativeSD Relative accuracy. Smaller values create counters that require more space.

 *                   It must be greater than 0.000017.

 */
def countApproxDistinct(relativeSD: Double =0.05): Long = withScope {

  require(relativeSD > 0.000017,
s"accuracy ($
relativeSD) must be greater than 0.000017")

  val p = math.ceil(2.0* math.log(1.054
/ relativeSD) / math.log(2)).toInt

  countApproxDistinct(if (p <
4) 4 elsep,
0)

}

采用的是HyperLogLog in Practice算法,原理比较深奥,有兴趣的可以深究。

实例如下:

object CountApproxDistinct {

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("spark-demo").setMaster("local")

    val sc = new SparkContext(conf)

    /**

     * 构建一个集合,分成20个partition

     */

    val a = sc.parallelize(1 to 10000 , 20)

    //RDD a内容复制5遍,其中有50000个元素

    val b = a++a++a++a++a

    //结果是9760,不传参数,默认是0.05

    println(b.countApproxDistinct())

    //结果是9760

    println(b.countApproxDistinct(0.05))

    //8224

    println(b.countApproxDistinct(0.1))

    //10000

    println(b.countApproxDistinct(0.001))

  }

}

 

7.collect

 

def collect(): Array[T] = withScope {

  val results = sc.runJob(this, (iter:Iterator[T]) => iter.toArray)

  Array.concat(results: _*)

}

获取Rdd的所有数据,然后缓存在Driver端,其流程如下:



如果RDD数据量很大的话,请谨慎使用,因为会缓存该RDD的所有数据量。

8.toLocalIterator

返回一个保护所有记录的迭代器

/**

 * Return an iterator that contains all of the elements in this RDD.

 *

 * The iterator will consume as much memory as the largest partition in this RDD.

 *

 * Note: this results in multiple Spark jobs, and if the input RDD is the result

 * of a wide transformation (e.g. join with different partitioners), to avoid

 * recomputing the input RDD should be cached first.

 */
def toLocalIterator:Iterator[T] = withScope {

//针对每个分区触发一次action

  def
collectPartition(p: Int): Array[T] = {

    sc.runJob(this, (iter:
Iterator[T]) => iter.toArray, Seq(p), allowLocal =false).head

  }

//调用flatMap将所有记录组装起来返回单个迭代器

  (0 until partitions.length).iterator.flatMap(i => collectPartition(i))

}

即:

scala> val rdd = sc.parallelize(1 to 10,2)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val it = rdd.toLocalIterator

it: Iterator[Int] = non-empty iterator

scala> while(it.hasNext){

     | println(it.next)

     | }

1

2

3

4

5

6

7

8

9

10

9.takeOrdered

takeOrdered函数用于从RDD中,按照默认(升序)或指定排序规则,返回前num个元素。

def takeOrdered(num: Int)(implicitord:
Ordering[T]): Array[T] = withScope {

  if (num ==
0) {

    Array.empty

  } else {

    val mapRDDs = mapPartitions { items =>

//先在excutor端进行排序,按照ord排序规则,转化为前num个优先队列

      // Priority keeps the largest elements, so let's reverse the ordering.

      val queue =
new
BoundedPriorityQueue[T](num)(ord.reverse)

      queue ++= util.collection.Utils.takeOrdered(items, num)(ord)

      Iterator.single(queue)

    }

    if (mapRDDs.partitions.length ==0) {

      Array.empty

    } else {

//将分区的计算结果传送给driver,转化为数组,进行排序取前num条记录

      mapRDDs.reduce { (queue1, queue2) =>

        queue1 ++= queue2

        queue1

      }.toArray.sorted(ord)

    }

  }

}

例如:

List<Integer> data = Arrays.asList(1,4,3,2,5,6);

JavaRDD<Integer> JavaRDD = jsc.parallelize(data,2);

for(Integer integer:JavaRDD.takeOrdered(2)){

    System.out.println(integer);

}

打印

1

2

其执行流程如下:

 


 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Spark RDD算子