Spark算子执行流程详解之二
2017-03-02 10:09
381 查看
4.count
def count(): Long = sc.runJob(this, Utils.getIteratorSize_).sum |
5.countApprox
在一定的超时时间之内返回rdd元素的个数,其rdd元素的总数分布符合正态分布,其分布因子为confidence,当超过timeout时,返回一个未完成的结果。/** * :: Experimental :: * Approximate version of count() that returns a potentially incomplete result * within a timeout, even if not all tasks have finished. */ @Experimental def countApprox( timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = withScope { //定义在excutor端计算总数的函数 val countElements: (TaskContext, Iterator[T]) => Long = { (ctx, iter) => var result = 0L while (iter.hasNext) { result += 1L iter.next() } result } //定义在driver端的一个监听回调函数,当task完成的时候,会触发里面的merge操作,当超时时间到之后或者任务提前完成的话,会取里面的当前状态,即currentResult val evaluator = newCountEvaluator(partitions.length, confidence) //提交任务 sc.runApproximateJob(this, countElements, evaluator, timeout) } |
def runApproximateJob[T,U, R]( rdd: RDD[T], func: (TaskContext, Iterator[T]) =>U, evaluator: ApproximateEvaluator[U, R], timeout: Long): PartialResult[R] = { assertNotStopped() val callSite = getCallSite logInfo("Starting job: " + callSite.shortForm) val start = System.nanoTime val cleanedFunc = clean(func) // cleanedFunc就是countElements,evaluator就是CountEvaluator,超时时间为timeout val result = dagScheduler.runApproximateJob(rdd, cleanedFunc, evaluator, callSite, timeout, localProperties.get) logInfo( "Job finished: " + callSite.shortForm +", took " + (System.nanoTime- start) / 1e9 + " s") result } |
def runApproximateJob[T,U, R]( rdd: RDD[T], func: (TaskContext, Iterator[T]) =>U, evaluator: ApproximateEvaluator[U, R], callSite: CallSite, timeout: Long, properties: Properties): PartialResult[R] = { //定义一个监听器,当有任务完成的时候触发taskSucceeded,当超时时间到的时候返回CountEvaluator的当前值 val listener = newApproximateActionListener(rdd, func, evaluator, timeout) val func2 = func.asInstanceOf[(TaskContext,Iterator[_]) => _] val partitions = (0until rdd.partitions.size).toArray val jobId = nextJobId.getAndIncrement() //提交任务 eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions, allowLocal = false, callSite, listener, SerializationUtils.clone(properties))) //等待计算结果 listener.awaitResult() // Will throw an exception if the job fails } |
private[spark] classApproximateActionListener[T, U, R]( rdd: RDD[T], func: (TaskContext, Iterator[T]) =>U, evaluator: ApproximateEvaluator[U, R], timeout: Long) extends JobListener { val startTime= System.currentTimeMillis() val totalTasks= rdd.partitions.size var finishedTasks= 0 var failure: Option[Exception] = None // Set if the job has failed (permanently) var resultObject: Option[PartialResult[R]] = None// Set if we've already returned a PartialResult //当某个分区完成的时候触发taskSucceeded回调函数 override def taskSucceeded(index: Int, result: Any) { synchronized { //更新CountEvaluator的当前值 evaluator.merge(index, result.asInstanceOf[U]) finishedTasks += 1 if (finishedTasks== totalTasks) {//当全部分区都完成的是退出等待,返回计算结果 // If we had already returned a PartialResult, set its final value resultObject.foreach(r => r.setFinalValue(evaluator.currentResult())) // Notify any waiting thread that may have called awaitResult //退出等待 this.notifyAll() } } } …… /** * Waits for up to timeout milliseconds since the listener was created and then returns a * PartialResult with the result so far. This may be complete if the whole job is done. */ //等待计算结果 def awaitResult(): PartialResult[R] = synchronized { val finishTime = startTime+ timeout while (true) { val time = System.currentTimeMillis() if (failure.isDefined) { throw failure.get } else if (finishedTasks== totalTasks) {//如果在超时时间之内计算完成,则返回全部结果 return new PartialResult(evaluator.currentResult(),true) } else if (time >= finishTime) {//如果已经超时,则返回部分结果 resultObject = Some(newPartialResult(evaluator.currentResult(), false)) return resultObject.get } else {//如果超时时间没到,则继续休眠 this.wait(finishTime - time) } } // Should never be reached, but required to keep the compiler happy return null } } |
private[spark] classCountEvaluator(totalOutputs: Int, confidence: Double) extends ApproximateEvaluator[Long, BoundedDouble] { var outputsMerged= 0 var sum: Long =0 override def merge(outputId: Int, taskResult: Long) { outputsMerged += 1 sum += taskResult } override def currentResult(): BoundedDouble = { if (outputsMerged== totalOutputs) {//全部完成 new BoundedDouble(sum,1.0, sum,sum) } else if (outputsMerged== 0) {//一个任务都没完成 new BoundedDouble(0,0.0, Double.NegativeInfinity, Double.PositiveInfinity) } else {//部分完成,计算其理论总数的正态分布参数 val p = outputsMerged.toDouble / totalOutputs val mean = (sum+ 1 - p) / p val variance = (sum+ 1) * (1 - p) / (p * p) val stdev = math.sqrt(variance) val confFactor = newNormalDistribution(). inverseCumulativeProbability(1 - (1- confidence) / 2) val low = mean - confFactor * stdev val high = mean + confFactor * stdev new BoundedDouble(mean, confidence, low, high) } } } |
6.countApproxDistinct
作用是对RDD集合内容进行去重统计,该统计是一个大约的统计,参数relativeSD控制统计的精确度。relativeSD越小,结果越准确。/** * Return approximate number of distinct elements in the RDD. * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>. * * @param relativeSD Relative accuracy. Smaller values create counters that require more space. * It must be greater than 0.000017. */ def countApproxDistinct(relativeSD: Double =0.05): Long = withScope { require(relativeSD > 0.000017, s"accuracy ($relativeSD) must be greater than 0.000017") val p = math.ceil(2.0* math.log(1.054 / relativeSD) / math.log(2)).toInt countApproxDistinct(if (p < 4) 4 elsep, 0) } |
采用的是HyperLogLog in Practice算法,原理比较深奥,有兴趣的可以深究。
实例如下:
object CountApproxDistinct { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) /** * 构建一个集合,分成20个partition */ val a = sc.parallelize(1 to 10000 , 20) //RDD a内容复制5遍,其中有50000个元素 val b = a++a++a++a++a //结果是9760,不传参数,默认是0.05 println(b.countApproxDistinct()) //结果是9760 println(b.countApproxDistinct(0.05)) //8224 println(b.countApproxDistinct(0.1)) //10000 println(b.countApproxDistinct(0.001)) } } |
7.collect
def collect(): Array[T] = withScope { val results = sc.runJob(this, (iter:Iterator[T]) => iter.toArray) Array.concat(results: _*) } |
如果RDD数据量很大的话,请谨慎使用,因为会缓存该RDD的所有数据量。
8.toLocalIterator
返回一个保护所有记录的迭代器/** * Return an iterator that contains all of the elements in this RDD. * * The iterator will consume as much memory as the largest partition in this RDD. * * Note: this results in multiple Spark jobs, and if the input RDD is the result * of a wide transformation (e.g. join with different partitioners), to avoid * recomputing the input RDD should be cached first. */ def toLocalIterator:Iterator[T] = withScope { //针对每个分区触发一次action def collectPartition(p: Int): Array[T] = { sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p), allowLocal =false).head } //调用flatMap将所有记录组装起来返回单个迭代器 (0 until partitions.length).iterator.flatMap(i => collectPartition(i)) } |
scala> val rdd = sc.parallelize(1 to 10,2) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> val it = rdd.toLocalIterator it: Iterator[Int] = non-empty iterator scala> while(it.hasNext){ | println(it.next) | } 1 2 3 4 5 6 7 8 9 10 |
9.takeOrdered
takeOrdered函数用于从RDD中,按照默认(升序)或指定排序规则,返回前num个元素。def takeOrdered(num: Int)(implicitord: Ordering[T]): Array[T] = withScope { if (num == 0) { Array.empty } else { val mapRDDs = mapPartitions { items => //先在excutor端进行排序,按照ord排序规则,转化为前num个优先队列 // Priority keeps the largest elements, so let's reverse the ordering. val queue = new BoundedPriorityQueue[T](num)(ord.reverse) queue ++= util.collection.Utils.takeOrdered(items, num)(ord) Iterator.single(queue) } if (mapRDDs.partitions.length ==0) { Array.empty } else { //将分区的计算结果传送给driver,转化为数组,进行排序取前num条记录 mapRDDs.reduce { (queue1, queue2) => queue1 ++= queue2 queue1 }.toArray.sorted(ord) } } } |
List<Integer> data = Arrays.asList(1,4,3,2,5,6); JavaRDD<Integer> JavaRDD = jsc.parallelize(data,2); for(Integer integer:JavaRDD.takeOrdered(2)){ System.out.println(integer); } 打印 1 2 |
相关文章推荐
- Spark算子执行流程详解之三
- Spark算子执行流程详解之五
- Spark算子执行流程详解之一
- Spark算子执行流程详解之四
- Spark算子执行流程详解之七
- Spark算子执行流程详解之六
- Spark算子执行流程详解之八
- 【Spark】RDD操作详解4——Action算子
- 转 alsa录音放音执行流程详解
- 详解Magento执行流程
- Spark学习之16:Spark Streaming执行流程(2)
- android invalidate 执行流程详解
- Android编译系统详解(二)——命令执行流程
- .net/c#中栈和堆的区别及代码在栈和堆中的执行流程详解
- 最新JBoss jBPM 3.1 开始指南之二---执行示例流程和管理执行中的流程
- Spark学习之10:Task执行结果返回流程
- Java try catch finally 虚拟机执行流程详解
- 转 alsa录音放音执行流程详解
- Spark SQL深度理解篇:模块实现、代码结构及执行流程总览
- .net/c#中栈和堆的区别及代码在栈和堆中的执行流程详解之一