您的位置:首页 > 其它

EventBus源码解析

2016-12-22 14:22 183 查看


事件注册

根据前一讲EventBus使用详解我们已经知道EventBus使用首先是需要注册的,注册事件的代码如下:

1

EventBus.getDefault().register(this);

EventBus对外提供了一个register方法来进行事件注册,该方法接收一个Object类型的参数,下面看下register方法的源码:
12
3
4
5
6
7
8
9
10

public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
// 判断该类是否是匿名内部类
boolean forceReflection = subscriberClass.isAnonymousClass();
List<SubscriberMethod> subscriberMethods =
subscriberMethodFinder.findSubscriberMethods(subscriberClass, forceReflection);
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}

该方法首先获取获取传进来参数的Class对象,然后判断该类是否是匿名内部类。然后根据这两个参数通过subscriberMethodFinder.findSubscriberMethods方法获取所有的事件处理方法。
12
3
4
5
6
7
8
9
10
1112
13
14
15
16
17
18
19
20
2122
23
24
25
26
27
28
29
30
3132
33

List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass, boolean forceReflection) {
String key = subscriberClass.getName();
List<SubscriberMethod> subscriberMethods;
synchronized (METHOD_CACHE) {
subscriberMethods = METHOD_CACHE.get(key);
}
if (subscriberMethods != null) {
//缓存命中,直接返回
return subscriberMethods;
}
if (INDEX != null && !forceReflection) {
// 如果INDEX不为空,并且subscriberClass为非匿名内部类,
// 则通过findSubscriberMethodsWithIndex方法查找事件处理函数
subscriberMethods = findSubscriberMethodsWithIndex(subscriberClass);
if (subscriberMethods.isEmpty()) {
//如果结果为空,则使用findSubscriberMethodsWithReflection方法再查找一次
subscriberMethods = findSubscriberMethodsWithReflection(subscriberClass);
}
} else {
//INDEX为空或者subscriberClass未匿名内部类,使用findSubscriberMethodsWithReflection方法查找
subscriberMethods = findSubscriberMethodsWithReflection(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
//存入缓存并返回
synchronized (METHOD_CACHE) {
METHOD_CACHE.put(key, subscriberMethods);
}
return subscriberMethods;
}
}

通过名字我们就知道这个方法是获取subscriberClass类中所有的事件处理方法(即使用了@Subscribe的方法)。该方法首先会从缓存METHOD_CACHE中去获取事件处理方法,如果缓存中不存在,则需要通过findSubscriberMethodsWithIndex或者findSubscriberMethodsWithReflection方法获取所有事件处理方法,获取到之后先存入缓存再返回。

这个方法里面有个INDEX对象,我们看看它是个什么鬼:

12
3
4
5
6
7
8
9
10
1112
13
14
15
16

/** Optional generated index without entries from subscribers super classes */
private static final SubscriberIndex INDEX;

static {
SubscriberIndex newIndex = null;
try {
Class<?> clazz = Class.forName("de.greenrobot.event.GeneratedSubscriberIndex");
newIndex = (SubscriberIndex) clazz.newInstance();
} catch (ClassNotFoundException e) {
Log.d(EventBus.TAG, "No subscriber index available, reverting to dynamic look-up");
// Fine
} catch (Exception e) {
Log.w(EventBus.TAG, "Could not init subscriber index, reverting to dynamic look-up", e);
}
INDEX = newIndex;
}

由上面代码可以看出EventBus会试图加载一个de.greenrobot.event.GeneratedSubscriberIndex类并创建对象赋值给INDEX,但是EventBus3.0 beta并没有为我们提供该类(可能后续版本会提供)。所以INDEX为null。

我们再返回findSubscriberMethods方法,我们知道INDEX已经为null了,所以必然会调用findSubscriberMethodsWithReflection方法查找所有事件处理函数:
12
3
4
5
6
7
8
9
10
1112
13
14
15
16
17
18
19
20
2122
23
24
25
26
27
28
29
30
3132
33
34
35
36
37
38
39
40
4142
43
44
45
46
47
48
49
50
5152
53
54
55
56
57
58
59
60
6162
63

private List<SubscriberMethod> findSubscriberMethodsWithReflection(Class<?> subscriberClass) {
List<SubscriberMethod> subscriberMethods = new ArrayList<SubscriberMethod>();
Class<?> clazz = subscriberClass;
HashSet<String> eventTypesFound = new HashSet<String>();
StringBuilder methodKeyBuilder = new StringBuilder();
while (clazz != null) {
String name = clazz.getName();
// 如果查找的类是java、javax或者android包下面的类,则过滤掉
if (name.startsWith("java.") || name.startsWith("javax.") || name.startsWith("android.")) {
// Skip system classes, this just degrades performance
break;
}

// Starting with EventBus 2.2 we enforced methods to be public (might change with annotations again)
// 通过反射查找所有该类中所有方法
Method[] methods = clazz.getDeclaredMethods();
for (Method method : methods) {
int modifiers = method.getModifiers();
// 事件处理方法必须为public,这里过滤掉所有非public方法
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
// 事件处理方法必须只有一个参数
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
String methodName = method.getName();
Class<?> eventType = parameterTypes[0];
methodKeyBuilder.setLength(0);
methodKeyBuilder.append(methodName);
methodKeyBuilder.append('>').append(eventType.getName());

String methodKey = methodKeyBuilder.toString();
if (eventTypesFound.add(methodKey)) {
// Only add if not already found in a sub class
// 只有在子类中没有找到,才会添加到subscriberMethods
ThreadMode threadMode = subscribeAnnotation.threadMode();
subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification) {
// 如果某个方法加了@Subscribe注解,并且不是1个参数,则抛出EventBusException异常
if (method.isAnnotationPresent(Subscribe.class)) {
String methodName = name + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
}
} else if (strictMethodVerification) {
// 如果某个方法加了@Subscribe注解,并且不是public修饰,则抛出EventBusException异常
if (method.isAnnotationPresent(Subscribe.class)) {
String methodName = name + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}

}
}
// 会继续查找父类的方法
clazz = clazz.getSuperclass();
}
return subscriberMethods;
}

该方法主要作用就是找出subscriberClass类以及subscriberClass的父类中所有的事件处理方法(添加了@Subscribe注解,访问修饰符为public并且只有一个参数)。值得注意的是:如果子类与父类中同时存在了相同事件处理函数,则父类中的不会被添加到subscriberMethods。

好了,查找事件处理函数的过程已经完了,我们继续回到register方法中:

12
3

for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}

找到事件处理函数后,会遍历找到的所有事件处理函数并调用subscribe方法将所有事件处理函数注册到EventBus中。

12
3
4
5
6
7
8
9
10
1112
13
14
15
16
17
18
19
20
2122
23
24
25
26
27
28
29
30
3132
33
34
35
36
37
38
39
40
4142
43
44
45
46
47
48
49
50
5152
53
54
55
56
57
58
59
60
6162
63

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
// 获取订阅了某种类型数据的 Subscription 。 使用了 CopyOnWriteArrayList ,这个是线程安全的,
// CopyOnWriteArrayList 会在更新的时候,重新生成一份 copy,其他线程使用的是
// copy,不存在什么线程安全性的问题。
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<Subscription>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
//如果已经被注册过了,则抛出EventBusException异常
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}

// Starting with EventBus 2.2 we enforced methods to be public (might change with annotations again)
// subscriberMethod.method.setAccessible(true);

// Got to synchronize to avoid shifted positions when adding/removing concurrently
// 根据优先级将newSubscription查到合适位置
synchronized (subscriptions) {
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
}

//将处理事件类型添加到typesBySubscriber
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<Class<?>>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);

// 如果该事件处理方法为粘性事件,即设置了“sticky = true”,则需要调用checkPostStickyEventToSubscription
// 判断是否有粘性事件需要处理,如果需要处理则触发一次事件处理函数
if (subscriberMethod.sticky) {
if (eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}

如果事件处理函数设置了“sticky = true”,则会调用checkPostStickyEventToSubscription处理粘性事件。

12
3
4
5
6
7

private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
if (stickyEvent != null) {
// If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state)
// --> Strange corner case, which we don't take care of here.
postToSubscription(newSubscription, stickyEvent, Looper.getMainLooper() == Looper.myLooper());
}
}

如果存在粘性事件,则立即调用postToSubscription触发该事件的事件处理函数。postToSubscription函数后面讲post时会讲到。

至此,整个register过程就介绍完了。

总结一下,整个过程分为3步:

查找注册的类中所有的事件处理函数(添加了@Subscribe注解且访问修饰符为public的方法)

将所有事件处理函数注册到EventBus

如果有事件处理函数设置了“sticky = true”,则立即处理该事件


post事件

register过程讲完后,我们知道了EventBus如何找到我们定义好的事件处理函数。有了这些事件处理函数,当post相应事件的时候,EventBus就会触发订阅该事件的处理函数。具体post过程是怎样的呢?我们看看代码:

12
3
4
5
6
7
8
9
10
1112
13
14
15
16
17
18
19
20
2122
23
24

public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);

if (!postingState.isPosting) {
// 标识post的线程是否是主线程
postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
// 循环处理eventQueue中的每一个event对象
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
// 处理完之后重置postingState的一些标识信息
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}

currentPostingThreadState是一个ThreadLocal类型,里面存储了PostingThreadState;

12
3
4
5
6
7
8
9
10
1112
13
14
15
16

private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};

/** For ThreadLocal, much faster to set (and get multiple values). */
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<Object>();
boolean isPosting;
boolean isMainThread;
Subscription subscription;
Object event;
boolean canceled;
}

PostingThreadState包含了一个事件队列eventQueue和一些标志信息。eventQueue存放所有待post的事件对象。

我们再回到post方法,首先会将event对象添加到事件队列eventQueue中。然后判断是否有事件正在post,如果没有则会遍历eventQueue中每一个event对象,并且调用postSingleEvent方法post该事件。
12
3
4
5
6
7
8
9
10
1112
13
14
15
16
17
18
19
20
2122
23
24
25

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
// 如果允许事件继承,则会调用lookupAllEventTypes查找所有的父类和接口类
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
Log.d(TAG, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
// 如果post的事件没有被注册,则post一个NoSubscriberEvent事件
post(new NoSubscriberEvent(this, event));
}
}
}

如果允许事件继承,则会调用lookupAllEventTypes查找所有的父类和接口类。

12
3
4
5
6
7
8
9
10
1112
13
14
15
16

private List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
synchronized (eventTypesCache) {
List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
if (eventTypes == null) {
eventTypes = new ArrayList<Class<?>>();
Class<?> clazz = eventClass;
while (clazz != null) {
eventTypes.add(clazz);
addInterfaces(eventTypes, clazz.getInterfaces());
clazz = clazz.getSuperclass();
}
eventTypesCache.put(eventClass, eventTypes);
}
return eventTypes;
}
}

这个方法很简单,就是查找eventClass类的所有父类和接口,并将其保存到eventTypesCache中,方便下次使用。

我们再回到postSingleEvent方法。不管允不允许事件继承,都会执行postSingleEventForEventType方法post事件。

12
3
4
5
6
7
8
9
10
1112
13
14
15
16
17
18
19
20
2122
23
24
25
26

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}

在postSingleEventForEventType方法中,会已eventClass为key从subscriptionsByEventType对象中获取Subscription列表。在上面讲register的时候我们已经看到EventBus在register的时候会将Subscription列表存储在subscriptionsByEventType中。接下来会遍历subscriptions列表然后调用postToSubscription方法进行下一步处理。

12
3
4
5
6
7
8
9
10
1112
13
14
15
16
17
18
19
20
2122
23
24
25
26
27
28
29
30
3132
33
34
35

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case PostThread:
// 如果该事件处理函数没有指定线程模型或者线程模型为PostThread
// 则调用invokeSubscriber在post的线程中执行事件处理函数
invokeSubscriber(subscription, event);
break;
case MainThread:
// 如果该事件处理函数指定的线程模型为MainThread
// 并且当前post的线程为主线程,则调用invokeSubscriber在当前线程(主线程)中执行事件处理函数
// 如果post的线程不是主线程,将使用mainThreadPoster.enqueue该事件处理函数添加到主线程的消息队列中
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case BackgroundThread:
// 如果该事件处理函数指定的线程模型为BackgroundThread
// 并且当前post的线程为主线程,则调用backgroundPoster.enqueue
// 如果post的线程不是主线程,则调用invokeSubscriber在当前线程(非主线程)中执行事件处理函数
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case Async:
//添加到异步线程队列中
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}

该方法主要是根据register注册的事件处理函数的线程模型在指定的线程中触发事件处理函数。在上一讲EventBus使用详解中已经讲过EventBus的线程模型相关概念了,不明白的可以回去看看。

mainThreadPoster、backgroundPoster和asyncPoster分别是HandlerPoster、BackgroundPoster和AsyncPoster的对象,其中HandlerPoster继承自Handle,BackgroundPoster和AsyncPoster继承自Runnable。

我们主要看看HandlerPoster。

1

mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);

在EventBus的构造函数中,我们看到mainThreadPoster初始化的时候,传入的是Looper.getMainLooper()。所以此Handle是运行在主线程中的。

mainThreadPoster.enqueue方法:

12
3
4
5
6
7
8
9
10
1112

void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}

enqueue方法最终会调用sendMessage方法,所以该Handle的handleMessage方法会被调用。

12
3
4
5
6
7
8
9
10
1112
13
14
15
16
17
18
19
20
2122
23
24
25
26
27
28
29
30
31

@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}

在该方法中,最终还是会调用eventBus.invokeSubscriber调用事件处理函数。

BackgroundPoster和AsyncPoster继承自Runnable,并且会在enqueue方法中调用eventBus.getExecutorService().execute(this);具体run方法大家可以自己去看源码,最终都会调用eventBus.invokeSubscriber方法。我们看看eventBus.invokeSubscriber方法的源码:

12
3
4
5
6
7
8

void invokeSubscriber(PendingPost pendingPost) {
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) {
invokeSubscriber(subscription, event);
}
}

该方法会调用invokeSubscriber方法进一步处理:

12
3
4
5
6
7
8
9
10

void invokeSubscriber(Subscription subscription, Object event) {
try {
// 通过反射调用事件处理函数
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}

该方法最终会通过反射来调用事件处理函数。至此,整个post过程分析完了。

总结一下整个post过程,大致分为3步:

将事件对象添加到事件队列eventQueue中等待处理

遍历eventQueue队列中的事件对象并调用postSingleEvent处理每个事件

找出订阅过该事件的所有事件处理函数,并在相应的线程中执行该事件处理函数


取消事件注册

上面已经分析了EventBus的register和post过程,这两个过程是EventBus的核心。不需要订阅事件时需要取消事件注册:
12
3
4
5
6
7
8
9
10
1112

/** Unregisters the given subscriber from all event classes. */
public synchronized void unregister(Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
unubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
} else {
Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}

取消事件注册很简单,只是将register过程注册到EventBus的事件处理函数移除掉。

到这里,EventBus源码我们已经分析完了,如有不对的地方还望指点。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: