您的位置:首页 > 运维架构

muduo源码学习(16)-EventLoop简介

2017-08-21 20:06 357 查看
在写一些epoll的例子的时候,一般都会有一个循环,在循环中调用epoll_wait函数。muduo中的EventLoop差不多就是对该循环的封装。在net/EventLoop.h中

class EventLoop : boost::noncopyable
{
public:
typedef boost::function<void()> Functor;

EventLoop();
~EventLoop();  // force out-line dtor, for scoped_ptr members.

///
/// Loops forever.
///
/// Must be called in the same thread as creation of the object.
///
void loop();

void quit();

///
/// Time when poll returns, usually means data arrivial.
///
Timestamp pollReturnTime() const { return pollReturnTime_; }

int64_t iteration() const { return iteration_; }

/// Runs callback immediately in the loop thread.
/// It wakes up the loop, and run the cb.
/// If in the same loop thread, cb is run within the function.
/// Safe to call from other threads.
void runInLoop(const Functor& cb);
/// Queues callback in the loop thread.
/// Runs after finish pooling.
/// Safe to call from other threads.
void queueInLoop(const Functor& cb);

// timers

//定时运行函数
///
/// Runs callback at 'time'.
/// Safe to call from other threads.
///
TimerId runAt(const Timestamp& time, const TimerCallback& cb);
///
/// Runs callback after @c delay seconds.
/// Safe to call from other threads.
///
TimerId runAfter(double delay, const TimerCallback& cb);
///
/// Runs callback every @c interval seconds.
/// Safe to call from other threads.
///
TimerId runEvery(double interval, const TimerCallback& cb);
///
/// Cancels the timer.
/// Safe to call from other threads.
///
//取消定时
void cancel(TimerId timerId);

// internal usage
//将线程从poll中唤醒
void wakeup();

//更新channel
void updateChannel(Channel* channel);

//移除channel
void removeChannel(Channel* channel);

// pid_t threadId() const { return threadId_; }
//断言在本线程中
void assertInLoopThread()
{
if (!isInLoopThread())
{
abortNotInLoopThread();
}
}

//是否在该线程中
bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }

// bool callingPendingFunctors() const { return callingPendingFunctors_; }
//是否在描述符事件的回调函数中
bool eventHandling() const { return eventHandling_; }

//获取当前线程的eventloop指针
static EventLoop* getEventLoopOfCurrentThread();

private:
void abortNotInLoopThread();

//对wakeup()发送的数据的读的回调函数
void handleRead();  // waked up

//是否在调用doPendingFunctors()中
void doPendingFunctors();

void printActiveChannels() const; // DEBUG

typedef std::vector<Channel*> ChannelList;

//是否在loop中
bool looping_; /* atomic *//

//是否要推出loop循环
bool quit_; /* atomic */

//是否处于描述符的回调事件中
bool eventHandling_; /* atomic */

//是否处于调用设置的回调函数中
bool callingPendingFunctors_; /* atomic */

//主循环次数
int64_t iteration_;

//该线程的id
const pid_t threadId_;

//poll函数返回的时刻
Timestamp pollReturnTime_;

//poll对象,调用poll函数
boost::scoped_ptr<Poller> poller_;

//定时事件的集合,通过new构造
boost::scoped_ptr<TimerQueue> timerQueue_;

//唤醒poll的描述符
int wakeupFd_;
// unlike in TimerQueue, which is an internal class,
// we don't expose Channel to client.

//对唤醒的描述符wakeupFd_的封装,为其该描述符设置回调函数
boost::scoped_ptr<Channel> wakeupChannel_;

//通过poll函数返回的活跃的IO的vector
ChannelList activeChannels_;

Channel* currentActiveChannel_;
MutexLock mutex_;

//执行的回调函数的动态数组
std::vector<Functor> pendingFunctors_; // @BuardedBy mutex_
};
类中的成员比较多。主要的就是有一个IO复用的对象智能指针poller_,还有定时时间的容器timeQueue_,函数指针的容器pendingFunctors_,唤醒的channel activeChannel_。定时事件之后再说。成员函数中,主要就是loop(),实现如下

void EventLoop::loop()
{
assert(!looping_);
assertInLoopThread();
looping_ = true;
quit_ = false;
LOG_TRACE << "EventLoop " << this << " start looping";

while (!quit_)
{
activeChannels_.clear();
//activeChannels_为得到的活跃的描述符的抽象
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);

//循环次数
++iteration_;

//根据日志级别打印日志
if (Logger::logLevel() <= Logger::TRACE)
{
printActiveChannels();
}
// TODO sort channel by priority

//表示正在处理描述符对应的回调函数
eventHandling_ = true;

//遍历活跃的描述符的容器,执行每一个描述符的回调函数
for (ChannelList::iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it)
{
currentActiveChannel_ = *it;
//执行回调函数
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;

//表示结束了遍历处理动作
eventHandling_ = false;

//调用设置的其他函数
doPendingFunctors();
}

LOG_TRACE << "EventLoop " << this << " stop looping";
looping_ = false;
}


该loop中,执行循环,首先调用IO复用函数得到活跃事件,之后遍历,调用channel的handleEvent事件处理函数。之后调用的doPendingFunctors()函数,实现如下:

void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;

//表示loop_循环正在调用doPendingFunctors函数
callingPendingFunctors_ = true;

{
MutexLockGuard lock(mutex_);
functors.swap(pendingFunctors_);
}

//遍历调用pendingFunctors_中的函数
for (size_t i = 0; i < functors.size(); ++i)
{
functors[i]();
}
//表示loop_循环结束调用doPendingFunctors函数
callingPendingFunctors_ = false;
}


遍历函数指针的容器,调用函数指针。

在每一个线程中,最多创建一个EventLoop对象,在EventLoop中,还有一个唤醒的channel,这是用于别的线程将该线程唤醒,也就是说,如果循环中,阻塞在了epoll_wait()上,此时我们可能要结束循环,或者是在函数指针的容器中添加了函数指针,想然线程尽快执行,于是就需要唤醒,也就是让唤醒的文件描述符产生可读事件,此时epoll_wait返回,循环继续往下执行。唤醒函数如下

//执行唤醒动作,往wakeupFd_描述符发送8个字节的数据,使poll能够返回,结束阻塞
void EventLoop::wakeup()
{
uint64_t one = 1;
ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);
if (n != sizeof one)
{
LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
}
}
在别的线程中,如果调用runInLoop函数,即:

void EventLoop::runInLoop(const Functor& cb)
{
if (isInLoopThread())
{
cb();
}
else
{
queueInLoop(cb);
}
}
会把函数指针添加到EventLoop的函数对象容器中,这样实现线程间的通信,queueInLoop如下:

void EventLoop::queueInLoop(const Functor& cb)
{
{
MutexLockGuard lock(mutex_);
//添加到设置的回调函数动态数组中
pendingFunctors_.push_back(cb);
}

if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup();
}
}
此时,会唤醒线程去执行函数对象。

在初始化的时候,就将用于唤醒的描述符加入到epoll中管理,而且线程内的全局变量会存储EventLoop对象的指针。

//构造函数
EventLoop::EventLoop()
: looping_(false),
quit_(false),
eventHandling_(false),
callingPendingFunctors_(false),
iteration_(0),
threadId_(CurrentThread::tid()),
poller_(Poller::newDefaultPoller(this)),
//定时器集合,构造中会将timeefd添加到poll中监听可读事件,可读事件的回调函数
//会调用超时的time的回调函数
timerQueue_(new TimerQueue(this)),
wakeupFd_(createEventfd()),
wakeupChannel_(new Channel(this, wakeupFd_)),
currentActiveChannel_(NULL)
{
LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
//检查该线程中是否存在其他的EventLoop,一个线程中只能存在一个Eventloop
if (t_loopInThisThread)
{
LOG_FATAL <<
4000
; "Another EventLoop " << t_loopInThisThread
<< " exists in this thread " << threadId_;
}
else
{
t_loopInThisThread = this;  //存储指针
}
wakeupChannel_->setReadCallback(
boost::bind(&EventLoop::handleRead, this));
// we are always reading the wakeupfd
//将wakeupfd设置到poll中监听
wakeupChannel_->enableReading();
}


//创建一个描述符, 用于唤醒
int createEventfd()
{
int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (evtfd < 0)
{
LOG_SYSERR << "Failed in eventfd";
abort();
}
return evtfd;
}


在EventLoop.cc中有

#pragma GCC diagnostic ignored "-Wold-style-cast"
class IgnoreSigPipe
{
public:
IgnoreSigPipe()
{
::signal(SIGPIPE, SIG_IGN);
LOG_TRACE << "Ignore SIGPIPE";
}
};
#pragma GCC diagnostic error "-Wold-style-cast"

//忽略SIGPIPE信号,全局初始化是就会调用
IgnoreSigPipe initObj;

}


用于忽略SIGPIPE信号。

区域的对channel的操作只是调用了poll对象的相应函数

//更新poller维护的相关集合中的描述符
void EventLoop::updateChannel(Channel* channel)
{
assert(channel->ownerLoop() == this);
assertInLoopThread();
poller_->updateChannel(channel);
}

//删除poller维护的相关集合中的描述符
void EventLoop::removeChannel(Channel* channel)
{
assert(channel->ownerLoop() == this);
assertInLoopThread();
if (eventHandling_)
{
assert(currentActiveChannel_ == channel ||
std::find(activeChannels_.begin(), activeChannels_.end(), channel) == activeChannels_.end());
}
poller_->removeChannel(channel);
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: