Spark学习之15:Spark Streaming执行流程(1)
2015-06-05 12:30
381 查看
本文以spark streaming文档中创建Socket Stream的例子来描述StreamingContext的执行流程。
例子示例代码:
例子中用StreamingContext.socketTextStream方法创建SocketInputDStream对象,并在SocketInputDStream执行print方法,生成的DStream关系图如下:
DStream类中包含一个以时间为键、RDD为值的哈希表:
DStream的所有转换和计算操作最终会基于该哈希表中的RDD进行。
Receiver启动分为两个部分,一是在Driver端初始化ReceiverTracker等,二是在将所有的Receiver封装成RDD,并发送的Executor执行。
(1)检查StreamingContext状态;
(2)启动JobScheduler。
(1)创建匿名Actor用于处理JobSchedulerEvent消息;
(2)创建并启动ReceiverTracker;
(3)启动JobGenerator。
(1)从DStreamGraph中读取所有的ReceiverInputDStream;DStreamGraph存储了所有的InputDStream和output DStream;
(2)创建ReceiverLauncher对象,ReceiverTracker的内部类,用于启动Receiver;
(3)创建ReceivedBlockTracker对象,用于存储Receiver创建好的Block数据描述信息,具体的Block数据存储在Executor的BlockManager中;其功能类似Driver端的BlockManager。
(1)创建ReceiverTrackerActor对象,内部类,处理Receiver的注册及Receiver创建的Block信息;
(2)启动ReceiverLauncher;ReceiverLauncher对象负责创建线程,并在线程中调用自身的startReceivers方法来启动所有的Receiver。
(1)遍历receiverInputStreams中的ReceiverInputDStream,以创建对应的Receiver,SocketInputDStream对应为SocketReceiver;
(2)通过makeRDD方法将所有的Receiver封装成RDD,分区数等于Receiver数量,每个Receiver将会占用一个Task,即一个core;
(3)创建startReceiver函数对象,将在Executor端执行;
(4)基于Receiver的RDD提交Job,每个Receiver将作为一个Task在Executor上执行。
Receiver类图关系:
到此,Driver端的启动准备工作就结束了,接下来开始Executor端的执行。
每个Receiver对应一个ReceiverSupervisorImpl,负责监管Receiver。
(1)onStart方法(ReceiverSupervisorImpl)用于启动BlockGenerator(调用其start方法);
(2)startReceiver则启动对应的Receiver。
虽然是先启动BlockGenerator,但数据由startReceiver产生,这里先介绍startReceiver方法。
调用Receiver的onStart方法,启动Receiver数据接收流程。
下面以SocketReceiver为例,描述数据接收过程。
创建线程并启动,在线程中执行receive方法。
(1)建立TCP连接;
(2)循环读取数据,并调用store方法存储数据。
executor是ReceiverSupervisorImpl对象,在ReceiverSupervisor构造体中设置。
ReceiverSupervisorImpl.pushSingle代码如下:
BlockGenerator.addData方法如下:
addData方法将data数据放入currentBuffer数组,该数组由BlockGenerator的定时器线程定时调用,把其中的内容封装成Block。
(1)创建RecurringTimer对象,其内部包含一个线程,启动后将周期性的调用updateCurrentBuffer方法;
(2)创建blockPushingThread线程,启动后将调用keepPushingBlocks方法;
(3)start方法负责启动上述的两个线程。
(1)这个一个同步方法;
(2)创建新Buffer与currentBuffer进行交换,currentBuffer中存储的是Receiver接收到的数据;
(3)将currentBuffer中的所有数据封装成Block对象,blockId名称格式:input-${streamId}-${uniqueId},其中streamId表示Receiver的编号,uniqueId为时间戳;
(4)将Block对象放入blocksForPushing队列。
从blocksForPushing取出一个Block,然后调用BlockGenerator.pushBlock方法。
listener是一个BlockGeneratorListener对象,其具体实现为一个匿名类,作为BlockGenerator对象创建参数,位于ReceiverSupervisorImpl类中。
onPushBlock方法将调用ReceiverSupervisorImpl.pushArrayBuffer,该方法将调用ReceiverSupervisorImpl.pushAndReportBlock方法。
(1)通过receivedBlockHandler对象的storeBlock将block存储在BlockManager中,并通知Driver端的BlockManager;ReceivedBlockHandler有两个实现:
WriteAheadLogBasedBlockHandler和BlockManagerBasedBlockHandler,默认为后者;
(2)发送AddBlock消息给ReceiverTrackerActor。
该Actor运行在Driver端。
收到AddBlock消息后,将调用ReceiverTracker.addBlock方法。代码:
将ReceivedBlockInfo对象放入streamIdToUnallocatedBlockQueues哈希表, streamIdToUnallocatedBlockQueues定义:
例子示例代码:
val lines = ssc.socketTextStream("localhost", 9999) val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print()
例子中用StreamingContext.socketTextStream方法创建SocketInputDStream对象,并在SocketInputDStream执行print方法,生成的DStream关系图如下:
DStream类中包含一个以时间为键、RDD为值的哈希表:
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
DStream的所有转换和计算操作最终会基于该哈希表中的RDD进行。
1. Receiver启动
流程如下:Receiver启动分为两个部分,一是在Driver端初始化ReceiverTracker等,二是在将所有的Receiver封装成RDD,并发送的Executor执行。
1.1. StreamingContext.start
该方法是流程执行入口:def start(): Unit = synchronized { if (state == Started) { throw new SparkException("StreamingContext has already been started") } if (state == Stopped) { throw new SparkException("StreamingContext has already been stopped") } validate() sparkContext.setCallSite(DStream.getCreationSite()) scheduler.start() state = Started }
(1)检查StreamingContext状态;
(2)启动JobScheduler。
1.2. JobScheduler.start
def start(): Unit = synchronized { if (eventActor != null) return // scheduler has already been started logDebug("Starting JobScheduler") eventActor = ssc.env.actorSystem.actorOf(Props(new Actor { def receive = { case event: JobSchedulerEvent => processEvent(event) } }), "JobScheduler") listenerBus.start() receiverTracker = new ReceiverTracker(ssc) receiverTracker.start() jobGenerator.start() logInfo("Started JobScheduler") }
(1)创建匿名Actor用于处理JobSchedulerEvent消息;
(2)创建并启动ReceiverTracker;
(3)启动JobGenerator。
1.3. ReceiverTracker
1.3.1 ReceiverTracker初始化
private[streaming] class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging { private val receiverInputStreams = ssc.graph.getReceiverInputStreams() private val receiverInputStreamIds = receiverInputStreams.map { _.id } private val receiverExecutor = new ReceiverLauncher() private val receiverInfo = new HashMap[Int, ReceiverInfo] with SynchronizedMap[Int, ReceiverInfo] private val receivedBlockTracker = new ReceivedBlockTracker( ssc.sparkContext.conf, ssc.sparkContext.hadoopConfiguration, receiverInputStreamIds, ssc.scheduler.clock, Option(ssc.checkpointDir) ) private val listenerBus = ssc.scheduler.listenerBus // actor is created when generator starts. // This not being null means the tracker has been started and not stopped private var actor: ActorRef = null
(1)从DStreamGraph中读取所有的ReceiverInputDStream;DStreamGraph存储了所有的InputDStream和output DStream;
(2)创建ReceiverLauncher对象,ReceiverTracker的内部类,用于启动Receiver;
(3)创建ReceivedBlockTracker对象,用于存储Receiver创建好的Block数据描述信息,具体的Block数据存储在Executor的BlockManager中;其功能类似Driver端的BlockManager。
1.3.2. ReceiverTracker.start
def start() = synchronized { if (actor != null) { throw new SparkException("ReceiverTracker already started") } if (!receiverInputStreams.isEmpty) { actor = ssc.env.actorSystem.actorOf(Props(new ReceiverTrackerActor), "ReceiverTracker") if (!skipReceiverLaunch) receiverExecutor.start() logInfo("ReceiverTracker started") } }
(1)创建ReceiverTrackerActor对象,内部类,处理Receiver的注册及Receiver创建的Block信息;
(2)启动ReceiverLauncher;ReceiverLauncher对象负责创建线程,并在线程中调用自身的startReceivers方法来启动所有的Receiver。
1.3.3. ReceiverLauncher.startReceivers
private def startReceivers() { val receivers = receiverInputStreams.map(nis => { val rcvr = nis.getReceiver() rcvr.setReceiverId(nis.id) rcvr }) // Right now, we only honor preferences if all receivers have them val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _) // Create the parallel collection of receivers to distributed them on the worker nodes val tempRDD = if (hasLocationPreferences) { val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get))) ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences) } else { ssc.sc.makeRDD(receivers, receivers.size) } val checkpointDirOption = Option(ssc.checkpointDir) val serializableHadoopConf = new SerializableWritable(ssc.sparkContext.hadoopConfiguration) // Function to start the receiver on the worker node val startReceiver = (iterator: Iterator[Receiver[_]]) => { if (!iterator.hasNext) { throw new SparkException( "Could not start receiver as object not found.") } val receiver = iterator.next() val supervisor = new ReceiverSupervisorImpl( receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption) supervisor.start() supervisor.awaitTermination() } // Run the dummy Spark job to ensure that all slaves have registered. // This avoids all the receivers to be scheduled on the same node. if (!ssc.sparkContext.isLocal) { ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect() } // Distribute the receivers and start them logInfo("Starting " + receivers.length + " receivers") running = true ssc.sparkContext.runJob(tempRDD, ssc.sparkContext.clean(startReceiver)) running = false logInfo("All of the receivers have been terminated") }
(1)遍历receiverInputStreams中的ReceiverInputDStream,以创建对应的Receiver,SocketInputDStream对应为SocketReceiver;
(2)通过makeRDD方法将所有的Receiver封装成RDD,分区数等于Receiver数量,每个Receiver将会占用一个Task,即一个core;
(3)创建startReceiver函数对象,将在Executor端执行;
(4)基于Receiver的RDD提交Job,每个Receiver将作为一个Task在Executor上执行。
Receiver类图关系:
到此,Driver端的启动准备工作就结束了,接下来开始Executor端的执行。
1.4. Receiver Task
每个Receiver都会作为一个Task在Executor端执行,执行的具体函数就是上面创建的startReceiver函数对象。代码如下:// Function to start the receiver on the worker node val startReceiver = (iterator: Iterator[Receiver[_]]) => { if (!iterator.hasNext) { throw new SparkException( "Could not start receiver as object not found.") } val receiver = iterator.next() val supervisor = new ReceiverSupervisorImpl( receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption) supervisor.start() supervisor.awaitTermination() }
每个Receiver对应一个ReceiverSupervisorImpl,负责监管Receiver。
1.4.1. ReceiverSupervisor.start
/** Start the supervisor */ def start() { onStart() startReceiver() }
(1)onStart方法(ReceiverSupervisorImpl)用于启动BlockGenerator(调用其start方法);
(2)startReceiver则启动对应的Receiver。
虽然是先启动BlockGenerator,但数据由startReceiver产生,这里先介绍startReceiver方法。
1.5. ReceiverSupervisor.startReceiver
def startReceiver(): Unit = synchronized { try { logInfo("Starting receiver") receiver.onStart() logInfo("Called receiver onStart") onReceiverStart() receiverState = Started } catch { case t: Throwable => stop("Error starting receiver " + streamId, Some(t)) } }
调用Receiver的onStart方法,启动Receiver数据接收流程。
下面以SocketReceiver为例,描述数据接收过程。
1.5.1. SocketReceiver.onStart
def onStart() { // Start the thread that receives data over a connection new Thread("Socket Receiver") { setDaemon(true) override def run() { receive() } }.start() }
创建线程并启动,在线程中执行receive方法。
1.5.2. SocketReceiver.receive
def receive() { var socket: Socket = null try { logInfo("Connecting to " + host + ":" + port) socket = new Socket(host, port) logInfo("Connected to " + host + ":" + port) val iterator = bytesToObjects(socket.getInputStream()) while(!isStopped && iterator.hasNext) { store(iterator.next) } logInfo("Stopped receiving") restart("Retrying connecting to " + host + ":" + port) } catch { ... } finally { if (socket != null) { socket.close() logInfo("Closed socket to " + host + ":" + port) } } }
(1)建立TCP连接;
(2)循环读取数据,并调用store方法存储数据。
1.5.3. Receiver.store
def store(dataItem: T) { executor.pushSingle(dataItem) }
executor是ReceiverSupervisorImpl对象,在ReceiverSupervisor构造体中设置。
ReceiverSupervisorImpl.pushSingle代码如下:
def pushSingle(data: Any) { blockGenerator.addData(data) }
BlockGenerator.addData方法如下:
def addData (data: Any): Unit = synchronized { waitToPush() currentBuffer += data }
addData方法将data数据放入currentBuffer数组,该数组由BlockGenerator的定时器线程定时调用,把其中的内容封装成Block。
1.6 BlockGenerator
1.6.1. BlockGenerator初始化及启动
private[streaming] class BlockGenerator( listener: BlockGeneratorListener, receiverId: Int, conf: SparkConf ) extends RateLimiter(conf) with Logging { private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any]) private val clock = new SystemClock() private val blockInterval = conf.getLong("spark.streaming.blockInterval", 200) private val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer, "BlockGenerator") private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10) private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize) private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } @volatile private var currentBuffer = new ArrayBuffer[Any] @volatile private var stopped = false /** Start block generating and pushing threads. */ def start() { blockIntervalTimer.start() blockPushingThread.start() logInfo("Started BlockGenerator") }
(1)创建RecurringTimer对象,其内部包含一个线程,启动后将周期性的调用updateCurrentBuffer方法;
(2)创建blockPushingThread线程,启动后将调用keepPushingBlocks方法;
(3)start方法负责启动上述的两个线程。
1.6.2. BlockGenerator.updateCurrentBuffer
private def updateCurrentBuffer(time: Long): Unit = synchronized { try { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[Any] if (newBlockBuffer.size > 0) { val blockId = StreamBlockId(receiverId, time - blockInterval) val newBlock = new Block(blockId, newBlockBuffer) listener.onGenerateBlock(blockId) blocksForPushing.put(newBlock) // put is blocking when queue is full logDebug("Last element in " + blockId + " is " + newBlockBuffer.last) } } catch { case ie: InterruptedException => logInfo("Block updating timer thread was interrupted") case e: Exception => reportError("Error in block updating thread", e) } }
(1)这个一个同步方法;
(2)创建新Buffer与currentBuffer进行交换,currentBuffer中存储的是Receiver接收到的数据;
(3)将currentBuffer中的所有数据封装成Block对象,blockId名称格式:input-${streamId}-${uniqueId},其中streamId表示Receiver的编号,uniqueId为时间戳;
(4)将Block对象放入blocksForPushing队列。
1.6.3 BlockGenerator.keepPushingBlocks
private def keepPushingBlocks() { logInfo("Started block pushing thread") try { while(!stopped) { Option(blocksForPushing.poll(100, TimeUnit.MILLISECONDS)) match { case Some(block) => pushBlock(block) case None => } } ... } catch { ... } }
从blocksForPushing取出一个Block,然后调用BlockGenerator.pushBlock方法。
private def pushBlock(block: Block) { listener.onPushBlock(block.id, block.buffer) logInfo("Pushed block " + block.id) }
listener是一个BlockGeneratorListener对象,其具体实现为一个匿名类,作为BlockGenerator对象创建参数,位于ReceiverSupervisorImpl类中。
private val blockGenerator = new BlockGenerator(new BlockGeneratorListener { def onAddData(data: Any, metadata: Any): Unit = { } def onGenerateBlock(blockId: StreamBlockId): Unit = { } def onError(message: String, throwable: Throwable) { reportError(message, throwable) } def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) { pushArrayBuffer(arrayBuffer, None, Some(blockId)) } }, streamId, env.conf)
onPushBlock方法将调用ReceiverSupervisorImpl.pushArrayBuffer,该方法将调用ReceiverSupervisorImpl.pushAndReportBlock方法。
def pushAndReportBlock( receivedBlock: ReceivedBlock, metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { val blockId = blockIdOption.getOrElse(nextBlockId) val numRecords = receivedBlock match { case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size case _ => -1 } val time = System.currentTimeMillis val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock) logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms") val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult) val future = trackerActor.ask(AddBlock(blockInfo))(askTimeout) Await.result(future, askTimeout) logDebug(s"Reported block $blockId") }
(1)通过receivedBlockHandler对象的storeBlock将block存储在BlockManager中,并通知Driver端的BlockManager;ReceivedBlockHandler有两个实现:
WriteAheadLogBasedBlockHandler和BlockManagerBasedBlockHandler,默认为后者;
(2)发送AddBlock消息给ReceiverTrackerActor。
1.7. ReceiverTrackerActor
private class ReceiverTrackerActor extends Actor { def receive = { case RegisterReceiver(streamId, typ, host, receiverActor) => registerReceiver(streamId, typ, host, receiverActor, sender) sender ! true case AddBlock(receivedBlockInfo) => sender ! addBlock(receivedBlockInfo) case ReportError(streamId, message, error) => reportError(streamId, message, error) case DeregisterReceiver(streamId, message, error) => deregisterReceiver(streamId, message, error) sender ! true } }
该Actor运行在Driver端。
收到AddBlock消息后,将调用ReceiverTracker.addBlock方法。代码:
private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { receivedBlockTracker.addBlock(receivedBlockInfo) }
1.7.1. ReceivedBlockTracker.addBlock
def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized { try { writeToLog(BlockAdditionEvent(receivedBlockInfo)) getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo logDebug(s"Stream ${receivedBlockInfo.streamId} received " + s"block ${receivedBlockInfo.blockStoreResult.blockId}") true } catch { case e: Exception => logError(s"Error adding block $receivedBlockInfo", e) false } }
将ReceivedBlockInfo对象放入streamIdToUnallocatedBlockQueues哈希表, streamIdToUnallocatedBlockQueues定义:
private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo] private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
相关文章推荐
- js+html5实现canvas绘制简单矩形的方法
- Unable to resolve address ' ' service ' ': Name or service not known
- Connecting Docker for Cloud Services using SDN and Network Virtualization
- iframe自适应(去除滚动条)
- MySql 与C# 乱码问题处理
- 解析Handler,MessageQueue,Message,Looper之间的关系
- EnumMap的用法
- hdoj1091(3)
- 链接(2)——动态链接汇编探秘
- 开始刷leetcode day32: Rotate List
- 团队工作总结
- SQL注入原理解说,非常不错!
- win7远程连接ubuntu14.04.1桌面
- ubuntu创建启动器
- 融云 Android sdk kit 头像昵称更新机制
- LightOJ1008---Fibsieve`s Fantabulous Birthday (规律)
- leetcode 21 -- Merge Two Sorted Lists
- Theano3.1-练习之初步介绍
- Theano3.1-练习之初步介绍
- iOS中动态计算字符串的长度