您的位置:首页 > 其它

spark学习-48-Spark的event事件监听器LiveListenerBus和特质SparkListenerBus以及特质ListenerBus

2017-11-29 15:43 591 查看
1。Spark 事件体系的中枢是ListenerBus,由该类接受Event并且分发给各个Listener。MetricsSystem 则是一个为了衡量系统的各种指标的度量系统。Listener可以是MetricsSystem的信息来源之一。他们之间总体是一个互相补充的关系。

2。事件监听器LiveListenerBus是在SparkContext中创建的

// An asynchronous listener bus for Spark events
// 一个Spark事件的异步监听总线
private[spark] val listenerBus = new LiveListenerBus(this)


3。LiveListenerBus和SparkListenerBus以及ListenerBus的关系如图



4。ListenerBus

ListenerBus是一个特质,因为有多中监听器,所以这里当一个总的,因此不需要设计太多功能

首先有一个属性

// Marked `private[spark]` for access in tests.
/**
* listener链表保存在ListenerBus类中,为了保证并发访问的安全性,此处采用Java的CopyOnWriteArrayList
* 类来存储listener. 当需要对listener链表进行更改时,CopyOnWriteArrayList的特性使得会先复制整个链表,\
* 然后在复制的链表上面进行修改.当一旦获得链表的迭代器,在迭代器的生命周期中,可以保证数据的一致性.
* */
private[spark] val listeners = new CopyOnWriteArrayList[L]


然后是增加各种监听器监听总线的方法。

/**
* Add a listener to listen events. This method is thread-safe and can be called in any thread.
* 添加一个监听器来监听事件。这种方法是线程安全的,可以在任何线程中调用。
*/
final def addListener(listener: L): Unit = {
listeners.add(listener)
}


然后是从监听总线删除某个监听器的方法。

/**
* Remove a listener and it won't receive any events. This method is thread-safe and can be called
* in any thread.
* 删除一个监听器,它不会收到任何事件。这种方法是线程安全的,可以在任何线程中调用。
*/
final def removeListener(listener: L): Unit = {
listeners.remove(listener)
}


再然后是把各种事件发送给监听器的方法,就如同公公共汽车一样,车站上车,到站下车

/**
* Post the event to all registered listeners. The `postToAll` caller should guarantee calling
* `postToAll` in the same thread for all events.
*
* 将事件发布到所有已注册的侦听器。“postToAll”调用者应该保证在所有事件的同一线程中调用“postToAll”。
*/
def postToAll(event: E): Unit = {
// JavaConverters can create a JIterableWrapper if we use asScala.
// However, this method will be called frequently. To avoid the wrapper cost, here we use
// Java Iterator directly.

140d8
val iter = listeners.iterator   //得到遍历所有添加到ListenerBus中的侦听器,比如SparkListener等等
while (iter.hasNext) {
val listener = iter.next()
try {
// 每次遍历一个监听器,就把监听器的类型和事件发过去,这一点是无差别攻击,只要是监听器,我就发给你,管你是不是你的,都发给你
// 然后由每个监听器去鉴定是不是我的啊,是的话就处理,不是的话就扔到,
// (就像老师给张三布置作业,老师站在讲台上说,张三你写篇作文,然后全班都听到了,但是只有张三去写作文)
doPostEvent(listener, event)
} catch {
case NonFatal(e) =>
logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
}
}
}


最后是一个接口,便于实现监听总线的,接受发送过来的事件,自己在子类中处理

/**
* Post an event to the specified listener. `onPostEvent` is guaranteed to be called in the same
* thread for all listeners.
*
* 将事件发送到指定的侦听器。“onPostEvent”保证在同一个线程中为所有侦听器调用。
*
* 只要继承这个ListenerBus这个特质的,都要用这个方法去处理
*/
protected def doPostEvent(listener: L, event: E): Unit


最后是一个辅助方法,根据类找到对应的监听器

private[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = {
val c = implicitly[ClassTag[T]].runtimeClass
listeners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq
}


5。因为SparkContext中,listenerBus 默认的实现类是LiveListenerBus

// An asynchronous listener bus for Spark events
// 一个Spark事件的异步监听总线,先创建listenerBus,因为它作为参数创递给sparkEnv
private[spark] val listenerBus = new LiveListenerBus(this)


所以我们看看这个类,这个类LiveListenerBus里面主要是维护了一个队列,采用先进先出的方式整体思想是生产者—消费者模式

主要的队列是eventQueue ,队列的大小默认为10000

/ Cap the capacity of the event queue so we get an explicit error (rather than
// an OOM exception) if it's perpetually being added to more quickly than it's being drained.
// 限制事件队列的容量,这样我们就得到了一个显式的错误(而不是OOM异常),
// 如果它总是被不断地添加到比它正在被消耗的更快的速度。
// 1.5的版本是private vall EVENT_QUEUE_CAPACITY=10000 现在一直追加到最后,任然是默认值为1000
private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()
// 事件阻塞队列 一个队列还是懒加载的 消息队列实际上是保存在类liveListenerBus中的:
private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)

// EVENT_QUEUE_CAPACITY=10000 现在一直追加到最后,任然是默认值为1000
private def validateAndGetQueueSize(): Int = {
val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
if (queueSize <= 0) {
throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!")
}
queueSize
}


然后是生产者,队列总要有人放入数据啊,放入的方法如下

// TODO:生产者:
/**
* 产生很多SparkListenerEvent的事件,比如SparkListenerStageSubmitted,SparkListenerStageCompleted,SparkListenerTaskStart等等
* @param event
*/
def post(event: SparkListenerEvent): Unit = {
if (stopped.get) {
// Drop further events to make `listenerThread` exit ASAP
logError(s"$name has already stopped! Dropping event $event")
return
}
// 在事件队列队尾添加事件
// add()和offer()区别:两者都是往队列尾部插入元素,不同的时候,当超出队列界限的时候,add()方法是抛出异常让你处理,而offer()方法是直接返回false
val eventAdded = eventQueue.offer(event)
if (eventAdded) {
//如果成功加入队列,则在信号量中加一,与之对应,消费者线程就可以消费这个元素
/** 当成功放入一个事件到事件队列时,就调用eventLock.release(),信号量值加1,listener线程取事件
* 之前通过eventLock.acquire(),阻塞直到有一个信号(信号量大于0),然后信号量减1,取出一个事件
* 处理。另外,LiveListenerBus的stop函数里面也调用了eventLock.release(),意思就是通过让
* listener线程最后可以多取一个空事件,通过对空事件的处理
* */
eventLock.release()
} else {
// 如果事件队列超过其容量,则将删除新的事件,这些子类将被通知到删除事件。(
// (直接丢弃肯定不行啊,不然程序都连不在一起了,所以还要通知子类),但是貌似没看到通知子类的代码
onDropEvent(event)
droppedEventsCounter.incrementAndGet()
}

val droppedEvents = droppedEventsCounter.get
if (droppedEvents > 0) {
// Don't log too frequently   日志不要太频繁
// 如果上一次,队列满了EVENT_QUEUE_CAPACITY=1000设置的值,就丢掉,然后记录一个时间,如果一直持续丢掉,那么每过60秒记录一次日志,不然日志会爆满的
if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
// There may be multiple threads trying to decrease droppedEventsCounter.
// Use "compareAndSet" to make sure only one thread can win.
// And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
// then that thread will update it.
// 可能有多个线程试图减少droppedEventsCounter。
// 使用“compareAndSet”,确保只有一个线程能够获
// 如果另一个线程正在增加droppedEventsCounter,“compareAndSet”将会失败
// 然后线程会更新它。
if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
val prevLastReportTimestamp = lastReportTimestamp
lastReportTimestamp = System.currentTimeMillis()
// 记录一个warn日志,表示这个事件,被丢弃了
logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +
new java.util.Date(prevLastReportTimestamp))
}
}
}
}


然后还要有消费者啊,不然队列只放不取,会爆满的

// TODO:消费者:
/**
* 整个思想就是典型的生产者消费者思想.为了保证生产者和消费者对消息队列的并发访问,在每次需要获取消息的时候,
* 调用eventLock.acquire()来获取信号量, 信号量的值就是当前队列中所含有的事件数量.如果正常获取到事件,
* 就调用postToAll将事件分发给所有listener, 继续下一次循环. 如果获取到null值, 则有下面两种情况:
*     整个application正常结束, 此时stopped值已经被设置为true
*     系统发生了错误, 立即终止运行
*
*
*  这个方法就是消费者,每次从队列首拿出来一个元素,然后,事件发布到所有已注册的侦听器。
*  你是什么类型的事件,就由什么侦听器去处理
* */
private val listenerThread = new Thread(name) {
setDaemon(true) //线程本身设为守护线程 (thread.setDaemon(true)必须在thread.start()之前设置,否则会跑出一个IllegalThreadStateException异常。)
// tryOrStopSparkContext把一个代码块当做执行的单元,如果有任何未捕获的异常,那么就停止SparkContext
override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
LiveListenerBus.withinListenerThread.withValue(true) {
while (true) {
// 只要为真,那么就一直接受新的event
eventLock.acquire()
self.synchronized {
processingEvent = true
}
try {
val event = eventQueue.poll  // remove() 和 poll() 方法都是从队列中删除第一个元素(head)。
if (event == null) {
// Get out of the while loop and shutdown the daemon thread
// 跳出while循环,关闭守护进程线程
if (!stopped.get) {
throw new IllegalStateException("Polling `null` from eventQueue means" +
" the listener bus has been stopped. So `stopped` must be true")
}
return
}
// 调用ListenerBus的postToAll(event: E)方法
postToAll(event)
} finally {
self.synchronized {
processingEvent = false
}
}
}
}
}
}


这里是一个守护线程,需要手动启动的,不然这个方法线程不会运行的,启动方法如下

/**
* Start sending events to attached listeners.
* 开始发送事件给附加的监听器。
*
* This first sends out all buffered events posted before this listener bus has started, then
* listens for any additional events asynchronously while the listener bus is still running.
* This should only be called once.
*
* 在此侦听器总线启动之前,它首先发出所有的缓冲事件,然后在侦听器总线仍在运行的情况下,异步侦听任何其他事件。这只需要调用一次。
*
*/
def start(): Unit = {
// CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做
if (started.compareAndSet(false, true)) { // 这一点涉及并发编程,反正这一点是循环设置为true
listenerThread.start() //启动消费者线程
} else {
throw new IllegalStateException(s"$name already started!")
}
}


消费之启动了,然后要把事件发给能处理的侦听器

// 调用ListenerBus的postToAll(event: E)方法
postToAll(event)


这个方法调用ListenerBus的postToAll(event: E)方法

/**
* Post the event to all registered listeners. The `postToAll` caller should guarantee calling
* `postToAll` in the same thread for all events.
*
* 将事件发布到所有已注册的侦听器。“postToAll”调用者应该保证在所有事件的同一线程中调用“postToAll”。
*/
def postToAll(event: E): Unit = {
// JavaConverters can create a JIterableWrapper if we use asScala.
// However, this method will be called frequently. To avoid the wrapper cost, here we use
// Java Iterator directly.
val iter = listeners.iterator   //得到遍历所有添加到ListenerBus中的侦听器,比如SparkListener等等
while (iter.hasNext) {
val listener = iter.next()
try {
// 每次遍历一个监听器,就把监听器的类型和事件发过去,这一点是无差别攻击,只要是监听器,我就发给你,管你是不是你的,都发给你
// 然后由每个监听器去鉴定是不是我的啊,是的话就处理,不是的话就扔到,
// (就像老师给张三布置作业,老师站在讲台上说,张三你写篇作文,然后全班都听到了,但是只有张三去写作文)
doPostEvent(listener, event)
} catch {
case NonFatal(e) =>
logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
}
}
}


最后调用doPostEvent(listener, event),由其子类来处理

生产者中,如果队列满了,会丢弃这个事件,但是注释是还会通知子类,但是我没看到,哪里通知的

/**
* If the event queue exceeds its capacity, the new events will be dropped. The subclasses will be
* notified with the dropped events.
*
* 如果事件队列超过其容量,则将删除新的事件。这些子类将被通知到删除事件。但是没看到哪里通知子类的代码
*
* Note: `onDropEvent` can be called in any thread.  这个方法可以再任何线程中调用
*/
def onDropEvent(event: SparkListenerEvent): Unit = {
if (logDroppedEvent.compareAndSet(false, true)) {
// Only log the following message once to avoid duplicated annoying logs.
logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
"This likely means one of the SparkListeners is too slow and cannot keep up with " +
"the rate at which tasks are being started by the scheduler.")
}
}


最后是监听总线的停止,Spark应用结束或者异常退出,总要有处理后事的(就像人死了一样,要有收尸的)stop方法

/**
* Stop the listener bus. It will wait until the queued events have been processed, but drop the
* new events after stopping.
*
* 停止监听总线。它将等待队列事件被处理,但在停止后删除新的事件。
*/
def stop(): Unit = {
if (!started.get()) {
throw new IllegalStateException(s"Attempted to stop $name that has not yet started!")
}
if (stopped.compareAndSet(false, true)) {
// Call eventLock.release() so that listenerThread will poll `null` from `eventQueue` and know
// `stop` is called.
eventLock.release()
//然后等待消费者线程自动关闭
listenerThread.join()
} else {
// Keep quiet
}
}


6。比如SparkListenerBus的 doPostEvent()方法来处理

package org.apache.spark.scheduler

import org.apache.spark.util.ListenerBus

/**
* A [[SparkListenerEvent]] bus that relays [[SparkListenerEvent]]s to its listeners
*/
private[spark] trait SparkListenerBus
extends ListenerBus[SparkListenerInterface, SparkListenerEvent] {

// 真正的消息路由是由SparkListenerBus的onPostEvent函数完成的
protected override def doPostEvent(
listener: SparkListenerInterface,
event: SparkListenerEvent): Unit = {
event match {
case stageSubmitted: SparkListenerStageSubmitted =>
listener.onStageSubmitted(stageSubmitted)
case stageCompleted: SparkListenerStageCompleted =>
listener.onStageCompleted(stageCompleted)
case jobStart: SparkListenerJobStart =>
listener.onJobStart(jobStart)
case jobEnd: SparkListenerJobEnd =>
listener.onJobEnd(jobEnd)
case taskStart: SparkListenerTaskStart =>
listener.onTaskStart(taskStart)
case taskGettingResult: SparkListenerTaskGettingResult =>
listener.onTaskGettingResult(taskGettingResult)
case taskEnd: SparkListenerTaskEnd =>
listener.onTaskEnd(taskEnd)
case environmentUpdate: SparkListenerEnvironmentUpdate =>
listener.onEnvironmentUpdate(environmentUpdate)
case blockManagerAdded: SparkListenerBlockManagerAdded =>
listener.onBlockManagerAdded(blockManagerAdded)
case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
listener.onBlockManagerRemoved(blockManagerRemoved)
case unpersistRDD: SparkListenerUnpersistRDD =>
listener.onUnpersistRDD(unpersistRDD)
case applicationStart: SparkListenerApplicationStart =>
listener.onApplicationStart(applicationStart)
case applicationEnd: SparkListenerApplicationEnd =>
listener.onApplicationEnd(applicationEnd)
case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
listener.onExecutorMetricsUpdate(metricsUpdate)
case executorAdded: SparkListenerExecutorAdded =>
listener.onExecutorAdded(executorAdded)
case executorRemoved: SparkListenerExecutorRemoved =>
listener.onExecutorRemoved(executorRemoved)
case executorBlacklisted: SparkListenerExecutorBlacklisted =>
listener.onExecutorBlacklisted(executorBlacklisted)
case executorUnblacklisted: SparkListenerExecutorUnblacklisted =>
listener.onExecutorUnblacklisted(executorUnblacklisted)
case nodeBlacklisted: SparkListenerNodeBlacklisted =>
listener.onNodeBlacklisted(nodeBlacklisted)
case nodeUnblacklisted: SparkListenerNodeUnblacklisted =>
listener.onNodeUnblacklisted(nodeUnblacklisted)
case blockUpdated: SparkListenerBlockUpdated =>
listener.onBlockUpdated(blockUpdated)
case logStart: SparkListenerLogStart => // ignore event log metadata
case _ => listener.onOtherEvent(event)
}
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: