您的位置:首页 > 产品设计 > UI/UE

【从源码看Android】03Android MessageQueue消息循环处理机制(epoll实现)

2015-10-31 17:51 525 查看
1 enqueueMessage

handler发送一条消息

[java] view
plaincopy

mHandler.sendEmptyMessage(1);

经过层层调用,进入到sendMessageAtTime函数块,最后调用到enqueueMessage

Handler.java

[java] view
plaincopy

public boolean sendMessageAtTime(Message msg, long uptimeMillis) {

MessageQueue queue = mQueue;

if (queue == null) {

RuntimeException e = new RuntimeException(

this + " sendMessageAtTime() called with no mQueue");

Log.w("Looper", e.getMessage(), e);

return false;

}

return enqueueMessage(queue, msg, uptimeMillis);

}

最后调用到Handler私有的函数enqueueMessage,把handler对象赋值给msg.target,调用queue.enqueueMessage
Handler.java

[java] view
plaincopy

private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {

msg.target = this;

if (mAsynchronous) {

msg.setAsynchronous(true);

}

return queue.enqueueMessage(msg, uptimeMillis);

}

下面是核心代码,首先是获得同步锁,

MessageQueue.java

[java] view
plaincopy

boolean enqueueMessage(Message msg, long when) {

if (msg.isInUse()) {

throw new AndroidRuntimeException(msg + " This message is already in use.");

}

if (msg.target == null) {

throw new AndroidRuntimeException("Message must have a target.");

}

synchronized (this) {

if (mQuitting) {

RuntimeException e = new RuntimeException(

msg.target + " sending message to a Handler on a dead thread");

Log.w("MessageQueue", e.getMessage(), e);

return false;

}

msg.when = when;

Message p = mMessages;

boolean needWake;

if (p == null || when == 0 || when < p.when) {

// New head, wake up the event queue if blocked.

msg.next = p;

mMessages = msg;

needWake = mBlocked;

} else {

// Inserted within the middle of the queue. Usually we don't have to wake

// up the event queue unless there is a barrier at the head of the queue

// and the message is the earliest asynchronous message in the queue.

needWake = mBlocked && p.target == null && msg.isAsynchronous();

Message prev;

for (;;) {

prev = p;

p = p.next;

if (p == null || when < p.when) {

break;

}

if (needWake && p.isAsynchronous()) {

needWake = false;

}

}

msg.next = p; // invariant: p == prev.next

prev.next = msg;

}

// We can assume mPtr != 0 because mQuitting is false.

if (needWake) {

nativeWake(mPtr);

}

}

return true;

}

首先是获得自身的同步锁synchronized (this),接着这个msg跟MessageQueue实例的头结点Message进行触发时间先后的比较,

如果触发时间比现有的头结点Message前,则这个新的Message作为整个MessageQueue的头结点,如果阻塞着,则立即唤醒线程处理

如果触发时间比头结点晚,则按照触发时间先后,在消息队列中间插入这个结点

接着如果需要唤醒,则调用nativeWake函数

在android_os_MessageQueue.cpp里定义了nativeWake函数

[cpp] view
plaincopy

static void android_os_MessageQueue_nativeWake(JNIEnv* env, jobject obj, jint ptr) {

NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);

return nativeMessageQueue->wake();

}

实际调用到mLooper->wake();

android_os_MessageQueue.cpp

[cpp] view
plaincopy

void NativeMessageQueue::wake() {

mLooper->wake();

}

而mLooper是cpp层的Looper对象,

framework/base/libs/utils/Looper.cpp

[cpp] view
plaincopy

void Looper::wake() {

#if DEBUG_POLL_AND_WAKE

LOGD("%p ~ wake", this);

#endif

#ifdef LOOPER_STATISTICS

// FIXME: Possible race with awoken() but this code is for testing only and is rarely enabled.

if (mPendingWakeCount++ == 0) {

mPendingWakeTime = systemTime(SYSTEM_TIME_MONOTONIC);

}

#endif

ssize_t nWrite;

do {

nWrite = write(mWakeWritePipeFd, "W", 1);

} while (nWrite == -1 && errno == EINTR);

if (nWrite != 1) {

if (errno != EAGAIN) {

LOGW("Could not write wake signal, errno=%d", errno);

}

}

}

是不是很熟悉?基本就是上一讲epoll原型的唤醒函数,向mWakeWritePipeFD写入1字节,唤醒监听block在mWakeReadPipeFD端口的epoll_wait

2 dequeueMessage

首先dequeueMessage只是我取的一个叫法,当java层的Looper进行loop的时候,就已经在不停地读取MessageQueue里的Message了

Looper.java

[java] view
plaincopy

public static void loop() {

final Looper me = myLooper();

if (me == null) {

throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");

}

final MessageQueue queue = me.mQueue;

// Make sure the identity of this thread is that of the local process,

// and keep track of what that identity token actually is.

Binder.clearCallingIdentity();

final long ident = Binder.clearCallingIdentity();

for (;;) {

Message msg = queue.next(); // might block

if (msg == null) {

// No message indicates that the message queue is quitting.

return;

}

// This must be in a local variable, in case a UI event sets the logger

Printer logging = me.mLogging;

if (logging != null) {

logging.println(">>>>> Dispatching to " + msg.target + " " +

msg.callback + ": " + msg.what);

}

msg.target.dispatchMessage(msg);

if (logging != null) {

logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);

}

// Make sure that during the course of dispatching the

// identity of the thread wasn't corrupted.

final long newIdent = Binder.clearCallingIdentity();

if (ident != newIdent) {

Log.wtf(TAG, "Thread identity changed from 0x"

+ Long.toHexString(ident) + " to 0x"

+ Long.toHexString(newIdent) + " while dispatching to "

+ msg.target.getClass().getName() + " "

+ msg.callback + " what=" + msg.what);

}

msg.recycle();

}

}

调用queue.next()读取下一条消息(在loop调用的线程中),如果读取到了就msg,target.dispatchMessage,

下面来看看queue.next()如何实现

MessageQueue.java

[java] view
plaincopy

Message next() {

int pendingIdleHandlerCount = -1; // -1 only during first iteration

int nextPollTimeoutMillis = 0;

for (;;) {

if (nextPollTimeoutMillis != 0) {

Binder.flushPendingCommands();

}

// We can assume mPtr != 0 because the loop is obviously still running.

// The looper will not call this method after the loop quits.

nativePollOnce(mPtr, nextPollTimeoutMillis);

synchronized (this) {

// Try to retrieve the next message. Return if found.

final long now = SystemClock.uptimeMillis();

Message prevMsg = null;

Message msg = mMessages;

if (msg != null && msg.target == null) {

// Stalled by a barrier. Find the next asynchronous message in the queue.

do {

prevMsg = msg;

msg = msg.next;

} while (msg != null && !msg.isAsynchronous());

}

if (msg != null) {

if (now < msg.when) {

// Next message is not ready. Set a timeout to wake up when it is ready.

nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);

} else {

// Got a message.

mBlocked = false;

if (prevMsg != null) {

prevMsg.next = msg.next;

} else {

mMessages = msg.next;

}

msg.next = null;

if (false) Log.v("MessageQueue", "Returning message: " + msg);

msg.markInUse();

return msg;

}

} else {

// No more messages.

nextPollTimeoutMillis = -1;

}

// Process the quit message now that all pending messages have been handled.

if (mQuitting) {

dispose();

return null;

}

// If first time idle, then get the number of idlers to run.

// Idle handles only run if the queue is empty or if the first message

// in the queue (possibly a barrier) is due to be handled in the future.

if (pendingIdleHandlerCount < 0

&& (mMessages == null || now < mMessages.when)) {

pendingIdleHandlerCount = mIdleHandlers.size();

}

if (pendingIdleHandlerCount <= 0) {

// No idle handlers to run. Loop and wait some more.

mBlocked = true;

continue;

}

if (mPendingIdleHandlers == null) {

mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];

}

mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);

}

// Run the idle handlers.

// We only ever reach this code block during the first iteration.

for (int i = 0; i < pendingIdleHandlerCount; i++) {

final IdleHandler idler = mPendingIdleHandlers[i];

mPendingIdleHandlers[i] = null; // release the reference to the handler

boolean keep = false;

try {

keep = idler.queueIdle();

} catch (Throwable t) {

Log.wtf("MessageQueue", "IdleHandler threw exception", t);

}

if (!keep) {

synchronized (this) {

mIdleHandlers.remove(idler);

}

}

}

// Reset the idle handler count to 0 so we do not run them again.

pendingIdleHandlerCount = 0;

// While calling an idle handler, a new message could have been delivered

// so go back and look again for a pending message without waiting.

nextPollTimeoutMillis = 0;

}

}

首先是个包内函数,所以在同一个包中(android.os)的Looper对象能调用到

nativePollOnce(mPtr, nextPollTimeoutMillis);函数待会展开,功能是调用上一讲的epoll_wait,

nextPollTimeoutMillis超时时间为下一条Message的触发时间,如果没有消息则会一直阻塞到超过超时时间

被唤醒后,我们暂时先忽略barrier类型的Message(这是android4.1后加入的一个特性Choreographer,http://blog.csdn.net/innost/article/details/8272867),

如果头结点msg不为null,就判断现在到了这条msg触发时间没有,

如果没到,则nextPollTimeoutMillis设置为这个条消息需要执行的时间和现在的时间差,给for循环下一次调用nativePollOnce时使用

如果到了甚至超过了,则取出这条msg,退出for循环返回这条msg,给上面上的handler进行dispatch

那么nativePollOnce具体是如何实现的呢?

android_os_MessageQueue.cpp

[cpp] view
plaincopy

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,

jint ptr, jint timeoutMillis) {

NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);

nativeMessageQueue->pollOnce(timeoutMillis);

}

调用到了nativeMessageQueue->pollOnce

android_os_MessageQueue.cpp

[cpp] view
plaincopy

void NativeMessageQueue::pollOnce(int timeoutMillis) {

mLooper->pollOnce(timeoutMillis);

}

调用到了mLooper->pollOnce

同样,在framework/base/libs/utils/Looper.cpp中

[cpp] view
plaincopy

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {

int result = 0;

for (;;) {

while (mResponseIndex < mResponses.size()) {

const Response& response = mResponses.itemAt(mResponseIndex++);

if (! response.request.callback) {

#if DEBUG_POLL_AND_WAKE

LOGD("%p ~ pollOnce - returning signalled identifier %d: "

"fd=%d, events=0x%x, data=%p", this,

response.request.ident, response.request.fd,

response.events, response.request.data);

#endif

if (outFd != NULL) *outFd = response.request.fd;

if (outEvents != NULL) *outEvents = response.events;

if (outData != NULL) *outData = response.request.data;

return response.request.ident;

}

}

if (result != 0) {

#if DEBUG_POLL_AND_WAKE

LOGD("%p ~ pollOnce - returning result %d", this, result);

#endif

if (outFd != NULL) *outFd = 0;

if (outEvents != NULL) *outEvents = NULL;

if (outData != NULL) *outData = NULL;

return result;

}

result = pollInner(timeoutMillis);

}

}

因为这个流程和mResponses无关,先忽略这部分,

调用到pollInner

framework/base/libs/utils/Looper.cpp

[cpp] view
plaincopy

nt Looper::pollInner(int timeoutMillis) {

#if DEBUG_POLL_AND_WAKE

LOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);

#endif

int result = ALOOPER_POLL_WAKE;

mResponses.clear();

mResponseIndex = 0;

#ifdef LOOPER_STATISTICS

nsecs_t pollStartTime = systemTime(SYSTEM_TIME_MONOTONIC);

#endif

#ifdef LOOPER_USES_EPOLL

struct epoll_event eventItems[EPOLL_MAX_EVENTS];

int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

bool acquiredLock = false;

#else

// Wait for wakeAndLock() waiters to run then set mPolling to true.

mLock.lock();

while (mWaiters != 0) {

mResume.wait(mLock);

}

mPolling = true;

mLock.unlock();

size_t requestedCount = mRequestedFds.size();

int eventCount = poll(mRequestedFds.editArray(), requestedCount, timeoutMillis);

#endif

if (eventCount < 0) {

if (errno == EINTR) {

goto Done;

}

LOGW("Poll failed with an unexpected error, errno=%d", errno);

result = ALOOPER_POLL_ERROR;

goto Done;

}

if (eventCount == 0) {

#if DEBUG_POLL_AND_WAKE

LOGD("%p ~ pollOnce - timeout", this);

#endif

result = ALOOPER_POLL_TIMEOUT;

goto Done;

}

#if DEBUG_POLL_AND_WAKE

LOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);

#endif

#ifdef LOOPER_USES_EPOLL

for (int i = 0; i < eventCount; i++) {

int fd = eventItems[i].data.fd;

uint32_t epollEvents = eventItems[i].events;

if (fd == mWakeReadPipeFd) {

if (epollEvents & EPOLLIN) {

awoken();

} else {

LOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);

}

} else {

if (! acquiredLock) {

mLock.lock();

acquiredLock = true;

}

ssize_t requestIndex = mRequests.indexOfKey(fd);

if (requestIndex >= 0) {

int events = 0;

if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;

if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;

if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;

if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;

pushResponse(events, mRequests.valueAt(requestIndex));

} else {

LOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "

"no longer registered.", epollEvents, fd);

}

}

}

if (acquiredLock) {

mLock.unlock();

}

Done: ;

#else

for (size_t i = 0; i < requestedCount; i++) {

const struct pollfd& requestedFd = mRequestedFds.itemAt(i);

short pollEvents = requestedFd.revents;

if (pollEvents) {

if (requestedFd.fd == mWakeReadPipeFd) {

if (pollEvents & POLLIN) {

awoken();

} else {

LOGW("Ignoring unexpected poll events 0x%x on wake read pipe.", pollEvents);

}

} else {

int events = 0;

if (pollEvents & POLLIN) events |= ALOOPER_EVENT_INPUT;

if (pollEvents & POLLOUT) events |= ALOOPER_EVENT_OUTPUT;

if (pollEvents & POLLERR) events |= ALOOPER_EVENT_ERROR;

if (pollEvents & POLLHUP) events |= ALOOPER_EVENT_HANGUP;

if (pollEvents & POLLNVAL) events |= ALOOPER_EVENT_INVALID;

pushResponse(events, mRequests.itemAt(i));

}

if (--eventCount == 0) {

break;

}

}

}

Done:

// Set mPolling to false and wake up the wakeAndLock() waiters.

mLock.lock();

mPolling = false;

if (mWaiters != 0) {

mAwake.broadcast();

}

mLock.unlock();

#endif

#ifdef LOOPER_STATISTICS

nsecs_t pollEndTime = systemTime(SYSTEM_TIME_MONOTONIC);

mSampledPolls += 1;

if (timeoutMillis == 0) {

mSampledZeroPollCount += 1;

mSampledZeroPollLatencySum += pollEndTime - pollStartTime;

} else if (timeoutMillis > 0 && result == ALOOPER_POLL_TIMEOUT) {

mSampledTimeoutPollCount += 1;

mSampledTimeoutPollLatencySum += pollEndTime - pollStartTime

- milliseconds_to_nanoseconds(timeoutMillis);

}

if (mSampledPolls == SAMPLED_POLLS_TO_AGGREGATE) {

LOGD("%p ~ poll latency statistics: %0.3fms zero timeout, %0.3fms non-zero timeout", this,

0.000001f * float(mSampledZeroPollLatencySum) / mSampledZeroPollCount,

0.000001f * float(mSampledTimeoutPollLatencySum) / mSampledTimeoutPollCount);

mSampledPolls = 0;

mSampledZeroPollCount = 0;

mSampledZeroPollLatencySum = 0;

mSampledTimeoutPollCount = 0;

mSampledTimeoutPollLatencySum = 0;

}

#endif

for (size_t i = 0; i < mResponses.size(); i++) {

const Response& response = mResponses.itemAt(i);

if (response.request.callback) {

#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS

LOGD("%p ~ pollOnce - invoking callback: fd=%d, events=0x%x, data=%p", this,

response.request.fd, response.events, response.request.data);

#endif

int callbackResult = response.request.callback(

response.request.fd, response.events, response.request.data);

if (callbackResult == 0) {

removeFd(response.request.fd);

}

result = ALOOPER_POLL_CALLBACK;

}

}

return result;

}

主要看#ifdef LOOPER_USES_EPOLL部分

int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

等待所有attach到mEpollFd上的事件,如果收到唤醒信号继续执行,否则阻塞等待

之后的#ifdef LOOPER_USES_EPOLL部分

[cpp] view
plaincopy

#ifdef LOOPER_USES_EPOLL

for (int i = 0; i < eventCount; i++) {

int fd = eventItems[i].data.fd;

uint32_t epollEvents = eventItems[i].events;

if (fd == mWakeReadPipeFd) {

if (epollEvents & EPOLLIN) {

awoken();

} else {

LOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);

}

} else {

if (! acquiredLock) {

mLock.lock();

acquiredLock = true;

}

ssize_t requestIndex = mRequests.indexOfKey(fd);

if (requestIndex >= 0) {

int events = 0;

if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;

if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;

if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;

if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;

pushResponse(events, mRequests.valueAt(requestIndex));

} else {

LOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "

"no longer registered.", epollEvents, fd);

}

}

}

if (acquiredLock) {

mLock.unlock();

}

Done: ;

对所有attach在mEpollFd上的事件进行遍历,如果对象文件描述符有mWakeReadPipeFd,则awoken()

framework/base/libs/utils/Looper.cpp

[cpp] view
plaincopy

void Looper::awoken() {

#if DEBUG_POLL_AND_WAKE

LOGD("%p ~ awoken", this);

#endif

#ifdef LOOPER_STATISTICS

if (mPendingWakeCount == 0) {

LOGD("%p ~ awoken: spurious!", this);

} else {

mSampledWakeCycles += 1;

mSampledWakeCountSum += mPendingWakeCount;

mSampledWakeLatencySum += systemTime(SYSTEM_TIME_MONOTONIC) - mPendingWakeTime;

mPendingWakeCount = 0;

mPendingWakeTime = -1;

if (mSampledWakeCycles == SAMPLED_WAKE_CYCLES_TO_AGGREGATE) {

LOGD("%p ~ wake statistics: %0.3fms wake latency, %0.3f wakes per cycle", this,

0.000001f * float(mSampledWakeLatencySum) / mSampledWakeCycles,

float(mSampledWakeCountSum) / mSampledWakeCycles);

mSampledWakeCycles = 0;

mSampledWakeCountSum = 0;

mSampledWakeLatencySum = 0;

}

}

#endif

char buffer[16];

ssize_t nRead;

do {

nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));

} while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));

}

awoken()即上一讲中得awoken()函数,用于把mWakeReadPipeFd上的数据读取干净,因为mWakeWriteReadPipeFd可能写入多次

读取干净后下一次epoll_wait时就会等待mWakeWriteReadPipeFd写入,如果没有读取干净,即通知epoll内核和mWakeReadPipeFd这个事件相关的处理完毕了,

否则epoll_wait就一直会触发对应的事件了(不等待新的写入,一直不阻塞)

3 总结

那么至此,enqueueMessage和定义dequeueMessage都解释清楚,感觉豁然开朗了有木有!!!!

下一讲讲nativeapp的线程消息循环处理过程(主要解读android_native_app_glue.c)

欢迎各位指正!!

4 reference

android sdk sourcecode

android framework sourcecode
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: