从源码角度分析native层消息机制与java层消息机制的关联
2015-09-02 15:36
477 查看
上文从源码分析Handler机制中从java层分析了消息机制,接下来本文从native层去分析Android中的消息机制。
在一个消息驱动的系统中,最重要的就是消息队列和消息获取和处理,从上一篇文章可以看出handler的消息机制主要是靠MessageQueue进行消息列队,靠Looper进行消息循环,Looper的loop方法中进行轮询消息的实际操作还是依靠MessageQueue的next方法来获取消息,也就是说在这个消息驱动机制中最重要的就是MessageQueue这个类了。在Android 2.3之前,只有java层中可以往MessageQueue中添加消息使得消息驱动正常的运作,在2.3之后,MessageQueue的核心部分移到了native层,MessageQueue兼顾了两个世界的来保证消息的运作。
在MessageQueue的构造方法中:
构造函数调用nativeInit,该函数由Native层实现,native层的真正实现为android_os_MessageQueue.cpp中的android_os_MessageQueue_nativeInit方法。
android_os_MessageQueue_nativeInit函数中创建以一个与java层MessageQueue对应点nativeMessageQueue消息队列,NativeMessageQueue构造中从当前线程中获取一个looper,如果当前线程没有到话,就实例化一个并且绑定到当前线程。
前一篇文章提到,当与消息机制相关的几个对象初始化完毕后,就要开始loop操作,而loop其实也就是循环的执行MessageQueue的next方法。
nativePollOnce方法返回后,就代表next方法就可以从mMessages中获取一个消息,也就是说如果消息队列中没有消息存在nativePollOnce就不会返回。
在MessageQueue的enqueueMessage方法中
添加完message后,调用了native层的nativeWake方法,这个应该是触发上面提到的nativePollOnce方法返回,好让加入的message得到分发处理。
在android_os_MessageQueue.cpp中:
在Looper.cpp中:
在wake方法中,惊讶的发现是往管道中写入了一个”w”,难道这样就可以唤醒nativePollOnce方法返回么?是不是也就意味着nativePollOnce方法中承载着这个管道的读操作呢?如果真是这样那在nativePollOnce方法的执行过程中肯定有这么一个监控这个管道的过程吧?这个都是猜测,我们接下来分析nativePollOnce方法的具体实现。
nativePollOnce的实现在android_os_MessageQueue.cpp中:
在Looper.cpp中:
在Looper::pollOnce方法中你会发现使用了#if 和 #endif,这就代表着looper采用了编译选项来控制是否使用epoll机制来进行I/O复用。在linux的网络编程中,很长的一段时间都在使用select来做事件触发,在linux新的内核中使用了epoll来替换它,相比于select,epoll最大的好处在于它不会随着监听文件描述符数目的增长而效率降低,select机制是采用轮询来处理的,轮询的fd数目越多,效率也就越低。epoll的接口非常简单就只有三个函数:
int epoll_create(int size);创建一个epoll句柄,当这个句柄创建完成之后,在/proc/进程id/fd中可以看到这个fd。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);注册事件函数。
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);等待事件的发生,参数timeout是超时时间毫秒值,0会立即返回,-1将不确定,也就是说有可能永久阻塞。该函数返回需要处理的事件数目,如返回0表示已超时。
再回到Looper::pollOnce方法中,每次的for循环都会调用一个函数,不妨前去看看。
在上述调用epoll_wait方法等待事件的发生,timeoutMillis是我们在java层传递过来的,在MessageQueue的next方法中如果没有消息的时候其中的nextPollTimeoutMillis = -1,也就是说timeoutMillis为-1,那么在等待事情发生的时候,就有可能会造成永久阻塞,直到某个事件发生。如果有事件发生并且是管道读端的事件,那么就会直接读取管道中的数据。之前在分析Looper::woke方法时,就往管道中写入了数据。
在一个消息驱动的系统中,最重要的就是消息队列和消息获取和处理,从上一篇文章可以看出handler的消息机制主要是靠MessageQueue进行消息列队,靠Looper进行消息循环,Looper的loop方法中进行轮询消息的实际操作还是依靠MessageQueue的next方法来获取消息,也就是说在这个消息驱动机制中最重要的就是MessageQueue这个类了。在Android 2.3之前,只有java层中可以往MessageQueue中添加消息使得消息驱动正常的运作,在2.3之后,MessageQueue的核心部分移到了native层,MessageQueue兼顾了两个世界的来保证消息的运作。
在MessageQueue的构造方法中:
MessageQueue(boolean quitAllowed) { mQuitAllowed = quitAllowed; mPtr = nativeInit(); }
构造函数调用nativeInit,该函数由Native层实现,native层的真正实现为android_os_MessageQueue.cpp中的android_os_MessageQueue_nativeInit方法。
static void android_os_MessageQueue_nativeInit(JNIEnv* env, jobject obj) { NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue(); if (! nativeMessageQueue) { jniThrowRuntimeException(env, "Unable to allocate native queue"); return; } android_os_MessageQueue_setNativeMessageQueue(env, obj, nativeMessageQueue); } NativeMessageQueue::NativeMessageQueue() { mLooper = Looper::getForThread(); if (mLooper == NULL) { mLooper = new Looper(false); Looper::setForThread(mLooper); } }
android_os_MessageQueue_nativeInit函数中创建以一个与java层MessageQueue对应点nativeMessageQueue消息队列,NativeMessageQueue构造中从当前线程中获取一个looper,如果当前线程没有到话,就实例化一个并且绑定到当前线程。
前一篇文章提到,当与消息机制相关的几个对象初始化完毕后,就要开始loop操作,而loop其实也就是循环的执行MessageQueue的next方法。
Message next() { int pendingIdleHandlerCount = -1; // -1 only during first iteration int nextPollTimeoutMillis = 0; for (;;) { if (nextPollTimeoutMillis != 0) { Binder.flushPendingCommands(); } // We can assume mPtr != 0 because the loop is obviously still running. // The looper will not call this method after the loop quits. nativePollOnce(mPtr, nextPollTimeoutMillis); synchronized (this) { // Try to retrieve the next message. Return if found. final long now = SystemClock.uptimeMillis(); Message prevMsg = null; Message msg = mMessages; if (msg != null && msg.target == null) { // Stalled by a barrier. Find the next asynchronous message in the queue. do { prevMsg = msg; msg = msg.next; } while (msg != null && !msg.isAsynchronous()); } if (msg != null) { if (now < msg.when) { // Next message is not ready. Set a timeout to wake up when it is ready. nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); } else { // Got a message. mBlocked = false; if (prevMsg != null) { prevMsg.next = msg.next; } else { mMessages = msg.next; } msg.next = null; if (false) Log.v("MessageQueue", "Returning message: " + msg); msg.markInUse(); return msg; } } else { // No more messages. nextPollTimeoutMillis = -1; } // Process the quit message now that all pending messages have been handled. if (mQuitting) { dispose(); return null; } // If first time idle, then get the number of idlers to run. // Idle handles only run if the queue is empty or if the first message // in the queue (possibly a barrier) is due to be handled in the future. if (pendingIdleHandlerCount < 0 && (mMessages == null || now < mMessages.when)) { pendingIdleHandlerCount = mIdleHandlers.size(); } if (pendingIdleHandlerCount <= 0) { // No idle handlers to run. Loop and wait some more. mBlocked = true; continue; } if (mPendingIdleHandlers == null) { mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)]; } mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers); } // Run the idle handlers. // We only ever reach this code block during the first iteration. for (int i = 0; i < pendingIdleHandlerCount; i++) { final IdleHandler idler = mPendingIdleHandlers[i]; mPendingIdleHandlers[i] = null; // release the reference to the handler boolean keep = false; try { keep = idler.queueIdle(); } catch (Throwable t) { Log.wtf("MessageQueue", "IdleHandler threw exception", t); } if (!keep) { synchronized (this) { mIdleHandlers.remove(idler); } } } // Reset the idle handler count to 0 so we do not run them again. pendingIdleHandlerCount = 0; // While calling an idle handler, a new message could have been delivered // so go back and look again for a pending message without waiting. nextPollTimeoutMillis = 0; } }
nativePollOnce方法返回后,就代表next方法就可以从mMessages中获取一个消息,也就是说如果消息队列中没有消息存在nativePollOnce就不会返回。
在MessageQueue的enqueueMessage方法中
boolean enqueueMessage(Message msg, long when) { if (msg.isInUse()) { throw new AndroidRuntimeException(msg + " This message is already in use."); } if (msg.target == null) { throw new AndroidRuntimeException("Message must have a target."); } synchronized (this) { if (mQuitting) { RuntimeException e = new RuntimeException( msg.target + " sending message to a Handler on a dead thread"); Log.w("MessageQueue", e.getMessage(), e); return false; } msg.when = when; Message p = mMessages; boolean needWake; if (p == null || when == 0 || when < p.when) { // New head, wake up the event queue if blocked. msg.next = p; mMessages = msg; needWake = mBlocked; } else { // Inserted within the middle of the queue. Usually we don't have to wake // up the event queue unless there is a barrier at the head of the queue // and the message is the earliest asynchronous message in the queue. needWake = mBlocked && p.target == null && msg.isAsynchronous(); Message prev; for (;;) { prev = p; p = p.next; if (p == null || when < p.when) { break; } if (needWake && p.isAsynchronous()) { needWake = false; } } msg.next = p; // invariant: p == prev.next prev.next = msg; } // We can assume mPtr != 0 because mQuitting is false. if (needWake) { nativeWake(mPtr); } } return true; }
添加完message后,调用了native层的nativeWake方法,这个应该是触发上面提到的nativePollOnce方法返回,好让加入的message得到分发处理。
在android_os_MessageQueue.cpp中:
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jobject obj, jint ptr) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); return nativeMessageQueue->wake(); } void NativeMessageQueue::wake() { mLooper->wake(); }
在Looper.cpp中:
void Looper::wake() { #if DEBUG_POLL_AND_WAKE LOGD("%p ~ wake", this); #endif #ifdef LOOPER_STATISTICS // FIXME: Possible race with awoken() but this code is for testing only and is rarely enabled. if (mPendingWakeCount++ == 0) { mPendingWakeTime = systemTime(SYSTEM_TIME_MONOTONIC); } #endif ssize_t nWrite; do { nWrite = write(mWakeWritePipeFd, "W", 1); } while (nWrite == -1 && errno == EINTR); if (nWrite != 1) { if (errno != EAGAIN) { LOGW("Could not write wake signal, errno=%d", errno); } } }
在wake方法中,惊讶的发现是往管道中写入了一个”w”,难道这样就可以唤醒nativePollOnce方法返回么?是不是也就意味着nativePollOnce方法中承载着这个管道的读操作呢?如果真是这样那在nativePollOnce方法的执行过程中肯定有这么一个监控这个管道的过程吧?这个都是猜测,我们接下来分析nativePollOnce方法的具体实现。
nativePollOnce的实现在android_os_MessageQueue.cpp中:
void NativeMessageQueue::pollOnce(int timeoutMillis) { mLooper->pollOnce(timeoutMillis); }
在Looper.cpp中:
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { int result = 0; for (;;) { while (mResponseIndex < mResponses.size()) { const Response& response = mResponses.itemAt(mResponseIndex++); if (! response.request.callback) { #if DEBUG_POLL_AND_WAKE LOGD("%p ~ pollOnce - returning signalled identifier %d: " "fd=%d, events=0x%x, data=%p", this, response.request.ident, response.request.fd, response.events, response.request.data); #endif if (outFd != NULL) *outFd = response.request.fd; if (outEvents != NULL) *outEvents = response.events; if (outData != NULL) *outData = response.request.data; return response.request.ident; } } if (result != 0) { #if DEBUG_POLL_AND_WAKE LOGD("%p ~ pollOnce - returning result %d", this, result); #endif if (outFd != NULL) *outFd = 0; if (outEvents != NULL) *outEvents = NULL; if (outData != NULL) *outData = NULL; return result; } result = pollInner(timeoutMillis); } }
在Looper::pollOnce方法中你会发现使用了#if 和 #endif,这就代表着looper采用了编译选项来控制是否使用epoll机制来进行I/O复用。在linux的网络编程中,很长的一段时间都在使用select来做事件触发,在linux新的内核中使用了epoll来替换它,相比于select,epoll最大的好处在于它不会随着监听文件描述符数目的增长而效率降低,select机制是采用轮询来处理的,轮询的fd数目越多,效率也就越低。epoll的接口非常简单就只有三个函数:
int epoll_create(int size);创建一个epoll句柄,当这个句柄创建完成之后,在/proc/进程id/fd中可以看到这个fd。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);注册事件函数。
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);等待事件的发生,参数timeout是超时时间毫秒值,0会立即返回,-1将不确定,也就是说有可能永久阻塞。该函数返回需要处理的事件数目,如返回0表示已超时。
再回到Looper::pollOnce方法中,每次的for循环都会调用一个函数,不妨前去看看。
int Looper::pollInner(int timeoutMillis) { #if DEBUG_POLL_AND_WAKE LOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis); #endif int result = ALOOPER_POLL_WAKE; mResponses.clear(); mResponseIndex = 0; #ifdef LOOPER_STATISTICS nsecs_t pollStartTime = systemTime(SYSTEM_TIME_MONOTONIC); #endif #ifdef LOOPER_USES_EPOLL // 这里表明是使用epoll的io复用凡方式 struct epoll_event eventItems[EPOLL_MAX_EVENTS]; // 调用epoll_wait等待事件的发生 int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); bool acquiredLock = false; #else // Wait for wakeAndLock() waiters to run then set mPolling to true. mLock.lock(); while (mWaiters != 0) { mResume.wait(mLock); } mPolling = true; mLock.unlock(); size_t requestedCount = mRequestedFds.size(); int eventCount = poll(mRequestedFds.editArray(), requestedCount, timeoutMillis); #endif if (eventCount < 0) { if (errno == EINTR) { goto Done; } LOGW("Poll failed with an unexpected error, errno=%d", errno); result = ALOOPER_POLL_ERROR; goto Done; } if (eventCount == 0) { #if DEBUG_POLL_AND_WAKE LOGD("%p ~ pollOnce - timeout", this); #endif result = ALOOPER_POLL_TIMEOUT; goto Done; } #if DEBUG_POLL_AND_WAKE LOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount); #endif #ifdef LOOPER_USES_EPOLL for (int i = 0; i < eventCount; i++) { int fd = eventItems[i].data.fd; uint32_t epollEvents = eventItems[i].events; if (fd == mWakeReadPipeFd) { if (epollEvents & EPOLLIN) { awoken(); } else { LOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents); } } else { if (! acquiredLock) { mLock.lock(); acquiredLock = true; } ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex >= 0) { int events = 0; if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT; if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT; if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR; if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP; pushResponse(events, mRequests.valueAt(requestIndex)); } else { LOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " "no longer registered.", epollEvents, fd); } } } if (acquiredLock) { mLock.unlock(); } Done: ; #else for (size_t i = 0; i < requestedCount; i++) { const struct pollfd& requestedFd = mRequestedFds.itemAt(i); short pollEvents = requestedFd.revents; if (pollEvents) { if (requestedFd.fd == mWakeReadPipeFd) { if (pollEvents & POLLIN) { // 是管道读端发生命令直接读取管道中的数据 awoken(); } else { LOGW("Ignoring unexpected poll events 0x%x on wake read pipe.", pollEvents); } } else { int events = 0; if (pollEvents & POLLIN) events |= ALOOPER_EVENT_INPUT; if (pollEvents & POLLOUT) events |= ALOOPER_EVENT_OUTPUT; if (pollEvents & POLLERR) events |= ALOOPER_EVENT_ERROR; if (pollEvents & POLLHUP) events |= ALOOPER_EVENT_HANGUP; if (pollEvents & POLLNVAL) events |= ALOOPER_EVENT_INVALID; pushResponse(events, mRequests.itemAt(i)); } if (--eventCount == 0) { break; } } } Done: // Set mPolling to false and wake up the wakeAndLock() waiters. mLock.lock(); mPolling = false; if (mWaiters != 0) { mAwake.broadcast(); } mLock.unlock(); #endif #ifdef LOOPER_STATISTICS nsecs_t pollEndTime = systemTime(SYSTEM_TIME_MONOTONIC); mSampledPolls += 1; if (timeoutMillis == 0) { mSampledZeroPollCount += 1; mSampledZeroPollLatencySum += pollEndTime - pollStartTime; } else if (timeoutMillis > 0 && result == ALOOPER_POLL_TIMEOUT) { mSampledTimeoutPollCount += 1; mSampledTimeoutPollLatencySum += pollEndTime - pollStartTime - milliseconds_to_nanoseconds(timeoutMillis); } if (mSampledPolls == SAMPLED_POLLS_TO_AGGREGATE) { LOGD("%p ~ poll latency statistics: %0.3fms zero timeout, %0.3fms non-zero timeout", this, 0.000001f * float(mSampledZeroPollLatencySum) / mSampledZeroPollCount, 0.000001f * float(mSampledTimeoutPollLatencySum) / mSampledTimeoutPollCount); mSampledPolls = 0; mSampledZeroPollCount = 0; mSampledZeroPollLatencySum = 0; mSampledTimeoutPollCount = 0; mSampledTimeoutPollLatencySum = 0; } #endif for (size_t i = 0; i < mResponses.size(); i++) { const Response& response = mResponses.itemAt(i); if (response.request.callback) { #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS LOGD("%p ~ pollOnce - invoking callback: fd=%d, events=0x%x, data=%p", this, response.request.fd, response.events, response.request.data); #endif int callbackResult = response.request.callback( response.request.fd, response.events, response.request.data); if (callbackResult == 0) { removeFd(response.request.fd); } result = ALOOPER_POLL_CALLBACK; } } return result; }
在上述调用epoll_wait方法等待事件的发生,timeoutMillis是我们在java层传递过来的,在MessageQueue的next方法中如果没有消息的时候其中的nextPollTimeoutMillis = -1,也就是说timeoutMillis为-1,那么在等待事情发生的时候,就有可能会造成永久阻塞,直到某个事件发生。如果有事件发生并且是管道读端的事件,那么就会直接读取管道中的数据。之前在分析Looper::woke方法时,就往管道中写入了数据。
相关文章推荐
- 注释驱动的 Spring cache 缓存介绍
- MyEclipse下配置mysql驱动的方法
- apktool + eclipse 动态调试无源码apk
- annotation-driven
- Java学习之排序
- MyEclipse 关闭debug
- Spring中的AOP——在Advice方法中获取目标方法的参数
- 集合框架-工具类(java基础)
- Spring3.1 Cache注解
- 成为更高效的Java开发人员要用到的开发工具
- Spring Cache 介绍
- Java轻量级锁
- Java NIO:NIO概述
- databus编译: Execution failed for task ':databus-core:databus-core-impl:compileJava'.
- J2EE、J2SE、J2ME、JDK各个击破(概念级)
- Java 字符串和时间互相转化 +时间戳
- Java NIO:浅析I/O模型
- java,查找数组中指定元素第一次出现的索引值。
- java异常处理:finally中不要return
- MyBatis与Spring整合过程