Spark Streaming工作流程源码解析
2016-12-23 17:39
267 查看
这篇文章以Spark官方Word Count为例,基于最新的2.0.2版本代码浅析Spark Streaming 的工作流程,这个例子实现了对socket流中的单词进行采集,以秒为单位统计每秒种出现的单词及出现次数。Word Count代码中路径如下:
先整体看下这个案例,代码不多,理解较为容易。
首先创建流式计算所需要的上下文,与Spark进行数据transformation所需的SparkContext不同,这里用到了StreamingContext:
跟踪StreamingContext的实现,对象初始化时调用隐式构造器:
构造器中调用了显式构造器,第一个参数通过cfg信息创建了新的SparkContext上下文对象,如下:
创建DStream也是重要的一步
继续跟踪socketTextStream的实现,它继承自ReceiverInputDStream,而ReceiverInputDStream又继承自InputDStream,InputDStream父类则是DStream,因此我们看到的lines对象便是一个DStream。
接下来,要对DStream先后执行flapMap、map、reduceByKey三个transformation操作。对DStream的操作,最终是变换成对RDD弹性分布数据集的操作。
以flapMap为例:
其内部又创建了新的DStream对象,也就是说DStream在变化时,不会改变原有DStream的内容。
print输出方法调用foreachRDD进行输出,看下实现:
foreachRDD创建了ForEachDStream对象,执行了register方法,源码注释也说的很清楚
ForEachDStream将则主要重写了generateJob方法,创建Job作业:
上面定义的各种transformation是在ssc.start调用之后执行的,start核心是启动了一个JobScheduler的线程,代码如下:
根据scheduler定义,查看JobScheduler中start方法实现
JobScheduler在start阶段中,启动了两个非常重要的组件JobGenerator与ReceiverTracker:
我们分别看下 receiverTracker.start()与jobGenerator.start()都做了什么。
开启了消息循环队列,当收到post来的消息时,回调processEvent方法来处理。
processEvent存在三种类型的事件,如下:
看下generateJobs方法是如何实现的:
DStream generateJob的实现也简单明了,调用了getOrCompute方法
/spark/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala
先整体看下这个案例,代码不多,理解较为容易。
object NetworkWordCount { //需要传入socket链接的ip与port def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: NetworkWordCount <hostname> <port>") System.exit(1) } StreamingExamples.setStreamingLogLevels() // Create the context with a 1 second batch size val sparkConf = new SparkConf().setAppName("NetworkWordCount") //Spark Streaming Context上下文对象用于创建DStream val ssc = new StreamingContext(sparkConf, Seconds(1)) // Create a socket stream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') // Note that no duplication in storage level only for running locally. // Replication necessary in distributed scenario for fault tolerance. //创建DStream对象 val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) //对DStream做transformation操作 val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) //输出结果 wordCounts.print() ssc.start() ssc.awaitTermination() } }
首先创建流式计算所需要的上下文,与Spark进行数据transformation所需的SparkContext不同,这里用到了StreamingContext:
val ssc = new StreamingContext(sparkConf, Seconds(1))
跟踪StreamingContext的实现,对象初始化时调用隐式构造器:
def this(conf: SparkConf, batchDuration: Duration) = { this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
构造器中调用了显式构造器,第一个参数通过cfg信息创建了新的SparkContext上下文对象,如下:
private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = { new SparkContext(conf)
创建DStream也是重要的一步
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
继续跟踪socketTextStream的实现,它继承自ReceiverInputDStream,而ReceiverInputDStream又继承自InputDStream,InputDStream父类则是DStream,因此我们看到的lines对象便是一个DStream。
接下来,要对DStream先后执行flapMap、map、reduceByKey三个transformation操作。对DStream的操作,最终是变换成对RDD弹性分布数据集的操作。
以flapMap为例:
def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope { new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc)) }
其内部又创建了新的DStream对象,也就是说DStream在变化时,不会改变原有DStream的内容。
def print(): Unit = ssc.withScope { print(10) } /** * Print the first num elements of each RDD generated in this DStream. This is an output * operator, so this DStream will be registered as an output stream and there materialized. */ def print(num: Int): Unit = ssc.withScope { def foreachFunc: (RDD[T], Time) => Unit = { (rdd: RDD[T], time: Time) => { val firstNum = rdd.take(num + 1) // scalastyle:off println println("-------------------------------------------") println(s"Time: $time") println("-------------------------------------------") firstNum.take(num).foreach(println) if (firstNum.length > num) println("...") println() // scalastyle:on println } } foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false) }
print输出方法调用foreachRDD进行输出,看下实现:
private def foreachRDD( foreachFunc: (RDD[T], Time) => Unit, displayInnerRDDOps: Boolean): Unit = { new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
foreachRDD创建了ForEachDStream对象,执行了register方法,源码注释也说的很清楚
/** * Register this streaming as an output stream. This would ensure that RDDs of this * DStream will be generated. */ private[streaming] def register(): DStream[T] = { ssc.graph.addOutputStream(this) this
ForEachDStream将则主要重写了generateJob方法,创建Job作业:
override def generateJob(time: Time): Option[Job] = { parent.getOrCompute(time) match { case Some(rdd) => val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) { foreachFunc(rdd, time) } Some(new Job(time, jobFunc)) case None => None } }
上面定义的各种transformation是在ssc.start调用之后执行的,start核心是启动了一个JobScheduler的线程,代码如下:
//在新的线程中启动scheduler ThreadUtils.runInNewThread("streaming-start") { sparkContext.setCallSite(startSite.get) sparkContext.clearJobGroup() sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") savedProperties.set(SerializationUtils.clone(sparkContext.localProperties.get())) scheduler.start() } state = StreamingContextState.ACTIVE
根据scheduler定义,查看JobScheduler中start方法实现
private[streaming] val scheduler = new JobScheduler(this)
JobScheduler在start阶段中,启动了两个非常重要的组件JobGenerator与ReceiverTracker:
def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been started logDebug("Starting JobScheduler") eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) } //开启消息循环 eventLoop.start() // attach rate controllers of input streams to receive batch completion updates for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController) listenerBus.start() receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match { case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient] case _ => null } executorAllocationManager = ExecutorAllocationManager.createIfEnabled( executorAllocClient, receiverTracker, ssc.conf, ssc.graph.batchDuration.milliseconds, clock) executorAllocationManager.foreach(ssc.addStreamingListener) receiverTracker.start() jobGenerator.start() executorAllocationManager.foreach(_.start()) logInfo("Started JobScheduler") }
我们分别看下 receiverTracker.start()与jobGenerator.start()都做了什么。
/** Start the endpoint and receiver execution thread. */ def start(): Unit = synchronized { if (isTrackerStarted) { throw new SparkException("ReceiverTracker already started") } if (!receiverInputStreams.isEmpty) { endpoint = ssc.env.rpcEnv.setupEndpoint( "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) if (!skipReceiverLaunch) launchReceivers() logInfo("ReceiverTracker started") trackerState = Started } }
开启了消息循环队列,当收到post来的消息时,回调processEvent方法来处理。
/** Start generation of jobs */ def start(): Unit = synchronized { if (eventLoop != null) return // generator has already been started // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock. // See SPARK-10125 checkpointWriter eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } } //开启消息循环队列 eventLoop.start() if (ssc.isCheckpointPresent) { restart() } else { startFirstTime() } }
processEvent存在三种类型的事件,如下:
/** Processes all events */ private def processEvent(event: JobGeneratorEvent) { logDebug("Got event " + event) event match { case GenerateJobs(time) => generateJobs(time) case ClearMetadata(time) => clearMetadata(time) case DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater) case ClearCheckpointData(time) => clearCheckpointData(time) } }
看下generateJobs方法是如何实现的:
/** Generate jobs and perform checkpoint for the given `time`. */ private def generateJobs(time: Time) { // Set the SparkEnv in this thread, so that job generation code can access the environment // Example: BlockRDDs are created in this thread, and it needs to access BlockManager // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed. SparkEnv.set(ssc.env) Try { jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch //生成jobs graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) }
def generateJobs(time: Time): Seq[Job] = { //... val jobs = this.synchronized { outputStreams.flatMap { outputStream => //调用DStream的generateJob方法 val jobOption = outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) jobOption } } //... }
DStream generateJob的实现也简单明了,调用了getOrCompute方法
/** * Generate a SparkStreaming job for the given time. This is an internal method that * should not be called directly. This default implementation creates a job * that materializes the corresponding RDD. Subclasses of DStream may override this * to generate their own jobs. */ private[streaming] def generateJob(time: Time): Option[Job] = { getOrCompute(time) match { case Some(rdd) => { val jobFunc = () => { val emptyFunc = { (iterator: Iterator[T]) => {} } context.sparkContext.runJob(rdd, emptyFunc) } Some(new Job(time, jobFunc)) } case None => None } }
/** * Get the RDD corresponding to the given time; either retrieve it from cache * or compute-and-cache it. */ private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = { // If RDD was already generated, then retrieve it from HashMap, // or else compute the RDD generatedRDDs.get(time).orElse { // Compute the RDD if time is valid (e.g. correct time in a sliding window) // of RDD generation, else generate nothing. if (isTimeValid(time)) { val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) { // Disable checks for existing output directories in jobs launched by the streaming // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. We need to have this call here because // compute() might cause Spark jobs to be launched. PairRDDFunctions.disableOutputSpecValidation.withValue(true) { compute(time) } } rddOption.foreach { case newRDD => // Register the generated RDD for caching and checkpointing if (storageLevel != StorageLevel.NONE) { newRDD.persist(storageLevel) logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel") } if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) { newRDD.checkpoint() logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing") } generatedRDDs.put(time, newRDD) } rddOption } else { None } } }
相关文章推荐
- spark1.2.0源码分析之spark streaming执行工作流程
- SparkStreaming的运行流程解析(源码)
- SparkStreaming的运行流程解析(源码)
- Spark streaming技术内幕6 : Job动态生成原理与源码解析
- 第131课: Spark Streaming源码经典解读系列之六:ReceiverTracker工作内幕源码解密
- Sparkstreaming reduceByKeyAndWindow(_+_, _-_, Duration, Duration) 的源码/原理解析
- SpringMVC源码解析,看SpringMVC的工作流程
- Spark streaming 执行流程源码图
- Spark streaming技术内幕6 : Job动态生成原理与源码解析
- Spark源码解析之SparkStreaming中Receiver的启动
- 7、volley 源码解析之缓存线程工作流程
- 第128课: Spark Streaming源码经典解读系列之三:JobScheduler工作内幕源
- 大数据IMF传奇行动绝密课程第81课:一节课贯通Spark SQL工作源码流程
- 第132课: Spark Streaming源码经典解读系列之七:Executro容错工作内幕源码解密
- Spark源码解析之SparkStreaming数据处理及流动
- 【Android】结合源码解析Android消息队列工作流程
- 第129课:Spark streaming源码经典解读系统之四:GobGenerator工作内幕源码解密
- Spark-streaming-2.0-Kafka数据接收并行度源码解析
- spark源码解析-启动流程
- 6、volley 源码解析之工作流程综述