您的位置:首页 > 编程语言

EventBus框架原理解析(结合源码)(下)

2015-08-12 14:14 676 查看
上一篇文章EventBus框架原理解析(结合源码)(上),给大家讲述了EventBus中实体类的封装和register()的具体代码。

接下来我们看另外一个重要方法post(),这个方法显然是要根据传入的参数类型,从subscriptionsByEventType取出对应的subscription

/** Posts the given event to the event bus. */
public void post(Object event) {//event就是参数
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);//将参数加入队列

if (!postingState.isPosting) {
postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}

整个post方法的执行主体是postSingleEvent(),我们看到,这里传入了event
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();//参数类型
boolean subscriptionFound = false;
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);//查找父类
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {//遍历
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
Log.d(TAG, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}首先调用了lookupAllEventTypes()方法
/**
* Looks up all Class objects including super classes and interfaces. Should also work for interfaces.<br>
* 根据参数类型,查找其父类类型和接口类型
*/
private List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
synchronized (eventTypesCache) {
List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
if (eventTypes == null) {
eventTypes = new ArrayList<Class<?>>();
Class<?> clazz = eventClass;
while (clazz != null) {
eventTypes.add(clazz);
addInterfaces(eventTypes, clazz.getInterfaces());
clazz = clazz.getSuperclass();
}
eventTypesCache.put(eventClass, eventTypes);
}
return eventTypes;
}
}

这个方法可以根据参数类型,找到其父类类型和接口类型
为什么要找到父类类型呢?这也是为了继承需要,也就是说如果一个类本身没有订阅方法,而它的父类有,那么它也应该有这个订阅方法了,我们要把它找出来。

可以看到,这里做了一个缓存机制,就是eventTypesCache

/**
* key为参数类型,value为参数类型及其父类型,接口类型列表
*/
private static final Map<Class<?>, List<Class<?>>> eventTypesCache = new HashMap<Class<?>, List<Class<?>>>();

找出来以后,遍历所有类型,调用了postSingleEventForEventType()方法

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);//找到该类型下所有订阅
}
if (subscriptions != null && !subscriptions.isEmpty()) {/遍历订阅
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}如上,使用subscriptionsByEventType找到参数类型(及其父类)的所有subscription,调用postSubscription()方法,如下:
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case PostThread:
invokeSubscriber(subscription, event);
break;
case MainThread:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case BackgroundThread:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case Async:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}在这个方法里面,我们可以看到,我们根据threadMode,将subscription和event交给不同Poster去处理
这时我们知道,有了subscription和event,我们就有了触发反射的所有条件,不同的Poster只是为了在不同线程中调用反射而已。

这里我只举其中一个poster来做说明,看HandlerPoster,这个是为了在主线程中调用反射final class HandlerPoster extends Handler {

private final PendingPostQueue queue;
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
private boolean handlerActive;

HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}

void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}

@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
}
看到enqueue()方法,其实就是将subscription和event包装成PendingPostfinal class PendingPost {
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();

Object event;
Subscription subscription;
PendingPost next;

private PendingPost(Object event, Subscription subscription) {
this.event = event;
this.subscription = subscription;
}

static PendingPost obtainPendingPost(Subscription subscription, Object event) {
synchronized (pendingPostPool) {
int size = pendingPostPool.size();
if (size > 0) {
PendingPost pendingPost = pendingPostPool.remove(size - 1);
pendingPost.event = event;
pendingPost.subscription = subscription;
pendingPost.next = null;
return pendingPost;
}
}
return new PendingPost(event, subscription);
}

static void releasePendingPost(PendingPost pendingPost) {
pendingPost.event = null;
pendingPost.subscription = null;
pendingPost.next = null;
synchronized (pendingPostPool) {
// Don't let the pool grow indefinitely
if (pendingPostPool.size() < 10000) {
pendingPostPool.add(pendingPost);
}
}
}

}其中pendingPostQueue是起到缓存池的作用。
重要的是handlerMessage(),这个方法从队列中取出一个pendingPost,然后调用EventBus的invokeSubscriber()方法来执行反射

void invokeSubscriber(PendingPost pendingPost) {
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
PendingPost.releasePendingPost(pendingPost);//释放缓存
if (subscription.active) {
invokeSubscriber(subscription, event);
}
}

void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);//触发反射!
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}这个方法很简单,就是触发了反射,在注释中我们可以看到触发反射的位置

有人会问为什么需要队列,因为同一个post()方法,可能有很多订阅者订阅了这个信息,而这些订阅是有优先级的,handler对于消息的执行,也是在一个队列里面,逐个取出的,所以不可能一次性执行所有的操作。

文章的最后,我说明一下sticky这个属性的作用,EventBus里面,还有一个stickyPost()方法

public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
// Should be posted after it is putted, in case the subscriber wants to remove immediately
post(event);
}可以看到,里面还是调用了post()方法,但是在这之前,将参数event的类型,和其本身都放入stickyEvents这个map里面保存了起来
/**
* 可以看做缓存,key参数类型,value是参数
*/
private final Map<Class<?>, Object> stickyEvents;其实这就是一个缓存的作用,为什么要保存呢?

还记得EventBus有一个subscribe()方法里面,有关于sticky的判断吗?

if (sticky) {
if (eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}如果是stickyregister(),那么就会进入这个判断里面
里面stickyEvents会根据参数类型,获得了之前postSticky()传递的参数,然后调用checkPostStickyEventToSubscription()方法

private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
if (stickyEvent != null) {
// If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state)
// --> Strange corner case, which we don't take care of here.
postToSubscription(newSubscription, stickyEvent, Looper.getMainLooper() == Looper.myLooper());
}
}

可以看到,调用了postToSubscription(),也就是说,马上post了。
在register()以后,如果是sticky,马上就转到post(),而这次的参数,是上次postSticky()保留下来的。

通过这种方式,我们可以获得最近一次post()传递的消息。

sticky的作用就是这样。

OK,整个框架解析得差不多了,有问题可以留言。

转载请注明出处。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息