muduo库阅读(29)——Net部分:Reactor(EventLoop事件循环)
2015-11-11 17:23
393 查看
Reactor是整个框架最核心的部分
它创建轮询器(Poller),创建用于传递消息的管道,初始化各个部分,然后进入一个无限循环,每一次循环中调用轮询器的轮询函数(等待函数),等待事件发生,如果有事件发生,就依次调用套接字(或其他的文件描述符)的事件处理器处理各个事件,然后调用投递的回调函数;接着进入下一次循环。
它创建轮询器(Poller),创建用于传递消息的管道,初始化各个部分,然后进入一个无限循环,每一次循环中调用轮询器的轮询函数(等待函数),等待事件发生,如果有事件发生,就依次调用套接字(或其他的文件描述符)的事件处理器处理各个事件,然后调用投递的回调函数;接着进入下一次循环。
/* * 事件循环,即Reactor,一个线程最多一个Reactor * 这是一个接口类 */ namespace muduo { namespace net { class Channel; class Poller; class TimerQueue; /// /// Reactor, at most one per thread. /// /// This is an interface class, so don't expose too much details. class EventLoop : boost::noncopyable { public: typedef boost::function<void()> Functor; EventLoop(); ~EventLoop(); // force out-line dtor, for scoped_ptr members. // 事件循环,必须在同一个线程内创建Reactor对象和调用它的loop函数 void loop(); /// better to call through shared_ptr<EventLoop> for 100% safety. // 跳出事件循环 void quit(); // 轮询返回的时间戳,通常表示有数据到来(即有事件到来) Timestamp pollReturnTime() const { return pollReturnTime_; } int64_t iteration() const { return iteration_; } /// Runs callback immediately in the loop thread. /// It wakes up the loop, and run the cb. /// If in the same loop thread, cb is run within the function. /// Safe to call from other threads. /* * 在事件循环中周期性的调用回调函数 */ void runInLoop(const Functor& cb); /// Queues callback in the loop thread. /// Runs after finish pooling. /// Safe to call from other threads. // 把回调函数放在队列中,在轮询结束之后调用 void queueInLoop(const Functor& cb); #ifdef __GXX_EXPERIMENTAL_CXX0X__ void runInLoop(Functor&& cb); void queueInLoop(Functor&& cb); #endif // 在指定的时间调用回调函数 TimerId runAt(const Timestamp& time, const TimerCallback& cb); // 在delay妙之后调用回调函数 TimerId runAfter(double delay, const TimerCallback& cb); // 每隔interval妙调用一次回调函数 TimerId runEvery(double interval, const TimerCallback& cb); // 取消一个计时器 void cancel(TimerId timerId); #ifdef __GXX_EXPERIMENTAL_CXX0X__ TimerId runAt(const Timestamp& time, TimerCallback&& cb); TimerId runAfter(double delay, TimerCallback&& cb); TimerId runEvery(double interval, TimerCallback&& cb); #endif /* * 下面的函数作为内部使用 */ // internal usage // 唤醒 void wakeup(); // 更新事件处理器 void updateChannel(Channel* channel); // 移除事件处理器 void removeChannel(Channel* channel); // 是否有某一个事件处理器 bool hasChannel(Channel* channel); // 断言自己是否在循环线程中 void assertInLoopThread() { if (!isInLoopThread()) { abortNotInLoopThread(); } } // 是否在循环线程中 bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); } // 是否正在处理事件 bool eventHandling() const { return eventHandling_; } // 设置上下文 void setContext(const boost::any& context) { context_ = context; } // 返回上下文 const boost::any& getContext() const { return context_; } // 返回可修改的上下文 boost::any* getMutableContext() { return &context_; } // 返回当前线程的Reactor对象 static EventLoop* getEventLoopOfCurrentThread(); private: // 如果创建Reactor的线程和运行Reactor的线程不同就退出进程 void abortNotInLoopThread(); // 处理读读事件 void handleRead(); // waked up // 执行投递的回调函数 void doPendingFunctors(); void printActiveChannels() const; // DEBUG // 事件处理器列表 typedef std::vector<Channel*> ChannelList; // 是否正在循环中 bool looping_; /* atomic */ // 是否退出循环 bool quit_; /* atomic and shared between threads, okay on x86, I guess. */ // 是否正在处理事件 bool eventHandling_; /* atomic */ // 是否正在调用投递的回调函数 bool callingPendingFunctors_; /* atomic */ // 迭代器 int64_t iteration_; // 线程id const pid_t threadId_; // 轮询返回的时间 Timestamp pollReturnTime_; // 轮询器 boost::scoped_ptr<Poller> poller_; // 定时器队列 boost::scoped_ptr<TimerQueue> timerQueue_; // 用于唤醒的描述符(将Reactor从等待中唤醒,一般是由于调用轮询器的等待函数而造成的阻塞) int wakeupFd_; // unlike in TimerQueue, which is an internal class, // we don't expose Channel to client. // 唤醒事件的处理器 boost::scoped_ptr<Channel> wakeupChannel_; // 上下文 boost::any context_; // scratch variables // 已激活的事件处理器队列 ChannelList activeChannels_; // 当前正在调用的事件处理器 Channel* currentActiveChannel_; // 投递的回调函数列表 MutexLock mutex_; std::vector<Functor> pendingFunctors_; // @GuardedBy mutex_ }; } }
using namespace muduo; using namespace muduo::net; namespace { // 加上了__thread修饰符,表示每个线程都有一个指向Reactor的指针 __thread EventLoop* t_loopInThisThread = 0; // 轮询的超时时间(10000毫秒 = 10妙) const int kPollTimeMs = 10000; /* * 创建管道,用于支持线程之间的通信 */ int createEventfd() { int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); if (evtfd < 0) { LOG_SYSERR << "Failed in eventfd"; abort(); } return evtfd; } #pragma GCC diagnostic ignored "-Wold-style-cast" /* * 这个类的作用是忽略管道事件 * 因为管道已经被使用另外的方法处理了 */ class IgnoreSigPipe { public: IgnoreSigPipe() { ::signal(SIGPIPE, SIG_IGN); // LOG_TRACE << "Ignore SIGPIPE"; } }; #pragma GCC diagnostic error "-Wold-style-cast" // 一个全局变量,用于实现对SIGPIPE信号的忽略 IgnoreSigPipe initObj; } /* * 返回当前线程的Reactor对象的指针 */ EventLoop* EventLoop::getEventLoopOfCurrentThread() { return t_loopInThisThread; } /* * Reactor的构造函数 */ EventLoop::EventLoop() : looping_(false), quit_(false), eventHandling_(false), callingPendingFunctors_(false), iteration_(0), threadId_(CurrentThread::tid()), poller_(Poller::newDefaultPoller(this)), // 调用默认的轮询器来创建轮询 timerQueue_(new TimerQueue(this)), wakeupFd_(createEventfd()), wakeupChannel_(new Channel(this, wakeupFd_)), currentActiveChannel_(NULL) { LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_; if (t_loopInThisThread) { LOG_FATAL << "Another EventLoop " << t_loopInThisThread << " exists in this thread " << threadId_; } else { t_loopInThisThread = this; } // 设置唤醒处理器的读回调函数为handleRead wakeupChannel_->setReadCallback( boost::bind(&EventLoop::handleRead, this)); // we are always reading the wakeupfd // 启用读功能 wakeupChannel_->enableReading(); } /* * 析构函数,销毁对象并清理 */ EventLoop::~EventLoop() { LOG_DEBUG << "EventLoop " << this << " of thread " << threadId_ << " destructs in thread " << CurrentThread::tid(); wakeupChannel_->disableAll(); wakeupChannel_->remove(); ::close(wakeupFd_); t_loopInThisThread = NULL; } /* * Reactor的循环 */ void EventLoop::loop() { assert(!looping_); assertInLoopThread(); looping_ = true; quit_ = false; // FIXME: what if someone calls quit() before loop() ? LOG_TRACE << "EventLoop " << this << " start looping"; while (!quit_) { // 清理已激活事件处理器的队列 activeChannels_.clear(); // 开始轮询 pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); // 记录循环的次数 ++iteration_; if (Logger::logLevel() <= Logger::TRACE) { printActiveChannels(); } // TODO sort channel by priority // 开始处理事件 eventHandling_ = true; // 遍历已激活事件处理器队列,执行每一个事件处理器的处理 for (ChannelList::iterator it = activeChannels_.begin(); it != activeChannels_.end(); ++it) { currentActiveChannel_ = *it; // 处理事件 currentActiveChannel_->handleEvent(pollReturnTime_); } currentActiveChannel_ = NULL; // 事件处理结束 eventHandling_ = false; // 执行投递回调函数 doPendingFunctors(); } LOG_TRACE << "EventLoop " << this << " stop looping"; looping_ = false; } /* * 让Reactor退出循环 * 因为调用quit的线程和执行Reactor循环的线程不一定相同,如果他们不是同一个线程, * 那么必须使用wakeup函数让wakeupChannel_被激活,然后轮询函数返回,就可以处理事件,然后进入下一个循环, * 由于quit_已经被设置为true,所以就会跳出循环 */ void EventLoop::quit() { quit_ = true; // There is a chance that loop() just executes while(!quit_) and exists, // then EventLoop destructs, then we are accessing an invalid object. // Can be fixed using mutex_ in both places. if (!isInLoopThread()) { // 不知道为什么线程不同才会调用wakeup????同一个线程之内难道不可以调用? wakeup(); } } // 运行一个回调函数 void EventLoop::runInLoop(const Functor& cb) { if (isInLoopThread()) { cb(); } else { // 如果不是同一个线程,那么将他添加到投递回调函数队列中 queueInLoop(cb); } } // 把一个回调函数添加到投递回调函数队列中,并唤醒Reactor void EventLoop::queueInLoop(const Functor& cb) { { MutexLockGuard lock(mutex_); pendingFunctors_.push_back(cb); } if (!isInLoopThread() || callingPendingFunctors_) { wakeup(); } } // 添加定时器事件:在某个时间点执行 TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb) { return timerQueue_->addTimer(cb, time, 0.0); } // 添加定时器事件:在delay妙之后执行 TimerId EventLoop::runAfter(double delay, const TimerCallback& cb) { Timestamp time(addTime(Timestamp::now(), delay)); return runAt(time, cb); } // 添加定时器事件:每interval妙执行一次 TimerId EventLoop::runEvery(double interval, const TimerCallback& cb) { Timestamp time(addTime(Timestamp::now(), interval)); return timerQueue_->addTimer(cb, time, interval); } #ifdef __GXX_EXPERIMENTAL_CXX0X__ // FIXME: remove duplication void EventLoop::runInLoop(Functor&& cb) { if (isInLoopThread()) { cb(); } else { queueInLoop(std::move(cb)); } } void EventLoop::queueInLoop(Functor&& cb) { { MutexLockGuard lock(mutex_); pendingFunctors_.push_back(std::move(cb)); // emplace_back } if (!isInLoopThread() || callingPendingFunctors_) { wakeup(); } } TimerId EventLoop::runAt(const Timestamp& time, TimerCallback&& cb) { return timerQueue_->addTimer(std::move(cb), time, 0.0); } TimerId EventLoop::runAfter(double delay, TimerCallback&& cb) { Timestamp time(addTime(Timestamp::now(), delay)); return runAt(time, std::move(cb)); } TimerId EventLoop::runEvery(double interval, TimerCallback&& cb) { Timestamp time(addTime(Timestamp::now(), interval)); return timerQueue_->addTimer(std::move(cb), time, interval); } #endif // 取消一个计时器(定时器) void EventLoop::cancel(TimerId timerId) { return timerQueue_->cancel(timerId); } // 更新事件处理器 void EventLoop::updateChannel(Channel* channel) { assert(channel->ownerLoop() == this); assertInLoopThread(); poller_->updateChannel(channel); } // 移除一个事件处理器 void EventLoop::removeChannel(Channel* channel) { assert(channel->ownerLoop() == this); assertInLoopThread(); if (eventHandling_) { assert(currentActiveChannel_ == channel || std::find(activeChannels_.begin(), activeChannels_.end(), channel) == activeChannels_.end()); } poller_->removeChannel(channel); } // 判断轮询器是否有事件处理器 bool EventLoop::hasChannel(Channel* channel) { assert(channel->ownerLoop() == this); assertInLoopThread(); return poller_->hasChannel(channel); } // 退出进程 void EventLoop::abortNotInLoopThread() { LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this << " was created in threadId_ = " << threadId_ << ", current thread id = " << CurrentThread::tid(); } /* * 唤醒 * 其实就是告诉正在处理循环的Reactor,发生了某一件事 */ void EventLoop::wakeup() { uint64_t one = 1; ssize_t n = sockets::write(wakeupFd_, &one, sizeof one); if (n != sizeof one) { LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; } } /* * 被wakeupChannel_使用 * wakeupChannel_有事件发生时候(wakeup一旦被调用,wakeupChannel_就被激活),就会执行handleEvent * 而handleEvent内部就会调用处理读的回调函数,wakeupChannel_的读回调函数就是handleRead */ void EventLoop::handleRead() { uint64_t one = 1; ssize_t n = sockets::read(wakeupFd_, &one, sizeof one); if (n != sizeof one) { LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; } } // 执行投递的回调函数(投递的回调函数是在一次循环中,所有的事件都处理完毕之后才调用的) void EventLoop::doPendingFunctors() { std::vector<Functor> functors; callingPendingFunctors_ = true; { // 交换的目的是让pendingFunctors_继续存放投递的回调函数 // 而functors则专门用于执行投递的回调函数 // 这样就不会因为长期锁住pendingFunctors_而造成阻塞了 MutexLockGuard lock(mutex_); functors.swap(pendingFunctors_); } for (size_t i = 0; i < functors.size(); ++i) { functors[i](); } callingPendingFunctors_ = false; } // 打印已激活的事件处理器 void EventLoop::printActiveChannels() const { for (ChannelList::const_iterator it = activeChannels_.begin(); it != activeChannels_.end(); ++it) { const Channel* ch = *it; LOG_TRACE << "{" << ch->reventsToString() << "} "; } }
相关文章推荐
- React-Native学习指南
- 现有的iOS项目集成ReactNative的记录文档
- react-native的兼容性(Android、Ios)
- ReactiveCocoa - iOS开发的新框架
- ReactiveCocoa – RACSequence介紹
- React使用
- 最新的react0.14支持ES6开发环境配置方案
- react native image resizeMode理解
- ReactiveCocoa框架菜鸟入门(四)——信号(Signal)详解
- ReactiveCocoa框架菜鸟入门(五)——信号的FlattenMap与Map
- ReactiveCocoa入门教程——第二部分
- ReactiveCocoa入门教程——第一部分
- windows环境下搭建react native0.12.0 android环境
- react-native Requring invalid module "image! 解决办法 android
- react
- ACE的Reactor用法
- ACE_Reactor (V1.01)
- react-native —— 在Mac上搭建React Native Android开发环境
- React Native实战(一):MAC配置环境
- ReactOS源码分析——内核加载器(一)