Spark Streaming性能优化系列-如何获得和持续使用足够的集群计算资源?
2016-06-17 15:23
585 查看
一:数据峰值的巨大影响
1. 数据确实不稳定,例如晚上的时候访问流量特别大
2. 在处理的时候例如GC的时候耽误时间会产生delay延迟
二:Backpressure:数据的反压机制
基本思想:根据上一次计算的Job的一些信息评估来决定下一个Job数据接收的速度。
如何限制Spark接收数据的速度?
Spark Streaming在接收数据的时候必须把当前的数据接收完毕才能接收下一条数据。
源码解析
RateController:
1. RateController是监听器,继承自StreamingListener.
问题来了,RateContoller什么时候被调用的呢?
BackPressure是根据上一次计算的Job信息来评估下一个Job数据接收的速度。因此肯定是在JobScheduler中被调用的。
1. 在JobScheduler的start方法中rateController方法是从inputStream中获取的。
}
3. 在StreamingListenerBus源码如下:
5. RateController中onBatchCompleted具体实现如下:
8. 将pulish消息给ReceiverTracker.
总体流程图如下:
总结:
每次上一个Batch Duration的Job执行完成之后,都会返回JobCompleted等信息,基于这些信息产生一个新的Rate,然后将新的Rate通过远程通信交给了Executor中,而Executor也会根据Rate重新设置Rate大小。
1. 数据确实不稳定,例如晚上的时候访问流量特别大
2. 在处理的时候例如GC的时候耽误时间会产生delay延迟
二:Backpressure:数据的反压机制
基本思想:根据上一次计算的Job的一些信息评估来决定下一个Job数据接收的速度。
如何限制Spark接收数据的速度?
Spark Streaming在接收数据的时候必须把当前的数据接收完毕才能接收下一条数据。
源码解析
RateController:
1. RateController是监听器,继承自StreamingListener.
/** * A StreamingListener that receives batch completion updates, and maintains * an estimate of the speed at which this stream should ingest messages, * given an estimate computation from a `RateEstimator` */ private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator) extends StreamingListener with Serializable {
问题来了,RateContoller什么时候被调用的呢?
BackPressure是根据上一次计算的Job信息来评估下一个Job数据接收的速度。因此肯定是在JobScheduler中被调用的。
1. 在JobScheduler的start方法中rateController方法是从inputStream中获取的。
// attach rate controllers of input streams to receive batch completion updates for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController)
2. 然后将此消息加入到listenerBus中。
/** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for * receiving system events related to streaming. */ def addStreamingListener(streamingListener: StreamingListener) { scheduler.listenerBus.addListener(streamingListener) }
}
3. 在StreamingListenerBus源码如下:
/** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */ private[spark] class StreamingListenerBus extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]("StreamingListenerBus") with Logging { private val logDroppedEvent = new AtomicBoolean(false) override def onPostEvent(listener: StreamingListener, event: StreamingListenerEvent): Unit = { event match { case receiverStarted: StreamingListenerReceiverStarted => listener.onReceiverStarted(receiverStarted) case receiverError: StreamingListenerReceiverError => listener.onReceiverError(receiverError) case receiverStopped: StreamingListenerReceiverStopped => listener.onReceiverStopped(receiverStopped) case batchSubmitted: StreamingListenerBatchSubmitted => listener.onBatchSubmitted(batchSubmitted) case batchStarted: StreamingListenerBatchStarted => listener.onBatchStarted(batchStarted) case batchCompleted: StreamingListenerBatchCompleted => listener.onBatchCompleted(batchCompleted)
4. 在RateController就实现了onBatchCompleted
5. RateController中onBatchCompleted具体实现如下:
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { val elements = batchCompleted.batchInfo.streamIdToInputInfo for { processingEnd <- batchCompleted.batchInfo.processingEndTime workDelay <- batchCompleted.batchInfo.processingDelay waitDelay <- batchCompleted.batchInfo.schedulingDelay elems <- elements.get(streamUID).map(_.numRecords) } computeAndPublish(processingEnd, elems, workDelay, waitDelay) }
6. RateController中computeAndPulish源码如下:
/** * Compute the new rate limit and publish it asynchronously. */ private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit = Future[Unit] { //评估新的更加合适Rate速度。 val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay) newRate.foreach { s => rateLimit.set(s.toLong) publish(getLatestRate()) } }
7. 其中publish实现是在ReceiverRateController中。
8. 将pulish消息给ReceiverTracker.
/** * A RateController that sends the new rate to receivers, via the receiver tracker. */ private[streaming] class ReceiverRateController(id: Int, estimator: RateEstimator) extends RateController(id, estimator) { override def publish(rate: Long): Unit = //因为会有很多RateController所以会有具体Id ssc.scheduler.receiverTracker.sendRateUpdate(id, rate) }
9. 在ReceiverTracker中sendRateUpdate源码如下: 此时的endpoint是ReceiverTrackerEndpoint.
/** Update a receiver's maximum ingestion rate */ def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized { if (isTrackerStarted) { endpoint.send(UpdateReceiverRateLimit(streamUID, newRate)) } }
10. 在ReceiverTrackerEndpoint的receive方法中就接收到了发来的消息。
case UpdateReceiverRateLimit(streamUID, newRate) => //根据receiverTrackingInfos获取info信息,然后根据endpoint获取通信句柄。 //此时endpoint是ReceiverSupervisor的endpoint通信实体。 for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) { eP.send(UpdateRateLimit(newRate)) }
11. 因此在ReceiverSupervisorImpl中接收到ReceiverTracker发来的消息。
/** RpcEndpointRef for receiving messages from the ReceiverTracker in the driver */ private val endpoint = env.rpcEnv.setupEndpoint( "Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint { override val rpcEnv: RpcEnv = env.rpcEnv override def receive: PartialFunction[Any, Unit] = { case StopReceiver => logInfo("Received stop signal") ReceiverSupervisorImpl.this.stop("Stopped by driver", None) case CleanupOldBlocks(threshTime) => logDebug("Received delete old batch signal") cleanupOldBlocks(threshTime) case UpdateRateLimit(eps) => logInfo(s"Received a new rate limit: $eps.") registeredBlockGenerators.foreach { bg => bg.updateRate(eps) } } })
12. RateLimiter中updateRate源码如下:
/** * Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by //这里有最大限制,因为你的集群处理规模是有限的。 //Spark Streaming可能运行在YARN之上,因为多个计算框架都在运行的话,资源就//更有限了。 * {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that. * * @param newRate A new rate in events per second. It has no effect if it's 0 or negative. */ private[receiver] def updateRate(newRate: Long): Unit = if (newRate > 0) { if (maxRateLimit > 0) { rateLimiter.setRate(newRate.min(maxRateLimit)) } else { rateLimiter.setRate(newRate) } }
总体流程图如下:
总结:
每次上一个Batch Duration的Job执行完成之后,都会返回JobCompleted等信息,基于这些信息产生一个新的Rate,然后将新的Rate通过远程通信交给了Executor中,而Executor也会根据Rate重新设置Rate大小。
相关文章推荐
- java对世界各个时区(TimeZone)的通用转换处理方法(转载)
- java-注解annotation
- java-模拟tomcat服务器
- java-用HttpURLConnection发送Http请求.
- java-WEB中的监听器Lisener
- Android IPC进程间通讯机制
- Android Native 绘图方法
- Android java 与 javascript互访(相互调用)的方法例子
- 介绍一款信息管理系统的开源框架---jeecg
- 聚类算法之kmeans算法java版本
- java实现 PageRank算法
- 深入理解PHP7内核之FAST_ZPP
- 详解HDFS Short Circuit Local Reads
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- PropertyChangeListener简单理解
- c++11 + SDL2 + ffmpeg +OpenAL + java = Android播放器
- 插入排序