您的位置:首页 > Web前端 > React

muduo库阅读(29)——Net部分:Reactor(EventLoop事件循环)

2015-11-11 17:23 393 查看
Reactor是整个框架最核心的部分

它创建轮询器(Poller),创建用于传递消息的管道,初始化各个部分,然后进入一个无限循环,每一次循环中调用轮询器的轮询函数(等待函数),等待事件发生,如果有事件发生,就依次调用套接字(或其他的文件描述符)的事件处理器处理各个事件,然后调用投递的回调函数;接着进入下一次循环。

/*
* 事件循环,即Reactor,一个线程最多一个Reactor
* 这是一个接口类
*/
namespace muduo
{
namespace net
{

class Channel;
class Poller;
class TimerQueue;

///
/// Reactor, at most one per thread.
///
/// This is an interface class, so don't expose too much details.
class EventLoop : boost::noncopyable
{
public:
typedef boost::function<void()> Functor;

EventLoop();
~EventLoop();  // force out-line dtor, for scoped_ptr members.

// 事件循环,必须在同一个线程内创建Reactor对象和调用它的loop函数
void loop();

/// better to call through shared_ptr<EventLoop> for 100% safety.
// 跳出事件循环
void quit();

// 轮询返回的时间戳,通常表示有数据到来(即有事件到来)
Timestamp pollReturnTime() const { return pollReturnTime_; }

int64_t iteration() const { return iteration_; }

/// Runs callback immediately in the loop thread.
/// It wakes up the loop, and run the cb.
/// If in the same loop thread, cb is run within the function.
/// Safe to call from other threads.
/*
* 在事件循环中周期性的调用回调函数
*/
void runInLoop(const Functor& cb);
/// Queues callback in the loop thread.
/// Runs after finish pooling.
/// Safe to call from other threads.
// 把回调函数放在队列中,在轮询结束之后调用
void queueInLoop(const Functor& cb);

#ifdef __GXX_EXPERIMENTAL_CXX0X__
void runInLoop(Functor&& cb);
void queueInLoop(Functor&& cb);
#endif

// 在指定的时间调用回调函数
TimerId runAt(const Timestamp& time, const TimerCallback& cb);

// 在delay妙之后调用回调函数
TimerId runAfter(double delay, const TimerCallback& cb);

// 每隔interval妙调用一次回调函数
TimerId runEvery(double interval, const TimerCallback& cb);

// 取消一个计时器
void cancel(TimerId timerId);

#ifdef __GXX_EXPERIMENTAL_CXX0X__
TimerId runAt(const Timestamp& time, TimerCallback&& cb);
TimerId runAfter(double delay, TimerCallback&& cb);
TimerId runEvery(double interval, TimerCallback&& cb);
#endif

/*
* 下面的函数作为内部使用
*/
// internal usage
// 唤醒
void wakeup();

// 更新事件处理器
void updateChannel(Channel* channel);

// 移除事件处理器
void removeChannel(Channel* channel);

// 是否有某一个事件处理器
bool hasChannel(Channel* channel);

// 断言自己是否在循环线程中
void assertInLoopThread()
{
if (!isInLoopThread())
{
abortNotInLoopThread();
}
}

// 是否在循环线程中
bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }

// 是否正在处理事件
bool eventHandling() const { return eventHandling_; }

// 设置上下文
void setContext(const boost::any& context)
{ context_ = context; }

// 返回上下文
const boost::any& getContext() const
{ return context_; }

// 返回可修改的上下文
boost::any* getMutableContext()
{ return &context_; }

// 返回当前线程的Reactor对象
static EventLoop* getEventLoopOfCurrentThread();

private:

// 如果创建Reactor的线程和运行Reactor的线程不同就退出进程
void abortNotInLoopThread();

// 处理读读事件
void handleRead();  // waked up

// 执行投递的回调函数
void doPendingFunctors();

void printActiveChannels() const; // DEBUG

// 事件处理器列表
typedef std::vector<Channel*> ChannelList;

// 是否正在循环中
bool looping_; /* atomic */

// 是否退出循环
bool quit_; /* atomic and shared between threads, okay on x86, I guess. */
// 是否正在处理事件
bool eventHandling_; /* atomic */

// 是否正在调用投递的回调函数
bool callingPendingFunctors_; /* atomic */

// 迭代器
int64_t iteration_;

// 线程id
const pid_t threadId_;

// 轮询返回的时间
Timestamp pollReturnTime_;

// 轮询器
boost::scoped_ptr<Poller> poller_;

// 定时器队列
boost::scoped_ptr<TimerQueue> timerQueue_;

// 用于唤醒的描述符(将Reactor从等待中唤醒,一般是由于调用轮询器的等待函数而造成的阻塞)
int wakeupFd_;
// unlike in TimerQueue, which is an internal class,
// we don't expose Channel to client.
// 唤醒事件的处理器
boost::scoped_ptr<Channel> wakeupChannel_;

// 上下文
boost::any context_;

// scratch variables
// 已激活的事件处理器队列
ChannelList activeChannels_;

// 当前正在调用的事件处理器
Channel* currentActiveChannel_;

// 投递的回调函数列表
MutexLock mutex_;
std::vector<Functor> pendingFunctors_; // @GuardedBy mutex_
};

}
}


using namespace muduo;
using namespace muduo::net;

namespace
{
// 加上了__thread修饰符,表示每个线程都有一个指向Reactor的指针
__thread EventLoop* t_loopInThisThread = 0;

// 轮询的超时时间(10000毫秒 = 10妙)
const int kPollTimeMs = 10000;

/*
* 创建管道,用于支持线程之间的通信
*/
int createEventfd()
{
int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (evtfd < 0)
{
LOG_SYSERR << "Failed in eventfd";
abort();
}
return evtfd;
}

#pragma GCC diagnostic ignored "-Wold-style-cast"
/*
* 这个类的作用是忽略管道事件
* 因为管道已经被使用另外的方法处理了
*/
class IgnoreSigPipe
{
public:
IgnoreSigPipe()
{
::signal(SIGPIPE, SIG_IGN);
// LOG_TRACE << "Ignore SIGPIPE";
}
};
#pragma GCC diagnostic error "-Wold-style-cast"

// 一个全局变量,用于实现对SIGPIPE信号的忽略
IgnoreSigPipe initObj;
}

/*
* 返回当前线程的Reactor对象的指针
*/
EventLoop* EventLoop::getEventLoopOfCurrentThread()
{
return t_loopInThisThread;
}

/*
* Reactor的构造函数
*/
EventLoop::EventLoop()
: looping_(false),
quit_(false),
eventHandling_(false),
callingPendingFunctors_(false),
iteration_(0),
threadId_(CurrentThread::tid()),
poller_(Poller::newDefaultPoller(this)), // 调用默认的轮询器来创建轮询
timerQueue_(new TimerQueue(this)),
wakeupFd_(createEventfd()),
wakeupChannel_(new Channel(this, wakeupFd_)),
currentActiveChannel_(NULL)
{
LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_;
if (t_loopInThisThread)
{
LOG_FATAL << "Another EventLoop " << t_loopInThisThread
<< " exists in this thread " << threadId_;
}
else
{
t_loopInThisThread = this;
}

// 设置唤醒处理器的读回调函数为handleRead
wakeupChannel_->setReadCallback(
boost::bind(&EventLoop::handleRead, this));
// we are always reading the wakeupfd
// 启用读功能
wakeupChannel_->enableReading();
}

/*
* 析构函数,销毁对象并清理
*/
EventLoop::~EventLoop()
{
LOG_DEBUG << "EventLoop " << this << " of thread " << threadId_
<< " destructs in thread " << CurrentThread::tid();
wakeupChannel_->disableAll();
wakeupChannel_->remove();
::close(wakeupFd_);
t_loopInThisThread = NULL;
}

/*
* Reactor的循环
*/
void EventLoop::loop()
{
assert(!looping_);
assertInLoopThread();
looping_ = true;
quit_ = false;  // FIXME: what if someone calls quit() before loop() ?
LOG_TRACE << "EventLoop " << this << " start looping";

while (!quit_)
{
// 清理已激活事件处理器的队列
activeChannels_.clear();

// 开始轮询
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);

// 记录循环的次数
++iteration_;
if (Logger::logLevel() <= Logger::TRACE)
{
printActiveChannels();
}
// TODO sort channel by priority
// 开始处理事件
eventHandling_ = true;

// 遍历已激活事件处理器队列,执行每一个事件处理器的处理
for (ChannelList::iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it)
{
currentActiveChannel_ = *it;

// 处理事件
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;

// 事件处理结束
eventHandling_ = false;

// 执行投递回调函数
doPendingFunctors();
}

LOG_TRACE << "EventLoop " << this << " stop looping";
looping_ = false;
}

/*
* 让Reactor退出循环
* 因为调用quit的线程和执行Reactor循环的线程不一定相同,如果他们不是同一个线程,
* 那么必须使用wakeup函数让wakeupChannel_被激活,然后轮询函数返回,就可以处理事件,然后进入下一个循环,
* 由于quit_已经被设置为true,所以就会跳出循环
*/
void EventLoop::quit()
{
quit_ = true;
// There is a chance that loop() just executes while(!quit_) and exists,
// then EventLoop destructs, then we are accessing an invalid object.
// Can be fixed using mutex_ in both places.
if (!isInLoopThread())
{
// 不知道为什么线程不同才会调用wakeup????同一个线程之内难道不可以调用?
wakeup();
}
}

// 运行一个回调函数
void EventLoop::runInLoop(const Functor& cb)
{
if (isInLoopThread())
{
cb();
}
else
{
// 如果不是同一个线程,那么将他添加到投递回调函数队列中
queueInLoop(cb);
}
}

// 把一个回调函数添加到投递回调函数队列中,并唤醒Reactor
void EventLoop::queueInLoop(const Functor& cb)
{
{
MutexLockGuard lock(mutex_);
pendingFunctors_.push_back(cb);
}

if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup();
}
}

// 添加定时器事件:在某个时间点执行
TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
{
return timerQueue_->addTimer(cb, time, 0.0);
}

// 添加定时器事件:在delay妙之后执行
TimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
{
Timestamp time(addTime(Timestamp::now(), delay));
return runAt(time, cb);
}

// 添加定时器事件:每interval妙执行一次
TimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
{
Timestamp time(addTime(Timestamp::now(), interval));
return timerQueue_->addTimer(cb, time, interval);
}

#ifdef __GXX_EXPERIMENTAL_CXX0X__
// FIXME: remove duplication
void EventLoop::runInLoop(Functor&& cb)
{
if (isInLoopThread())
{
cb();
}
else
{
queueInLoop(std::move(cb));
}
}

void EventLoop::queueInLoop(Functor&& cb)
{
{
MutexLockGuard lock(mutex_);
pendingFunctors_.push_back(std::move(cb));  // emplace_back
}

if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup();
}
}

TimerId EventLoop::runAt(const Timestamp& time, TimerCallback&& cb)
{
return timerQueue_->addTimer(std::move(cb), time, 0.0);
}

TimerId EventLoop::runAfter(double delay, TimerCallback&& cb)
{
Timestamp time(addTime(Timestamp::now(), delay));
return runAt(time, std::move(cb));
}

TimerId EventLoop::runEvery(double interval, TimerCallback&& cb)
{
Timestamp time(addTime(Timestamp::now(), interval));
return timerQueue_->addTimer(std::move(cb), time, interval);
}
#endif

// 取消一个计时器(定时器)
void EventLoop::cancel(TimerId timerId)
{
return timerQueue_->cancel(timerId);
}

// 更新事件处理器
void EventLoop::updateChannel(Channel* channel)
{
assert(channel->ownerLoop() == this);
assertInLoopThread();
poller_->updateChannel(channel);
}

// 移除一个事件处理器
void EventLoop::removeChannel(Channel* channel)
{
assert(channel->ownerLoop() == this);
assertInLoopThread();
if (eventHandling_)
{
assert(currentActiveChannel_ == channel ||
std::find(activeChannels_.begin(), activeChannels_.end(), channel) == activeChannels_.end());
}
poller_->removeChannel(channel);
}

// 判断轮询器是否有事件处理器
bool EventLoop::hasChannel(Channel* channel)
{
assert(channel->ownerLoop() == this);
assertInLoopThread();
return poller_->hasChannel(channel);
}

// 退出进程
void EventLoop::abortNotInLoopThread()
{
LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
<< " was created in threadId_ = " << threadId_
<< ", current thread id = " <<  CurrentThread::tid();
}

/*
* 唤醒
* 其实就是告诉正在处理循环的Reactor,发生了某一件事
*/
void EventLoop::wakeup()
{
uint64_t one = 1;
ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);
if (n != sizeof one)
{
LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
}
}

/*
* 被wakeupChannel_使用
* wakeupChannel_有事件发生时候(wakeup一旦被调用,wakeupChannel_就被激活),就会执行handleEvent
* 而handleEvent内部就会调用处理读的回调函数,wakeupChannel_的读回调函数就是handleRead
*/
void EventLoop::handleRead()
{
uint64_t one = 1;
ssize_t n = sockets::read(wakeupFd_, &one, sizeof one);
if (n != sizeof one)
{
LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
}
}

// 执行投递的回调函数(投递的回调函数是在一次循环中,所有的事件都处理完毕之后才调用的)
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;

{
// 交换的目的是让pendingFunctors_继续存放投递的回调函数
// 而functors则专门用于执行投递的回调函数
// 这样就不会因为长期锁住pendingFunctors_而造成阻塞了
MutexLockGuard lock(mutex_);
functors.swap(pendingFunctors_);
}

for (size_t i = 0; i < functors.size(); ++i)
{
functors[i]();
}
callingPendingFunctors_ = false;
}

// 打印已激活的事件处理器
void EventLoop::printActiveChannels() const
{
for (ChannelList::const_iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it)
{
const Channel* ch = *it;
LOG_TRACE << "{" << ch->reventsToString() << "} ";
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: