您的位置:首页 > 其它

深入理解EventBus的设计思想

2013-12-08 00:39 337 查看
凌弃同学已经介绍了EventBus的使用方式

​如何使用——三步走:

​1、定义一个observer,并加入@Subscribe作为消息回调函数;

2、将observer注册到EventBus;EventBus.register(this);

​3、消息投递:eventBus.post(logTo);

本文将深入EventBus的源代码,和大家一起深入研究EventBus的让人惊叹的设计思路。由于作者水平有限,无法面面俱到,希望大家先读读EventBusExplained。

注:Guava的版本

Java代码


<dependency>

<groupId>com.google.guava</groupId>

<artifactId>guava</artifactId>

<version>15.0</version>

</dependency>


准备工作

为了方便大家理解Coder的思路,有一些名词或约定先解释一下:

在observer类(比如:例子中的
EventBusChangeRecorder
)里面,
@Subscribe
所annotate的
method
,有且只有一个参数。因为
EventBus#post(Object)
方法只有一个参数咯,比如

Java代码


//Classistypicallyregisteredbythecontainer.

classEventBusChangeRecorder{

//Subscribeannotation,并且只有一个ChangeEvent方法参数

@SubscribepublicvoidrecordCustomerChange(ChangeEvente){

recordChange(e.getChange());

}

}

通过
method
和observer的
instance
来定义一个
EventSubscriber
,请看源码

Java代码


SubscriberFindingStrategy#findAllSubscribers(Object)

在一个observer类里面,可以定义多个
@Subscribe
,根据
method.getParameterTypes()[0]
来缓存参数的类型——
EventType
Set<EventSubscriber>


Java代码


<code>//所谓SetMultimap,就是Map<Class<?>,Set<EventSubscriber>>

Set<EventSubscriber>>privatefinalSetMultimap<Class<?>,EventSubscriber>subscribersByType=HashMultimap.create();</code>



@Subscribe
所annotate的
method
的参数,不能支持泛型。因为在运行的时候,因为
Type
Erasure
导致拿不到"真正"的
parameterType
,举个例子:

Java代码


publicclassGenericClass<T>{//1

privateList<T>list;//2

privateMap<String,T>map;//3

public<U>UgenericMethod(Map<T,U>m){//4

returnnull;

}

}

上面代码里,带有注释的行里的泛型信息在运行时都还能获取到,原则是源码里写了什么运行时就能得到什么。针对1的GenericClass,运行时通过Class.getTypeParameters()方法得到的数组可以获取那个“T”;同理,2的T、3的java.lang.String与T、4的T与U都可以获得。源码文本里写的是什么运行时就能得到什么;像是T、U等在运行时的实际类型是获取不到的。


设计思路


Register/Unregister

在99.99%的使用场景中,是不会在runtime的时候去register/unregister某个observer的,在spring的环境,也是在init的时候做register/unregister。不过做framework就必须要考虑这0.01%的使用场景。在runtime的时候去register/unregister,最重要的就是线程安全问题:如果我在unregister某个observer的时候,正好调用
EventSubscriber
,会因为异常,导致Event不能送达到其它的observer上。所以在register/unregister的方法实现里面,都加入了
ReadWriteLock
,register/unregister的时候用
writeLock
,post的时候用
readLock


Java代码


<code>publicvoidregister(Objectobject){

//Map<Class<?>,Collection<EventSubscriber>>结构

Multimap<Class<?>,EventSubscriber>methodsInListener=

finder.findAllSubscribers(object);

subscribersByTypeLock.writeLock().lock();

try{

//subscribersByType是一个Map<Class<?>,Set<EventSubscriber>>结构

subscribersByType.putAll(methodsInListener);

}finally{

subscribersByTypeLock.writeLock().unlock();

}

}</code>



其次,在
SubscriberFindingStrategy#findAllSubscribers
的时候有也用到了Cache,原理与下面要研究的Post的Cache一模一样


Post

EventBus#post
的实现真的非常amazing,我们先从最初的设计思路开始,一步一步来。

最简单的想法就是,通过
post
传入一个
event
对象,这个
event
getClass
作为
key
,通过
subscribersByType
来获取
EventSubscriber
Set
,再调用
EventSubscriber#handleEvent
完成
method#invoke


这样的思路没有什么问题,不过EventBus的作者想得更多更远:

PostEverything

可以是任意的object,只要
subscribersByType
有这个Key

Java代码


Set<Class<?>>dispatchTypes=flattenHierarchy(event.getClass())

Cache

毕竟post的
Event
的class是有限的,所以我们可以在
classLoader
下缓存
flattenHierarchy
的输入和输出,正如:

Java代码


privatestaticfinalLoadingCache<Class<?>,Set<Class<?>>>flattenHierarchyCache=CacheBuilder.newBuilder()

.weakKeys()

.build(newCacheLoader<Class<?>,Set<Class<?>>>(){

@SuppressWarnings({"unchecked","rawtypes"})//safecast

@Override

publicSet<Class<?>>load(Class<?>concreteClass){

return(Set)TypeToken.of(concreteClass).getTypes().rawTypes();

}

});

注:static不是JVM下的全局共享,只是在
classloader
下面共享

WeakReference

也许你也注意到了,
flattenHierarchyCache
的Key(
EventType
)是一个
WeakReference
,这样做的目的就是GC友好。比方说你在runtime的时候,unregister了一个observer,这时候
subscribersByType
就不再Strong
Reference
这个
EventType
flattenHierarchyCache
也会在minor
gc
的时候回收内存。

ThreadLocal

EventBus里面最Amazing的实现,在EventBus里面使用了
ThreadLocal
的地方有两处

Java代码


/**queuesofeventsforthecurrentthreadtodispatch*/

privatefinalThreadLocal<Queue<EventWithSubscriber>>eventsToDispatch=newThreadLocal<Queue<EventWithSubscriber>>(){

@OverrideprotectedQueue<EventWithSubscriber>initialValue(){

returnnewLinkedList<EventWithSubscriber>();

}

};

/**trueifthecurrentthreadiscurrentlydispatchinganevent*/

privatefinalThreadLocal<Boolean>isDispatching=newThreadLocal<Boolean>(){

@OverrideprotectedBooleaninitialValue(){

returnfalse;

}

};

这样巧妙的设计,有三个目的:

解决嵌套问题。比方说一个observer有两个方法
@Subscribe
,其中一个方法的实现里面
bus.post(SECOND);
,为了避免已经处理过的
Event
再次被处理,所以需要
isDispatching
,下面是一个嵌套的例子。

Java代码


publicclassReentrantEventsHater{

booleanready=true;

List<Object>eventsReceived=Lists.newArrayList();

@Subscribe

publicvoidlistenForStrings(Stringevent){

eventsReceived.add(event);

ready=false;

try{

bus.post(SECOND);

}finally{

ready=true;

}

}

@Subscribe

publicvoidlistenForDoubles(Doubleevent){

assertTrue("IreceivedaneventwhenIwasn'tready!",ready);

eventsReceived.add(event);

}

}

eventsToDispatch
是一个
queue
,在
enqueueEvent
(请结合源码
EventBus#enqueueEvent
)的时候调用,
queue
的使用够减少读锁的占用时间

eventsToDispatch
dispatchQueuedEvents
通过ThreadLocal能够独立成为方法,方便了
AsyncEventBus
Override


ConcurrentLinkedQueuevsLinkedBlockingQueue

得益于
EventBus
的巧妙设计,
AsyncEventBus
的实现就容易很多,不过笔者也发现了一个很有意思的地方。JavaDoc里面都标识了

BlockingQueueimplementationsaredesignedtobeusedprimarilyforproducer-consumerqueues

那么为什么要选用ConcurrentLinkedQueue而不是LinkedBlockingQueue呢?

Java代码


/**thequeueofeventsissharedacrossallthreads*/

privatefinalConcurrentLinkedQueue<EventWithSubscriber>eventsToDispatch=newConcurrentLinkedQueue<EventWithSubscriber>();

protectedvoiddispatchQueuedEvents(){

while(true){

EventWithSubscribereventWithSubscriber=eventsToDispatch.poll();

if(eventWithSubscriber==null){

break;

}

dispatch(eventWithSubscriber.event,eventWithSubscriber.subscriber);

}

}

简单来说,ConcurrentLinkedQueue是无锁的,没有synchronized,也没有Lock.lock,依靠CAS保证并发,同时,也不提供阻塞方法
put()
take()
,速度上面肯定无锁的会更快一些,吞吐量更高一些(都是纳秒的差距)。再加上这里只有一个
Publisher
,多个
Consumer
Cousumer
的消费速度又几乎是0,所以我个人觉得用啥都没啥区别。。。


总结

Guava真的是神器,希望读者看完本文后,能够对Guava产生兴趣。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: