spark streaming job生成与运行
2016-04-13 10:54
357 查看
spark streaming job生成
spark Streaming每次提交job的时候,会提交几个呢?
DStreamGraphdef generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) val jobs = this.synchronized { outputStreams.flatMap { outputStream => val jobOption = outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) jobOption } } logDebug("Generated " + jobs.length + " jobs for time " + time) jobs }
分别根据每个outputStream生成job,也就是说有多少个outputStream,就会有多少job
outputStream如何生成呢
DStream
private def foreachRDD( foreachFunc: (RDD[T], Time) => Unit, displayInnerRDDOps: Boolean): Unit = { new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register() }
这里通过register方法来注册outputStream
DStream
/** * Register this streaming as an output stream. This would ensure that RDDs of this * DStream will be generated. */ private[streaming] def register(): DStream[T] = { ssc.graph.addOutputStream(this) this }
体现在代码级别上,每执行一个foreach方法,提交job的时候就会有新增一个job,如果整个应用中没有foreach,也就是说没有 outputStream的话,会触发异常。
DStreamGraph
def validate() { this.synchronized { require(batchDuration != null, "Batch duration has not been set") // assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration + // " is very low") require(getOutputStreams().size > 0, "No output operations registered, so nothing to execute") } }
生成job
DStream* Generate a SparkStreaming job for the given time. This is an internal method that * should not be called directly. This default implementation creates a job * that materializes the corresponding RDD. Subclasses of DStream may override this * to generate their own jobs. */ private[streaming] def generateJob(time: Time): Option[Job] = { getOrCompute(time) match { case Some(rdd) => { val jobFunc = () => { val emptyFunc = { (iterator: Iterator[T]) => {} } context.sparkContext.runJob(rdd, emptyFunc) } Some(new Job(time, jobFunc)) } case None => None } }
此处有jobFunc,直接调用的spark的runJob方法,runJob的分析,可以参考我另一篇博客。
job提交
JobGenerator/** Generate jobs and perform checkpoint for the given `time`. */ private def generateJobs(time: Time) { // Set the SparkEnv in this thread, so that job generation code can access the environment // Example: BlockRDDs are created in this thread, and it needs to access BlockManager // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed. SparkEnv.set(ssc.env) Try { jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch //此处生成job graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) //此处提交job jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) }
JobScheduler
def submitJobSet(jobSet: JobSet) { if (jobSet.jobs.isEmpty) { logInfo("No jobs added for time " + jobSet.time) } else { listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) jobSets.put(jobSet.time, jobSet) jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) logInfo("Added jobs for time " + jobSet.time) } }
内部有线程池,提交JobHandler
private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) private val jobExecutor = ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
private class JobHandler(job: Job) extends Runnable with Logging { import JobScheduler._ def run() { try { .... PairRDDFunctions.disableOutputSpecValidation.withValue(true) { //此处就是job的最终运行的地方 job.run() } .... } }
Job
def run() { _result = Try(func()) }
//此处func方法就是在job生成时的jobFunc,调用的runJob方法。
相关文章推荐
- dwr
- linux下ntp服务端配置问题
- JAVASE
- HDPJ 1087 Super Jumping! Jumping! Jumping! (DP)
- 创建虚拟机并搭建GitLab代码管理环境
- list 操作add时候 java.lang.UnsupportedOperationException 错误
- 如何在sublime text中实现函数跟踪跳转(ctags)
- 测试管理_出色测试管理者的思考[持续更新ing]
- 从Membership 到 .NET4.5 之 ASP.NET Identity
- Ubuntu Linux 12.04 LTS amd64系统本地root提权
- .NET平台下,关于数据持久层框架
- DataTables warning : Requested unknown parameter '5' from the data source for row 0
- linux监控之cacti
- mysql Federated 引擎
- Android NDK开发Crash错误定位
- ios7之后屏幕右滑返回
- 使用excel 2010时打开多个文件多个窗口-亲试
- C/C++中堆栈相关
- 轻松把玩HttpClient之配置ssl,采用设置信任自签名证书实现https
- MySQL 入门(一)—— 常用数据库介绍