您的位置:首页 > 其它

StreamingContext实例化过程

2016-05-18 10:36 69 查看
一.SparkStreaming实例化过程创建StreamingContext的方法有很多种,示例以SparkCon和任务执行间隔作为参数创建StreamingContext
val ssc = new StreamingContext(conf,Seconds(3))
1.检查SparkContext和CheckPoint,不能同时为空,否则抛出异常
2.检查checkPoint是否启用
3.更新SparkContext,如果SparkContext不存在,则将checkPoint需要的信息写入SparkConf,并以此构建SparkContext;如果SparkContext存在,则使用它
checkpoint需要的信息:
val propertiesToReload = List(
"spark.yarn.app.id",
"spark.yarn.app.attemptId",
"spark.driver.host",
"spark.driver.port",
"spark.master",
"spark.yarn.keytab",
"spark.yarn.principal",
"spark.ui.filters")
4.模式检查,spark.master不能为本地单线程模式
5.通过SparkContext来更新SparkConf和SparkEnv
6.实例化DStreamGraph
private[streaming] val graph: DStreamGraph = {
if (isCheckpointPresent) {
cp_.graph.setContext(this)
cp_.graph.restoreCheckpointData()
cp_.graph
} else {
require(batchDur_ != null, "Batch duration for StreamingContext cannot be null")
val newGraph = new DStreamGraph()
newGraph.setBatchDuration(batchDur_)
newGraph
}
}
7.设置checkPoint目录
private[streaming] var checkpointDir: String = {
if (isCheckpointPresent) {
sc.setCheckpointDir(cp_.checkpointDir)
cp_.checkpointDir
} else {
null
}
}
8.创建Streaming作业调度器,JobScheduler,使用DStreamGraph产生Jobvalscheduler = new JobScheduler(this)
9.创建ContextWaiter
valwaiter = new ContextWaiter
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
graph.generateJobs(time) // generate jobs using allocated block
val jobOption = outputStream.generateJob(time)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: