Android消息机制不完全解析(下) .
2016-03-04 19:29
561 查看
转自:http://blog.csdn.net/a220315410/article/details/10444171
首先,看看在/frameworks/base/core/jni/android_os_MessageQueue.cpp文件中看看android.os.MessageQueue类中的四个原生函数的实现:
[java]
view plain
copy
print?
static void android_os_MessageQueue_nativeInit(JNIEnv* env, jobject obj) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();//构造NativeMessageQueue实例
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return;
}
nativeMessageQueue->incStrong(env);//强引用+1
android_os_MessageQueue_setNativeMessageQueue(env, obj, nativeMessageQueue);
}
static void android_os_MessageQueue_nativeDestroy(JNIEnv* env, jobject obj) {
NativeMessageQueue* nativeMessageQueue =
android_os_MessageQueue_getNativeMessageQueue(env, obj);
if (nativeMessageQueue) {
android_os_MessageQueue_setNativeMessageQueue(env, obj, NULL);
nativeMessageQueue->decStrong(env);//强引用-1,实际上会导致释放NativeMessageQueue实例
}
}
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jint ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);//指针强制转换
nativeMessageQueue->pollOnce(env, timeoutMillis);//调用nativeMessageQueue的pollonce函数
}
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jobject obj, jint ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
return nativeMessageQueue->wake();//调用nativeMessageQueue的wake函数
}
从代码中,可以看到这四个函数的实现都是依赖于NativeMessageQueue类。不过,在开始解析NativeMessageQueue之前,我们再看一些有意思的代码:
[cpp]
view plain
copy
print?
static void android_os_MessageQueue_setNativeMessageQueue(JNIEnv* env, jobject messageQueueObj,
NativeMessageQueue* nativeMessageQueue) {
env->SetIntField(messageQueueObj, gMessageQueueClassInfo.mPtr,
reinterpret_cast<jint>(nativeMessageQueue));//把nativeMessageQueue的实例地址强转为java的int类型并保存到gMessageQueueClassInfo.mPtr中
}
那么gMessageQueueClassInfo.mPtr是什么呢?
[cpp]
view plain
copy
print?
static JNINativeMethod gMessageQueueMethods[] = {
/* name, signature, funcPtr */
{ "nativeInit", "()V", (void*)android_os_MessageQueue_nativeInit },
{ "nativeDestroy", "()V", (void*)android_os_MessageQueue_nativeDestroy },
{ "nativePollOnce", "(II)V", (void*)android_os_MessageQueue_nativePollOnce },
{ "nativeWake", "(I)V", (void*)android_os_MessageQueue_nativeWake }
};
#define FIND_CLASS(var, className) \
var = env->FindClass(className); \
LOG_FATAL_IF(! var, "Unable to find class " className);
#define GET_FIELD_ID(var, clazz, fieldName, fieldDescriptor) \
var = env->GetFieldID(clazz, fieldName, fieldDescriptor); \
LOG_FATAL_IF(! var, "Unable to find field " fieldName);
//这个函数在Android启动的时候,会被系统调用
int register_android_os_MessageQueue(JNIEnv* env) {
int res = jniRegisterNativeMethods(env, "android/os/MessageQueue",
gMessageQueueMethods, NELEM(gMessageQueueMethods));//关联MessageQueueQueue的原生函数
LOG_FATAL_IF(res < 0, "Unable to register native methods.");
jclass clazz;
FIND_CLASS(clazz, "android/os/MessageQueue");//获取MessageQueue的class
GET_FIELD_ID(gMessageQueueClassInfo.mPtr, clazz,
"mPtr", "I");//获取MessageQueue class的mPtr field的Id
return 0;
}
[java]
view plain
copy
print?
Class cls = Class.forName("android.os.MessageQueue");
Field feild = cls.getField("mPtr");
小结:
android_os_MessageQueue_nativeInit和android_os_MessageQueue_nativeDestory两个函数做了些什么:
android_os_MessageQueue_nativeInit:构造NativeMessageQueue实例
android_os_MessageQueue_nativeDestory:销毁NativeMessageQeue实例
[cpp]
view plain
copy
print?
class NativeMessageQueue : public MessageQueue {
public:
NativeMessageQueue();
virtual ~NativeMessageQueue();
virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj);
void pollOnce(JNIEnv* env, int timeoutMillis);
void wake();
private:
bool mInCallback;
jthrowable mExceptionObj;
};
[cpp]
view plain
copy
print?
class MessageQueue {
......
protected:
sp<Looper> mLooper;
};
继续看NativeMessageQueue的代码:
[cpp]
view plain
copy
print?
NativeMessageQueue::NativeMessageQueue() : mInCallback(false), mExceptionObj(NULL) {
mLooper = Looper::getForThread();
if (mLooper == NULL) {
mLooper = new Looper(false);//实例化mLooper
Looper::setForThread(mLooper);
}
}
[cpp]
view plain
copy
print?
void NativeMessageQueue::pollOnce(JNIEnv* env, int timeoutMillis) {
mInCallback = true;
mLooper->pollOnce(timeoutMillis);
mInCallback = false;
if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}
void NativeMessageQueue::wake() {
mLooper->wake();
}
小结:
NativeMessageQueue的函数pollonce和wake实现相当简单,交给mLooper的同名函数。
[cpp]
view plain
copy
print?
class Looper : public ALooper, public RefBase {
protected:
virtual ~Looper();
public:
Looper(bool allowNonCallbacks);
bool getAllowNonCallbacks() const;
int pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData);
inline int pollOnce(int timeoutMillis) {
return pollOnce(timeoutMillis, NULL, NULL, NULL);
}
void wake();
private:
const bool mAllowNonCallbacks; // immutable
int mWakeReadPipeFd; // immutable
int mWakeWritePipeFd; // immutable
Mutex mLock;
int mEpollFd; // immutable
int pollInner(int timeoutMillis);
void awoken();
};
还是从构造函数开始(frameworks/native/utils/Looper.cpp):
[cpp]
view plain
copy
print?
Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
int wakeFds[2];
int result = pipe(wakeFds);//创建命名管道
LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno);
// 保存命名管道
mWakeReadPipeFd = wakeFds[0];
mWakeWritePipeFd = wakeFds[1];
result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);//设置为非阻塞模式
LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking. errno=%d",
errno);
result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);//设置为非阻塞模式
LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking. errno=%d",
errno);
// 开始使用epoll API,实现轮询
// Allocate the epoll instance and register the wake pipe.
mEpollFd = epoll_create(EPOLL_SIZE_HINT);//创建epoll文件描述符
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno);
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeReadPipeFd;
result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem); // 把刚才创建的命名管道的读端加入的到epoll的监听队列中
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance. errno=%d",
errno);
}
Looper::~Looper() {
close(mWakeReadPipeFd);//释放命名管道
close(mWakeWritePipeFd);
close(mEpollFd);//释放epoll文件描述符
}
[cpp]
view plain
copy
print?
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
// 这段代码暂时无视
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {//ident > 0, 即此response为noncallback,需要返回event,data等数据给调用者处理
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
"fd=%d, events=0x%x, data=%p",
this, ident, fd, events, data);
#endif
if (outFd != NULL) *outFd = fd;
if (outEvents != NULL) *outEvents = events;
if (outData != NULL) *outData = data;
return ident;
}
}
if (result != 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
if (outFd != NULL) *outFd = 0;
if (outEvents != NULL) *outEvents = 0;
if (outData != NULL) *outData = NULL;
return result;
}
result = pollInner(timeoutMillis);//这一行才是重点!
}
}
[cpp]
view plain
copy
print?
int Looper::pollInner(int timeoutMillis) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
#endif
// 设置timeoutMillis的值为Math.min(timeoutMills, mNextMessageUptime)
// Adjust the timeout based on when the next message is due.
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - next message in %lldns, adjusted timeout: timeoutMillis=%d",
this, mNextMessageUptime - now, timeoutMillis);
#endif
}
// Poll.
int result = ALOOPER_POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
// 开始轮询
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// Acquire lock.
mLock.lock();
// Check for poll error.
if (eventCount < 0) { //处理error
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error, errno=%d", errno);
result = ALOOPER_POLL_ERROR;
goto Done;
}
// Check for poll timeout.
if (eventCount == 0) { // 未能等到event,故timeout
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - timeout", this);
#endif
result = ALOOPER_POLL_TIMEOUT;
goto Done;
}
// Handle all events.
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif
for (int i = 0; i < eventCount; i++) {//有event,则处理
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeReadPipeFd) {
if (epollEvents & EPOLLIN) {
awoken();//读取命名管道内的数据
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));//把event添加到<SPAN style="FONT-FAMILY: Arial, Helvetica, sans-serif">mResponses中,等待后续处理</SPAN>
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
Done: ;
// 处理C++层的Message
// Invoke pending message callbacks.
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
// Remove the envelope from the list.
// We keep a strong reference to the handler until the call to handleMessage
// finishes. Then we drop it so that the handler can be deleted *before*
// we reacquire our lock.
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
this, handler.get(), message.what);
#endif
handler->handleMessage(message);//调用handler-><SPAN style="FONT-FAMILY: Arial, Helvetica, sans-serif">handleMessage</SPAN>
} // release handler
mLock.lock();
mSendingMessage = false;
result = ALOOPER_POLL_CALLBACK;
} else {
// The last message left at the head of the queue determines the next wakeup time.
mNextMessageUptime = messageEnvelope.uptime;//更新<SPAN style="FONT-FAMILY: Arial, Helvetica, sans-serif">mNextMessageUptime</SPAN>
break;
}
}
// Release lock.
mLock.unlock();
// 处理<SPAN style="FONT-FAMILY: Arial, Helvetica, sans-serif">mResponses</SPAN>
// Invoke all response callbacks.
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == ALOOPER_POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
this, response.request.callback.get(), fd, events, data);
#endif
int callbackResult = response.request.callback->handleEvent(fd, events, data);//调用callback->handleEvent
if (callbackResult == 0) {
removeFd(fd);
}
// Clear the callback reference in the response structure promptly because we
// will not clear the response vector itself until the next poll.
response.request.callback.clear();
result = ALOOPER_POLL_CALLBACK;
}
}
return result;
}
代码比较长,所以分段分析:
[cpp]
view plain
copy
print?
// 设置timeoutMillis的值为Math.min(timeoutMillis, mNextMessageUptime)
// Adjust the timeout based on when the next message is due.
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - next message in %lldns, adjusted timeout: timeoutMillis=%d",
this, mNextMessageUptime - now, timeoutMillis);
if
}
为了解析这段代码,需要先补充一些C++层的Message相关的代码:
[cpp]
view plain
copy
print?
struct Message {
Message() : what(0) { }
Message(int what) : what(what) { }
/* The message type. (interpretation is left up to the handler) */
int what;
};
class MessageHandler : public virtual RefBase {
protected:
virtual ~MessageHandler() { }
public:
/**
* Handles a message.
*/
virtual void handleMessage(const Message& message) = 0;
}
struct MessageEnvelope {
MessageEnvelope() : uptime(0) { }
MessageEnvelope(nsecs_t uptime, const sp<MessageHandler> handler,const Message& message) : uptime(uptime), handler(handler), message(message) {}
nsecs_t uptime;
MessageHandler> handler;
Message message;
};
void Looper::sendMessage(const sp<MessageHandler>& handler, const Message& message) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
sendMessageAtTime(now, handler, message);
}
void Looper::sendMessageDelayed(nsecs_t uptimeDelay, const sp<MessageHandler>& handler,
const Message& message) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
sendMessageAtTime(now + uptimeDelay, handler, message);
}
void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
const Message& message) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ sendMessageAtTime - uptime=%lld, handler=%p, what=%d",
this, uptime, handler.get(), message.what);
#endif
size_t i = 0;
{ // acquire lock
AutoMutex _l(mLock);
size_t messageCount = mMessageEnvelopes.size();
while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
i += 1;
}
MessageEnvelope messageEnvelope(uptime, handler, message);
mMessageEnvelopes.insertAt(messageEnvelope, i, 1);
// Optimization: If the Looper is currently sending a message, then we can skip
// the call to wake() because the next thing the Looper will do after processing
// messages is to decide when the next wakeup time should be. In fact, it does
// not even matter whether this code is running on the Looper thread.
if (mSendingMessage) {
return;
}
} // release lock
// Wake the poll loop only when we enqueue a new message at the head.
if (i == 0) {
wake();
}
}
void Looper::removeMessages(const sp<MessageHandler>& handler) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ removeMessages - handler=%p", this, handler.get());
#endif
{ // acquire lock
AutoMutex _l(mLock);
for (size_t i = mMessageEnvelopes.size(); i != 0; ) {
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i);
if (messageEnvelope.handler == handler) {
mMessageEnvelopes.removeAt(i);
}
}
} // release lock
}
void Looper::removeMessages(const sp<MessageHandler>& handler, int what) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ removeMessages - handler=%p, what=%d", this, handler.get(), what);
#endif
{ // acquire lock
AutoMutex _l(mLock);
for (size_t i = mMessageEnvelopes.size(); i != 0; ) {
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i);
if (messageEnvelope.handler == handler
&& messageEnvelope.message.what == what) {
mMessageEnvelopes.removeAt(i);
}
}
} // release lock
}
现在回到Looper::pollonceh函数,我们就应该能够理解,pollOnce函数到timeOutMillis参数仅仅代表了Java层下一个Message的触发延迟,所以,我们还需要考虑C++层下一个Message的触发延迟,所以,代码设置timeoutMillis为timeoutMillis和mNextMessageUpTime中的较小值。
继续下一段代码:
[cpp]
view plain
copy
print?
int result = ALOOPER_POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
// 开始轮询
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// Acquire lock.
mLock.lock();
1. 失败的处理:
[cpp]
view plain
copy
print?
// Check for poll error.
if (eventCount < 0) { //处理error
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error, errno=%d", errno);
result = ALOOPER_POLL_ERROR;
goto Done;
}
[cpp]
view plain
copy
print?
// Check for poll timeout.
if (eventCount == 0) { // 未能等到event,故timeout
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - timeout", this);
#endif
result = ALOOPER_POLL_TIMEOUT;
goto Done;
}
[cpp]
view plain
copy
print?
// Handle all events.
DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
if
for (int i = 0; i < eventCount; i++) {//有event,则处理
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeReadPipeFd) {//说明java层或者C++层有新的Message
if (epollEvents & EPOLLIN) {
awoken();//读取命名管道内的数据
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));//把event添加到<SPAN style="FONT-FAMILY: Arial, Helvetica, sans-serif">mResponses中,等待后续处理</SPAN>
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
处理event的时候,需要分为两类:fd==mWakeReadPipeFd和fd!=mWakeReadPipeFd
fd==mWakeReadPipeFd:说明C++层,或者java层有新的Message出现,需要处理。这种情况下,只需要读mWakeReadPipeFd内的数据即可
[cpp]
view plain
copy
print?
void Looper::awoken() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ awoken", this);
#endif
char buffer[16];
ssize_t nRead;
do {
nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
} while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
}
[cpp]
view plain
copy
print?
struct Request {
int fd;
int ident;
sp<LooperCallback> callback;
void* data;
};
struct Response {
int events;
Request request;
};
[cpp]
view plain
copy
print?
/**
* A looper callback.
*/
class LooperCallback : public virtual RefBase {
protected:
virtual ~LooperCallback() { }
public:
/**
* Handles a poll event for the given file descriptor.
* It is given the file descriptor it is associated with,
* a bitmask of the poll events that were triggered (typically ALOOPER_EVENT_INPUT),
* and the data pointer that was originally supplied.
*
* Implementations should return 1 to continue receiving callbacks, or 0
* to have this file descriptor and callback unregistered from the looper.
*/
virtual int handleEvent(int fd, int events, void* data) = 0;
};
[cpp]
view plain
copy
print?
int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,
events, callback.get(), data);
#endif
if (!callback.get()) {
if (! mAllowNonCallbacks) {
ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
return -1;
}
if (ident < 0) {//仅当Looper支持NonCallbacks,并且ident大于0时,允许添加callback为null的Fd
ALOGE("Invalid attempt to set NULL callback with ident < 0.");
return -1;
}
} else {
ident = ALOOPER_POLL_CALLBACK;
}
int epollEvents = 0;
if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN;
if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT;
{ // acquire lock
AutoMutex _l(mLock);
Request request;
request.fd = fd;
request.ident = ident;
request.callback = callback;
request.data = data;
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = epollEvents;
eventItem.data.fd = fd;
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex < 0) {
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d, errno=%d", fd, errno);
return -1;
}
mRequests.add(fd, request);
} else {//存在则替换
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);
if (epollResult < 0) {
ALOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno);
return -1;
}
mRequests.replaceValueAt(requestIndex, request);
}
} // release lock
return 1;
}
从上面的代码,我们可知,Looper还支持添加Fd和自定义的callback,类似java层的Message.callback。
通过addFd函数,可以向Looper的mEpollFd添加指定的Fd,当Fd触发指定的event .e.i EPOLLIN or EPOLLOUT时,指定的相应的自定义callback就会得到执行。
另外,当Looper支持noncallback时,还可以向Looper添加callback为null的Fd,因为没有callback,所以Fd添加者需要调用int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) 通过outFd、outEvent、outData参数获取数据并进行处理。
所以当Fd触发消息时,需要生成对应到reponse并添加到meResponses中,等待后续的处理。
[cpp]
view plain
copy
print?
void Looper::pushResponse(int events, const Request& request) {
Response response;
response.events = events;
response.request = request;
mResponses.push(response);
}
到这里为止,epoll_wait函数返回的三种结果的不同处理已经解析完毕,接下来代码进入共同的Done环节。
处理C++层的Message:
[cpp]
view plain
copy
print?
Done: ;
// 处理C++层的Message
// Invoke pending message callbacks.
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
// Remove the envelope from the list.
// We keep a strong reference to the handler until the call to handleMessage
// finishes. Then we drop it so that the handler can be deleted *before*
// we reacquire our lock.
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
this, handler.get(), message.what);
#endif
handler->handleMessage(message);//调用handler-><SPAN style="FONT-FAMILY: Arial, Helvetica, sans-serif">handleMessage</SPAN>
} // release handler
mLock.lock();
mSendingMessage = false;
result = ALOOPER_POLL_CALLBACK;
} else {
// The last message left at the head of the queue determines the next wakeup time.
mNextMessageUptime = messageEnvelope.uptime;//更新<SPAN style="FONT-FAMILY: Arial, Helvetica, sans-serif">mNextMessageUptime</SPAN>
break;
}
}
处理Response:
[cpp]
view plain
copy
print?
// 处理<SPAN style="FONT-FAMILY: Arial, Helvetica, sans-serif">mResponses</SPAN>
// Invoke all response callbacks.
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == ALOOPER_POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
this, response.request.callback.get(), fd, events, data);
#endif
int callbackResult = response.request.callback->handleEvent(fd, events, data);//调用callback->handleEvent
if (callbackResult == 0) {
removeFd(fd);
}
// Clear the callback reference in the response structure promptly because we
// will not clear the response vector itself until the next poll.
response.request.callback.clear();
result = ALOOPER_POLL_CALLBACK;
}
}
搞懂了上面的代码,我们就很容易明白wake函数做了些什么:
[cpp]
view plain
copy
print?
void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif
ssize_t nWrite;
do {
nWrite = write(mWakeWritePipeFd, "W", 1);//向<SPAN style="FONT-FAMILY: Arial, Helvetica, sans-serif; FONT-SIZE: 12px">mWakeWritePipeFd</SPAN><SPAN style="FONT-FAMILY: Arial, Helvetica, sans-serif; FONT-SIZE: 12px">中写入一个字符,所以mWakeReadPipeFd就会触发</SPAN><SPAN style="FONT-FAMILY: Arial, Helvetica, sans-serif; FONT-SIZE: 12px">EPOLLIN event</SPAN><SPAN style="FONT-FAMILY: Arial, Helvetica, sans-serif; FONT-SIZE: 12px">
</SPAN> } while (nWrite == -1 && errno == EINTR);
if (nWrite != 1) {
if (errno != EAGAIN) {
ALOGW("Could not write wake signal, errno=%d", errno);
}
}
}
Looper通过epoll函数组实现了一个可以支持随时唤醒的阻塞机制
Looper支持两种不同的方式处理消息:Message + MessageHandler 和 LooperCallback。
Looper的阻塞在如下四种条件下会被唤醒:
发生错误
等待超时
出现需要处理的新Message(包括C++层和Java层)
由addFd函数添加的Fd触发event
在哪个线程调用JAVA层的Looper.loop(),Mesage和callback(包括Java层和C++层)就在哪个线程被处理,上图为Looper.loop函数的时序图。
C++层的NativeMesasgeQueue不应该是Java层的MesageQueue的内部实现,而更接近于“栾生兄弟”的关系。MessageQueue负责处理java层上到消息,NativeMessageQueue负责处理C++层上的消息。其中Java层是在android.os.Looper.looper函数中调用android.os.Handler.dispatchMessage处理,而C++层是在android::Looper::pollInner函数中调用android::MessageHandler::handleMessage
& android:LooperCallback::handleEvent函数处理。
NativeMessageQueue利用Looper类实现了一个基于epoll函数和文件描述符(Fd)的可唤醒的阻塞机制。
首先,看看在/frameworks/base/core/jni/android_os_MessageQueue.cpp文件中看看android.os.MessageQueue类中的四个原生函数的实现:
[java]
view plain
copy
print?
static void android_os_MessageQueue_nativeInit(JNIEnv* env, jobject obj) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();//构造NativeMessageQueue实例
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return;
}
nativeMessageQueue->incStrong(env);//强引用+1
android_os_MessageQueue_setNativeMessageQueue(env, obj, nativeMessageQueue);
}
static void android_os_MessageQueue_nativeDestroy(JNIEnv* env, jobject obj) {
NativeMessageQueue* nativeMessageQueue =
android_os_MessageQueue_getNativeMessageQueue(env, obj);
if (nativeMessageQueue) {
android_os_MessageQueue_setNativeMessageQueue(env, obj, NULL);
nativeMessageQueue->decStrong(env);//强引用-1,实际上会导致释放NativeMessageQueue实例
}
}
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jint ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);//指针强制转换
nativeMessageQueue->pollOnce(env, timeoutMillis);//调用nativeMessageQueue的pollonce函数
}
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jobject obj, jint ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
return nativeMessageQueue->wake();//调用nativeMessageQueue的wake函数
}
static void android_os_MessageQueue_nativeInit(JNIEnv* env, jobject obj) { NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();//构造NativeMessageQueue实例 if (!nativeMessageQueue) { jniThrowRuntimeException(env, "Unable to allocate native queue"); return; } nativeMessageQueue->incStrong(env);//强引用+1 android_os_MessageQueue_setNativeMessageQueue(env, obj, nativeMessageQueue); } static void android_os_MessageQueue_nativeDestroy(JNIEnv* env, jobject obj) { NativeMessageQueue* nativeMessageQueue = android_os_MessageQueue_getNativeMessageQueue(env, obj); if (nativeMessageQueue) { android_os_MessageQueue_setNativeMessageQueue(env, obj, NULL); nativeMessageQueue->decStrong(env);//强引用-1,实际上会导致释放NativeMessageQueue实例 } } static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jint ptr, jint timeoutMillis) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);//指针强制转换 nativeMessageQueue->pollOnce(env, timeoutMillis);//调用nativeMessageQueue的pollonce函数 } static void android_os_MessageQueue_nativeWake(JNIEnv* env, jobject obj, jint ptr) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); return nativeMessageQueue->wake();//调用nativeMessageQueue的wake函数 }
从代码中,可以看到这四个函数的实现都是依赖于NativeMessageQueue类。不过,在开始解析NativeMessageQueue之前,我们再看一些有意思的代码:
[cpp]
view plain
copy
print?
static void android_os_MessageQueue_setNativeMessageQueue(JNIEnv* env, jobject messageQueueObj,
NativeMessageQueue* nativeMessageQueue) {
env->SetIntField(messageQueueObj, gMessageQueueClassInfo.mPtr,
reinterpret_cast<jint>(nativeMessageQueue));//把nativeMessageQueue的实例地址强转为java的int类型并保存到gMessageQueueClassInfo.mPtr中
}
static void android_os_MessageQueue_setNativeMessageQueue(JNIEnv* env, jobject messageQueueObj, NativeMessageQueue* nativeMessageQueue) { env->SetIntField(messageQueueObj, gMessageQueueClassInfo.mPtr, reinterpret_cast<jint>(nativeMessageQueue));//把nativeMessageQueue的实例地址强转为java的int类型并保存到gMessageQueueClassInfo.mPtr中 }
那么gMessageQueueClassInfo.mPtr是什么呢?
[cpp]
view plain
copy
print?
static JNINativeMethod gMessageQueueMethods[] = {
/* name, signature, funcPtr */
{ "nativeInit", "()V", (void*)android_os_MessageQueue_nativeInit },
{ "nativeDestroy", "()V", (void*)android_os_MessageQueue_nativeDestroy },
{ "nativePollOnce", "(II)V", (void*)android_os_MessageQueue_nativePollOnce },
{ "nativeWake", "(I)V", (void*)android_os_MessageQueue_nativeWake }
};
#define FIND_CLASS(var, className) \
var = env->FindClass(className); \
LOG_FATAL_IF(! var, "Unable to find class " className);
#define GET_FIELD_ID(var, clazz, fieldName, fieldDescriptor) \
var = env->GetFieldID(clazz, fieldName, fieldDescriptor); \
LOG_FATAL_IF(! var, "Unable to find field " fieldName);
//这个函数在Android启动的时候,会被系统调用
int register_android_os_MessageQueue(JNIEnv* env) {
int res = jniRegisterNativeMethods(env, "android/os/MessageQueue",
gMessageQueueMethods, NELEM(gMessageQueueMethods));//关联MessageQueueQueue的原生函数
LOG_FATAL_IF(res < 0, "Unable to register native methods.");
jclass clazz;
FIND_CLASS(clazz, "android/os/MessageQueue");//获取MessageQueue的class
GET_FIELD_ID(gMessageQueueClassInfo.mPtr, clazz,
"mPtr", "I");//获取MessageQueue class的mPtr field的Id
return 0;
}
static JNINativeMethod gMessageQueueMethods[] = { /* name, signature, funcPtr */ { "nativeInit", "()V", (void*)android_os_MessageQueue_nativeInit }, { "nativeDestroy", "()V", (void*)android_os_MessageQueue_nativeDestroy }, { "nativePollOnce", "(II)V", (void*)android_os_MessageQueue_nativePollOnce }, { "nativeWake", "(I)V", (void*)android_os_MessageQueue_nativeWake } }; #define FIND_CLASS(var, className) \ var = env->FindClass(className); \ LOG_FATAL_IF(! var, "Unable to find class " className); #define GET_FIELD_ID(var, clazz, fieldName, fieldDescriptor) \ var = env->GetFieldID(clazz, fieldName, fieldDescriptor); \ LOG_FATAL_IF(! var, "Unable to find field " fieldName); //这个函数在Android启动的时候,会被系统调用 int register_android_os_MessageQueue(JNIEnv* env) { int res = jniRegisterNativeMethods(env, "android/os/MessageQueue", gMessageQueueMethods, NELEM(gMessageQueueMethods));//关联MessageQueueQueue的原生函数 LOG_FATAL_IF(res < 0, "Unable to register native methods."); jclass clazz; FIND_CLASS(clazz, "android/os/MessageQueue");//获取MessageQueue的class GET_FIELD_ID(gMessageQueueClassInfo.mPtr, clazz, "mPtr", "I");//获取MessageQueue class的mPtr field的Id return 0; }上面的代码很像java的反射有木有?
[java]
view plain
copy
print?
Class cls = Class.forName("android.os.MessageQueue");
Field feild = cls.getField("mPtr");
Class cls = Class.forName("android.os.MessageQueue"); Field feild = cls.getField("mPtr");到这里,我们就明白了android_os_MessageQueue_setNativeMessageQueue函数实际上把android.os.MessageQueue实例的mPtr值设置为nativeMessageQueue实例的地址。虽然Java语言没有指针的说法,但是,这里的mPtr却的的确确是作为一个指针使用的。现在,我们也就理解了,为什么mPtr可以被强制转换为nativeMessageQueue了。
小结:
android_os_MessageQueue_nativeInit和android_os_MessageQueue_nativeDestory两个函数做了些什么:
android_os_MessageQueue_nativeInit:构造NativeMessageQueue实例
android_os_MessageQueue_nativeDestory:销毁NativeMessageQeue实例
NativeMessageQueue
先来看看NativeMessageQueue的声明:[cpp]
view plain
copy
print?
class NativeMessageQueue : public MessageQueue {
public:
NativeMessageQueue();
virtual ~NativeMessageQueue();
virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj);
void pollOnce(JNIEnv* env, int timeoutMillis);
void wake();
private:
bool mInCallback;
jthrowable mExceptionObj;
};
class NativeMessageQueue : public MessageQueue { public: NativeMessageQueue(); virtual ~NativeMessageQueue(); virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj); void pollOnce(JNIEnv* env, int timeoutMillis); void wake(); private: bool mInCallback; jthrowable mExceptionObj; };NativeMessageQueue继承自MessageQueue(不是java中的android.os.MessageQueue哦),关于MessageQueue我们只需要了解,它包含了一个成员mLooper即可(有兴趣的同学可以查看/frameworks/base/core/jni/android_os_MessageQueue.h)
[cpp]
view plain
copy
print?
class MessageQueue {
......
protected:
sp<Looper> mLooper;
};
class MessageQueue { ...... protected: sp<Looper> mLooper; };
继续看NativeMessageQueue的代码:
[cpp]
view plain
copy
print?
NativeMessageQueue::NativeMessageQueue() : mInCallback(false), mExceptionObj(NULL) {
mLooper = Looper::getForThread();
if (mLooper == NULL) {
mLooper = new Looper(false);//实例化mLooper
Looper::setForThread(mLooper);
}
}
NativeMessageQueue::NativeMessageQueue() : mInCallback(false), mExceptionObj(NULL) { mLooper = Looper::getForThread(); if (mLooper == NULL) { mLooper = new Looper(false);//实例化mLooper Looper::setForThread(mLooper); } }NativeMessageQueue构造实例的时候,会实例化mLooper。
[cpp]
view plain
copy
print?
void NativeMessageQueue::pollOnce(JNIEnv* env, int timeoutMillis) {
mInCallback = true;
mLooper->pollOnce(timeoutMillis);
mInCallback = false;
if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}
void NativeMessageQueue::wake() {
mLooper->wake();
}
void NativeMessageQueue::pollOnce(JNIEnv* env, int timeoutMillis) { mInCallback = true; mLooper->pollOnce(timeoutMillis); mInCallback = false; if (mExceptionObj) { env->Throw(mExceptionObj); env->DeleteLocalRef(mExceptionObj); mExceptionObj = NULL; } } void NativeMessageQueue::wake() { mLooper->wake(); }
小结:
NativeMessageQueue的函数pollonce和wake实现相当简单,交给mLooper的同名函数。
Looper
先来看看Looper的声明/frameworks/native/include/utils/Looper.h:[cpp]
view plain
copy
print?
class Looper : public ALooper, public RefBase {
protected:
virtual ~Looper();
public:
Looper(bool allowNonCallbacks);
bool getAllowNonCallbacks() const;
int pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData);
inline int pollOnce(int timeoutMillis) {
return pollOnce(timeoutMillis, NULL, NULL, NULL);
}
void wake();
private:
const bool mAllowNonCallbacks; // immutable
int mWakeReadPipeFd; // immutable
int mWakeWritePipeFd; // immutable
Mutex mLock;
int mEpollFd; // immutable
int pollInner(int timeoutMillis);
void awoken();
};
class Looper : public ALooper, public RefBase { protected: virtual ~Looper(); public: Looper(bool allowNonCallbacks); bool getAllowNonCallbacks() const; int pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData); inline int pollOnce(int timeoutMillis) { return pollOnce(timeoutMillis, NULL, NULL, NULL); } void wake(); private: const bool mAllowNonCallbacks; // immutable int mWakeReadPipeFd; // immutable int mWakeWritePipeFd; // immutable Mutex mLock; int mEpollFd; // immutable int pollInner(int timeoutMillis); void awoken(); };因为代码有点多,所以上面的声明,已经被我精简了大部分,现在我们只关注我们关心的:pollonce和wake函数。
还是从构造函数开始(frameworks/native/utils/Looper.cpp):
[cpp]
view plain
copy
print?
Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
int wakeFds[2];
int result = pipe(wakeFds);//创建命名管道
LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno);
// 保存命名管道
mWakeReadPipeFd = wakeFds[0];
mWakeWritePipeFd = wakeFds[1];
result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);//设置为非阻塞模式
LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking. errno=%d",
errno);
result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);//设置为非阻塞模式
LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking. errno=%d",
errno);
// 开始使用epoll API,实现轮询
// Allocate the epoll instance and register the wake pipe.
mEpollFd = epoll_create(EPOLL_SIZE_HINT);//创建epoll文件描述符
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno);
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeReadPipeFd;
result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem); // 把刚才创建的命名管道的读端加入的到epoll的监听队列中
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance. errno=%d",
errno);
}
Looper::~Looper() {
close(mWakeReadPipeFd);//释放命名管道
close(mWakeWritePipeFd);
close(mEpollFd);//释放epoll文件描述符
}
Looper::Looper(bool allowNonCallbacks) : mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) { int wakeFds[2]; int result = pipe(wakeFds);//创建命名管道 LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno); // 保存命名管道 mWakeReadPipeFd = wakeFds[0]; mWakeWritePipeFd = wakeFds[1]; result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);//设置为非阻塞模式 LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking. errno=%d", errno); result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);//设置为非阻塞模式 LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking. errno=%d", errno); // 开始使用epoll API,实现轮询 // Allocate the epoll instance and register the wake pipe. mEpollFd = epoll_create(EPOLL_SIZE_HINT);//创建epoll文件描述符 LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno); struct epoll_event eventItem; memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union eventItem.events = EPOLLIN; eventItem.data.fd = mWakeReadPipeFd; result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem); // 把刚才创建的命名管道的读端加入的到epoll的监听队列中 LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance. errno=%d", errno); } Looper::~Looper() { close(mWakeReadPipeFd);//释放命名管道 close(mWakeWritePipeFd); close(mEpollFd);//释放epoll文件描述符 }Looper的构造函数中,出现了命名管道和epoll相关的代码,这是为什么呢?别急,接着看下去就知道了:
[cpp]
view plain
copy
print?
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
// 这段代码暂时无视
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {//ident > 0, 即此response为noncallback,需要返回event,data等数据给调用者处理
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
"fd=%d, events=0x%x, data=%p",
this, ident, fd, events, data);
#endif
if (outFd != NULL) *outFd = fd;
if (outEvents != NULL) *outEvents = events;
if (outData != NULL) *outData = data;
return ident;
}
}
if (result != 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
if (outFd != NULL) *outFd = 0;
if (outEvents != NULL) *outEvents = 0;
if (outData != NULL) *outData = NULL;
return result;
}
result = pollInner(timeoutMillis);//这一行才是重点!
}
}
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { int result = 0; for (;;) { // 这段代码暂时无视 while (mResponseIndex < mResponses.size()) { const Response& response = mResponses.itemAt(mResponseIndex++); int ident = response.request.ident; if (ident >= 0) {//ident > 0, 即此response为noncallback,需要返回event,data等数据给调用者处理 int fd = response.request.fd; int events = response.events; void* data = response.request.data; #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - returning signalled identifier %d: " "fd=%d, events=0x%x, data=%p", this, ident, fd, events, data); #endif if (outFd != NULL) *outFd = fd; if (outEvents != NULL) *outEvents = events; if (outData != NULL) *outData = data; return ident; } } if (result != 0) { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - returning result %d", this, result); #endif if (outFd != NULL) *outFd = 0; if (outEvents != NULL) *outEvents = 0; if (outData != NULL) *outData = NULL; return result; } result = pollInner(timeoutMillis);//这一行才是重点! } }接着往下看,代码有些长,但请仔细看:
[cpp]
view plain
copy
print?
int Looper::pollInner(int timeoutMillis) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
#endif
// 设置timeoutMillis的值为Math.min(timeoutMills, mNextMessageUptime)
// Adjust the timeout based on when the next message is due.
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - next message in %lldns, adjusted timeout: timeoutMillis=%d",
this, mNextMessageUptime - now, timeoutMillis);
#endif
}
// Poll.
int result = ALOOPER_POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
// 开始轮询
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// Acquire lock.
mLock.lock();
// Check for poll error.
if (eventCount < 0) { //处理error
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error, errno=%d", errno);
result = ALOOPER_POLL_ERROR;
goto Done;
}
// Check for poll timeout.
if (eventCount == 0) { // 未能等到event,故timeout
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - timeout", this);
#endif
result = ALOOPER_POLL_TIMEOUT;
goto Done;
}
// Handle all events.
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif
for (int i = 0; i < eventCount; i++) {//有event,则处理
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeReadPipeFd) {
if (epollEvents & EPOLLIN) {
awoken();//读取命名管道内的数据
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));//把event添加到<SPAN style="FONT-FAMILY: Arial, Helvetica, sans-serif">mResponses中,等待后续处理</SPAN>
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
Done: ;
// 处理C++层的Message
// Invoke pending message callbacks.
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
// Remove the envelope from the list.
// We keep a strong reference to the handler until the call to handleMessage
// finishes. Then we drop it so that the handler can be deleted *before*
// we reacquire our lock.
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
this, handler.get(), message.what);
#endif
handler->handleMessage(message);//调用handler-><SPAN style="FONT-FAMILY: Arial, Helvetica, sans-serif">handleMessage</SPAN>
} // release handler
mLock.lock();
mSendingMessage = false;
result = ALOOPER_POLL_CALLBACK;
} else {
// The last message left at the head of the queue determines the next wakeup time.
mNextMessageUptime = messageEnvelope.uptime;//更新<SPAN style="FONT-FAMILY: Arial, Helvetica, sans-serif">mNextMessageUptime</SPAN>
break;
}
}
// Release lock.
mLock.unlock();
// 处理<SPAN style="FONT-FAMILY: Arial, Helvetica, sans-serif">mResponses</SPAN>
// Invoke all response callbacks.
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == ALOOPER_POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
this, response.request.callback.get(), fd, events, data);
#endif
int callbackResult = response.request.callback->handleEvent(fd, events, data);//调用callback->handleEvent
if (callbackResult == 0) {
removeFd(fd);
}
// Clear the callback reference in the response structure promptly because we
// will not clear the response vector itself until the next poll.
response.request.callback.clear();
result = ALOOPER_POLL_CALLBACK;
}
}
return result;
}
int Looper::pollInner(int timeoutMillis) { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis); #endif // 设置timeoutMillis的值为Math.min(timeoutMills, mNextMessageUptime) // Adjust the timeout based on when the next message is due. if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime); if (messageTimeoutMillis >= 0 && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) { timeoutMillis = messageTimeoutMillis; } #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - next message in %lldns, adjusted timeout: timeoutMillis=%d", this, mNextMessageUptime - now, timeoutMillis); #endif } // Poll. int result = ALOOPER_POLL_WAKE; mResponses.clear(); mResponseIndex = 0; // 开始轮询 struct epoll_event eventItems[EPOLL_MAX_EVENTS]; int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); // Acquire lock. mLock.lock(); // Check for poll error. if (eventCount < 0) { //处理error if (errno == EINTR) { goto Done; } ALOGW("Poll failed with an unexpected error, errno=%d", errno); result = ALOOPER_POLL_ERROR; goto Done; } // Check for poll timeout. if (eventCount == 0) { // 未能等到event,故timeout #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - timeout", this); #endif result = ALOOPER_POLL_TIMEOUT; goto Done; } // Handle all events. #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount); #endif for (int i = 0; i < eventCount; i++) {//有event,则处理 int fd = eventItems[i].data.fd; uint32_t epollEvents = eventItems[i].events; if (fd == mWakeReadPipeFd) { if (epollEvents & EPOLLIN) { awoken();//读取命名管道内的数据 } else { ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents); } } else { ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex >= 0) { int events = 0; if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT; if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT; if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR; if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP; pushResponse(events, mRequests.valueAt(requestIndex));//把event添加到<span style="FONT-FAMILY: Arial, Helvetica, sans-serif">mResponses中,等待后续处理</span> } else { ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " "no longer registered.", epollEvents, fd); } } } Done: ; // 处理C++层的Message // Invoke pending message callbacks. mNextMessageUptime = LLONG_MAX; while (mMessageEnvelopes.size() != 0) { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0); if (messageEnvelope.uptime <= now) { // Remove the envelope from the list. // We keep a strong reference to the handler until the call to handleMessage // finishes. Then we drop it so that the handler can be deleted *before* // we reacquire our lock. { // obtain handler sp<MessageHandler> handler = messageEnvelope.handler; Message message = messageEnvelope.message; mMessageEnvelopes.removeAt(0); mSendingMessage = true; mLock.unlock(); #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d", this, handler.get(), message.what); #endif handler->handleMessage(message);//调用handler-><span style="FONT-FAMILY: Arial, Helvetica, sans-serif">handleMessage</span> } // release handler mLock.lock(); mSendingMessage = false; result = ALOOPER_POLL_CALLBACK; } else { // The last message left at the head of the queue determines the next wakeup time. mNextMessageUptime = messageEnvelope.uptime;//更新<span style="FONT-FAMILY: Arial, Helvetica, sans-serif">mNextMessageUptime</span> break; } } // Release lock. mLock.unlock(); // 处理<span style="FONT-FAMILY: Arial, Helvetica, sans-serif">mResponses</span> // Invoke all response callbacks. for (size_t i = 0; i < mResponses.size(); i++) { Response& response = mResponses.editItemAt(i); if (response.request.ident == ALOOPER_POLL_CALLBACK) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p", this, response.request.callback.get(), fd, events, data); #endif int callbackResult = response.request.callback->handleEvent(fd, events, data);//调用callback->handleEvent if (callbackResult == 0) { removeFd(fd); } // Clear the callback reference in the response structure promptly because we // will not clear the response vector itself until the next poll. response.request.callback.clear(); result = ALOOPER_POLL_CALLBACK; } } return result; }
代码比较长,所以分段分析:
[cpp]
view plain
copy
print?
// 设置timeoutMillis的值为Math.min(timeoutMillis, mNextMessageUptime)
// Adjust the timeout based on when the next message is due.
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - next message in %lldns, adjusted timeout: timeoutMillis=%d",
this, mNextMessageUptime - now, timeoutMillis);
if
}
// 设置timeoutMillis的值为Math.min(timeoutMillis, mNextMessageUptime) // Adjust the timeout based on when the next message is due. if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime); if (messageTimeoutMillis >= 0 && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) { timeoutMillis = messageTimeoutMillis; }#if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - next message in %lldns, adjusted timeout: timeoutMillis=%d", this, mNextMessageUptime - now, timeoutMillis); #endif }
为了解析这段代码,需要先补充一些C++层的Message相关的代码:
[cpp]
view plain
copy
print?
struct Message {
Message() : what(0) { }
Message(int what) : what(what) { }
/* The message type. (interpretation is left up to the handler) */
int what;
};
class MessageHandler : public virtual RefBase {
protected:
virtual ~MessageHandler() { }
public:
/**
* Handles a message.
*/
virtual void handleMessage(const Message& message) = 0;
}
struct MessageEnvelope {
MessageEnvelope() : uptime(0) { }
MessageEnvelope(nsecs_t uptime, const sp<MessageHandler> handler,const Message& message) : uptime(uptime), handler(handler), message(message) {}
nsecs_t uptime;
MessageHandler> handler;
Message message;
};
void Looper::sendMessage(const sp<MessageHandler>& handler, const Message& message) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
sendMessageAtTime(now, handler, message);
}
void Looper::sendMessageDelayed(nsecs_t uptimeDelay, const sp<MessageHandler>& handler,
const Message& message) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
sendMessageAtTime(now + uptimeDelay, handler, message);
}
void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
const Message& message) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ sendMessageAtTime - uptime=%lld, handler=%p, what=%d",
this, uptime, handler.get(), message.what);
#endif
size_t i = 0;
{ // acquire lock
AutoMutex _l(mLock);
size_t messageCount = mMessageEnvelopes.size();
while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
i += 1;
}
MessageEnvelope messageEnvelope(uptime, handler, message);
mMessageEnvelopes.insertAt(messageEnvelope, i, 1);
// Optimization: If the Looper is currently sending a message, then we can skip
// the call to wake() because the next thing the Looper will do after processing
// messages is to decide when the next wakeup time should be. In fact, it does
// not even matter whether this code is running on the Looper thread.
if (mSendingMessage) {
return;
}
} // release lock
// Wake the poll loop only when we enqueue a new message at the head.
if (i == 0) {
wake();
}
}
void Looper::removeMessages(const sp<MessageHandler>& handler) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ removeMessages - handler=%p", this, handler.get());
#endif
{ // acquire lock
AutoMutex _l(mLock);
for (size_t i = mMessageEnvelopes.size(); i != 0; ) {
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i);
if (messageEnvelope.handler == handler) {
mMessageEnvelopes.removeAt(i);
}
}
} // release lock
}
void Looper::removeMessages(const sp<MessageHandler>& handler, int what) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ removeMessages - handler=%p, what=%d", this, handler.get(), what);
#endif
{ // acquire lock
AutoMutex _l(mLock);
for (size_t i = mMessageEnvelopes.size(); i != 0; ) {
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i);
if (messageEnvelope.handler == handler
&& messageEnvelope.message.what == what) {
mMessageEnvelopes.removeAt(i);
}
}
} // release lock
}
struct Message { Message() : what(0) { } Message(int what) : what(what) { } /* The message type. (interpretation is left up to the handler) */ int what; }; class MessageHandler : public virtual RefBase { protected: virtual ~MessageHandler() { } public: /** * Handles a message. */ virtual void handleMessage(const Message& message) = 0; } struct MessageEnvelope { MessageEnvelope() : uptime(0) { } MessageEnvelope(nsecs_t uptime, const sp<MessageHandler> handler,const Message& message) : uptime(uptime), handler(handler), message(message) {} nsecs_t uptime; MessageHandler> handler; Message message; }; void Looper::sendMessage(const sp<MessageHandler>& handler, const Message& message) { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); sendMessageAtTime(now, handler, message); } void Looper::sendMessageDelayed(nsecs_t uptimeDelay, const sp<MessageHandler>& handler, const Message& message) { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); sendMessageAtTime(now + uptimeDelay, handler, message); } void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler, const Message& message) { #if DEBUG_CALLBACKS ALOGD("%p ~ sendMessageAtTime - uptime=%lld, handler=%p, what=%d", this, uptime, handler.get(), message.what); #endif size_t i = 0; { // acquire lock AutoMutex _l(mLock); size_t messageCount = mMessageEnvelopes.size(); while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) { i += 1; } MessageEnvelope messageEnvelope(uptime, handler, message); mMessageEnvelopes.insertAt(messageEnvelope, i, 1); // Optimization: If the Looper is currently sending a message, then we can skip // the call to wake() because the next thing the Looper will do after processing // messages is to decide when the next wakeup time should be. In fact, it does // not even matter whether this code is running on the Looper thread. if (mSendingMessage) { return; } } // release lock // Wake the poll loop only when we enqueue a new message at the head. if (i == 0) { wake(); } } void Looper::removeMessages(const sp<MessageHandler>& handler) { #if DEBUG_CALLBACKS ALOGD("%p ~ removeMessages - handler=%p", this, handler.get()); #endif { // acquire lock AutoMutex _l(mLock); for (size_t i = mMessageEnvelopes.size(); i != 0; ) { const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i); if (messageEnvelope.handler == handler) { mMessageEnvelopes.removeAt(i); } } } // release lock } void Looper::removeMessages(const sp<MessageHandler>& handler, int what) { #if DEBUG_CALLBACKS ALOGD("%p ~ removeMessages - handler=%p, what=%d", this, handler.get(), what); #endif { // acquire lock AutoMutex _l(mLock); for (size_t i = mMessageEnvelopes.size(); i != 0; ) { const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i); if (messageEnvelope.handler == handler && messageEnvelope.message.what == what) { mMessageEnvelopes.removeAt(i); } } } // release lock }是不是觉得上面的代码似曾相识?和Java层的MessageQueue很像似有木有?和java层一样,C++层存在消息队列和消息处理机制,消息被保存到成员mMessageEvelopes中,并在pollInner函数中处理消息(调用MesageHandler的handleMesage函数处理)。
现在回到Looper::pollonceh函数,我们就应该能够理解,pollOnce函数到timeOutMillis参数仅仅代表了Java层下一个Message的触发延迟,所以,我们还需要考虑C++层下一个Message的触发延迟,所以,代码设置timeoutMillis为timeoutMillis和mNextMessageUpTime中的较小值。
继续下一段代码:
[cpp]
view plain
copy
print?
int result = ALOOPER_POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
// 开始轮询
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// Acquire lock.
mLock.lock();
int result = ALOOPER_POLL_WAKE; mResponses.clear(); mResponseIndex = 0; // 开始轮询 struct epoll_event eventItems[EPOLL_MAX_EVENTS]; int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); // Acquire lock. mLock.lock();调用epoll函数,等待event发生,epoll_wait函数的返回值有三种可能:失败出错、没有event、有一个或多个event。
1. 失败的处理:
[cpp]
view plain
copy
print?
// Check for poll error.
if (eventCount < 0) { //处理error
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error, errno=%d", errno);
result = ALOOPER_POLL_ERROR;
goto Done;
}
// Check for poll error. if (eventCount < 0) { //处理error if (errno == EINTR) { goto Done; } ALOGW("Poll failed with an unexpected error, errno=%d", errno); result = ALOOPER_POLL_ERROR; goto Done; }2. 没有event发生:
[cpp]
view plain
copy
print?
// Check for poll timeout.
if (eventCount == 0) { // 未能等到event,故timeout
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - timeout", this);
#endif
result = ALOOPER_POLL_TIMEOUT;
goto Done;
}
// Check for poll timeout. if (eventCount == 0) { // 未能等到event,故timeout #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - timeout", this); #endif result = ALOOPER_POLL_TIMEOUT; goto Done; }3. 有event发生:
[cpp]
view plain
copy
print?
// Handle all events.
DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
if
for (int i = 0; i < eventCount; i++) {//有event,则处理
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeReadPipeFd) {//说明java层或者C++层有新的Message
if (epollEvents & EPOLLIN) {
awoken();//读取命名管道内的数据
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));//把event添加到<SPAN style="FONT-FAMILY: Arial, Helvetica, sans-serif">mResponses中,等待后续处理</SPAN>
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
// Handle all events. #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount); #endif for (int i = 0; i < eventCount; i++) {//有event,则处理 int fd = eventItems[i].data.fd; uint32_t epollEvents = eventItems[i].events; if (fd == mWakeReadPipeFd) {//说明java层或者C++层有新的Message if (epollEvents & EPOLLIN) { awoken();//读取命名管道内的数据 } else { ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents); } } else { ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex >= 0) { int events = 0; if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT; if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT; if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR; if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP; pushResponse(events, mRequests.valueAt(requestIndex));//把event添加到<span style="FONT-FAMILY: Arial, Helvetica, sans-serif">mResponses中,等待后续处理</span> } else { ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " "no longer registered.", epollEvents, fd); } } }
处理event的时候,需要分为两类:fd==mWakeReadPipeFd和fd!=mWakeReadPipeFd
fd==mWakeReadPipeFd:说明C++层,或者java层有新的Message出现,需要处理。这种情况下,只需要读mWakeReadPipeFd内的数据即可
[cpp]
view plain
copy
print?
void Looper::awoken() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ awoken", this);
#endif
char buffer[16];
ssize_t nRead;
do {
nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
} while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
}
void Looper::awoken() { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ awoken", this); #endif char buffer[16]; ssize_t nRead; do { nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer)); } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer)); }fd!=mWakeReadPipeFd:首先需要搞明白fd是哪来的:
[cpp]
view plain
copy
print?
struct Request {
int fd;
int ident;
sp<LooperCallback> callback;
void* data;
};
struct Response {
int events;
Request request;
};
struct Request { int fd; int ident; sp<LooperCallback> callback; void* data; }; struct Response { int events; Request request; };
[cpp]
view plain
copy
print?
/**
* A looper callback.
*/
class LooperCallback : public virtual RefBase {
protected:
virtual ~LooperCallback() { }
public:
/**
* Handles a poll event for the given file descriptor.
* It is given the file descriptor it is associated with,
* a bitmask of the poll events that were triggered (typically ALOOPER_EVENT_INPUT),
* and the data pointer that was originally supplied.
*
* Implementations should return 1 to continue receiving callbacks, or 0
* to have this file descriptor and callback unregistered from the looper.
*/
virtual int handleEvent(int fd, int events, void* data) = 0;
};
/** * A looper callback. */ class LooperCallback : public virtual RefBase { protected: virtual ~LooperCallback() { } public: /** * Handles a poll event for the given file descriptor. * It is given the file descriptor it is associated with, * a bitmask of the poll events that were triggered (typically ALOOPER_EVENT_INPUT), * and the data pointer that was originally supplied. * * Implementations should return 1 to continue receiving callbacks, or 0 * to have this file descriptor and callback unregistered from the looper. */ virtual int handleEvent(int fd, int events, void* data) = 0; };
[cpp]
view plain
copy
print?
int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,
events, callback.get(), data);
#endif
if (!callback.get()) {
if (! mAllowNonCallbacks) {
ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
return -1;
}
if (ident < 0) {//仅当Looper支持NonCallbacks,并且ident大于0时,允许添加callback为null的Fd
ALOGE("Invalid attempt to set NULL callback with ident < 0.");
return -1;
}
} else {
ident = ALOOPER_POLL_CALLBACK;
}
int epollEvents = 0;
if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN;
if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT;
{ // acquire lock
AutoMutex _l(mLock);
Request request;
request.fd = fd;
request.ident = ident;
request.callback = callback;
request.data = data;
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = epollEvents;
eventItem.data.fd = fd;
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex < 0) {
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d, errno=%d", fd, errno);
return -1;
}
mRequests.add(fd, request);
} else {//存在则替换
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);
if (epollResult < 0) {
ALOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno);
return -1;
}
mRequests.replaceValueAt(requestIndex, request);
}
} // release lock
return 1;
}
int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) { #if DEBUG_CALLBACKS ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident, events, callback.get(), data); #endif if (!callback.get()) { if (! mAllowNonCallbacks) { ALOGE("Invalid attempt to set NULL callback but not allowed for this looper."); return -1; } if (ident < 0) {//仅当Looper支持NonCallbacks,并且ident大于0时,允许添加callback为null的Fd ALOGE("Invalid attempt to set NULL callback with ident < 0."); return -1; } } else { ident = ALOOPER_POLL_CALLBACK; } int epollEvents = 0; if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN; if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT; { // acquire lock AutoMutex _l(mLock); Request request; request.fd = fd; request.ident = ident; request.callback = callback; request.data = data; struct epoll_event eventItem; memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union eventItem.events = epollEvents; eventItem.data.fd = fd; ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex < 0) { int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem); if (epollResult < 0) { ALOGE("Error adding epoll events for fd %d, errno=%d", fd, errno); return -1; } mRequests.add(fd, request); } else {//存在则替换 int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem); if (epollResult < 0) { ALOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno); return -1; } mRequests.replaceValueAt(requestIndex, request); } } // release lock return 1; }
从上面的代码,我们可知,Looper还支持添加Fd和自定义的callback,类似java层的Message.callback。
通过addFd函数,可以向Looper的mEpollFd添加指定的Fd,当Fd触发指定的event .e.i EPOLLIN or EPOLLOUT时,指定的相应的自定义callback就会得到执行。
另外,当Looper支持noncallback时,还可以向Looper添加callback为null的Fd,因为没有callback,所以Fd添加者需要调用int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) 通过outFd、outEvent、outData参数获取数据并进行处理。
所以当Fd触发消息时,需要生成对应到reponse并添加到meResponses中,等待后续的处理。
[cpp]
view plain
copy
print?
void Looper::pushResponse(int events, const Request& request) {
Response response;
response.events = events;
response.request = request;
mResponses.push(response);
}
void Looper::pushResponse(int events, const Request& request) { Response response; response.events = events; response.request = request; mResponses.push(response); }
到这里为止,epoll_wait函数返回的三种结果的不同处理已经解析完毕,接下来代码进入共同的Done环节。
处理C++层的Message:
[cpp]
view plain
copy
print?
Done: ;
// 处理C++层的Message
// Invoke pending message callbacks.
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
// Remove the envelope from the list.
// We keep a strong reference to the handler until the call to handleMessage
// finishes. Then we drop it so that the handler can be deleted *before*
// we reacquire our lock.
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
this, handler.get(), message.what);
#endif
handler->handleMessage(message);//调用handler-><SPAN style="FONT-FAMILY: Arial, Helvetica, sans-serif">handleMessage</SPAN>
} // release handler
mLock.lock();
mSendingMessage = false;
result = ALOOPER_POLL_CALLBACK;
} else {
// The last message left at the head of the queue determines the next wakeup time.
mNextMessageUptime = messageEnvelope.uptime;//更新<SPAN style="FONT-FAMILY: Arial, Helvetica, sans-serif">mNextMessageUptime</SPAN>
break;
}
}
Done: ; // 处理C++层的Message // Invoke pending message callbacks. mNextMessageUptime = LLONG_MAX; while (mMessageEnvelopes.size() != 0) { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0); if (messageEnvelope.uptime <= now) { // Remove the envelope from the list. // We keep a strong reference to the handler until the call to handleMessage // finishes. Then we drop it so that the handler can be deleted *before* // we reacquire our lock. { // obtain handler sp<MessageHandler> handler = messageEnvelope.handler; Message message = messageEnvelope.message; mMessageEnvelopes.removeAt(0); mSendingMessage = true; mLock.unlock(); #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d", this, handler.get(), message.what); #endif handler->handleMessage(message);//调用handler-><span style="FONT-FAMILY: Arial, Helvetica, sans-serif">handleMessage</span> } // release handler mLock.lock(); mSendingMessage = false; result = ALOOPER_POLL_CALLBACK; } else { // The last message left at the head of the queue determines the next wakeup time. mNextMessageUptime = messageEnvelope.uptime;//更新<span style="FONT-FAMILY: Arial, Helvetica, sans-serif">mNextMessageUptime</span> break; } }
处理Response:
[cpp]
view plain
copy
print?
// 处理<SPAN style="FONT-FAMILY: Arial, Helvetica, sans-serif">mResponses</SPAN>
// Invoke all response callbacks.
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == ALOOPER_POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
this, response.request.callback.get(), fd, events, data);
#endif
int callbackResult = response.request.callback->handleEvent(fd, events, data);//调用callback->handleEvent
if (callbackResult == 0) {
removeFd(fd);
}
// Clear the callback reference in the response structure promptly because we
// will not clear the response vector itself until the next poll.
response.request.callback.clear();
result = ALOOPER_POLL_CALLBACK;
}
}
// 处理<span style="FONT-FAMILY: Arial, Helvetica, sans-serif">mResponses</span> // Invoke all response callbacks. for (size_t i = 0; i < mResponses.size(); i++) { Response& response = mResponses.editItemAt(i); if (response.request.ident == ALOOPER_POLL_CALLBACK) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p", this, response.request.callback.get(), fd, events, data); #endif int callbackResult = response.request.callback->handleEvent(fd, events, data);//调用callback->handleEvent if (callbackResult == 0) { removeFd(fd); } // Clear the callback reference in the response structure promptly because we // will not clear the response vector itself until the next poll. response.request.callback.clear(); result = ALOOPER_POLL_CALLBACK; } }
搞懂了上面的代码,我们就很容易明白wake函数做了些什么:
[cpp]
view plain
copy
print?
void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif
ssize_t nWrite;
do {
nWrite = write(mWakeWritePipeFd, "W", 1);//向<SPAN style="FONT-FAMILY: Arial, Helvetica, sans-serif; FONT-SIZE: 12px">mWakeWritePipeFd</SPAN><SPAN style="FONT-FAMILY: Arial, Helvetica, sans-serif; FONT-SIZE: 12px">中写入一个字符,所以mWakeReadPipeFd就会触发</SPAN><SPAN style="FONT-FAMILY: Arial, Helvetica, sans-serif; FONT-SIZE: 12px">EPOLLIN event</SPAN><SPAN style="FONT-FAMILY: Arial, Helvetica, sans-serif; FONT-SIZE: 12px">
</SPAN> } while (nWrite == -1 && errno == EINTR);
if (nWrite != 1) {
if (errno != EAGAIN) {
ALOGW("Could not write wake signal, errno=%d", errno);
}
}
}
void Looper::wake() { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ wake", this); #endif ssize_t nWrite; do { nWrite = write(mWakeWritePipeFd, "W", 1);//向<span style="font-family: Arial, Helvetica, sans-serif;">mWakeWritePipeFd</span><span style="font-family: Arial, Helvetica, sans-serif;">中写入一个字符,所以mWakeReadPipeFd就会触发</span><span style="font-family: Arial, Helvetica, sans-serif;">EPOLLIN event</span><span style="font-family: Arial, Helvetica, sans-serif;"> </span> } while (nWrite == -1 && errno == EINTR); if (nWrite != 1) { if (errno != EAGAIN) { ALOGW("Could not write wake signal, errno=%d", errno); } } }小结:
Looper通过epoll函数组实现了一个可以支持随时唤醒的阻塞机制
Looper支持两种不同的方式处理消息:Message + MessageHandler 和 LooperCallback。
Looper的阻塞在如下四种条件下会被唤醒:
发生错误
等待超时
出现需要处理的新Message(包括C++层和Java层)
由addFd函数添加的Fd触发event
总结:
在哪个线程调用JAVA层的Looper.loop(),Mesage和callback(包括Java层和C++层)就在哪个线程被处理,上图为Looper.loop函数的时序图。
C++层的NativeMesasgeQueue不应该是Java层的MesageQueue的内部实现,而更接近于“栾生兄弟”的关系。MessageQueue负责处理java层上到消息,NativeMessageQueue负责处理C++层上的消息。其中Java层是在android.os.Looper.looper函数中调用android.os.Handler.dispatchMessage处理,而C++层是在android::Looper::pollInner函数中调用android::MessageHandler::handleMessage
& android:LooperCallback::handleEvent函数处理。
NativeMessageQueue利用Looper类实现了一个基于epoll函数和文件描述符(Fd)的可唤醒的阻塞机制。
相关文章推荐
- Android 屏幕适配
- Android数据存储方式之——文件存储
- Android消息机制不完全解析(上) .
- Gprinter Android SDK V2.1.4 使用说明
- Android Studio -修改LogCat的颜色
- Android开发之基本控件
- Android之ListView的高级封装!
- 从setContentView方法分析Android加载布局流程
- Android:Animation专题:1.alpha、scale、translate、rotate、set的xml属性及用法
- Android使用Messenger进行Service IPC通信分析
- Android控件ListView
- Android 播放提示音
- android 下的多线程
- 【安卓基础四】adb命令使用Heap检测和分析Android应用内存
- Android开发中的View类的视图属性focusableInTouchMode这个属性跟focusable有什么区别?
- ViewGroup的事件分发机制
- Android新手入门2016(1)--创建和运行helloworld
- Android L+ Theme 与 Toolbar
- Android Studio 上传SVN标准全过程
- Android工程的编译过程