您的位置:首页 > 移动开发 > Android开发

android应用消息处理机制

2015-10-12 17:50 477 查看
2月份写的文章.发上来.

Android的消息处理机制由三部分组成:消息循环、消息发送、消息处理。

消息循环

消息都存在在一个消息队列中,应用程序的主线程会不断读取其中的消息,并分派给相应的Handler进行处理;如果队列中没有消息,应用程序的主线程就进入空闲等待状态。

消息循环通过Looper类来实现。

这里分两部分

一.Looper.prepareMainLooper

二.Looper.loop

Looper.prepareMainLooper

public final class Looper {

……

//sThreadLocal.get() will return null unless you've called prepare().

static finalThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();

private staticLooper sMainLooper;  // guarded byLooper.class

 

finalMessageQueue mQueue;

……

/** Initialize the current thread as a looper.

     * This gives you a chance to create handlers that then reference

     * this looper, before actually starting the loop. Be sure to call

     * {@link #loop()} after calling this method, and end it by calling

     * {@link #quit()}.

     */

   public static void prepare() {

       prepare(true);

    }

 

   private static void prepare(boolean quitAllowed) {

       if (sThreadLocal.get() != null) {

           throw new RuntimeException("Only one Looper may be created perthread");

       }

       sThreadLocal.set(new Looper(quitAllowed));

    }

 

   /**

    * Initialize the current thread as a looper, marking it as an

    * application's main looper. The main looper for your application

    * is created by the Android environment, so you should never need

    * to call this function yourself. See also: {@link #prepare()}

    */

   public static void prepareMainLooper() {

       prepare(false);

       synchronized (Looper.class) {

           if (sMainLooper != null) {

                throw newIllegalStateException("The main Looper has already been prepared.");

           }

           sMainLooper = myLooper();

       }

}

……

 

   /**

    * Return the Looper object associated with the current thread.  Returns

    * null if the calling thread is not associated with a Looper.

    */

   public static Looper myLooper() {

       return sThreadLocal.get();

}

……

   /**

    * Return the {@link MessageQueue} object associated with the current

    * thread.  This must be calledfrom a thread running a Looper, or a

    * NullPointerException will be thrown.

    */

   public static MessageQueue myQueue() {

       return myLooper().mQueue;

    }

 

   private Looper(boolean quitAllowed) {

       mQueue = new MessageQueue(quitAllowed);

       mThread = Thread.currentThread();

}

 

……

}

 

prepareMainLooper主要做的事情就是:创建了一个Looper对象,保存在sThreadLocal中,并将这个Looper对象赋值给sMainLooper保存起来。

new Looper:在创建的时候,同时实例化一个MessageQueue。并赋值给mQueue保存起来。后续的消息就通过它来管理。

接下来,我们来看MessageQueue.java

------  frameworks/base/core/java/android/os/MessageQueue.java

 

public final class MessageQueue {

……

 

   private native static int nativeInit();

   private native static void nativeDestroy(int ptr);

   private native static void nativePollOnce(int ptr, int timeoutMillis);

   private native static void nativeWake(int ptr);

private nativestatic boolean nativeIsIdling(int ptr);

……

MessageQueue(boolean quitAllowed) {

       mQuitAllowed = quitAllowed;

       mPtr = nativeInit();

}

……

}

可以看出。MessageQueue的整体实现都交给了native.

继续跟,grep一把就可以找到MessageQueue的native实现。

frameworks/base/core/java/android/os/MessageQueue.java

从nativeInit开始

static jintandroid_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {

   NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();

   if (!nativeMessageQueue) {

       jniThrowRuntimeException(env, "Unable to allocate nativequeue");

       return 0;

    }

 

   nativeMessageQueue->incStrong(env);

   return reinterpret_cast<jint>(nativeMessageQueue);

}

可以看出,android_os_MessageQueue_nativeInit在native中创建了一个nativeMessageQueue.

在这个文件中可以找到如下
NativeMessageQueue::NativeMessageQueue(): mInCallback(false), mExceptionObj(NULL) {
    mLooper = Looper::getForThread();
    if (mLooper == NULL) {
        mLooper = new Looper(false);
        Looper::setForThread(mLooper);
    }
}
nativeMessageQueue在native层又实例化了一个Looper。
 
Looper.cpp实现在system/core/libutils/Looper.cpp
 
Looper::Looper(boolallowNonCallbacks) :
        mAllowNonCallbacks(allowNonCallbacks),mSendingMessage(false),
        mResponseIndex(0),mNextMessageUptime(LLONG_MAX) {
    int wakeFds[2];
    int result = pipe(wakeFds);
    LOG_ALWAYS_FATAL_IF(result != 0,"Could not create wake pipe. errno=%d", errno);
 
    mWakeReadPipeFd =wakeFds[0];
    mWakeWritePipeFd =wakeFds[1];
 
    result = fcntl(mWakeReadPipeFd, F_SETFL,O_NONBLOCK);
    LOG_ALWAYS_FATAL_IF(result != 0,"Could not make wake read pipe non-blocking.  errno=%d", errno);
 
    result = fcntl(mWakeWritePipeFd, F_SETFL,O_NONBLOCK);
    LOG_ALWAYS_FATAL_IF(result != 0,"Could not make wake write pipe non-blocking.  errno=%d", errno);
 
    mIdling = false;
 
    // Allocate the epoll instance and registerthe wake pipe.
    mEpollFd =epoll_create(EPOLL_SIZE_HINT);//#------
EPOLL_SIZE_HINT是在这个mEpollFd上能监控的最大文件描述符数,这里创建了epoll的filedescription
    LOG_ALWAYS_FATAL_IF(mEpollFd < 0,"Could not create epoll instance. errno=%d", errno);
 
    struct epoll_event eventItem;
    memset(& eventItem, 0, sizeof(epoll_event));// zero out unused members of data field union
    eventItem.events = EPOLLIN;
    eventItem.data.fd = mWakeReadPipeFd;//#------将mWakeReadPipeFd赋值进去,就是说有内容更新,就立马wake线程去读取。
    result = epoll_ctl(mEpollFd,EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
    LOG_ALWAYS_FATAL_IF(result != 0,"Could not add wake read pipe to epoll instance.  errno=%d", errno);
}
 
注意看代码中标红部分。
消息的循环使用的是Pipe机制,是linux系统中进程间通信的一种机制。
好像linux进程间的通信总是通过一个文件进行。mWakeReadPipeFd,mWakeWritePipeFd 是pipe的文件描述(FileDescription)。
这是一个多线程机制。
线程A使用FileDescription来读pipe的内容
线程B使用FileDescription来写pipe的内容
Pipe中没内容就线程A就进入wait状态,线程B在进行写操作的时候,就会wake线程A。
 
Epoll这部分不详。直接应用罗大牛的说明

这个等待和唤醒的操作是如何进行的呢,这就要借助Linux系统中的epoll机制了。 Linux系统中的epoll机制为处理大批量句柄而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著减少程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。但是这里我们其实只需要监控的IO接口只有mWakeReadPipeFd一个,即前面我们所创建的管道的读端,为什么还需要用到epoll呢?有点用牛刀来杀鸡的味道。其实不然,这个Looper类是非常强大的,它除了监控内部所创建的管道接口之外,还提供了addFd接口供外界面调用,外界可以通过这个接口把自己想要监控的IO事件一并加入到这个Looper对象中去,当所有这些被监控的IO接口上面有事件发生时,就会唤醒相应的线程来处理,不过这里我们只关心刚才所创建的管道的IO事件的发生。
 

 
至此,消息循环的创建结束,即Looper.prepare()的创建结束
 
 

Looper.loop

回到Looper.java,看loop()

/**
     * Run the message queue in this thread. Besure to call
     * {@link #quit()} to end the loop.
     */
    public static void loop() {
        final Looper me = myLooper();
        if (me == null) {
            throw new RuntimeException("NoLooper; Looper.prepare() wasn't called on this thread.");
        }
        final MessageQueue queue = me.mQueue;
 
        // Make sure the identity of thisthread is that of the local process,
        // and keep track of what that identitytoken actually is.
        Binder.clearCallingIdentity();
        final long ident =Binder.clearCallingIdentity();
        ……
 
        for (;;) {
            Message msg = queue.next(); //might block
            if (msg == null) {
                // No message indicates thatthe message queue is quitting.
                return;
            }
            ……
 
            msg.target.dispatchMessage(msg);
 
            ……
 
            // Make sure that during the courseof dispatching the
            // identity of the thread wasn'tcorrupted.
            final long newIdent =Binder.clearCallingIdentity();
            ……
            msg.recycle();
        }
}
 
这个method,不断的读取信息,直接没有信息为止,读到信息就dispatchMessage传递出去,然后回收掉message。没信息之后,就quit。
 
看下MessageQueue中的next是怎么实现的?
 
Message next() {
        intpendingIdleHandlerCount = -1; // -1 only during first iteration
        intnextPollTimeoutMillis = 0;
        for (;;) {
            if(nextPollTimeoutMillis != 0) {
               Binder.flushPendingCommands();
            }
 
            // We can assumemPtr != 0 because the loop is obviously still running.
            // The looper willnot call this method after the loop quits.
            nativePollOnce(mPtr, nextPollTimeoutMillis);
 
            synchronized(this) {
                // Try toretrieve the next message.  Return iffound.
                final long now= SystemClock.uptimeMillis();
                MessageprevMsg = null;
                Message msg =mMessages;
                if (msg !=null && msg.target == null) {
                    // Stalledby a barrier.  Find the next asynchronousmessage in the queue.
                    do {
                       prevMsg = msg;
                        msg =msg.next;
                    } while(msg != null && !msg.isAsynchronous());
                }
                if (msg !=null) {
                    if (now< msg.when) {
                        // Next message is not ready.  Set a timeout to wake up when it is ready.
                       nextPollTimeoutMillis = (int) Math.min(msg.when - now,Integer.MAX_VALUE);
                    } else {
                        // Gota message.
                       mBlocked = false;
                        if(prevMsg != null) {
                           prevMsg.next = msg.next;
                        } else{
                           mMessages = msg.next;
                        }
                        msg.next = null;
                        if(false) Log.v("MessageQueue", "Returning message: " + msg);
                       msg.markInUse();
                        returnmsg;
                    }
                } else {
                    // No moremessages.
                   nextPollTimeoutMillis = -1;
                }
 
                // Process thequit message now that all pending messages have been handled.
                if (mQuitting){
                    dispose();
                    returnnull;
                }
 
                // If firsttime idle, then get the number of idlers to run.
                // Idlehandles only run if the queue is empty or if the first message
                // in thequeue (possibly a barrier) is due to be handled in the future.
                if(pendingIdleHandlerCount < 0
                       && (mMessages == null || now < mMessages.when)) {
                   pendingIdleHandlerCount = mIdleHandlers.size();
                }
                if(pendingIdleHandlerCount <= 0) {
                    // No idlehandlers to run.  Loop and wait somemore.
                    mBlocked =true;
                    continue;
                }
 
                if(mPendingIdleHandlers == null) {
                   mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount,4)];
                }
               mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
            }
 
            // Run the idlehandlers.
            // We only everreach this code block during the first iteration.
            for (int i = 0; i< pendingIdleHandlerCount; i++) {
                finalIdleHandler idler = mPendingIdleHandlers[i];
                mPendingIdleHandlers[i] = null; //release the reference to the handler
 
                boolean keep =false;
                try {
                    keep =idler.queueIdle();
                } catch(Throwable t) {
                   Log.wtf("MessageQueue", "IdleHandler threwexception", t);
                }
 
                if (!keep) {
                   synchronized (this) {
                       mIdleHandlers.remove(idler);
                    }
                }
            }
 
            // Reset the idle handler count to 0 so wedo not run them again.
           pendingIdleHandlerCount = 0;
 
            // While callingan idle handler, a new message could have been delivered
            // so go back andlook again for a pending message without waiting.
           nextPollTimeoutMillis = 0;
        }
    }
 
当loop第一次被调用时,nativePollOnce会立即去获取下一个message;
之后,就立即去获取下一条message。
当消息队列中的消息还没到时间的时候,nextPollTimeoutMillis > 0,那么,nativePollOnce会阻塞着,等待时间到达。
当消息队列中没有消息时,nextPollTimeoutMillis= -1,那么消息队列就会被无限阻塞,直到有新的消息进来去wake它。这里的话,最终会调用到epoll_wait,而-1的参数则会使得它无限阻塞。
 
值得注意的是这里有个IdleHandler,这是什么意思呢?
// If first time idle, then get the number of idlers to run.
// Idle handles only run if the queue is empty or if the firstmessage
// in the queue (possibly a barrier) is due to be handled in thefuture.
if (pendingIdleHandlerCount < 0
        && (mMessages== null || now < mMessages.when)) {
     pendingIdleHandlerCount =mIdleHandlers.size();
}
 
分析可以看出,线程只有在空闲的时候,才会去处理IdleHandler的方法。
// Run the idle handlers.
            // We only ever reach this codeblock during the first iteration.
            for (int i = 0; i <pendingIdleHandlerCount; i++) {
                final IdleHandler idler =mPendingIdleHandlers[i];
                mPendingIdleHandlers[i] = null;// release the reference to the handler
 
                boolean keep = false;
                try {
                    keep = idler.queueIdle();
                } catch (Throwable t) {
                   Log.wtf("MessageQueue", "IdleHandler threwexception", t);
                }
 
                if (!keep) {
                    synchronized (this) {
                       mIdleHandlers.remove(idler);
                    }
                }
            }
而且,根本queueIdle的返回值,判断是否保留这个handle。

至此,MessageQueue.java中的loop分析完毕。

 

接下来看一下,nativePollOnce中是如何实现的?

先看下native的实现如何?在android_os_MessageQueue.cpp中。

static voidandroid_os_MessageQueue_nativePollOnce(JNIEnv* env, jclass clazz,

       jint ptr, jint timeoutMillis) {

    NativeMessageQueue* nativeMessageQueue =reinterpret_cast<NativeMessageQueue*>(ptr);
   nativeMessageQueue->pollOnce(env, timeoutMillis);

}

这里将之前保存的mPtr转换成了NativeMessageQueue。

在调用NativeMessageQueue的pollOnce方法。

 

继续看

 

void NativeMessageQueue::pollOnce(JNIEnv*env, int timeoutMillis) {

   mInCallback = true;

   mLooper->pollOnce(timeoutMillis);

   mInCallback = false;

   if (mExceptionObj) {

       env->Throw(mExceptionObj);

       env->DeleteLocalRef(mExceptionObj);

       mExceptionObj = NULL;

    }

}

可以看到又调用了Looper.cpp的pollOnce方法。

 

 

int Looper::pollOnce(int timeoutMillis,int* outFd, int* outEvents, void** outData) {

   int result = 0;

   for (;;) {

       while (mResponseIndex < mResponses.size()) {

           const Response& response = mResponses.itemAt(mResponseIndex++);

           int ident = response.request.ident;

           if (ident >= 0) {

                int fd = response.request.fd;

                int events = response.events;

                void* data =response.request.data;

………………

               if (outFd != NULL) *outFd= fd;

                if (outEvents != NULL)*outEvents = events;

                if (outData != NULL) *outData =data;

                return ident;

           }

       }

 

       if (result != 0) {

………….

           if (outFd != NULL) *outFd = 0;

           if (outEvents != NULL) *outEvents = 0;

           if (outData != NULL) *outData = NULL;

           return result;

       }

 

       result = pollInner(timeoutMillis);

    }

}

这里比较重要的就是pollInner。而mResponses也会在pollInner中被修改。都被归结到了pollInner中了。

 

 

int Looper::pollInner(int timeoutMillis) {

…………..

   // Adjust the timeout based on when the next message is due.

   if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {

       nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);

       int messageTimeoutMillis = toMillisecondTimeoutDelay(now,mNextMessageUptime);

       if (messageTimeoutMillis >= 0

                && (timeoutMillis <0 || messageTimeoutMillis < timeoutMillis)) {

           timeoutMillis = messageTimeoutMillis;

       }

…………………..

    }--------计算延迟的时间。。。。这里不重要。。。忽略

 

   // Poll.

    int result =ALOOPER_POLL_WAKE;

   mResponses.clear();

   mResponseIndex = 0;

 

   // We are about to idle.

   mIdling = true;

 

    struct epoll_event eventItems[EPOLL_MAX_EVENTS];

    int eventCount = epoll_wait(mEpollFd,eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

 

   // No longer idling.

   mIdling = false;

 

   // Acquire lock.

   mLock.lock();

 

   // Check for poll error.

    if (eventCount < 0) {

       if (errno == EINTR) {

           goto Done;

       }

       ALOGW("Poll failed with an unexpected error, errno=%d",errno);

       result = ALOOPER_POLL_ERROR;

       goto Done;

    }

 

   // Check for poll timeout.

    if (eventCount == 0) {

………..

       result = ALOOPER_POLL_TIMEOUT;

       goto Done;

    }

 

   // Handle all events.

………………

 

    for (int i = 0; i <eventCount; i++) {------------遍历events

        int fd = eventItems[i].data.fd;

       uint32_t epollEvents = eventItems[i].events;

       if (fd ==mWakeReadPipeFd) {

           if (epollEvents & EPOLLIN) {

                awoken();

           } else {

                ALOGW("Ignoring unexpectedepoll events 0x%x on wake read pipe.", epollEvents);

           }

       } else {

           ssize_t requestIndex = mRequests.indexOfKey(fd);

           if (requestIndex >= 0) {

                int events = 0;

                if (epollEvents & EPOLLIN)events |= ALOOPER_EVENT_INPUT;

                if (epollEvents & EPOLLOUT)events |= ALOOPER_EVENT_OUTPUT;

                if (epollEvents & EPOLLERR)events |= ALOOPER_EVENT_ERROR;

                if (epollEvents & EPOLLHUP)events |= ALOOPER_EVENT_HANGUP;

               pushResponse(events,mRequests.valueAt(requestIndex));

           } else {

                ALOGW("Ignoring unexpectedepoll events 0x%x on fd %d that is "

                        "no longerregistered.", epollEvents, fd);

           }

       }

    }

Done: ;

--------之后就都跟epoll无关了

   // Invoke pending message callbacks.

   mNextMessageUptime = LLONG_MAX;

   while (mMessageEnvelopes.size() != 0) {

       nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);

       const MessageEnvelope& messageEnvelope =mMessageEnvelopes.itemAt(0);

       if (messageEnvelope.uptime <= now) {

           // Remove the envelope from the list.

           // We keep a strong reference to the handler until the call tohandleMessage

           // finishes.  Then we drop it sothat the handler can be deleted *before*

           // we reacquire our lock.

           { // obtain handler

                sp<MessageHandler>handler = messageEnvelope.handler;

                Message message =messageEnvelope.message;

                mMessageEnvelopes.removeAt(0);

                mSendingMessage = true;

                mLock.unlock();

 

……………..

               handler->handleMessage(message);

           } // release handler

 

           mLock.lock();

           mSendingMessage = false;

           result = ALOOPER_POLL_CALLBACK;

        } else {

           // The last message left at the head of the queue determines the nextwakeup time.

           mNextMessageUptime = messageEnvelope.uptime;

           break;

       }

    }

 

   // Release lock.

   mLock.unlock();

 

   // Invoke all response callbacks.

   for (size_t i = 0; i < mResponses.size(); i++) {

       Response& response = mResponses.editItemAt(i);

       if (response.request.ident == ALOOPER_POLL_CALLBACK) {

           int fd = response.request.fd;

           int events = response.events;

           void* data = response.request.data;

……………

           int callbackResult = response.request.callback->handleEvent(fd,events, data);

           if (callbackResult == 0) {

                removeFd(fd);

           }

            // Clear the callback reference inthe response structure promptly because we

           // will not clear the response vector itself until the next poll.

           response.request.callback.clear();

           result = ALOOPER_POLL_CALLBACK;

       }

    }

   return result;

}

 

注意红色字体:

epoll_wait(mEpollFd,eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

这里就是epoll_wait阻塞的method。

 

eventCount 小于0 就说明出错了。

if (eventCount < 0) {

       if (errno == EINTR) {

           goto Done;

       }

       ALOGW("Poll failed with anunexpected error, errno=%d", errno);

       result = ALOOPER_POLL_ERROR;

       goto Done;

    }

eventCount 等于0,就说明超时了。

   // Check for poll timeout.

    if (eventCount == 0) {

………..

       result = ALOOPER_POLL_TIMEOUT;

       goto Done;

    }

 

剩下的,当eventCount 大于0,就说明我们监控的事件发生了。

for (int i = 0; i < eventCount; i++) {------------遍历events

       int fd = eventItems[i].data.fd;

       uint32_t epollEvents = eventItems[i].events;

       if (fd == mWakeReadPipeFd) {

           if (epollEvents &EPOLLIN) {

                awoken();

                                     ……..

           }

           …………

       }

………..

}

当有消息进来的时候,所监控的mWakeReadPipeFd,发生了EPOLLIN时就会调用awoken。

说明一下。Epoll有EPOLLOUT和EPOLLIN两个触发事件

 

void Looper::awoken() {

……….

    char buffer[16];

    ssize_t nRead;

    do {

        nRead =read(mWakeReadPipeFd, buffer, sizeof(buffer));

    } while ((nRead == -1&& errno == EINTR) || nRead == sizeof(buffer));

}

分析awoken,Message被读了出来。

然后,就触发了接下去的一系列操作。

 

消息发送

一个activity对应一个ActivityThread,在ActivityThead.java的main函数中。

public final class ActivityThread {

    ……………

   public static void main(String[] args) {

       …………………….

 

       Looper.prepareMainLooper();

 

       ActivityThread thread = new ActivityThread();

       thread.attach(false);

 

       if (sMainThreadHandler == null) {

           sMainThreadHandler = thread.getHandler();

       }

……………

 

       Looper.loop();

 

       throw new RuntimeException("Main thread loop unexpectedlyexited");

}

    …………

}

可以看到,在Looper.prepareMainLooper();之后,ActivityThread调用了attach去初始化其他信息。然后便初始化sMainThreadHandler。

看一下getHandler方法的实现。

         finalH mH = new H();

……..

   final Handler getHandler() {

       return mH;

}

可以找到,这里的H是定义在ActivityThread中的内部类,继承自Handler。有兴趣的朋友,会发现H这个类定义了很多的action,几乎涵盖了大部分程序运行时会发生的动作。

 

继续说,

H继承自Handler。

 

   public Handler(Callback callback, boolean async) {

       …………..

       mLooper = Looper.myLooper();

       if (mLooper == null) {

           throw new RuntimeException(

                "Can't create handlerinside thread that has not called Looper.prepare()");

       }

       mQueue = mLooper.mQueue;

       mCallback = callback;

       mAsynchronous = async;

}

   public Handler(Looper looper, Callback callback, boolean async) {

       mLooper = looper;

       mQueue = looper.mQueue;

       mCallback = callback;

       mAsynchronous = async;

}

查看Handler的构造体,mLooper或用外部传入的参数,或用已初始化好的,mQueue则从mLooper中获得。

总之,一定会有这个mLooper,有了这个Looper就可以访问MessageQueue。

 

在消息发送的时候,我们有两种方式,sendMessage、post(Runnablerunnable)。

 

private booleanenqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {

    msg.target = this;

    if (mAsynchronous) {

        msg.setAsynchronous(true);

    }

    return queue.enqueueMessage(msg,uptimeMillis);

}

 

Handler会根据target来找处理的对象,那么,这里就是this,自己发自己处理。

 

继续看MessageQueue中的enqueueMessage

 

    boolean enqueueMessage(Message msg, longwhen) {

        ……………..

 

        Looper myLooper = msg.target.mLooper;

        …………..

 

       synchronized (this) {

            ……………………..

            msg.when = when;

            Message p = mMessages;

            boolean needWake;

            if (p == null || when == 0 || when< p.when) {

                // New head, wake up the eventqueue if blocked.

                msg.next = p;

                mMessages = msg;

                needWake = mBlocked;

            } else {

                // Inserted within the middleof the queue.  Usually we don't have towake

                // up the event queue unlessthere is a barrier at the head of the queue

                // and the message is theearliest asynchronous message in the queue.

                needWake = mBlocked &&p.target == null && msg.isAsynchronous();

                Message prev;

                for(;;) {

                   prev = p;

                    p =p.next;

                    if(p == null || when < p.when) {

                       break;

                    }

                    if(needWake && p.isAsynchronous()) {

                        needWake = false;

                    }

                }

                msg.next = p; // invariant: p== prev.next

                prev.next = msg;

            }

 

            // We can assume mPtr != 0 becausemQuitting is false.

            if (needWake) {

                nativeWake(mPtr);

            }

        }

        return true;

    }

这里有两种情况

一、即刻插入到当前,立即执行

1. mMessage=0,说明队列为空,可以立即插入

2.when = 0,说明用户需要立即执行

3.when < p.when,当前消息的执行时间比目前要求发送的message的执行之间晚,自然就被即刻插入到当前去。

 

二、插入到中间,等些时间再执行

这种情况也好理解,仔细看代码段中标红部分,就是根据执行的时间先后顺序,找一个位置插入。

 

 

在消息插入完成后,就调用nativeWake唤醒,这里就直接跳转到native的实现。

 
static voidandroid_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jint ptr) {
    NativeMessageQueue*nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    returnnativeMessageQueue->wake();
}
 
这里再次将之前保存在java层的mPtr转换成NativeMessageQueue,然后调用wake方法。
void NativeMessageQueue::wake() {
   mLooper->wake();
}
 
转到Looper.cpp中分析。
 
void Looper::wake() {
……
    ssize_t nWrite;
    do {
        nWrite =write(mWakeWritePipeFd, "W", 1);
    } while (nWrite == -1&& errno == EINTR);
 
    if (nWrite != 1) {
        if (errno != EAGAIN) {
            ALOGW("Couldnot write wake signal, errno=%d", errno);
        }
    }
}
 
Wake函数比较简单,通过FileDescription mWakeWritePipeFd往pipe中写入了一个W的字符。在写入了这个字符后,就会唤醒应用程序的主线程,再次去读取pipe中的信息。
值得注意的是。写入一个W并没有任何意义。也就是说写入的作用就只剩下唤醒线程。
而之前的分析可以发现,Message的信息,其实是保存在Messages中的。
原来,pipe只承担了唤醒通知的功能。
至此,消息就发送了,调用了wake之后呢,应用主线程中的Looper就会调用nativePollOnce,然后返回到MessageQueue.next中去。消息发送也就成功了。
 

消息处理

再次回到ActivityThread中去,在ActivityThead的main函数中。

一个activity对应一个ActivityThread,在ActivityThead.java的main函数中。

public final class ActivityThread {

    ……………

   public static void main(String[] args) {

       …………………….

 

       Looper.prepareMainLooper();

 

       ActivityThread thread = new ActivityThread();

       thread.attach(false);

 

       if (sMainThreadHandler == null) {

           sMainThreadHandler = thread.getHandler();

       }

……………

 

       Looper.loop();

 

        throw new RuntimeException("Main threadloop unexpectedly exited");

}

    …………

}

 

在一起都创建好之后,就进入了无限循环等待。

消息的处理也就在这个Looper.looper中。

来看一下这个loop做了什么。

 

 

   public static void loop() {

       final Looper me = myLooper();

       ………………

       final MessageQueue queue = me.mQueue;

 

       …………

 

       for (;;) {

           Message msg = queue.next(); // might block

           …………..

 

           msg.target.dispatchMessage(msg);

 

           ………………

           msg.recycle();

       }

}

 

在queue.next接受到消息之后,就会效用msg.target. dispatchMessage。
         这里调用的是msg.target。而这个target在之前的分析中,是将this传入。

换言之,msg的处理是谁发谁处理。

 

而且dispatchMessage在哪里定义呢。在平常的代码编写中,我会会发现,一般情况下,我们都会在handle中重写dispatchMessage函数。

那么看下Handler.java的实现如何

/**

     * Subclasses must implement this toreceive messages.

     */

    public void handleMessage(Message msg) {

    }

 

    public void dispatchMessage(Message msg) {

        if (msg.callback != null) {

            handleCallback(msg);

        } else {

            if (mCallback != null) {

                if(mCallback.handleMessage(msg)) {

                    return;

                }

            }

            handleMessage(msg);

        }

    }

可以发现确实Handler也没有处理,只是传给了handleMessage。

 

至此,消息处理完毕,谁发谁处理,谁要处理,就自己重写handleMessage。

 

第一次打这么多的字,学习别人的写作方式,一个个去看源代码揣测。算上其他杂事,花了3天时间。总算完成了。真的比单纯看别人的文章来的印象深刻,理解深刻。代码还是需要结合程序源码来看的。

总结一下,

1.      Android的消息处理机制:包含三部分:消息循环、消息发送、消息处理.

2.      消息循环是基于linux的pipe通信机制的,使用了两个FileDescription,感觉有点类似线程安全中多个线程对一个资源的读写

3.      消息循环中pipe机制,有最大监控数量,可以监控的更多,在等待中使用的是Epoll机制中的epoll_wait机制。

4.      在消息循环中有个空闲等待,IdleHandler,在线程不忙的时候,可以一直做处理。使得应用程序在空闲的时候可以做一些事情。根据返回值的不同是保留IdleHandler还是一次性使用。

5.      消息队列有消息进来的时候,只是单纯的利用pipe机制来做唤醒,其具体的消息保存并没有在pipe中。

6.      消息的发送利用的是pipe机制,通过写入字符串,唤醒读写通道达到唤醒消息循环的目的

7.      消息的处理要靠用户重载handleMessage达到
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息