Android消息机制源码解析(四)——消息队列MessageQueue
2015-12-15 11:10
597 查看
MessageQueue是Android消息机制系列文章的最后一个小节,MessageQueue的主要作用是按一定规则维护一个消息队列,并能够添加消息、取出消息、移除消息等。
一、来看下MessageQueue是怎样定义的:
public final class MessageQueue
接下来是一些重要的属性字段:
// True if the message queue can be quit.
private final boolean mQuitAllowed;
private long mPtr; // used by native code
Message mMessages;
private final ArrayList<IdleHandler> mIdleHandlers = new ArrayList<IdleHandler>();
private IdleHandler[] mPendingIdleHandlers;
private boolean mQuitting;
// Indicates whether next() is blocked waiting in pollOnce() with a non-zero timeout.
private boolean mBlocked;
mQuitAllowed注释已经很清楚了,这个值实际上是在Looper构造函数中传过来的;mPtr注释也说明是在native代码里使用的,后面会详细分析。mMessage可以理解为消息队列的头部;mIdelHandlers存储了一些闲置的Handler;mPendingIdelHandlers是一个数组对象;mQuitting用来表示该队列是否退出;mBlocked布尔值表示next()方法是否阻塞了。
二、MessageQueue还有几个native方法:
private native static long nativeInit();
private native static void nativeDestroy(long ptr);
private native static void nativePollOnce(long ptr, int timeoutMillis);
private native static void nativeWake(long ptr);
private native static boolean nativeIsIdling(long ptr);
从方法名称上可以理解该方法的作用,以nativeInit()为例,它被调用是在MessageQueue构造函数中:
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}
是否有点眼熟,MessageQueue的初始化就是在Looper的构造函数中调用的。nativeInit()是在C++层实现的,具体代码如下:
从native代码可以看到MessageQueue名义上为消息队列,但真正的队列是在native层创建的,最后会将nativeMessageQueue转成mPtr,在需要获取nativeMessageQueue的时候再将mPtr转回去。其他几个本地方法知道其作用即可,这里不再解释源码。
三、在分析Handler的时候,知道发送一条消息后,实际上是把消息添加到消息队列中,它的具体实现逻辑是在MessageQueue中,下面就来看一下是怎么做的:
boolean enqueueMessage(Message msg, long when) {
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}
synchronized (this) {
if (mQuitting) {
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w("MessageQueue", e.getMessage(), e);
msg.recycle();
return false;
}
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
代码比较容易理解,首先判断该消息有无Handler以及是否正在使用,之后如果消息队列未退出的话,则进行入队操作。首先会将消息标记为正在使用,然后给when字段赋值,如果原队列为空,或者要插入消息的when字段为0或小于队列头部消息的when,则将消息插入到队列头部,否则按照when的大小插入到消息队列的合适位置。消息入队之后,如果有需要,会唤醒队列,调用的是本地方法nativeWake(mPtr)。
四、再来看Looper是怎样从消息队列取出消息的,即消息的出队逻辑next()方法:
Message next() {
// Return here if the message loop has already quit and been disposed.
// This can happen if the application tries to restart a looper after quit
// which is not supported.
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// Got a message.
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (false) Log.v("MessageQueue", "Returning message: " + msg);
return msg;
}
} else {
// No more messages.
nextPollTimeoutMillis = -1;
}
// Process the quit message now that all pending messages have been handled.
if (mQuitting) {
dispose();
return null;
}
// If first time idle, then get the number of idlers to run.
// Idle handles only run if the queue is empty or if the first message
// in the queue (possibly a barrier) is due to be handled in the future.
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
// Run the idle handlers.
// We only ever reach this code block during the first iteration.
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler
boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf("MessageQueue", "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
// Reset the idle handler count to 0 so we do not run them again.
pendingIdleHandlerCount = 0;
// While calling an idle handler, a new message could have been delivered
// so go back and look again for a pending message without waiting.
nextPollTimeoutMillis = 0;
}
}
代码虽然很长,但我们把握主要逻辑即可,主要是for循环里的逻辑,其他为辅助代码。这个for循环也是一个无限循环,之前讲Looper的loop()时说到过next()可能会阻塞,就是这里了。消息出队的实现在for循环中的同步代码块中,如果取到消息则返回给Looper处理,如果没有消息则阻塞在这里,还有一种情况会造成等待,即消息的执行时间还未到,此时会重新设置下一次轮训的时间,这是根据when字段判断的:
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
}
五、MessageQueue的remove系列方法:
void removeMessages(Handler h, int what, Object object)
void removeMessages(Handler h, Runnable r, Object object)
void removeCallbacksAndMessages(Handler h, Object object)
private void removeAllMessagesLocked()
private void removeAllFutureMessagesLocked()
前三个是根据参数来从消息队列移除消息,removeAllMessagesLocked()为移除所有消息,remvoeAllFutureMessagesLocked()则是仅移除那些还未执行的消息,此刻是否想起了上一节Looper的quit()和quitSafely()呢?
六、上一节说到Looper的退出实际上是调用了MessageQueue的quit方法:
void quit(boolean safe) {
if (!mQuitAllowed) {
throw new IllegalStateException("Main thread not allowed to quit.");
}
synchronized (this) {
if (mQuitting) {
return;
}
mQuitting = true;
if (safe) {
removeAllFutureMessagesLocked();
} else {
removeAllMessagesLocked();
}
// We can assume mPtr != 0 because mQuitting was previously false.
nativeWake(mPtr);
}
}
代码很简单,注意会将mQuitting变量置为true。
MessageQueue主要的内容就是以上这些,分析源码应当宏观把握,了解主要逻辑框架而不必拘泥于代码的细节实现,否则可能会陷入理解代码的汪洋大海,苦不堪言且效果不佳。
到此Android消息机制相关的四节内容都讲完了,相信大家已经从原理上对Android消息机制有了比较深刻的了解,这几节内容有很多交叉和依赖,希望能够融会贯通,灵活运用,写出更好的应用。
一、来看下MessageQueue是怎样定义的:
public final class MessageQueue
接下来是一些重要的属性字段:
// True if the message queue can be quit.
private final boolean mQuitAllowed;
private long mPtr; // used by native code
Message mMessages;
private final ArrayList<IdleHandler> mIdleHandlers = new ArrayList<IdleHandler>();
private IdleHandler[] mPendingIdleHandlers;
private boolean mQuitting;
// Indicates whether next() is blocked waiting in pollOnce() with a non-zero timeout.
private boolean mBlocked;
mQuitAllowed注释已经很清楚了,这个值实际上是在Looper构造函数中传过来的;mPtr注释也说明是在native代码里使用的,后面会详细分析。mMessage可以理解为消息队列的头部;mIdelHandlers存储了一些闲置的Handler;mPendingIdelHandlers是一个数组对象;mQuitting用来表示该队列是否退出;mBlocked布尔值表示next()方法是否阻塞了。
二、MessageQueue还有几个native方法:
private native static long nativeInit();
private native static void nativeDestroy(long ptr);
private native static void nativePollOnce(long ptr, int timeoutMillis);
private native static void nativeWake(long ptr);
private native static boolean nativeIsIdling(long ptr);
从方法名称上可以理解该方法的作用,以nativeInit()为例,它被调用是在MessageQueue构造函数中:
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}
是否有点眼熟,MessageQueue的初始化就是在Looper的构造函数中调用的。nativeInit()是在C++层实现的,具体代码如下:
从native代码可以看到MessageQueue名义上为消息队列,但真正的队列是在native层创建的,最后会将nativeMessageQueue转成mPtr,在需要获取nativeMessageQueue的时候再将mPtr转回去。其他几个本地方法知道其作用即可,这里不再解释源码。
三、在分析Handler的时候,知道发送一条消息后,实际上是把消息添加到消息队列中,它的具体实现逻辑是在MessageQueue中,下面就来看一下是怎么做的:
boolean enqueueMessage(Message msg, long when) {
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}
synchronized (this) {
if (mQuitting) {
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w("MessageQueue", e.getMessage(), e);
msg.recycle();
return false;
}
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
代码比较容易理解,首先判断该消息有无Handler以及是否正在使用,之后如果消息队列未退出的话,则进行入队操作。首先会将消息标记为正在使用,然后给when字段赋值,如果原队列为空,或者要插入消息的when字段为0或小于队列头部消息的when,则将消息插入到队列头部,否则按照when的大小插入到消息队列的合适位置。消息入队之后,如果有需要,会唤醒队列,调用的是本地方法nativeWake(mPtr)。
四、再来看Looper是怎样从消息队列取出消息的,即消息的出队逻辑next()方法:
Message next() {
// Return here if the message loop has already quit and been disposed.
// This can happen if the application tries to restart a looper after quit
// which is not supported.
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// Got a message.
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (false) Log.v("MessageQueue", "Returning message: " + msg);
return msg;
}
} else {
// No more messages.
nextPollTimeoutMillis = -1;
}
// Process the quit message now that all pending messages have been handled.
if (mQuitting) {
dispose();
return null;
}
// If first time idle, then get the number of idlers to run.
// Idle handles only run if the queue is empty or if the first message
// in the queue (possibly a barrier) is due to be handled in the future.
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
// Run the idle handlers.
// We only ever reach this code block during the first iteration.
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler
boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf("MessageQueue", "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
// Reset the idle handler count to 0 so we do not run them again.
pendingIdleHandlerCount = 0;
// While calling an idle handler, a new message could have been delivered
// so go back and look again for a pending message without waiting.
nextPollTimeoutMillis = 0;
}
}
代码虽然很长,但我们把握主要逻辑即可,主要是for循环里的逻辑,其他为辅助代码。这个for循环也是一个无限循环,之前讲Looper的loop()时说到过next()可能会阻塞,就是这里了。消息出队的实现在for循环中的同步代码块中,如果取到消息则返回给Looper处理,如果没有消息则阻塞在这里,还有一种情况会造成等待,即消息的执行时间还未到,此时会重新设置下一次轮训的时间,这是根据when字段判断的:
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
}
五、MessageQueue的remove系列方法:
void removeMessages(Handler h, int what, Object object)
void removeMessages(Handler h, Runnable r, Object object)
void removeCallbacksAndMessages(Handler h, Object object)
private void removeAllMessagesLocked()
private void removeAllFutureMessagesLocked()
前三个是根据参数来从消息队列移除消息,removeAllMessagesLocked()为移除所有消息,remvoeAllFutureMessagesLocked()则是仅移除那些还未执行的消息,此刻是否想起了上一节Looper的quit()和quitSafely()呢?
六、上一节说到Looper的退出实际上是调用了MessageQueue的quit方法:
void quit(boolean safe) {
if (!mQuitAllowed) {
throw new IllegalStateException("Main thread not allowed to quit.");
}
synchronized (this) {
if (mQuitting) {
return;
}
mQuitting = true;
if (safe) {
removeAllFutureMessagesLocked();
} else {
removeAllMessagesLocked();
}
// We can assume mPtr != 0 because mQuitting was previously false.
nativeWake(mPtr);
}
}
代码很简单,注意会将mQuitting变量置为true。
MessageQueue主要的内容就是以上这些,分析源码应当宏观把握,了解主要逻辑框架而不必拘泥于代码的细节实现,否则可能会陷入理解代码的汪洋大海,苦不堪言且效果不佳。
到此Android消息机制相关的四节内容都讲完了,相信大家已经从原理上对Android消息机制有了比较深刻的了解,这几节内容有很多交叉和依赖,希望能够融会贯通,灵活运用,写出更好的应用。
相关文章推荐
- js继承 Base类的源码解析
- android的消息处理机制(图文+源码分析)―Looper/Handler/Message
- AsyncTask陷阱之:Handler,Looper与MessageQueue的详解
- Handler Looper MessageQueue总结
- android:fitSystemWindows详解
- android:fitSystemWindows详解
- 如此抄袭Apps之OscHub(三)
- Handler异步消息处理机制浅析
- Android基于源码分析Handler的消息机制
- RabbitMQ(一)概念
- nsq源码阅读笔记之nsqd(三)——diskQueue
- Android中的Handler消息处理机制
- android中Message,MessageQueue,Looper,Handler详解+实例
- android:fitSystemWindows详解
- 理解Handler、Looper、MessageQueue、Thread关系
- Android~Handler机制DEMO示例
- Android~Handler机制自我理解
- Handler机制
- Spark修炼之道(高级篇)——Spark源码阅读:第四节 Stage划分
- Spark修炼之道(高级篇)——Spark源码阅读:第六节 Task提交