android应用消息处理机制
2015-10-12 17:50
477 查看
2月份写的文章.发上来.
Android的消息处理机制由三部分组成:消息循环、消息发送、消息处理。
消息循环通过Looper类来实现。
这里分两部分
一.Looper.prepareMainLooper
二.Looper.loop
……
//sThreadLocal.get() will return null unless you've called prepare().
static finalThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
private staticLooper sMainLooper; // guarded byLooper.class
finalMessageQueue mQueue;
……
/** Initialize the current thread as a looper.
* This gives you a chance to create handlers that then reference
* this looper, before actually starting the loop. Be sure to call
* {@link #loop()} after calling this method, and end it by calling
* {@link #quit()}.
*/
public static void prepare() {
prepare(true);
}
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created perthread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
/**
* Initialize the current thread as a looper, marking it as an
* application's main looper. The main looper for your application
* is created by the Android environment, so you should never need
* to call this function yourself. See also: {@link #prepare()}
*/
public static void prepareMainLooper() {
prepare(false);
synchronized (Looper.class) {
if (sMainLooper != null) {
throw newIllegalStateException("The main Looper has already been prepared.");
}
sMainLooper = myLooper();
}
}
……
/**
* Return the Looper object associated with the current thread. Returns
* null if the calling thread is not associated with a Looper.
*/
public static Looper myLooper() {
return sThreadLocal.get();
}
……
/**
* Return the {@link MessageQueue} object associated with the current
* thread. This must be calledfrom a thread running a Looper, or a
* NullPointerException will be thrown.
*/
public static MessageQueue myQueue() {
return myLooper().mQueue;
}
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}
……
}
prepareMainLooper主要做的事情就是:创建了一个Looper对象,保存在sThreadLocal中,并将这个Looper对象赋值给sMainLooper保存起来。
new Looper:在创建的时候,同时实例化一个MessageQueue。并赋值给mQueue保存起来。后续的消息就通过它来管理。
接下来,我们来看MessageQueue.java
------ frameworks/base/core/java/android/os/MessageQueue.java
public final class MessageQueue {
……
private native static int nativeInit();
private native static void nativeDestroy(int ptr);
private native static void nativePollOnce(int ptr, int timeoutMillis);
private native static void nativeWake(int ptr);
private nativestatic boolean nativeIsIdling(int ptr);
……
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}
……
}
可以看出。MessageQueue的整体实现都交给了native.
继续跟,grep一把就可以找到MessageQueue的native实现。
frameworks/base/core/java/android/os/MessageQueue.java
从nativeInit开始
static jintandroid_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate nativequeue");
return 0;
}
nativeMessageQueue->incStrong(env);
return reinterpret_cast<jint>(nativeMessageQueue);
}
可以看出,android_os_MessageQueue_nativeInit在native中创建了一个nativeMessageQueue.
在这个文件中可以找到如下
NativeMessageQueue::NativeMessageQueue(): mInCallback(false), mExceptionObj(NULL) {
mLooper = Looper::getForThread();
if (mLooper == NULL) {
mLooper = new Looper(false);
Looper::setForThread(mLooper);
}
}
nativeMessageQueue在native层又实例化了一个Looper。
Looper.cpp实现在system/core/libutils/Looper.cpp
Looper::Looper(boolallowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks),mSendingMessage(false),
mResponseIndex(0),mNextMessageUptime(LLONG_MAX) {
int wakeFds[2];
int result = pipe(wakeFds);
LOG_ALWAYS_FATAL_IF(result != 0,"Could not create wake pipe. errno=%d", errno);
mWakeReadPipeFd =wakeFds[0];
mWakeWritePipeFd =wakeFds[1];
result = fcntl(mWakeReadPipeFd, F_SETFL,O_NONBLOCK);
LOG_ALWAYS_FATAL_IF(result != 0,"Could not make wake read pipe non-blocking. errno=%d", errno);
result = fcntl(mWakeWritePipeFd, F_SETFL,O_NONBLOCK);
LOG_ALWAYS_FATAL_IF(result != 0,"Could not make wake write pipe non-blocking. errno=%d", errno);
mIdling = false;
// Allocate the epoll instance and registerthe wake pipe.
mEpollFd =epoll_create(EPOLL_SIZE_HINT);//#------
EPOLL_SIZE_HINT是在这个mEpollFd上能监控的最大文件描述符数,这里创建了epoll的filedescription
LOG_ALWAYS_FATAL_IF(mEpollFd < 0,"Could not create epoll instance. errno=%d", errno);
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event));// zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeReadPipeFd;//#------将mWakeReadPipeFd赋值进去,就是说有内容更新,就立马wake线程去读取。
result = epoll_ctl(mEpollFd,EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
LOG_ALWAYS_FATAL_IF(result != 0,"Could not add wake read pipe to epoll instance. errno=%d", errno);
}
注意看代码中标红部分。
消息的循环使用的是Pipe机制,是linux系统中进程间通信的一种机制。
好像linux进程间的通信总是通过一个文件进行。mWakeReadPipeFd,mWakeWritePipeFd 是pipe的文件描述(FileDescription)。
这是一个多线程机制。
线程A使用FileDescription来读pipe的内容
线程B使用FileDescription来写pipe的内容
Pipe中没内容就线程A就进入wait状态,线程B在进行写操作的时候,就会wake线程A。
Epoll这部分不详。直接应用罗大牛的说明
这个等待和唤醒的操作是如何进行的呢,这就要借助Linux系统中的epoll机制了。 Linux系统中的epoll机制为处理大批量句柄而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著减少程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。但是这里我们其实只需要监控的IO接口只有mWakeReadPipeFd一个,即前面我们所创建的管道的读端,为什么还需要用到epoll呢?有点用牛刀来杀鸡的味道。其实不然,这个Looper类是非常强大的,它除了监控内部所创建的管道接口之外,还提供了addFd接口供外界面调用,外界可以通过这个接口把自己想要监控的IO事件一并加入到这个Looper对象中去,当所有这些被监控的IO接口上面有事件发生时,就会唤醒相应的线程来处理,不过这里我们只关心刚才所创建的管道的IO事件的发生。
至此,消息循环的创建结束,即Looper.prepare()的创建结束
/**
* Run the message queue in this thread. Besure to call
* {@link #quit()} to end the loop.
*/
public static void loop() {
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("NoLooper; Looper.prepare() wasn't called on this thread.");
}
final MessageQueue queue = me.mQueue;
// Make sure the identity of thisthread is that of the local process,
// and keep track of what that identitytoken actually is.
Binder.clearCallingIdentity();
final long ident =Binder.clearCallingIdentity();
……
for (;;) {
Message msg = queue.next(); //might block
if (msg == null) {
// No message indicates thatthe message queue is quitting.
return;
}
……
msg.target.dispatchMessage(msg);
……
// Make sure that during the courseof dispatching the
// identity of the thread wasn'tcorrupted.
final long newIdent =Binder.clearCallingIdentity();
……
msg.recycle();
}
}
这个method,不断的读取信息,直接没有信息为止,读到信息就dispatchMessage传递出去,然后回收掉message。没信息之后,就quit。
看下MessageQueue中的next是怎么实现的?
Message next() {
intpendingIdleHandlerCount = -1; // -1 only during first iteration
intnextPollTimeoutMillis = 0;
for (;;) {
if(nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
// We can assumemPtr != 0 because the loop is obviously still running.
// The looper willnot call this method after the loop quits.
nativePollOnce(mPtr, nextPollTimeoutMillis);
synchronized(this) {
// Try toretrieve the next message. Return iffound.
final long now= SystemClock.uptimeMillis();
MessageprevMsg = null;
Message msg =mMessages;
if (msg !=null && msg.target == null) {
// Stalledby a barrier. Find the next asynchronousmessage in the queue.
do {
prevMsg = msg;
msg =msg.next;
} while(msg != null && !msg.isAsynchronous());
}
if (msg !=null) {
if (now< msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now,Integer.MAX_VALUE);
} else {
// Gota message.
mBlocked = false;
if(prevMsg != null) {
prevMsg.next = msg.next;
} else{
mMessages = msg.next;
}
msg.next = null;
if(false) Log.v("MessageQueue", "Returning message: " + msg);
msg.markInUse();
returnmsg;
}
} else {
// No moremessages.
nextPollTimeoutMillis = -1;
}
// Process thequit message now that all pending messages have been handled.
if (mQuitting){
dispose();
returnnull;
}
// If firsttime idle, then get the number of idlers to run.
// Idlehandles only run if the queue is empty or if the first message
// in thequeue (possibly a barrier) is due to be handled in the future.
if(pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if(pendingIdleHandlerCount <= 0) {
// No idlehandlers to run. Loop and wait somemore.
mBlocked =true;
continue;
}
if(mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount,4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
// Run the idlehandlers.
// We only everreach this code block during the first iteration.
for (int i = 0; i< pendingIdleHandlerCount; i++) {
finalIdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; //release the reference to the handler
boolean keep =false;
try {
keep =idler.queueIdle();
} catch(Throwable t) {
Log.wtf("MessageQueue", "IdleHandler threwexception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
// Reset the idle handler count to 0 so wedo not run them again.
pendingIdleHandlerCount = 0;
// While callingan idle handler, a new message could have been delivered
// so go back andlook again for a pending message without waiting.
nextPollTimeoutMillis = 0;
}
}
当loop第一次被调用时,nativePollOnce会立即去获取下一个message;
之后,就立即去获取下一条message。
当消息队列中的消息还没到时间的时候,nextPollTimeoutMillis > 0,那么,nativePollOnce会阻塞着,等待时间到达。
当消息队列中没有消息时,nextPollTimeoutMillis= -1,那么消息队列就会被无限阻塞,直到有新的消息进来去wake它。这里的话,最终会调用到epoll_wait,而-1的参数则会使得它无限阻塞。
值得注意的是这里有个IdleHandler,这是什么意思呢?
// If first time idle, then get the number of idlers to run.
// Idle handles only run if the queue is empty or if the firstmessage
// in the queue (possibly a barrier) is due to be handled in thefuture.
if (pendingIdleHandlerCount < 0
&& (mMessages== null || now < mMessages.when)) {
pendingIdleHandlerCount =mIdleHandlers.size();
}
分析可以看出,线程只有在空闲的时候,才会去处理IdleHandler的方法。
// Run the idle handlers.
// We only ever reach this codeblock during the first iteration.
for (int i = 0; i <pendingIdleHandlerCount; i++) {
final IdleHandler idler =mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null;// release the reference to the handler
boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf("MessageQueue", "IdleHandler threwexception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
而且,根本queueIdle的返回值,判断是否保留这个handle。
至此,MessageQueue.java中的loop分析完毕。
static voidandroid_os_MessageQueue_nativePollOnce(JNIEnv* env, jclass clazz,
jint ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue =reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, timeoutMillis);
}
这里将之前保存的mPtr转换成了NativeMessageQueue。
在调用NativeMessageQueue的pollOnce方法。
继续看
void NativeMessageQueue::pollOnce(JNIEnv*env, int timeoutMillis) {
mInCallback = true;
mLooper->pollOnce(timeoutMillis);
mInCallback = false;
if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}
可以看到又调用了Looper.cpp的pollOnce方法。
int Looper::pollOnce(int timeoutMillis,int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data =response.request.data;
………………
if (outFd != NULL) *outFd= fd;
if (outEvents != NULL)*outEvents = events;
if (outData != NULL) *outData =data;
return ident;
}
}
if (result != 0) {
………….
if (outFd != NULL) *outFd = 0;
if (outEvents != NULL) *outEvents = 0;
if (outData != NULL) *outData = NULL;
return result;
}
result = pollInner(timeoutMillis);
}
}
这里比较重要的就是pollInner。而mResponses也会在pollInner中被修改。都被归结到了pollInner中了。
int Looper::pollInner(int timeoutMillis) {
…………..
// Adjust the timeout based on when the next message is due.
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now,mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis <0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
…………………..
}--------计算延迟的时间。。。。这里不重要。。。忽略
// Poll.
int result =ALOOPER_POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
// We are about to idle.
mIdling = true;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd,eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// No longer idling.
mIdling = false;
// Acquire lock.
mLock.lock();
// Check for poll error.
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error, errno=%d",errno);
result = ALOOPER_POLL_ERROR;
goto Done;
}
// Check for poll timeout.
if (eventCount == 0) {
………..
result = ALOOPER_POLL_TIMEOUT;
goto Done;
}
// Handle all events.
………………
for (int i = 0; i <eventCount; i++) {------------遍历events
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd ==mWakeReadPipeFd) {
if (epollEvents & EPOLLIN) {
awoken();
} else {
ALOGW("Ignoring unexpectedepoll events 0x%x on wake read pipe.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN)events |= ALOOPER_EVENT_INPUT;
if (epollEvents & EPOLLOUT)events |= ALOOPER_EVENT_OUTPUT;
if (epollEvents & EPOLLERR)events |= ALOOPER_EVENT_ERROR;
if (epollEvents & EPOLLHUP)events |= ALOOPER_EVENT_HANGUP;
pushResponse(events,mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpectedepoll events 0x%x on fd %d that is "
"no longerregistered.", epollEvents, fd);
}
}
}
Done: ;
--------之后就都跟epoll无关了
// Invoke pending message callbacks.
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope =mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
// Remove the envelope from the list.
// We keep a strong reference to the handler until the call tohandleMessage
// finishes. Then we drop it sothat the handler can be deleted *before*
// we reacquire our lock.
{ // obtain handler
sp<MessageHandler>handler = messageEnvelope.handler;
Message message =messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
……………..
handler->handleMessage(message);
} // release handler
mLock.lock();
mSendingMessage = false;
result = ALOOPER_POLL_CALLBACK;
} else {
// The last message left at the head of the queue determines the nextwakeup time.
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
// Release lock.
mLock.unlock();
// Invoke all response callbacks.
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == ALOOPER_POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
……………
int callbackResult = response.request.callback->handleEvent(fd,events, data);
if (callbackResult == 0) {
removeFd(fd);
}
// Clear the callback reference inthe response structure promptly because we
// will not clear the response vector itself until the next poll.
response.request.callback.clear();
result = ALOOPER_POLL_CALLBACK;
}
}
return result;
}
注意红色字体:
epoll_wait(mEpollFd,eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
这里就是epoll_wait阻塞的method。
eventCount 小于0 就说明出错了。
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with anunexpected error, errno=%d", errno);
result = ALOOPER_POLL_ERROR;
goto Done;
}
eventCount 等于0,就说明超时了。
// Check for poll timeout.
if (eventCount == 0) {
………..
result = ALOOPER_POLL_TIMEOUT;
goto Done;
}
剩下的,当eventCount 大于0,就说明我们监控的事件发生了。
for (int i = 0; i < eventCount; i++) {------------遍历events
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeReadPipeFd) {
if (epollEvents &EPOLLIN) {
awoken();
……..
}
…………
}
………..
}
当有消息进来的时候,所监控的mWakeReadPipeFd,发生了EPOLLIN时就会调用awoken。
说明一下。Epoll有EPOLLOUT和EPOLLIN两个触发事件
void Looper::awoken() {
……….
char buffer[16];
ssize_t nRead;
do {
nRead =read(mWakeReadPipeFd, buffer, sizeof(buffer));
} while ((nRead == -1&& errno == EINTR) || nRead == sizeof(buffer));
}
分析awoken,Message被读了出来。
然后,就触发了接下去的一系列操作。
public final class ActivityThread {
……………
public static void main(String[] args) {
…………………….
Looper.prepareMainLooper();
ActivityThread thread = new ActivityThread();
thread.attach(false);
if (sMainThreadHandler == null) {
sMainThreadHandler = thread.getHandler();
}
……………
Looper.loop();
throw new RuntimeException("Main thread loop unexpectedlyexited");
}
…………
}
可以看到,在Looper.prepareMainLooper();之后,ActivityThread调用了attach去初始化其他信息。然后便初始化sMainThreadHandler。
看一下getHandler方法的实现。
finalH mH = new H();
……..
final Handler getHandler() {
return mH;
}
可以找到,这里的H是定义在ActivityThread中的内部类,继承自Handler。有兴趣的朋友,会发现H这个类定义了很多的action,几乎涵盖了大部分程序运行时会发生的动作。
继续说,
H继承自Handler。
public Handler(Callback callback, boolean async) {
…………..
mLooper = Looper.myLooper();
if (mLooper == null) {
throw new RuntimeException(
"Can't create handlerinside thread that has not called Looper.prepare()");
}
mQueue = mLooper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
public Handler(Looper looper, Callback callback, boolean async) {
mLooper = looper;
mQueue = looper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
查看Handler的构造体,mLooper或用外部传入的参数,或用已初始化好的,mQueue则从mLooper中获得。
总之,一定会有这个mLooper,有了这个Looper就可以访问MessageQueue。
在消息发送的时候,我们有两种方式,sendMessage、post(Runnablerunnable)。
private booleanenqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
msg.target = this;
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg,uptimeMillis);
}
Handler会根据target来找处理的对象,那么,这里就是this,自己发自己处理。
继续看MessageQueue中的enqueueMessage
boolean enqueueMessage(Message msg, longwhen) {
……………..
Looper myLooper = msg.target.mLooper;
…………..
synchronized (this) {
……………………..
msg.when = when;
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when< p.when) {
// New head, wake up the eventqueue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// Inserted within the middleof the queue. Usually we don't have towake
// up the event queue unlessthere is a barrier at the head of the queue
// and the message is theearliest asynchronous message in the queue.
needWake = mBlocked &&p.target == null && msg.isAsynchronous();
Message prev;
for(;;) {
prev = p;
p =p.next;
if(p == null || when < p.when) {
break;
}
if(needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p== prev.next
prev.next = msg;
}
// We can assume mPtr != 0 becausemQuitting is false.
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
这里有两种情况
一、即刻插入到当前,立即执行
1. mMessage=0,说明队列为空,可以立即插入
2.when = 0,说明用户需要立即执行
3.when < p.when,当前消息的执行时间比目前要求发送的message的执行之间晚,自然就被即刻插入到当前去。
二、插入到中间,等些时间再执行
这种情况也好理解,仔细看代码段中标红部分,就是根据执行的时间先后顺序,找一个位置插入。
在消息插入完成后,就调用nativeWake唤醒,这里就直接跳转到native的实现。
static voidandroid_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jint ptr) {
NativeMessageQueue*nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
returnnativeMessageQueue->wake();
}
这里再次将之前保存在java层的mPtr转换成NativeMessageQueue,然后调用wake方法。
void NativeMessageQueue::wake() {
mLooper->wake();
}
转到Looper.cpp中分析。
void Looper::wake() {
……
ssize_t nWrite;
do {
nWrite =write(mWakeWritePipeFd, "W", 1);
} while (nWrite == -1&& errno == EINTR);
if (nWrite != 1) {
if (errno != EAGAIN) {
ALOGW("Couldnot write wake signal, errno=%d", errno);
}
}
}
Wake函数比较简单,通过FileDescription mWakeWritePipeFd往pipe中写入了一个W的字符。在写入了这个字符后,就会唤醒应用程序的主线程,再次去读取pipe中的信息。
值得注意的是。写入一个W并没有任何意义。也就是说写入的作用就只剩下唤醒线程。
而之前的分析可以发现,Message的信息,其实是保存在Messages中的。
原来,pipe只承担了唤醒通知的功能。
至此,消息就发送了,调用了wake之后呢,应用主线程中的Looper就会调用nativePollOnce,然后返回到MessageQueue.next中去。消息发送也就成功了。
一个activity对应一个ActivityThread,在ActivityThead.java的main函数中。
public final class ActivityThread {
……………
public static void main(String[] args) {
…………………….
Looper.prepareMainLooper();
ActivityThread thread = new ActivityThread();
thread.attach(false);
if (sMainThreadHandler == null) {
sMainThreadHandler = thread.getHandler();
}
……………
Looper.loop();
throw new RuntimeException("Main threadloop unexpectedly exited");
}
…………
}
在一起都创建好之后,就进入了无限循环等待。
消息的处理也就在这个Looper.looper中。
来看一下这个loop做了什么。
public static void loop() {
final Looper me = myLooper();
………………
final MessageQueue queue = me.mQueue;
…………
for (;;) {
Message msg = queue.next(); // might block
…………..
msg.target.dispatchMessage(msg);
………………
msg.recycle();
}
}
在queue.next接受到消息之后,就会效用msg.target. dispatchMessage。
这里调用的是msg.target。而这个target在之前的分析中,是将this传入。
换言之,msg的处理是谁发谁处理。
而且dispatchMessage在哪里定义呢。在平常的代码编写中,我会会发现,一般情况下,我们都会在handle中重写dispatchMessage函数。
那么看下Handler.java的实现如何
/**
* Subclasses must implement this toreceive messages.
*/
public void handleMessage(Message msg) {
}
public void dispatchMessage(Message msg) {
if (msg.callback != null) {
handleCallback(msg);
} else {
if (mCallback != null) {
if(mCallback.handleMessage(msg)) {
return;
}
}
handleMessage(msg);
}
}
可以发现确实Handler也没有处理,只是传给了handleMessage。
至此,消息处理完毕,谁发谁处理,谁要处理,就自己重写handleMessage。
第一次打这么多的字,学习别人的写作方式,一个个去看源代码揣测。算上其他杂事,花了3天时间。总算完成了。真的比单纯看别人的文章来的印象深刻,理解深刻。代码还是需要结合程序源码来看的。
总结一下,
1. Android的消息处理机制:包含三部分:消息循环、消息发送、消息处理.
2. 消息循环是基于linux的pipe通信机制的,使用了两个FileDescription,感觉有点类似线程安全中多个线程对一个资源的读写
3. 消息循环中pipe机制,有最大监控数量,可以监控的更多,在等待中使用的是Epoll机制中的epoll_wait机制。
4. 在消息循环中有个空闲等待,IdleHandler,在线程不忙的时候,可以一直做处理。使得应用程序在空闲的时候可以做一些事情。根据返回值的不同是保留IdleHandler还是一次性使用。
5. 消息队列有消息进来的时候,只是单纯的利用pipe机制来做唤醒,其具体的消息保存并没有在pipe中。
6. 消息的发送利用的是pipe机制,通过写入字符串,唤醒读写通道达到唤醒消息循环的目的
7. 消息的处理要靠用户重载handleMessage达到
Android的消息处理机制由三部分组成:消息循环、消息发送、消息处理。
消息循环
消息都存在在一个消息队列中,应用程序的主线程会不断读取其中的消息,并分派给相应的Handler进行处理;如果队列中没有消息,应用程序的主线程就进入空闲等待状态。消息循环通过Looper类来实现。
这里分两部分
一.Looper.prepareMainLooper
二.Looper.loop
Looper.prepareMainLooper
public final class Looper {……
//sThreadLocal.get() will return null unless you've called prepare().
static finalThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
private staticLooper sMainLooper; // guarded byLooper.class
finalMessageQueue mQueue;
……
/** Initialize the current thread as a looper.
* This gives you a chance to create handlers that then reference
* this looper, before actually starting the loop. Be sure to call
* {@link #loop()} after calling this method, and end it by calling
* {@link #quit()}.
*/
public static void prepare() {
prepare(true);
}
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created perthread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
/**
* Initialize the current thread as a looper, marking it as an
* application's main looper. The main looper for your application
* is created by the Android environment, so you should never need
* to call this function yourself. See also: {@link #prepare()}
*/
public static void prepareMainLooper() {
prepare(false);
synchronized (Looper.class) {
if (sMainLooper != null) {
throw newIllegalStateException("The main Looper has already been prepared.");
}
sMainLooper = myLooper();
}
}
……
/**
* Return the Looper object associated with the current thread. Returns
* null if the calling thread is not associated with a Looper.
*/
public static Looper myLooper() {
return sThreadLocal.get();
}
……
/**
* Return the {@link MessageQueue} object associated with the current
* thread. This must be calledfrom a thread running a Looper, or a
* NullPointerException will be thrown.
*/
public static MessageQueue myQueue() {
return myLooper().mQueue;
}
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}
……
}
prepareMainLooper主要做的事情就是:创建了一个Looper对象,保存在sThreadLocal中,并将这个Looper对象赋值给sMainLooper保存起来。
new Looper:在创建的时候,同时实例化一个MessageQueue。并赋值给mQueue保存起来。后续的消息就通过它来管理。
接下来,我们来看MessageQueue.java
------ frameworks/base/core/java/android/os/MessageQueue.java
public final class MessageQueue {
……
private native static int nativeInit();
private native static void nativeDestroy(int ptr);
private native static void nativePollOnce(int ptr, int timeoutMillis);
private native static void nativeWake(int ptr);
private nativestatic boolean nativeIsIdling(int ptr);
……
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}
……
}
可以看出。MessageQueue的整体实现都交给了native.
继续跟,grep一把就可以找到MessageQueue的native实现。
frameworks/base/core/java/android/os/MessageQueue.java
从nativeInit开始
static jintandroid_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate nativequeue");
return 0;
}
nativeMessageQueue->incStrong(env);
return reinterpret_cast<jint>(nativeMessageQueue);
}
可以看出,android_os_MessageQueue_nativeInit在native中创建了一个nativeMessageQueue.
在这个文件中可以找到如下
NativeMessageQueue::NativeMessageQueue(): mInCallback(false), mExceptionObj(NULL) {
mLooper = Looper::getForThread();
if (mLooper == NULL) {
mLooper = new Looper(false);
Looper::setForThread(mLooper);
}
}
nativeMessageQueue在native层又实例化了一个Looper。
Looper.cpp实现在system/core/libutils/Looper.cpp
Looper::Looper(boolallowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks),mSendingMessage(false),
mResponseIndex(0),mNextMessageUptime(LLONG_MAX) {
int wakeFds[2];
int result = pipe(wakeFds);
LOG_ALWAYS_FATAL_IF(result != 0,"Could not create wake pipe. errno=%d", errno);
mWakeReadPipeFd =wakeFds[0];
mWakeWritePipeFd =wakeFds[1];
result = fcntl(mWakeReadPipeFd, F_SETFL,O_NONBLOCK);
LOG_ALWAYS_FATAL_IF(result != 0,"Could not make wake read pipe non-blocking. errno=%d", errno);
result = fcntl(mWakeWritePipeFd, F_SETFL,O_NONBLOCK);
LOG_ALWAYS_FATAL_IF(result != 0,"Could not make wake write pipe non-blocking. errno=%d", errno);
mIdling = false;
// Allocate the epoll instance and registerthe wake pipe.
mEpollFd =epoll_create(EPOLL_SIZE_HINT);//#------
EPOLL_SIZE_HINT是在这个mEpollFd上能监控的最大文件描述符数,这里创建了epoll的filedescription
LOG_ALWAYS_FATAL_IF(mEpollFd < 0,"Could not create epoll instance. errno=%d", errno);
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event));// zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeReadPipeFd;//#------将mWakeReadPipeFd赋值进去,就是说有内容更新,就立马wake线程去读取。
result = epoll_ctl(mEpollFd,EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
LOG_ALWAYS_FATAL_IF(result != 0,"Could not add wake read pipe to epoll instance. errno=%d", errno);
}
注意看代码中标红部分。
消息的循环使用的是Pipe机制,是linux系统中进程间通信的一种机制。
好像linux进程间的通信总是通过一个文件进行。mWakeReadPipeFd,mWakeWritePipeFd 是pipe的文件描述(FileDescription)。
这是一个多线程机制。
线程A使用FileDescription来读pipe的内容
线程B使用FileDescription来写pipe的内容
Pipe中没内容就线程A就进入wait状态,线程B在进行写操作的时候,就会wake线程A。
Epoll这部分不详。直接应用罗大牛的说明
这个等待和唤醒的操作是如何进行的呢,这就要借助Linux系统中的epoll机制了。 Linux系统中的epoll机制为处理大批量句柄而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著减少程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。但是这里我们其实只需要监控的IO接口只有mWakeReadPipeFd一个,即前面我们所创建的管道的读端,为什么还需要用到epoll呢?有点用牛刀来杀鸡的味道。其实不然,这个Looper类是非常强大的,它除了监控内部所创建的管道接口之外,还提供了addFd接口供外界面调用,外界可以通过这个接口把自己想要监控的IO事件一并加入到这个Looper对象中去,当所有这些被监控的IO接口上面有事件发生时,就会唤醒相应的线程来处理,不过这里我们只关心刚才所创建的管道的IO事件的发生。
至此,消息循环的创建结束,即Looper.prepare()的创建结束
Looper.loop
回到Looper.java,看loop()/**
* Run the message queue in this thread. Besure to call
* {@link #quit()} to end the loop.
*/
public static void loop() {
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("NoLooper; Looper.prepare() wasn't called on this thread.");
}
final MessageQueue queue = me.mQueue;
// Make sure the identity of thisthread is that of the local process,
// and keep track of what that identitytoken actually is.
Binder.clearCallingIdentity();
final long ident =Binder.clearCallingIdentity();
……
for (;;) {
Message msg = queue.next(); //might block
if (msg == null) {
// No message indicates thatthe message queue is quitting.
return;
}
……
msg.target.dispatchMessage(msg);
……
// Make sure that during the courseof dispatching the
// identity of the thread wasn'tcorrupted.
final long newIdent =Binder.clearCallingIdentity();
……
msg.recycle();
}
}
这个method,不断的读取信息,直接没有信息为止,读到信息就dispatchMessage传递出去,然后回收掉message。没信息之后,就quit。
看下MessageQueue中的next是怎么实现的?
Message next() {
intpendingIdleHandlerCount = -1; // -1 only during first iteration
intnextPollTimeoutMillis = 0;
for (;;) {
if(nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
// We can assumemPtr != 0 because the loop is obviously still running.
// The looper willnot call this method after the loop quits.
nativePollOnce(mPtr, nextPollTimeoutMillis);
synchronized(this) {
// Try toretrieve the next message. Return iffound.
final long now= SystemClock.uptimeMillis();
MessageprevMsg = null;
Message msg =mMessages;
if (msg !=null && msg.target == null) {
// Stalledby a barrier. Find the next asynchronousmessage in the queue.
do {
prevMsg = msg;
msg =msg.next;
} while(msg != null && !msg.isAsynchronous());
}
if (msg !=null) {
if (now< msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now,Integer.MAX_VALUE);
} else {
// Gota message.
mBlocked = false;
if(prevMsg != null) {
prevMsg.next = msg.next;
} else{
mMessages = msg.next;
}
msg.next = null;
if(false) Log.v("MessageQueue", "Returning message: " + msg);
msg.markInUse();
returnmsg;
}
} else {
// No moremessages.
nextPollTimeoutMillis = -1;
}
// Process thequit message now that all pending messages have been handled.
if (mQuitting){
dispose();
returnnull;
}
// If firsttime idle, then get the number of idlers to run.
// Idlehandles only run if the queue is empty or if the first message
// in thequeue (possibly a barrier) is due to be handled in the future.
if(pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if(pendingIdleHandlerCount <= 0) {
// No idlehandlers to run. Loop and wait somemore.
mBlocked =true;
continue;
}
if(mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount,4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
// Run the idlehandlers.
// We only everreach this code block during the first iteration.
for (int i = 0; i< pendingIdleHandlerCount; i++) {
finalIdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; //release the reference to the handler
boolean keep =false;
try {
keep =idler.queueIdle();
} catch(Throwable t) {
Log.wtf("MessageQueue", "IdleHandler threwexception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
// Reset the idle handler count to 0 so wedo not run them again.
pendingIdleHandlerCount = 0;
// While callingan idle handler, a new message could have been delivered
// so go back andlook again for a pending message without waiting.
nextPollTimeoutMillis = 0;
}
}
当loop第一次被调用时,nativePollOnce会立即去获取下一个message;
之后,就立即去获取下一条message。
当消息队列中的消息还没到时间的时候,nextPollTimeoutMillis > 0,那么,nativePollOnce会阻塞着,等待时间到达。
当消息队列中没有消息时,nextPollTimeoutMillis= -1,那么消息队列就会被无限阻塞,直到有新的消息进来去wake它。这里的话,最终会调用到epoll_wait,而-1的参数则会使得它无限阻塞。
值得注意的是这里有个IdleHandler,这是什么意思呢?
// If first time idle, then get the number of idlers to run.
// Idle handles only run if the queue is empty or if the firstmessage
// in the queue (possibly a barrier) is due to be handled in thefuture.
if (pendingIdleHandlerCount < 0
&& (mMessages== null || now < mMessages.when)) {
pendingIdleHandlerCount =mIdleHandlers.size();
}
分析可以看出,线程只有在空闲的时候,才会去处理IdleHandler的方法。
// Run the idle handlers.
// We only ever reach this codeblock during the first iteration.
for (int i = 0; i <pendingIdleHandlerCount; i++) {
final IdleHandler idler =mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null;// release the reference to the handler
boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf("MessageQueue", "IdleHandler threwexception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
而且,根本queueIdle的返回值,判断是否保留这个handle。
至此,MessageQueue.java中的loop分析完毕。
接下来看一下,nativePollOnce中是如何实现的?
先看下native的实现如何?在android_os_MessageQueue.cpp中。static voidandroid_os_MessageQueue_nativePollOnce(JNIEnv* env, jclass clazz,
jint ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue =reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, timeoutMillis);
}
这里将之前保存的mPtr转换成了NativeMessageQueue。
在调用NativeMessageQueue的pollOnce方法。
继续看
void NativeMessageQueue::pollOnce(JNIEnv*env, int timeoutMillis) {
mInCallback = true;
mLooper->pollOnce(timeoutMillis);
mInCallback = false;
if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}
可以看到又调用了Looper.cpp的pollOnce方法。
int Looper::pollOnce(int timeoutMillis,int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data =response.request.data;
………………
if (outFd != NULL) *outFd= fd;
if (outEvents != NULL)*outEvents = events;
if (outData != NULL) *outData =data;
return ident;
}
}
if (result != 0) {
………….
if (outFd != NULL) *outFd = 0;
if (outEvents != NULL) *outEvents = 0;
if (outData != NULL) *outData = NULL;
return result;
}
result = pollInner(timeoutMillis);
}
}
这里比较重要的就是pollInner。而mResponses也会在pollInner中被修改。都被归结到了pollInner中了。
int Looper::pollInner(int timeoutMillis) {
…………..
// Adjust the timeout based on when the next message is due.
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now,mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis <0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
…………………..
}--------计算延迟的时间。。。。这里不重要。。。忽略
// Poll.
int result =ALOOPER_POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
// We are about to idle.
mIdling = true;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd,eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// No longer idling.
mIdling = false;
// Acquire lock.
mLock.lock();
// Check for poll error.
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error, errno=%d",errno);
result = ALOOPER_POLL_ERROR;
goto Done;
}
// Check for poll timeout.
if (eventCount == 0) {
………..
result = ALOOPER_POLL_TIMEOUT;
goto Done;
}
// Handle all events.
………………
for (int i = 0; i <eventCount; i++) {------------遍历events
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd ==mWakeReadPipeFd) {
if (epollEvents & EPOLLIN) {
awoken();
} else {
ALOGW("Ignoring unexpectedepoll events 0x%x on wake read pipe.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN)events |= ALOOPER_EVENT_INPUT;
if (epollEvents & EPOLLOUT)events |= ALOOPER_EVENT_OUTPUT;
if (epollEvents & EPOLLERR)events |= ALOOPER_EVENT_ERROR;
if (epollEvents & EPOLLHUP)events |= ALOOPER_EVENT_HANGUP;
pushResponse(events,mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpectedepoll events 0x%x on fd %d that is "
"no longerregistered.", epollEvents, fd);
}
}
}
Done: ;
--------之后就都跟epoll无关了
// Invoke pending message callbacks.
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope =mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
// Remove the envelope from the list.
// We keep a strong reference to the handler until the call tohandleMessage
// finishes. Then we drop it sothat the handler can be deleted *before*
// we reacquire our lock.
{ // obtain handler
sp<MessageHandler>handler = messageEnvelope.handler;
Message message =messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
……………..
handler->handleMessage(message);
} // release handler
mLock.lock();
mSendingMessage = false;
result = ALOOPER_POLL_CALLBACK;
} else {
// The last message left at the head of the queue determines the nextwakeup time.
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
// Release lock.
mLock.unlock();
// Invoke all response callbacks.
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == ALOOPER_POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
……………
int callbackResult = response.request.callback->handleEvent(fd,events, data);
if (callbackResult == 0) {
removeFd(fd);
}
// Clear the callback reference inthe response structure promptly because we
// will not clear the response vector itself until the next poll.
response.request.callback.clear();
result = ALOOPER_POLL_CALLBACK;
}
}
return result;
}
注意红色字体:
epoll_wait(mEpollFd,eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
这里就是epoll_wait阻塞的method。
eventCount 小于0 就说明出错了。
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with anunexpected error, errno=%d", errno);
result = ALOOPER_POLL_ERROR;
goto Done;
}
eventCount 等于0,就说明超时了。
// Check for poll timeout.
if (eventCount == 0) {
………..
result = ALOOPER_POLL_TIMEOUT;
goto Done;
}
剩下的,当eventCount 大于0,就说明我们监控的事件发生了。
for (int i = 0; i < eventCount; i++) {------------遍历events
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeReadPipeFd) {
if (epollEvents &EPOLLIN) {
awoken();
……..
}
…………
}
………..
}
当有消息进来的时候,所监控的mWakeReadPipeFd,发生了EPOLLIN时就会调用awoken。
说明一下。Epoll有EPOLLOUT和EPOLLIN两个触发事件
void Looper::awoken() {
……….
char buffer[16];
ssize_t nRead;
do {
nRead =read(mWakeReadPipeFd, buffer, sizeof(buffer));
} while ((nRead == -1&& errno == EINTR) || nRead == sizeof(buffer));
}
分析awoken,Message被读了出来。
然后,就触发了接下去的一系列操作。
消息发送
一个activity对应一个ActivityThread,在ActivityThead.java的main函数中。public final class ActivityThread {
……………
public static void main(String[] args) {
…………………….
Looper.prepareMainLooper();
ActivityThread thread = new ActivityThread();
thread.attach(false);
if (sMainThreadHandler == null) {
sMainThreadHandler = thread.getHandler();
}
……………
Looper.loop();
throw new RuntimeException("Main thread loop unexpectedlyexited");
}
…………
}
可以看到,在Looper.prepareMainLooper();之后,ActivityThread调用了attach去初始化其他信息。然后便初始化sMainThreadHandler。
看一下getHandler方法的实现。
finalH mH = new H();
……..
final Handler getHandler() {
return mH;
}
可以找到,这里的H是定义在ActivityThread中的内部类,继承自Handler。有兴趣的朋友,会发现H这个类定义了很多的action,几乎涵盖了大部分程序运行时会发生的动作。
继续说,
H继承自Handler。
public Handler(Callback callback, boolean async) {
…………..
mLooper = Looper.myLooper();
if (mLooper == null) {
throw new RuntimeException(
"Can't create handlerinside thread that has not called Looper.prepare()");
}
mQueue = mLooper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
public Handler(Looper looper, Callback callback, boolean async) {
mLooper = looper;
mQueue = looper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
查看Handler的构造体,mLooper或用外部传入的参数,或用已初始化好的,mQueue则从mLooper中获得。
总之,一定会有这个mLooper,有了这个Looper就可以访问MessageQueue。
在消息发送的时候,我们有两种方式,sendMessage、post(Runnablerunnable)。
private booleanenqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
msg.target = this;
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg,uptimeMillis);
}
Handler会根据target来找处理的对象,那么,这里就是this,自己发自己处理。
继续看MessageQueue中的enqueueMessage
boolean enqueueMessage(Message msg, longwhen) {
……………..
Looper myLooper = msg.target.mLooper;
…………..
synchronized (this) {
……………………..
msg.when = when;
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when< p.when) {
// New head, wake up the eventqueue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// Inserted within the middleof the queue. Usually we don't have towake
// up the event queue unlessthere is a barrier at the head of the queue
// and the message is theearliest asynchronous message in the queue.
needWake = mBlocked &&p.target == null && msg.isAsynchronous();
Message prev;
for(;;) {
prev = p;
p =p.next;
if(p == null || when < p.when) {
break;
}
if(needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p== prev.next
prev.next = msg;
}
// We can assume mPtr != 0 becausemQuitting is false.
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
这里有两种情况
一、即刻插入到当前,立即执行
1. mMessage=0,说明队列为空,可以立即插入
2.when = 0,说明用户需要立即执行
3.when < p.when,当前消息的执行时间比目前要求发送的message的执行之间晚,自然就被即刻插入到当前去。
二、插入到中间,等些时间再执行
这种情况也好理解,仔细看代码段中标红部分,就是根据执行的时间先后顺序,找一个位置插入。
在消息插入完成后,就调用nativeWake唤醒,这里就直接跳转到native的实现。
static voidandroid_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jint ptr) {
NativeMessageQueue*nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
returnnativeMessageQueue->wake();
}
这里再次将之前保存在java层的mPtr转换成NativeMessageQueue,然后调用wake方法。
void NativeMessageQueue::wake() {
mLooper->wake();
}
转到Looper.cpp中分析。
void Looper::wake() {
……
ssize_t nWrite;
do {
nWrite =write(mWakeWritePipeFd, "W", 1);
} while (nWrite == -1&& errno == EINTR);
if (nWrite != 1) {
if (errno != EAGAIN) {
ALOGW("Couldnot write wake signal, errno=%d", errno);
}
}
}
Wake函数比较简单,通过FileDescription mWakeWritePipeFd往pipe中写入了一个W的字符。在写入了这个字符后,就会唤醒应用程序的主线程,再次去读取pipe中的信息。
值得注意的是。写入一个W并没有任何意义。也就是说写入的作用就只剩下唤醒线程。
而之前的分析可以发现,Message的信息,其实是保存在Messages中的。
原来,pipe只承担了唤醒通知的功能。
至此,消息就发送了,调用了wake之后呢,应用主线程中的Looper就会调用nativePollOnce,然后返回到MessageQueue.next中去。消息发送也就成功了。
消息处理
再次回到ActivityThread中去,在ActivityThead的main函数中。一个activity对应一个ActivityThread,在ActivityThead.java的main函数中。
public final class ActivityThread {
……………
public static void main(String[] args) {
…………………….
Looper.prepareMainLooper();
ActivityThread thread = new ActivityThread();
thread.attach(false);
if (sMainThreadHandler == null) {
sMainThreadHandler = thread.getHandler();
}
……………
Looper.loop();
throw new RuntimeException("Main threadloop unexpectedly exited");
}
…………
}
在一起都创建好之后,就进入了无限循环等待。
消息的处理也就在这个Looper.looper中。
来看一下这个loop做了什么。
public static void loop() {
final Looper me = myLooper();
………………
final MessageQueue queue = me.mQueue;
…………
for (;;) {
Message msg = queue.next(); // might block
…………..
msg.target.dispatchMessage(msg);
………………
msg.recycle();
}
}
在queue.next接受到消息之后,就会效用msg.target. dispatchMessage。
这里调用的是msg.target。而这个target在之前的分析中,是将this传入。
换言之,msg的处理是谁发谁处理。
而且dispatchMessage在哪里定义呢。在平常的代码编写中,我会会发现,一般情况下,我们都会在handle中重写dispatchMessage函数。
那么看下Handler.java的实现如何
/**
* Subclasses must implement this toreceive messages.
*/
public void handleMessage(Message msg) {
}
public void dispatchMessage(Message msg) {
if (msg.callback != null) {
handleCallback(msg);
} else {
if (mCallback != null) {
if(mCallback.handleMessage(msg)) {
return;
}
}
handleMessage(msg);
}
}
可以发现确实Handler也没有处理,只是传给了handleMessage。
至此,消息处理完毕,谁发谁处理,谁要处理,就自己重写handleMessage。
第一次打这么多的字,学习别人的写作方式,一个个去看源代码揣测。算上其他杂事,花了3天时间。总算完成了。真的比单纯看别人的文章来的印象深刻,理解深刻。代码还是需要结合程序源码来看的。
总结一下,
1. Android的消息处理机制:包含三部分:消息循环、消息发送、消息处理.
2. 消息循环是基于linux的pipe通信机制的,使用了两个FileDescription,感觉有点类似线程安全中多个线程对一个资源的读写
3. 消息循环中pipe机制,有最大监控数量,可以监控的更多,在等待中使用的是Epoll机制中的epoll_wait机制。
4. 在消息循环中有个空闲等待,IdleHandler,在线程不忙的时候,可以一直做处理。使得应用程序在空闲的时候可以做一些事情。根据返回值的不同是保留IdleHandler还是一次性使用。
5. 消息队列有消息进来的时候,只是单纯的利用pipe机制来做唤醒,其具体的消息保存并没有在pipe中。
6. 消息的发送利用的是pipe机制,通过写入字符串,唤醒读写通道达到唤醒消息循环的目的
7. 消息的处理要靠用户重载handleMessage达到
相关文章推荐
- android开机启动Service(小发现)
- android LOG机制
- mac安装android开发环境
- Android多页面传递问题(1)
- 【Android】反射
- Android之捕获TextView超链接
- Android之捕获TextView超链接
- Android 仿360恶意广告拦截扫描
- Android四种常见动画
- Android 代码自动提示功能
- Android gradle使用教程<一> 基本使用方法
- Android Binder机制
- android控件的属性
- Android.V4的ViewPager的源码和改造(一)
- Android ----制作自己的Vendor
- android如何使用真机调试应用程序
- activity的生命周期
- Android 关于button渐变背景色的详解,摘抄
- android 判断切换后台 判断按下home键
- [Android实例] android如何让service不被杀死