muduo源码学习(16)-EventLoop简介
2017-08-21 20:06
357 查看
在写一些epoll的例子的时候,一般都会有一个循环,在循环中调用epoll_wait函数。muduo中的EventLoop差不多就是对该循环的封装。在net/EventLoop.h中
该loop中,执行循环,首先调用IO复用函数得到活跃事件,之后遍历,调用channel的handleEvent事件处理函数。之后调用的doPendingFunctors()函数,实现如下:
遍历函数指针的容器,调用函数指针。
在每一个线程中,最多创建一个EventLoop对象,在EventLoop中,还有一个唤醒的channel,这是用于别的线程将该线程唤醒,也就是说,如果循环中,阻塞在了epoll_wait()上,此时我们可能要结束循环,或者是在函数指针的容器中添加了函数指针,想然线程尽快执行,于是就需要唤醒,也就是让唤醒的文件描述符产生可读事件,此时epoll_wait返回,循环继续往下执行。唤醒函数如下
在初始化的时候,就将用于唤醒的描述符加入到epoll中管理,而且线程内的全局变量会存储EventLoop对象的指针。
在EventLoop.cc中有
用于忽略SIGPIPE信号。
区域的对channel的操作只是调用了poll对象的相应函数
class EventLoop : boost::noncopyable { public: typedef boost::function<void()> Functor; EventLoop(); ~EventLoop(); // force out-line dtor, for scoped_ptr members. /// /// Loops forever. /// /// Must be called in the same thread as creation of the object. /// void loop(); void quit(); /// /// Time when poll returns, usually means data arrivial. /// Timestamp pollReturnTime() const { return pollReturnTime_; } int64_t iteration() const { return iteration_; } /// Runs callback immediately in the loop thread. /// It wakes up the loop, and run the cb. /// If in the same loop thread, cb is run within the function. /// Safe to call from other threads. void runInLoop(const Functor& cb); /// Queues callback in the loop thread. /// Runs after finish pooling. /// Safe to call from other threads. void queueInLoop(const Functor& cb); // timers //定时运行函数 /// /// Runs callback at 'time'. /// Safe to call from other threads. /// TimerId runAt(const Timestamp& time, const TimerCallback& cb); /// /// Runs callback after @c delay seconds. /// Safe to call from other threads. /// TimerId runAfter(double delay, const TimerCallback& cb); /// /// Runs callback every @c interval seconds. /// Safe to call from other threads. /// TimerId runEvery(double interval, const TimerCallback& cb); /// /// Cancels the timer. /// Safe to call from other threads. /// //取消定时 void cancel(TimerId timerId); // internal usage //将线程从poll中唤醒 void wakeup(); //更新channel void updateChannel(Channel* channel); //移除channel void removeChannel(Channel* channel); // pid_t threadId() const { return threadId_; } //断言在本线程中 void assertInLoopThread() { if (!isInLoopThread()) { abortNotInLoopThread(); } } //是否在该线程中 bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); } // bool callingPendingFunctors() const { return callingPendingFunctors_; } //是否在描述符事件的回调函数中 bool eventHandling() const { return eventHandling_; } //获取当前线程的eventloop指针 static EventLoop* getEventLoopOfCurrentThread(); private: void abortNotInLoopThread(); //对wakeup()发送的数据的读的回调函数 void handleRead(); // waked up //是否在调用doPendingFunctors()中 void doPendingFunctors(); void printActiveChannels() const; // DEBUG typedef std::vector<Channel*> ChannelList; //是否在loop中 bool looping_; /* atomic *// //是否要推出loop循环 bool quit_; /* atomic */ //是否处于描述符的回调事件中 bool eventHandling_; /* atomic */ //是否处于调用设置的回调函数中 bool callingPendingFunctors_; /* atomic */ //主循环次数 int64_t iteration_; //该线程的id const pid_t threadId_; //poll函数返回的时刻 Timestamp pollReturnTime_; //poll对象,调用poll函数 boost::scoped_ptr<Poller> poller_; //定时事件的集合,通过new构造 boost::scoped_ptr<TimerQueue> timerQueue_; //唤醒poll的描述符 int wakeupFd_; // unlike in TimerQueue, which is an internal class, // we don't expose Channel to client. //对唤醒的描述符wakeupFd_的封装,为其该描述符设置回调函数 boost::scoped_ptr<Channel> wakeupChannel_; //通过poll函数返回的活跃的IO的vector ChannelList activeChannels_; Channel* currentActiveChannel_; MutexLock mutex_; //执行的回调函数的动态数组 std::vector<Functor> pendingFunctors_; // @BuardedBy mutex_ };类中的成员比较多。主要的就是有一个IO复用的对象智能指针poller_,还有定时时间的容器timeQueue_,函数指针的容器pendingFunctors_,唤醒的channel activeChannel_。定时事件之后再说。成员函数中,主要就是loop(),实现如下
void EventLoop::loop() { assert(!looping_); assertInLoopThread(); looping_ = true; quit_ = false; LOG_TRACE << "EventLoop " << this << " start looping"; while (!quit_) { activeChannels_.clear(); //activeChannels_为得到的活跃的描述符的抽象 pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); //循环次数 ++iteration_; //根据日志级别打印日志 if (Logger::logLevel() <= Logger::TRACE) { printActiveChannels(); } // TODO sort channel by priority //表示正在处理描述符对应的回调函数 eventHandling_ = true; //遍历活跃的描述符的容器,执行每一个描述符的回调函数 for (ChannelList::iterator it = activeChannels_.begin(); it != activeChannels_.end(); ++it) { currentActiveChannel_ = *it; //执行回调函数 currentActiveChannel_->handleEvent(pollReturnTime_); } currentActiveChannel_ = NULL; //表示结束了遍历处理动作 eventHandling_ = false; //调用设置的其他函数 doPendingFunctors(); } LOG_TRACE << "EventLoop " << this << " stop looping"; looping_ = false; }
该loop中,执行循环,首先调用IO复用函数得到活跃事件,之后遍历,调用channel的handleEvent事件处理函数。之后调用的doPendingFunctors()函数,实现如下:
void EventLoop::doPendingFunctors() { std::vector<Functor> functors; //表示loop_循环正在调用doPendingFunctors函数 callingPendingFunctors_ = true; { MutexLockGuard lock(mutex_); functors.swap(pendingFunctors_); } //遍历调用pendingFunctors_中的函数 for (size_t i = 0; i < functors.size(); ++i) { functors[i](); } //表示loop_循环结束调用doPendingFunctors函数 callingPendingFunctors_ = false; }
遍历函数指针的容器,调用函数指针。
在每一个线程中,最多创建一个EventLoop对象,在EventLoop中,还有一个唤醒的channel,这是用于别的线程将该线程唤醒,也就是说,如果循环中,阻塞在了epoll_wait()上,此时我们可能要结束循环,或者是在函数指针的容器中添加了函数指针,想然线程尽快执行,于是就需要唤醒,也就是让唤醒的文件描述符产生可读事件,此时epoll_wait返回,循环继续往下执行。唤醒函数如下
//执行唤醒动作,往wakeupFd_描述符发送8个字节的数据,使poll能够返回,结束阻塞 void EventLoop::wakeup() { uint64_t one = 1; ssize_t n = sockets::write(wakeupFd_, &one, sizeof one); if (n != sizeof one) { LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; } }在别的线程中,如果调用runInLoop函数,即:
void EventLoop::runInLoop(const Functor& cb) { if (isInLoopThread()) { cb(); } else { queueInLoop(cb); } }会把函数指针添加到EventLoop的函数对象容器中,这样实现线程间的通信,queueInLoop如下:
void EventLoop::queueInLoop(const Functor& cb) { { MutexLockGuard lock(mutex_); //添加到设置的回调函数动态数组中 pendingFunctors_.push_back(cb); } if (!isInLoopThread() || callingPendingFunctors_) { wakeup(); } }此时,会唤醒线程去执行函数对象。
在初始化的时候,就将用于唤醒的描述符加入到epoll中管理,而且线程内的全局变量会存储EventLoop对象的指针。
//构造函数 EventLoop::EventLoop() : looping_(false), quit_(false), eventHandling_(false), callingPendingFunctors_(false), iteration_(0), threadId_(CurrentThread::tid()), poller_(Poller::newDefaultPoller(this)), //定时器集合,构造中会将timeefd添加到poll中监听可读事件,可读事件的回调函数 //会调用超时的time的回调函数 timerQueue_(new TimerQueue(this)), wakeupFd_(createEventfd()), wakeupChannel_(new Channel(this, wakeupFd_)), currentActiveChannel_(NULL) { LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; //检查该线程中是否存在其他的EventLoop,一个线程中只能存在一个Eventloop if (t_loopInThisThread) { LOG_FATAL << 4000 ; "Another EventLoop " << t_loopInThisThread << " exists in this thread " << threadId_; } else { t_loopInThisThread = this; //存储指针 } wakeupChannel_->setReadCallback( boost::bind(&EventLoop::handleRead, this)); // we are always reading the wakeupfd //将wakeupfd设置到poll中监听 wakeupChannel_->enableReading(); }
//创建一个描述符, 用于唤醒 int createEventfd() { int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); if (evtfd < 0) { LOG_SYSERR << "Failed in eventfd"; abort(); } return evtfd; }
在EventLoop.cc中有
#pragma GCC diagnostic ignored "-Wold-style-cast" class IgnoreSigPipe { public: IgnoreSigPipe() { ::signal(SIGPIPE, SIG_IGN); LOG_TRACE << "Ignore SIGPIPE"; } }; #pragma GCC diagnostic error "-Wold-style-cast" //忽略SIGPIPE信号,全局初始化是就会调用 IgnoreSigPipe initObj; }
用于忽略SIGPIPE信号。
区域的对channel的操作只是调用了poll对象的相应函数
//更新poller维护的相关集合中的描述符 void EventLoop::updateChannel(Channel* channel) { assert(channel->ownerLoop() == this); assertInLoopThread(); poller_->updateChannel(channel); } //删除poller维护的相关集合中的描述符 void EventLoop::removeChannel(Channel* channel) { assert(channel->ownerLoop() == this); assertInLoopThread(); if (eventHandling_) { assert(currentActiveChannel_ == channel || std::find(activeChannels_.begin(), activeChannels_.end(), channel) == activeChannels_.end()); } poller_->removeChannel(channel); }
相关文章推荐
- Java集合源码学习(16)_BlockingQueue接口的实现ArrayBlockingQueue
- Tomcat源码学习(16)-How Tomcat works(转)
- Api demo源码学习(16)--App/Activity/Alarm Alarm Controller&&Alarm Service
- muduo库源码学习(base)singleton
- muduo源码学习(9)-单例类
- Tomcat源码学习(16)-How Tomcat works(转)
- muduo源码学习(1)
- SQLite3源码学习(16) test_vfs框架
- muduo库源码学习(base):WeakCallback
- muduo源码学习(18)-EventLoopThread
- muduo源码学习(20)-Acceptor封装
- muduo源码学习(12)-日志类封装2
- 深入学习Django源码基础16 - django中信号的学习分析
- muduo源码学习(6)-锁,条件变量
- cocos2D-X源码分析之从cocos2D-X学习OpenGL(16)----基本光照
- muduo源码学习(3)-原子操作
- muduo源码学习(19)-socket封装
- Halide学习笔记----Halide tutorial源码阅读16
- muduo源码学习(13)-多线程与并发服务器设计
- muduo源码学习(14)-网络库类库概述