您的位置:首页 > 其它

Spark Streaming工作流程源码解析

2016-12-23 17:39 267 查看
这篇文章以Spark官方Word Count为例,基于最新的2.0.2版本代码浅析Spark Streaming 的工作流程,这个例子实现了对socket流中的单词进行采集,以秒为单位统计每秒种出现的单词及出现次数。Word Count代码中路径如下:

/spark/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala


先整体看下这个案例,代码不多,理解较为容易。

object NetworkWordCount {
//需要传入socket链接的ip与port
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
}

StreamingExamples.setStreamingLogLevels()

// Create the context with a 1 second batch size
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
//Spark Streaming Context上下文对象用于创建DStream
val ssc = new StreamingContext(sparkConf, Seconds(1))

// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
//创建DStream对象
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
//对DStream做transformation操作
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
//输出结果
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}


首先创建流式计算所需要的上下文,与Spark进行数据transformation所需的SparkContext不同,这里用到了StreamingContext:

val ssc = new StreamingContext(sparkConf, Seconds(1))


跟踪StreamingContext的实现,对象初始化时调用隐式构造器:

def this(conf: SparkConf, batchDuration: Duration) = {
this(StreamingContext.createNewSparkContext(conf), null, batchDuration)


构造器中调用了显式构造器,第一个参数通过cfg信息创建了新的SparkContext上下文对象,如下:

private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
new SparkContext(conf)


创建DStream也是重要的一步

val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)


继续跟踪socketTextStream的实现,它继承自ReceiverInputDStream,而ReceiverInputDStream又继承自InputDStream,InputDStream父类则是DStream,因此我们看到的lines对象便是一个DStream。

接下来,要对DStream先后执行flapMap、map、reduceByKey三个transformation操作。对DStream的操作,最终是变换成对RDD弹性分布数据集的操作。

以flapMap为例:

def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope {
new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
}


其内部又创建了新的DStream对象,也就是说DStream在变化时,不会改变原有DStream的内容。

def print(): Unit = ssc.withScope {
print(10)
}
/**
* Print the first num elements of each RDD generated in this DStream. This is an output
* operator, so this DStream will be registered as an output stream and there materialized.
*/
def print(num: Int): Unit = ssc.withScope {
def foreachFunc: (RDD[T], Time) => Unit = {
(rdd: RDD[T], time: Time) => {
val firstNum = rdd.take(num + 1)
// scalastyle:off println
println("-------------------------------------------")
println(s"Time: $time")
println("-------------------------------------------")
firstNum.take(num).foreach(println)
if (firstNum.length > num) println("...")
println()
// scalastyle:on println
}
}
foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
}


print输出方法调用foreachRDD进行输出,看下实现:

private def foreachRDD(
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean): Unit = {
new ForEachDStream(this,
context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()


foreachRDD创建了ForEachDStream对象,执行了register方法,源码注释也说的很清楚

/**
* Register this streaming as an output stream. This would ensure that RDDs of this
* DStream will be generated.
*/
private[streaming] def register(): DStream[T] = {
ssc.graph.addOutputStream(this)
this


ForEachDStream将则主要重写了generateJob方法,创建Job作业:

override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
case None => None
}
}


上面定义的各种transformation是在ssc.start调用之后执行的,start核心是启动了一个JobScheduler的线程,代码如下:

//在新的线程中启动scheduler
ThreadUtils.runInNewThread("streaming-start") {
sparkContext.setCallSite(startSite.get)
sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
savedProperties.set(SerializationUtils.clone(sparkContext.localProperties.get()))
scheduler.start()
}
state = StreamingContextState.ACTIVE


根据scheduler定义,查看JobScheduler中start方法实现

private[streaming] val scheduler = new JobScheduler(this)


JobScheduler在start阶段中,启动了两个非常重要的组件JobGenerator与ReceiverTracker:

def start(): Unit = synchronized {
if (eventLoop != null) return // scheduler has already been started

logDebug("Starting JobScheduler")
eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
}
//开启消息循环
eventLoop.start()

// attach rate controllers of input streams to receive batch completion updates
for {
inputDStream <- ssc.graph.getInputStreams
rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)

listenerBus.start()
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)

val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match {
case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient]
case _ => null
}

executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
executorAllocClient,
receiverTracker,
ssc.conf,
ssc.graph.batchDuration.milliseconds,
clock)
executorAllocationManager.foreach(ssc.addStreamingListener)
receiverTracker.start()
jobGenerator.start()
executorAllocationManager.foreach(_.start())
logInfo("Started JobScheduler")
}


我们分别看下 receiverTracker.start()与jobGenerator.start()都做了什么。

/** Start the endpoint and receiver execution thread. */
def start(): Unit = synchronized {
if (isTrackerStarted) {
throw new SparkException("ReceiverTracker already started")
}

if (!receiverInputStreams.isEmpty) {
endpoint = ssc.env.rpcEnv.setupEndpoint(
"ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
if (!skipReceiverLaunch) launchReceivers()
logInfo("ReceiverTracker started")
trackerState = Started
}
}


开启了消息循环队列,当收到post来的消息时,回调processEvent方法来处理。

/** Start generation of jobs */
def start(): Unit = synchronized {
if (eventLoop != null) return // generator has already been started

// Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
// See SPARK-10125
checkpointWriter

eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

override protected def onError(e: Throwable): Unit = {
jobScheduler.reportError("Error in job generator", e)
}
}
//开启消息循环队列
eventLoop.start()

if (ssc.isCheckpointPresent) {
restart()
} else {
startFirstTime()
}
}


processEvent存在三种类型的事件,如下:

/** Processes all events */
private def processEvent(event: JobGeneratorEvent) {
logDebug("Got event " + event)
event match {
case GenerateJobs(time) => generateJobs(time)
case ClearMetadata(time) => clearMetadata(time)
case DoCheckpoint(time, clearCheckpointDataLater) =>
doCheckpoint(time, clearCheckpointDataLater)
case ClearCheckpointData(time) => clearCheckpointData(time)
}
}


看下generateJobs方法是如何实现的:

/** Generate jobs and perform checkpoint for the given `time`.  */
private def generateJobs(time: Time) {
// Set the SparkEnv in this thread, so that job generation code can access the environment
// Example: BlockRDDs are created in this thread, and it needs to access BlockManager
// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
SparkEnv.set(ssc.env)
Try {
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
//生成jobs
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}


def generateJobs(time: Time): Seq[Job] = {
//...
val jobs = this.synchronized {
outputStreams.flatMap { outputStream =>
//调用DStream的generateJob方法
val jobOption = outputStream.generateJob(time)
jobOption.foreach(_.setCallSite(outputStream.creationSite))
jobOption
}
}
//...
}


DStream generateJob的实现也简单明了,调用了getOrCompute方法

/**
* Generate a SparkStreaming job for the given time. This is an internal method that
* should not be called directly. This default implementation creates a job
* that materializes the corresponding RDD. Subclasses of DStream may override this
* to generate their own jobs.
*/
private[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
case Some(rdd) => {
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
}
case None => None
}
}


/**
* Get the RDD corresponding to the given time; either retrieve it from cache
* or compute-and-cache it.
*/
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
// If RDD was already generated, then retrieve it from HashMap,
// or else compute the RDD
generatedRDDs.get(time).orElse {
// Compute the RDD if time is valid (e.g. correct time in a sliding window)
// of RDD generation, else generate nothing.
if (isTimeValid(time)) {

val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details. We need to have this call here because
// compute() might cause Spark jobs to be launched.
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
compute(time)
}
}

rddOption.foreach { case newRDD =>
// Register the generated RDD for caching and checkpointing
if (storageLevel != StorageLevel.NONE) {
newRDD.persist(storageLevel)
logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
}
if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()
logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
}
generatedRDDs.put(time, newRDD)
}
rddOption
} else {
None
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息