您的位置:首页 > 其它

第8课:Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考

2016-05-21 17:28 477 查看
本期内容:

1.      
DStream与RDD关系彻底研究

2.      
Straming中RDD的生产彻底研究

示例代码:spark-1.6.1/example/src/main/scala/org/apach

RDD的三个问题

1.RDD到底是怎么生成的

2.具体执行的时候,是否和基于Spark Core上的RDD有所不同,runtime级别的

3.运行之后我们对RDD如何处理。会随batch duration不断的产生RDD,内存无法完全容纳这些对象。

每个batch duration产生的作业执行完RDD之后怎么对以有的RDD进行管理是一个问题。

RDD生成的全生命周期:

ForEachDStream不一定会触发job的执行,会触发job产生,但job真正产生是由timer定时器产生的。

对DStream进行操作其实就是对RDD进行操作,是因为DStream就是一套RDD的模板,后面的DStream与前面的DStream有依赖关系。因为从后往前依赖所以可以推出前面的RDD(回溯)

* DStreams internally is characterized by a few basic properties:

*  - A list of other DStreams that the DStream depends on

*  - A time interval at which the DStream generates an RDD

*  - A function that is used to generate an RDD after each time interval
abstract class
DStream[T: ClassTag] (

    @transient private[streaming]
var ssc: StreamingContext

  ) extends Serializable
with Logging {
 

 

源码

DStream

/**

 * Print the first num elements of each RDD generated in this DStream. This is an output

 * operator, so this DStream will be registered as an output stream and there materialized.

 */
def print(num: Int): Unit = ssc.withScope {

  def foreachFunc: (RDD[T], Time) => Unit = {

    (rdd: RDD[T], time: Time) => {

      val firstNum = rdd.take(num +
1)

      // scalastyle:off println

      println("-------------------------------------------")

      println("Time: " + time)

      println("-------------------------------------------")

      firstNum.take(num).foreach(println)

      if (firstNum.length > num)
println("...")

      println()

      // scalastyle:on println

    }

  }

  foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps =
false)

}
private def
foreachRDD(

    foreachFunc: (RDD[T], Time) => Unit,

    displayInnerRDDOps: Boolean): Unit = {

  new ForEachDStream(this,

    context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()

}
/**

 * Get the RDD corresponding to the given time; either retrieve it from cache

 * or compute-and-cache it.

 */
private[streaming]
final def getOrCompute(time: Time): Option[RDD[T]] = {

  // If RDD was already generated, then retrieve it from HashMap,

  // or else compute the RDD

  generatedRDDs.get(time).orElse {

    // Compute the RDD if time is valid (e.g. correct time in a sliding window)

    // of RDD generation, else generate nothing.

    if (isTimeValid(time)) {

      val rddOption =
createRDDWithLocalProperties(time, displayInnerRDDOps =
false) {

        // Disable checks for existing output directories in jobs launched by the streaming

        // scheduler, since we may need to write output to an existing directory during checkpoint

        // recovery; see SPARK-4835 for more details. We need to have this call here because

        // compute() might cause Spark jobs to be launched.

        PairRDDFunctions.disableOutputSpecValidation.withValue(true)
{

          compute(time)

        }

      }

      rddOption.foreach { case newRDD =>

        // Register the generated RDD for caching and checkpointing

        if (storageLevel
!= StorageLevel.NONE) {

          newRDD.persist(storageLevel)

          logDebug(s"Persisting RDD
$
{newRDD.id} for time
$
time to
$
storageLevel")

        }

        if (checkpointDuration
!= null
&& (time - zeroTime).isMultipleOf(checkpointDuration)) {

          newRDD.checkpoint()

          logInfo(s"Marking RDD ${newRDD.id}
for time $
time for checkpointing")

        }

        generatedRDDs.put(time, newRDD)

      }

      rddOption

    } else {

      None

    }

  }

}
 

 

/** Checks whether the 'time' is valid wrt slideDuration for generating RDD */
private[streaming]
def isTimeValid(time: Time): Boolean = {

  if (!isInitialized) {

    throw new SparkException (this
+ " has not been initialized")

  } else if (time <=
zeroTime || ! (time -
zeroTime).isMultipleOf(slideDuration)) {

    logInfo("Time " + time +
" is invalid as zeroTime is " +
zeroTime +

      " and slideDuration is " + slideDuration +
" and difference is " + (time -
zeroTime))

    false

 
} else
{

    logDebug("Time " + time +
" is valid")

    true

 
}

}

SocketInputDStream继承自ReceiverInputDStream

private[streaming]
class SocketInputDStream[T: ClassTag](

    ssc_ : StreamingContext,

    host: String,

    port: Int,

    bytesToObjects: InputStream => Iterator[T],

    storageLevel: StorageLevel

  ) extends ReceiverInputDStream[T](ssc_) {

ReceiverInputDStream

/**

 * Generates RDDs with blocks received by the receiver of this stream. */
override def compute(validTime: Time): Option[RDD[T]] = {

  val blockRDD = {

    if (validTime <
graph.startTime) {

      // If this is called for any time before the start time of the context,

      // then this returns an empty RDD. This may happen when recovering from a

      // driver failure without any write ahead log to recover pre-failure data.

      new BlockRDD[T](ssc.sc, Array.empty)

    } else {

      // Otherwise, ask the tracker for all the blocks that have been allocated to this stream

      // for this batch

      val receiverTracker = ssc.scheduler.receiverTracker

      val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id,
Seq.empty)

      // Register the input blocks information into InputInfoTracker

      val inputInfo =
StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)

      ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

      // Create the BlockRDD

      createBlockRDD(validTime, blockInfos)

    }

  }

  Some(blockRDD)

}
private[streaming]
def createBlockRDD(time: Time, blockInfos:
Seq[ReceivedBlockInfo]): RDD[T] = {

  if (blockInfos.nonEmpty) {

    val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray

    // Are WAL record handles present with all the blocks

    val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }

    if (areWALRecordHandlesPresent) {

      // If all the blocks have WAL record handle, then create a WALBackedBlockRDD

      val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray

      val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray

      new WriteAheadLogBackedBlockRDD[T](

        ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)

    } else {

      // Else, create a BlockRDD. However, if there are some blocks with WAL info but not

      // others then that is unexpected and log a warning accordingly.

      if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {

        if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {

          logError("Some blocks do not have Write Ahead Log information; "
+

            "this is unexpected and data may not be recoverable after driver failures")

        } else {

          logWarning("Some blocks have Write Ahead Log information; this is unexpected")

        }

      }

      val validBlockIds = blockIds.filter { id =>

        ssc.sparkContext.env.blockManager.master.contains(id)

      }

      if (validBlockIds.size != blockIds.size) {

        logWarning("Some blocks could not be recovered as they were not found in memory. "
+

          "To prevent such data loss, enabled Write Ahead Log (see programming guide "
+

          "for more details.")

      }

      new BlockRDD[T](ssc.sc, validBlockIds)

    }

  } else {

    // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD

    // according to the configuration

    if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {

      new WriteAheadLogBackedBlockRDD[T](

        ssc.sparkContext, Array.empty, Array.empty, Array.empty)

    } else {

      new BlockRDD[T](ssc.sc,
Array.empty)

    }

  }

}
 

MappedDStream

private[streaming]
class MappedDStream[T: ClassTag,
U: ClassTag] (

    parent: DStream[T],

    mapFunc: T =>
U

  ) extends
DStream[U](parent.ssc) {

  override def dependencies:
List[DStream[_]] =
List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[U]] = {

    parent.getOrCompute(validTime).map(_.map[U](mapFunc))

  }

}

ForEachDStream

private[streaming]
class ForEachDStream[T: ClassTag] (

    parent: DStream[T],

    foreachFunc: (RDD[T], Time) => Unit,

    displayInnerRDDOps: Boolean

  ) extends DStream[Unit](parent.ssc) {

  override def dependencies:
List[DStream[_]] =
List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[Unit]] = None

  override def generateJob(time: Time): Option[Job] = {

    parent.getOrCompute(time) match
{

      case Some(rdd) =>

        val jobFunc = () =>
createRDDWithLocalProperties(time, displayInnerRDDOps) {

          foreachFunc(rdd, time)

        }

        Some(new Job(time, jobFunc))

      case None => None

    }

  }

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: