spark学习-48-Spark的event事件监听器LiveListenerBus和特质SparkListenerBus以及特质ListenerBus
2017-11-29 15:43
591 查看
1。Spark 事件体系的中枢是ListenerBus,由该类接受Event并且分发给各个Listener。MetricsSystem 则是一个为了衡量系统的各种指标的度量系统。Listener可以是MetricsSystem的信息来源之一。他们之间总体是一个互相补充的关系。
2。事件监听器LiveListenerBus是在SparkContext中创建的
3。LiveListenerBus和SparkListenerBus以及ListenerBus的关系如图
4。ListenerBus
ListenerBus是一个特质,因为有多中监听器,所以这里当一个总的,因此不需要设计太多功能
首先有一个属性
然后是增加各种监听器到监听总线的方法。
然后是从监听总线删除某个监听器的方法。
再然后是把各种事件发送给监听器的方法,就如同公公共汽车一样,车站上车,到站下车
最后是一个接口,便于实现监听总线的,接受发送过来的事件,自己在子类中处理
最后是一个辅助方法,根据类找到对应的监听器
5。因为SparkContext中,listenerBus 默认的实现类是LiveListenerBus
所以我们看看这个类,这个类LiveListenerBus里面主要是维护了一个队列,采用先进先出的方式,整体思想是生产者—消费者模式
主要的队列是eventQueue ,队列的大小默认为10000
然后是生产者,队列总要有人放入数据啊,放入的方法如下
然后还要有消费者啊,不然队列只放不取,会爆满的
这里是一个守护线程,需要手动启动的,不然这个方法线程不会运行的,启动方法如下
消费之启动了,然后要把事件发给能处理的侦听器
这个方法调用ListenerBus的postToAll(event: E)方法
最后调用doPostEvent(listener, event),由其子类来处理
生产者中,如果队列满了,会丢弃这个事件,但是注释是还会通知子类,但是我没看到,哪里通知的
最后是监听总线的停止,Spark应用结束或者异常退出,总要有处理后事的(就像人死了一样,要有收尸的)stop方法
6。比如SparkListenerBus的 doPostEvent()方法来处理
2。事件监听器LiveListenerBus是在SparkContext中创建的
// An asynchronous listener bus for Spark events // 一个Spark事件的异步监听总线 private[spark] val listenerBus = new LiveListenerBus(this)
3。LiveListenerBus和SparkListenerBus以及ListenerBus的关系如图
4。ListenerBus
ListenerBus是一个特质,因为有多中监听器,所以这里当一个总的,因此不需要设计太多功能
首先有一个属性
// Marked `private[spark]` for access in tests. /** * listener链表保存在ListenerBus类中,为了保证并发访问的安全性,此处采用Java的CopyOnWriteArrayList * 类来存储listener. 当需要对listener链表进行更改时,CopyOnWriteArrayList的特性使得会先复制整个链表,\ * 然后在复制的链表上面进行修改.当一旦获得链表的迭代器,在迭代器的生命周期中,可以保证数据的一致性. * */ private[spark] val listeners = new CopyOnWriteArrayList[L]
然后是增加各种监听器到监听总线的方法。
/** * Add a listener to listen events. This method is thread-safe and can be called in any thread. * 添加一个监听器来监听事件。这种方法是线程安全的,可以在任何线程中调用。 */ final def addListener(listener: L): Unit = { listeners.add(listener) }
然后是从监听总线删除某个监听器的方法。
/** * Remove a listener and it won't receive any events. This method is thread-safe and can be called * in any thread. * 删除一个监听器,它不会收到任何事件。这种方法是线程安全的,可以在任何线程中调用。 */ final def removeListener(listener: L): Unit = { listeners.remove(listener) }
再然后是把各种事件发送给监听器的方法,就如同公公共汽车一样,车站上车,到站下车
/** * Post the event to all registered listeners. The `postToAll` caller should guarantee calling * `postToAll` in the same thread for all events. * * 将事件发布到所有已注册的侦听器。“postToAll”调用者应该保证在所有事件的同一线程中调用“postToAll”。 */ def postToAll(event: E): Unit = { // JavaConverters can create a JIterableWrapper if we use asScala. // However, this method will be called frequently. To avoid the wrapper cost, here we use // Java Iterator directly. 140d8 val iter = listeners.iterator //得到遍历所有添加到ListenerBus中的侦听器,比如SparkListener等等 while (iter.hasNext) { val listener = iter.next() try { // 每次遍历一个监听器,就把监听器的类型和事件发过去,这一点是无差别攻击,只要是监听器,我就发给你,管你是不是你的,都发给你 // 然后由每个监听器去鉴定是不是我的啊,是的话就处理,不是的话就扔到, // (就像老师给张三布置作业,老师站在讲台上说,张三你写篇作文,然后全班都听到了,但是只有张三去写作文) doPostEvent(listener, event) } catch { case NonFatal(e) => logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e) } } }
最后是一个接口,便于实现监听总线的,接受发送过来的事件,自己在子类中处理
/** * Post an event to the specified listener. `onPostEvent` is guaranteed to be called in the same * thread for all listeners. * * 将事件发送到指定的侦听器。“onPostEvent”保证在同一个线程中为所有侦听器调用。 * * 只要继承这个ListenerBus这个特质的,都要用这个方法去处理 */ protected def doPostEvent(listener: L, event: E): Unit
最后是一个辅助方法,根据类找到对应的监听器
private[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = { val c = implicitly[ClassTag[T]].runtimeClass listeners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq }
5。因为SparkContext中,listenerBus 默认的实现类是LiveListenerBus
// An asynchronous listener bus for Spark events // 一个Spark事件的异步监听总线,先创建listenerBus,因为它作为参数创递给sparkEnv private[spark] val listenerBus = new LiveListenerBus(this)
所以我们看看这个类,这个类LiveListenerBus里面主要是维护了一个队列,采用先进先出的方式,整体思想是生产者—消费者模式
主要的队列是eventQueue ,队列的大小默认为10000
/ Cap the capacity of the event queue so we get an explicit error (rather than // an OOM exception) if it's perpetually being added to more quickly than it's being drained. // 限制事件队列的容量,这样我们就得到了一个显式的错误(而不是OOM异常), // 如果它总是被不断地添加到比它正在被消耗的更快的速度。 // 1.5的版本是private vall EVENT_QUEUE_CAPACITY=10000 现在一直追加到最后,任然是默认值为1000 private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize() // 事件阻塞队列 一个队列还是懒加载的 消息队列实际上是保存在类liveListenerBus中的: private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY) // EVENT_QUEUE_CAPACITY=10000 现在一直追加到最后,任然是默认值为1000 private def validateAndGetQueueSize(): Int = { val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE) if (queueSize <= 0) { throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!") } queueSize }
然后是生产者,队列总要有人放入数据啊,放入的方法如下
// TODO:生产者: /** * 产生很多SparkListenerEvent的事件,比如SparkListenerStageSubmitted,SparkListenerStageCompleted,SparkListenerTaskStart等等 * @param event */ def post(event: SparkListenerEvent): Unit = { if (stopped.get) { // Drop further events to make `listenerThread` exit ASAP logError(s"$name has already stopped! Dropping event $event") return } // 在事件队列队尾添加事件 // add()和offer()区别:两者都是往队列尾部插入元素,不同的时候,当超出队列界限的时候,add()方法是抛出异常让你处理,而offer()方法是直接返回false val eventAdded = eventQueue.offer(event) if (eventAdded) { //如果成功加入队列,则在信号量中加一,与之对应,消费者线程就可以消费这个元素 /** 当成功放入一个事件到事件队列时,就调用eventLock.release(),信号量值加1,listener线程取事件 * 之前通过eventLock.acquire(),阻塞直到有一个信号(信号量大于0),然后信号量减1,取出一个事件 * 处理。另外,LiveListenerBus的stop函数里面也调用了eventLock.release(),意思就是通过让 * listener线程最后可以多取一个空事件,通过对空事件的处理 * */ eventLock.release() } else { // 如果事件队列超过其容量,则将删除新的事件,这些子类将被通知到删除事件。( // (直接丢弃肯定不行啊,不然程序都连不在一起了,所以还要通知子类),但是貌似没看到通知子类的代码 onDropEvent(event) droppedEventsCounter.incrementAndGet() } val droppedEvents = droppedEventsCounter.get if (droppedEvents > 0) { // Don't log too frequently 日志不要太频繁 // 如果上一次,队列满了EVENT_QUEUE_CAPACITY=1000设置的值,就丢掉,然后记录一个时间,如果一直持续丢掉,那么每过60秒记录一次日志,不然日志会爆满的 if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) { // There may be multiple threads trying to decrease droppedEventsCounter. // Use "compareAndSet" to make sure only one thread can win. // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and // then that thread will update it. // 可能有多个线程试图减少droppedEventsCounter。 // 使用“compareAndSet”,确保只有一个线程能够获 // 如果另一个线程正在增加droppedEventsCounter,“compareAndSet”将会失败 // 然后线程会更新它。 if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) { val prevLastReportTimestamp = lastReportTimestamp lastReportTimestamp = System.currentTimeMillis() // 记录一个warn日志,表示这个事件,被丢弃了 logWarning(s"Dropped $droppedEvents SparkListenerEvents since " + new java.util.Date(prevLastReportTimestamp)) } } } }
然后还要有消费者啊,不然队列只放不取,会爆满的
// TODO:消费者: /** * 整个思想就是典型的生产者消费者思想.为了保证生产者和消费者对消息队列的并发访问,在每次需要获取消息的时候, * 调用eventLock.acquire()来获取信号量, 信号量的值就是当前队列中所含有的事件数量.如果正常获取到事件, * 就调用postToAll将事件分发给所有listener, 继续下一次循环. 如果获取到null值, 则有下面两种情况: * 整个application正常结束, 此时stopped值已经被设置为true * 系统发生了错误, 立即终止运行 * * * 这个方法就是消费者,每次从队列首拿出来一个元素,然后,事件发布到所有已注册的侦听器。 * 你是什么类型的事件,就由什么侦听器去处理 * */ private val listenerThread = new Thread(name) { setDaemon(true) //线程本身设为守护线程 (thread.setDaemon(true)必须在thread.start()之前设置,否则会跑出一个IllegalThreadStateException异常。) // tryOrStopSparkContext把一个代码块当做执行的单元,如果有任何未捕获的异常,那么就停止SparkContext override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) { LiveListenerBus.withinListenerThread.withValue(true) { while (true) { // 只要为真,那么就一直接受新的event eventLock.acquire() self.synchronized { processingEvent = true } try { val event = eventQueue.poll // remove() 和 poll() 方法都是从队列中删除第一个元素(head)。 if (event == null) { // Get out of the while loop and shutdown the daemon thread // 跳出while循环,关闭守护进程线程 if (!stopped.get) { throw new IllegalStateException("Polling `null` from eventQueue means" + " the listener bus has been stopped. So `stopped` must be true") } return } // 调用ListenerBus的postToAll(event: E)方法 postToAll(event) } finally { self.synchronized { processingEvent = false } } } } } }
这里是一个守护线程,需要手动启动的,不然这个方法线程不会运行的,启动方法如下
/** * Start sending events to attached listeners. * 开始发送事件给附加的监听器。 * * This first sends out all buffered events posted before this listener bus has started, then * listens for any additional events asynchronously while the listener bus is still running. * This should only be called once. * * 在此侦听器总线启动之前,它首先发出所有的缓冲事件,然后在侦听器总线仍在运行的情况下,异步侦听任何其他事件。这只需要调用一次。 * */ def start(): Unit = { // CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做 if (started.compareAndSet(false, true)) { // 这一点涉及并发编程,反正这一点是循环设置为true listenerThread.start() //启动消费者线程 } else { throw new IllegalStateException(s"$name already started!") } }
消费之启动了,然后要把事件发给能处理的侦听器
// 调用ListenerBus的postToAll(event: E)方法 postToAll(event)
这个方法调用ListenerBus的postToAll(event: E)方法
/** * Post the event to all registered listeners. The `postToAll` caller should guarantee calling * `postToAll` in the same thread for all events. * * 将事件发布到所有已注册的侦听器。“postToAll”调用者应该保证在所有事件的同一线程中调用“postToAll”。 */ def postToAll(event: E): Unit = { // JavaConverters can create a JIterableWrapper if we use asScala. // However, this method will be called frequently. To avoid the wrapper cost, here we use // Java Iterator directly. val iter = listeners.iterator //得到遍历所有添加到ListenerBus中的侦听器,比如SparkListener等等 while (iter.hasNext) { val listener = iter.next() try { // 每次遍历一个监听器,就把监听器的类型和事件发过去,这一点是无差别攻击,只要是监听器,我就发给你,管你是不是你的,都发给你 // 然后由每个监听器去鉴定是不是我的啊,是的话就处理,不是的话就扔到, // (就像老师给张三布置作业,老师站在讲台上说,张三你写篇作文,然后全班都听到了,但是只有张三去写作文) doPostEvent(listener, event) } catch { case NonFatal(e) => logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e) } } }
最后调用doPostEvent(listener, event),由其子类来处理
生产者中,如果队列满了,会丢弃这个事件,但是注释是还会通知子类,但是我没看到,哪里通知的
/** * If the event queue exceeds its capacity, the new events will be dropped. The subclasses will be * notified with the dropped events. * * 如果事件队列超过其容量,则将删除新的事件。这些子类将被通知到删除事件。但是没看到哪里通知子类的代码 * * Note: `onDropEvent` can be called in any thread. 这个方法可以再任何线程中调用 */ def onDropEvent(event: SparkListenerEvent): Unit = { if (logDroppedEvent.compareAndSet(false, true)) { // Only log the following message once to avoid duplicated annoying logs. logError("Dropping SparkListenerEvent because no remaining room in event queue. " + "This likely means one of the SparkListeners is too slow and cannot keep up with " + "the rate at which tasks are being started by the scheduler.") } }
最后是监听总线的停止,Spark应用结束或者异常退出,总要有处理后事的(就像人死了一样,要有收尸的)stop方法
/** * Stop the listener bus. It will wait until the queued events have been processed, but drop the * new events after stopping. * * 停止监听总线。它将等待队列事件被处理,但在停止后删除新的事件。 */ def stop(): Unit = { if (!started.get()) { throw new IllegalStateException(s"Attempted to stop $name that has not yet started!") } if (stopped.compareAndSet(false, true)) { // Call eventLock.release() so that listenerThread will poll `null` from `eventQueue` and know // `stop` is called. eventLock.release() //然后等待消费者线程自动关闭 listenerThread.join() } else { // Keep quiet } }
6。比如SparkListenerBus的 doPostEvent()方法来处理
package org.apache.spark.scheduler import org.apache.spark.util.ListenerBus /** * A [[SparkListenerEvent]] bus that relays [[SparkListenerEvent]]s to its listeners */ private[spark] trait SparkListenerBus extends ListenerBus[SparkListenerInterface, SparkListenerEvent] { // 真正的消息路由是由SparkListenerBus的onPostEvent函数完成的 protected override def doPostEvent( listener: SparkListenerInterface, event: SparkListenerEvent): Unit = { event match { case stageSubmitted: SparkListenerStageSubmitted => listener.onStageSubmitted(stageSubmitted) case stageCompleted: SparkListenerStageCompleted => listener.onStageCompleted(stageCompleted) case jobStart: SparkListenerJobStart => listener.onJobStart(jobStart) case jobEnd: SparkListenerJobEnd => listener.onJobEnd(jobEnd) case taskStart: SparkListenerTaskStart => listener.onTaskStart(taskStart) case taskGettingResult: SparkListenerTaskGettingResult => listener.onTaskGettingResult(taskGettingResult) case taskEnd: SparkListenerTaskEnd => listener.onTaskEnd(taskEnd) case environmentUpdate: SparkListenerEnvironmentUpdate => listener.onEnvironmentUpdate(environmentUpdate) case blockManagerAdded: SparkListenerBlockManagerAdded => listener.onBlockManagerAdded(blockManagerAdded) case blockManagerRemoved: SparkListenerBlockManagerRemoved => listener.onBlockManagerRemoved(blockManagerRemoved) case unpersistRDD: SparkListenerUnpersistRDD => listener.onUnpersistRDD(unpersistRDD) case applicationStart: SparkListenerApplicationStart => listener.onApplicationStart(applicationStart) case applicationEnd: SparkListenerApplicationEnd => listener.onApplicationEnd(applicationEnd) case metricsUpdate: SparkListenerExecutorMetricsUpdate => listener.onExecutorMetricsUpdate(metricsUpdate) case executorAdded: SparkListenerExecutorAdded => listener.onExecutorAdded(executorAdded) case executorRemoved: SparkListenerExecutorRemoved => listener.onExecutorRemoved(executorRemoved) case executorBlacklisted: SparkListenerExecutorBlacklisted => listener.onExecutorBlacklisted(executorBlacklisted) case executorUnblacklisted: SparkListenerExecutorUnblacklisted => listener.onExecutorUnblacklisted(executorUnblacklisted) case nodeBlacklisted: SparkListenerNodeBlacklisted => listener.onNodeBlacklisted(nodeBlacklisted) case nodeUnblacklisted: SparkListenerNodeUnblacklisted => listener.onNodeUnblacklisted(nodeUnblacklisted) case blockUpdated: SparkListenerBlockUpdated => listener.onBlockUpdated(blockUpdated) case logStart: SparkListenerLogStart => // ignore event log metadata case _ => listener.onOtherEvent(event) } } }
相关文章推荐
- Spark源码学习笔记3-LiveListenerBus
- spark 处理大量数据时报错: ERROR scheduler.LiveListenerBus: Listener EventLoggingListener threw an exception j
- spark ListenerBus 监听器
- RemoveEventListener无法移除事件监听器的情况
- ERROR LiveListenerBus: Listener EventLoggingListener threw an exception
- [原创]java WEB学习笔记48:其他的Servlet 监听器:域对象中属性的变更的事件监听器 (3 个),感知 Session 绑定的事件监听器(2个)
- spark学习-58-Spark的EventLoggingListener
- Spark问题5之ERROR LiveListenerBus SparkListenerBus has already stopped
- Android学习记录:MotionEvent,onTouch,OnTouchListener 事件机制等学习
- spark core 2.0 LiveListenerBus
- js玩具——UI组件:PropertyChangeEventListener 属性改变事件监听器及默认实现
- Servlet3.0学习总结(四)——使用注解标注监听器(Listener)
- android事件传递机制以及onInterceptTouchEvent()和onTouchEvent()总结
- spring boot学习笔记5-Event事件传递
- Dom对象事件注册和取消(addEventListener/attachEvent)
- javaweb学习总结(四十四)——监听器(Listener)学习
- jQuery 事件绑定(event)学习笔记
- Scala中隐式参数实战详解以及隐式参数在Spark中的应用源码解析之Scala学习笔记-50
- Dom对象事件注册和取消(addEventListener/attachEvent)
- Android中Preference的使用以及监听事件分析(自己学习)