StreamingContext实例化过程
2016-05-18 10:36
69 查看
一.SparkStreaming实例化过程创建StreamingContext的方法有很多种,示例以SparkCon和任务执行间隔作为参数创建StreamingContext
val ssc = new StreamingContext(conf,Seconds(3))
1.检查SparkContext和CheckPoint,不能同时为空,否则抛出异常
2.检查checkPoint是否启用
3.更新SparkContext,如果SparkContext不存在,则将checkPoint需要的信息写入SparkConf,并以此构建SparkContext;如果SparkContext存在,则使用它
checkpoint需要的信息:
val propertiesToReload = List( "spark.yarn.app.id", "spark.yarn.app.attemptId", "spark.driver.host", "spark.driver.port", "spark.master", "spark.yarn.keytab", "spark.yarn.principal", "spark.ui.filters")
4.模式检查,spark.master不能为本地单线程模式
5.通过SparkContext来更新SparkConf和SparkEnv
6.实例化DStreamGraph
private[streaming] val graph: DStreamGraph = { if (isCheckpointPresent) { cp_.graph.setContext(this) cp_.graph.restoreCheckpointData() cp_.graph } else { require(batchDur_ != null, "Batch duration for StreamingContext cannot be null") val newGraph = new DStreamGraph() newGraph.setBatchDuration(batchDur_) newGraph } }
7.设置checkPoint目录
private[streaming] var checkpointDir: String = { if (isCheckpointPresent) { sc.setCheckpointDir(cp_.checkpointDir) cp_.checkpointDir } else { null } }
8.创建Streaming作业调度器,JobScheduler,使用DStreamGraph产生Jobvalscheduler = new JobScheduler(this)9.创建ContextWaiter
valwaiter = new ContextWaiter
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
graph.generateJobs(time) // generate jobs using allocated block
val jobOption = outputStream.generateJob(time)
相关文章推荐
- memset(void *s,char ch unsigned n)函数解释
- 农村调查笔记:勤劳未必致富 癌症病人越来越多
- systemd.service 中文手册
- C++在线学习文档(强烈推荐)
- 应对电商大促峰值的九个方法
- Android studio 导入项目,界面停在"Building 'project name' gradle project info"卡死
- linux环境变量
- GridView 后台分页
- 160518、java中使用百度地图(超级简单)
- systemd.service 中文手册
- Laravel Service Provider 中 boot 方法和 register 方法的区别
- 移动网页设计9大原则——第1部分
- tcpdump常用参数说明
- 关于UIScrollView上面布局遇到的问题
- 一点一点学ASP.NET之基础概念——HttpHandler
- static函数与普通函数
- windows下关于QT5连接mysql的几点问题
- spring整合Redis 入门例子
- 趋势科技搭建工控蜜罐系统
- iOS多线程同步锁