android graphic(6)—surfaceflinger和MessageQueue
2016-01-26 12:48
609 查看
MessageQueue等待消息
epoll监听fd
pipe fd导致epoll_wait返回
BitTube fd导致epoll_wait返回
MessageQueue(简称为MQ)是surfaceflinger(简称为SF)主线程中消息处理的“管家”,所有子线程要和主线程打交道都需要通过MQ,例如发送消息,发送Vsync信号等,这里主要分析MQ具体的实现流程。
下面这幅图是MQ处理消息的一个大概流程,下面根据其中的内容展开(黄色部分表示类名,绿色为类成员),MQ主要处理两类事件,一种是Message,一种是Event(Vsync),如图中的①②所示,图中epoll_wait()下的P表示pipe描述符,B表示BitTube描述符。
进而去调用MQ的
而pollOnce(),会去调用pollInner(),
从上面的代码可以看出,epoll_wait()等待了两类信号的到来,一种是message,一种是Vsync event,那么肯定是epoll监听了两个描述符,那么都是在哪里添加的呢?
其次在Looper的setEventThread()函数中,epoll注册了另外一个描述符,这个描述符是对应BitTube mEventTube中的读描述符,而对应的写描述符在EventThread的mDisplayEventConnections中。
最终回去调用Looper的sendMessageAtTime,
在weak()函数中我们看到有写mWakeWritePipeFd的操作,进而会导致epoll_wait返回。从上面的代码能够看到并不是每来一个消息都会去马上处理,mMessageEnvelopes中的消息按触发时间的先后顺序排列,如果我们插入到头部,表明新加入的这个消息是需要最近处理的,mMessageEnvelopes中后面还有其他待处理的消息,这时候才会去激活epoll_wait()返回,这种处理方式主要是出于效率的考虑,一次处理的消息不能太多也不能太少。
进而会往BitTube的写描述符中写东西,对应对端的读描述符被激活,epoll_wait()激活。
epoll监听fd
pipe fd导致epoll_wait返回
BitTube fd导致epoll_wait返回
MessageQueue(简称为MQ)是surfaceflinger(简称为SF)主线程中消息处理的“管家”,所有子线程要和主线程打交道都需要通过MQ,例如发送消息,发送Vsync信号等,这里主要分析MQ具体的实现流程。
下面这幅图是MQ处理消息的一个大概流程,下面根据其中的内容展开(黄色部分表示类名,绿色为类成员),MQ主要处理两类事件,一种是Message,一种是Event(Vsync),如图中的①②所示,图中epoll_wait()下的P表示pipe描述符,B表示BitTube描述符。
MessageQueue等待消息
在前面分析过,SF进程其实核心就是个接收消息,然后处理的过程。在SF启动过程中,最后会去执行run()函数,是个while循环,一直在等待事件的到来waitForEvent(),
void SurfaceFlinger::run() { do { waitForEvent(); } while (true); }
进而去调用MQ的
waitMessage(),“大管家”直接出场,其核心也是个while循环,处理函数为
mLooper->pollOnce(-1),
void MessageQueue::waitMessage() { do { IPCThreadState::self()->flushCommands(); int32_t ret = mLooper->pollOnce(-1); switch (ret) { case ALOOPER_POLL_WAKE: case ALOOPER_POLL_CALLBACK: continue; case ALOOPER_POLL_ERROR: ALOGE("ALOOPER_POLL_ERROR"); case ALOOPER_POLL_TIMEOUT: // timeout (should not happen) continue; default: // should not happen ALOGE("Looper::pollOnce() returned unknown status %d", ret); continue; } } while (true); }
而pollOnce(),会去调用pollInner(),
int Looper::pollInner(int timeoutMillis) { // Poll. //poll之前先把mResponses清空 int result = ALOOPER_POLL_WAKE; mResponses.clear(); mResponseIndex = 0; // We are about to idle. mIdling = true; //通过epoll_wait等到事件的到来,监听了哪些描述符,需要去找epoll_ctl() struct epoll_event eventItems[EPOLL_MAX_EVENTS]; int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); //事件到来 for (int i = 0; i < eventCount; i++) { int fd = eventItems[i].data.fd; uint32_t epollEvents = eventItems[i].events; //分两类事件 //第一种为消息 if (fd == mWakeReadPipeFd) { if (epollEvents & EPOLLIN) { awoken(); } else { ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents); } } else { //第二种为Vsync信号 ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex >= 0) { int events = 0; if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT; if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT; if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR; if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP; pushResponse(events, mRequests.valueAt(requestIndex)); } else { ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " "no longer registered.", epollEvents, fd); } } } Done: ; //处理消息 // Invoke pending message callbacks. mNextMessageUptime = LLONG_MAX; while (mMessageEnvelopes.size() != 0) { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0); if (messageEnvelope.uptime <= now) { // Remove the envelope from the list. // We keep a strong reference to the handler until the call to handleMessage // finishes. Then we drop it so that the handler can be deleted *before* // we reacquire our lock. { // obtain handler sp<MessageHandler> handler = messageEnvelope.handler; Message message = messageEnvelope.message; mMessageEnvelopes.removeAt(0); mSendingMessage = true; mLock.unlock(); handler->handleMessage(message); } // release handler mLock.lock(); mSendingMessage = false; result = ALOOPER_POLL_CALLBACK; } else { // The last message left at the head of the queue determines the next wakeup time. mNextMessageUptime = messageEnvelope.uptime; break; } } // Release lock. mLock.unlock(); //处理Vsync信号 // Invoke all response callbacks. for (size_t i = 0; i < mResponses.size(); i++) { Response& response = mResponses.editItemAt(i); if (response.request.ident == ALOOPER_POLL_CALLBACK) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p", this, response.request.callback.get(), fd, events, data); #endif int callbackResult = response.request.callback->handleEvent(fd, events, data); if (callbackResult == 0) { removeFd(fd); } // Clear the callback reference in the response structure promptly because we // will not clear the response vector itself until the next poll. response.request.callback.clear(); result = ALOOPER_POLL_CALLBACK; } } return result; }
从上面的代码可以看出,epoll_wait()等待了两类信号的到来,一种是message,一种是Vsync event,那么肯定是epoll监听了两个描述符,那么都是在哪里添加的呢?
epoll监听fd
首先在Looper的构造函数中,创建了图中所示的两个描述符mWakeReadPipeFd ,mWakeWritePipeFd 分别对应pipe的读和写,并且将读描述符mWakeReadPipeFd通过epoll_ctl添加到监听的描述符中。这个描述符所对应的是Message消息。
Looper::Looper(bool allowNonCallbacks) : mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) { int wakeFds[2]; //创建一个管道,一个写东西另外一个就有东西读了, int result = pipe(wakeFds); LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno); mWakeReadPipeFd = wakeFds[0]; mWakeWritePipeFd = wakeFds[1]; result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK); LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking. errno=%d", errno); result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK); LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking. errno=%d", errno); mIdling = false; // Allocate the epoll instance and register the wake pipe. mEpollFd = epoll_create(EPOLL_SIZE_HINT); LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno); struct epoll_event eventItem; memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union eventItem.events = EPOLLIN; eventItem.data.fd = mWakeReadPipeFd; //监听 result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem); LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance. errno=%d", errno); }
其次在Looper的setEventThread()函数中,epoll注册了另外一个描述符,这个描述符是对应BitTube mEventTube中的读描述符,而对应的写描述符在EventThread的mDisplayEventConnections中。
void MessageQueue::setEventThread(const sp<EventThread>& eventThread) { mEventThread = eventThread; //首先创建一个完整的Connection,里面的BitTube中读写描述符都在 mEvents = eventThread->createEventConnection(); //重建一个Connection,里面包含了mEvents的读描述符 mEventTube = mEvents->getDataChannel(); //把读描述符注册到epoll监听 mLooper->addFd(mEventTube->getFd(), 0, ALOOPER_EVENT_INPUT, MessageQueue::cb_eventReceiver, this); }
int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) { int epollEvents = 0; if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN; if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT; { // acquire lock AutoMutex _l(mLock); //首先把要监听的fd,回调函数等构建一个Request Request request; request.fd = fd; request.ident = ident; request.callback = callback; request.data = data; struct epoll_event eventItem; memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union eventItem.events = epollEvents; eventItem.data.fd = fd; ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex < 0) { //监听描述符 int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem); if (epollResult < 0) { ALOGE("Error adding epoll events for fd %d, errno=%d", fd, errno); return -1; } //将request放到键值对mRequests中 mRequests.add(fd, request); } else { int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem); if (epollResult < 0) { ALOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno); return -1; } mRequests.replaceValueAt(requestIndex, request); } } // release lock return 1; }
pipe fd导致epoll_wait返回
上面注册了pipe和BitTube的读描述符,那么当这两个对应的写描述符有写操作时,epoll_wait()就会返回,然后进行消息的处理。首先分析pipe对应的写描述符的激活,一般给SF发消息(只分析异步消息),都是去调用SF的postMessageAsync()函数,这个函数的入参是MessageBase类,所以给SF发消息首先会对消息进行封装,封装为MessageBase的子类,前面介绍过。status_t SurfaceFlinger::postMessageAsync(const sp<MessageBase>& msg, nsecs_t reltime, uint32_t flags) { return mEventQueue.postMessage(msg, reltime); }
最终回去调用Looper的sendMessageAtTime,
void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler, const Message& message) { #if DEBUG_CALLBACKS ALOGD("%p ~ sendMessageAtTime - uptime=%lld, handler=%p, what=%d", this, uptime, handler.get(), message.what); #endif size_t i = 0; { // acquire lock AutoMutex _l(mLock); //有个保存所有消息的Vector, //如果mMessageEnvelopes这里面没有消息,则i=0 //mMessageEnvelopes中的消息按触发时间的先后顺序排列 size_t messageCount = mMessageEnvelopes.size(); while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) { i += 1; } //处理的消息都被封装为MessageEnvelope, MessageEnvelope messageEnvelope(uptime, handler, message); mMessageEnvelopes.insertAt(messageEnvelope, i, 1); // Optimization: If the Looper is currently sending a message, then we can skip // the call to wake() because the next thing the Looper will do after processing // messages is to decide when the next wakeup time should be. In fact, it does // not even matter whether this code is running on the Looper thread. if (mSendingMessage) { return; } } // release lock // Wake the poll loop only when we enqueue a new message at the head. // 如果i=0,则去调用wake(), // 只有我们把这个消息插入到MessageEnvelope头部时,才会去激活epoll_wait()返回处理消息 if (i == 0) { wake(); } }
在weak()函数中我们看到有写mWakeWritePipeFd的操作,进而会导致epoll_wait返回。从上面的代码能够看到并不是每来一个消息都会去马上处理,mMessageEnvelopes中的消息按触发时间的先后顺序排列,如果我们插入到头部,表明新加入的这个消息是需要最近处理的,mMessageEnvelopes中后面还有其他待处理的消息,这时候才会去激活epoll_wait()返回,这种处理方式主要是出于效率的考虑,一次处理的消息不能太多也不能太少。
void Looper::wake() { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ wake", this); #endif ssize_t nWrite; do { nWrite = write(mWakeWritePipeFd, "W", 1); } while (nWrite == -1 && errno == EINTR); if (nWrite != 1) { if (errno != EAGAIN) { ALOGW("Could not write wake signal, errno=%d", errno); } } }
BitTube fd导致epoll_wait返回
当Vsync信号的到来时,EventThread从睡眠中打断,调用Connection的postEvent函数,这个Connection里面保存了BitTube的写描述符。bool EventThread::threadLoop() { DisplayEventReceiver::Event event; Vector< sp<EventThread::Connection> > signalConnections; signalConnections = waitForEvent(&event); //vsync信号到来,睡醒了 // dispatch events to listeners... const size_t count = signalConnections.size(); for (size_t i=0 ; i<count ; i++) { const sp<Connection>& conn(signalConnections[i]); // now see if we still need to report this event //调用Connection的postEvent函数,这个Connection里面保存了BitTube的写描述符 status_t err = conn->postEvent(event); } return true; }
进而会往BitTube的写描述符中写东西,对应对端的读描述符被激活,epoll_wait()激活。
status_t EventThread::Connection::postEvent( const DisplayEventReceiver::Event& event) { ssize_t size = DisplayEventReceiver::sendEvents(mChannel, &event, 1); return size < 0 ? status_t(size) : status_t(NO_ERROR); }
相关文章推荐
- ZOJ 3911 Prime Query (线段树+单点更新+区间lazy更新+素数判断计数)
- easyui validatebox 验证
- easyui datagrid 操作
- iOS开发之UIActivityViewController
- ios的手势操作之UIGestureRecognizer浅析
- 使用Hibernate SQLQuery执行原生SQL
- UITableView 分组头视图取消悬浮
- UICollectionView简单使用
- 类似于系统UISegmentedControl的切换控件
- 探索工作流(三)--顺序工作流 Sequence
- SQL写存储过程时报错 'CREATE/ALTER PROCEDURE' must be the first statement in a query batch.
- 解决AndroidStudio使用build过慢的问题--stackoverflow
- EasyUI datagrid 笔记
- <LeetCode OJ> 52. N-Queens II
- UITabBarController- 标签视图控制器
- iOS Interface Builder:在.xib文件中加载另一个.xib文件
- UITableView 编辑
- UITableView标示图
- UINavigationControl、界面通信
- UIScrollView、UIPageControl