您的位置:首页 > 产品设计 > UI/UE

muduo库阅读(33)——Net部分:定时器队列TimerQueue

2015-11-11 21:10 459 查看
/*
* 定时器队列
*/
namespace muduo
{
namespace net
{

class EventLoop;
class Timer;
class TimerId;

class TimerQueue : boost::noncopyable
{
public:
TimerQueue(EventLoop* loop);
~TimerQueue();

/// Must be thread safe. Usually be called from other threads.
// 添加一个定时器
TimerId addTimer(const TimerCallback& cb,
Timestamp when,
double interval);
#ifdef __GXX_EXPERIMENTAL_CXX0X__
TimerId addTimer(TimerCallback&& cb,
Timestamp when,
double interval);
#endif

// 取消一个定时器
void cancel(TimerId timerId);

private:

// 计时器列表中的元素实体
typedef std::pair<Timestamp, Timer*> Entry;

// 计时器列表
typedef std::set<Entry> TimerList;

// 活动的计时器
typedef std::pair<Timer*, int64_t> ActiveTimer;

// 活动的计时器集合
typedef std::set<ActiveTimer> ActiveTimerSet;

// 添加定时器
void addTimerInLoop(Timer* timer);

// 取消定时器
void cancelInLoop(TimerId timerId);

// 系统定时器到期(表示定时器队列中超时最早的定时器已经超时)
void handleRead();

// 获取所有已经超时定时器,并从定时器队列/活动定时集合中删除它们
std::vector<Entry> getExpired(Timestamp now);

// 重置周期性的定时器,删除一次性或被取消的计时器
void reset(const std::vector<Entry>& expired, Timestamp now);

// 插入定时器
bool insert(Timer* timer);

// 所属的Reactor
EventLoop* loop_;

// 定时器文件描述符(Reactor用这个文件描述符产生的事件激活定时器事件处理器)
const int timerfd_;

// 定时器事件处理器
Channel timerfdChannel_;

// 定时器列表
TimerList timers_;

// 活动的定时器集合
ActiveTimerSet activeTimers_;

// 是否正在处理超时任务
bool callingExpiredTimers_; /* atomic */

// 被取消的定时器的集合
ActiveTimerSet cancelingTimers_;
};

}
}


namespace muduo
{
namespace net
{
namespace detail
{

/*
* 创建一个定时器描述符,Reator使用它产生的定时事件来激活定时器事件处理器
* timerfd是Linux为用户程序提供的一个定时器接口。
* 这个接口基于文件描述符,
* 通过文件描述符的可读事件进行超时通知,
* 所以能够被用于select/poll的应用场景
*/
int createTimerfd()
{
// 注意timerfd_create这个函数的用法
// 另外注意这里所说的定时与Timer的不同
// 这是操作系统级别的定时器,Timer需要使用它才能实现定时功能
int timerfd = ::timerfd_create(CLOCK_MONOTONIC,TFD_NONBLOCK | TFD_CLOEXEC);
if (timerfd < 0)
{
LOG_SYSFATAL << "Failed in timerfd_create";
}
return timerfd;
}

// 从现在到某一个指定的时间点的时间长度
struct timespec howMuchTimeFromNow(Timestamp when)
{
int64_t microseconds = when.microSecondsSinceEpoch() - Timestamp::now().microSecondsSinceEpoch();
if (microseconds < 100)
{
microseconds = 100;
}
struct timespec ts;
ts.tv_sec = static_cast<time_t>( microseconds / Timestamp::kMicroSecondsPerSecond);
ts.tv_nsec = static_cast<long>( (microseconds % Timestamp::kMicroSecondsPerSecond) * 1000);
return ts;
}

// 定时器的读,定时器可读表明有超时事件到来
// 不会出现同时超时两个的情况
void readTimerfd(int timerfd, Timestamp now)
{
uint64_t howmany;

// 这个读取的是使用timerfd_settime设置的值
ssize_t n = ::read(timerfd, &howmany, sizeof howmany);
LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString();
if (n != sizeof howmany)
{
LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8";
}
}

// 重新设置计时器的超时时间
// 如果添加的某个定时器的超时时间,比当前设置的系统定时器超时时间要早,要么就需要重新设置
void resetTimerfd(int timerfd, Timestamp expiration)
{
// wake up loop by timerfd_settime()
struct itimerspec newValue;
struct itimerspec oldValue;
bzero(&newValue, sizeof newValue);
bzero(&oldValue, sizeof oldValue);
newValue.it_value = howMuchTimeFromNow(expiration);

// 表示设置一个在expiration超时的定时器
int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue);

if (ret)
{
LOG_SYSERR << "timerfd_settime()";
}
}

}
}
}

using namespace muduo;
using namespace muduo::net;
using namespace muduo::net::detail;

TimerQueue::TimerQueue(EventLoop* loop)
: loop_(loop),
timerfd_(createTimerfd()),
timerfdChannel_(loop, timerfd_),
timers_(),
callingExpiredTimers_(false)
{
// 设置超时的回调函数
timerfdChannel_.setReadCallback(boost::bind(&TimerQueue::handleRead, this));

// 设置为可读
timerfdChannel_.enableReading();
}

/*
* 销毁定时器队列
*/
TimerQueue::~TimerQueue()
{
timerfdChannel_.disableAll();
timerfdChannel_.remove();
::close(timerfd_);
// do not remove channel, since we're in EventLoop::dtor();
for (TimerList::iterator it = timers_.begin();
it != timers_.end(); ++it)
{
delete it->second;
}
}

// 添加一个定时器
TimerId TimerQueue::addTimer(const TimerCallback& cb,
Timestamp when,
double interval)
{
Timer* timer = new Timer(cb, when, interval);

// 添加一个定时器,执行Reactor循环的线程择机调用addTimerInLoop实现添加
loop_->runInLoop(boost::bind(&TimerQueue::addTimerInLoop, this, timer));

// 返回一个定时器id
return TimerId(timer, timer->sequence());
}

#ifdef __GXX_EXPERIMENTAL_CXX0X__
TimerId TimerQueue::addTimer(TimerCallback&& cb,
Timestamp when,
double interval)
{
Timer* timer = new Timer(std::move(cb), when, interval);
loop_->runInLoop(
boost::bind(&TimerQueue::addTimerInLoop, this, timer));
return TimerId(timer, timer->sequence());
}
#endif

void TimerQueue::cancel(TimerId timerId)
{
loop_->runInLoop(boost::bind(&TimerQueue::cancelInLoop, this, timerId));
}

// 添加定时器(在Reactor的循环中添加)
void TimerQueue::addTimerInLoop(Timer* timer)
{
loop_->assertInLoopThread();

// 当前添加的定时器是超时最早的计时器
bool earliestChanged = insert(timer);

if (earliestChanged)
{
// 重新设置系统定时器的超时时间
resetTimerfd(timerfd_, timer->expiration());
}
}

// 取消定时器
void TimerQueue::cancelInLoop(TimerId timerId)
{
loop_->assertInLoopThread();

assert(timers_.size() == activeTimers_.size());

ActiveTimer timer(timerId.timer_, timerId.sequence_);

ActiveTimerSet::iterator it = activeTimers_.find(timer);

// 删除定时器
if (it != activeTimers_.end())
{
size_t n = timers_.erase(Entry(it->first->expiration(), it->first));
assert(n == 1); (void)n;
delete it->first; // FIXME: no delete please
activeTimers_.erase(it);
}
else if (callingExpiredTimers_)
{
// 如果没有这个定时器(表示它已经超时,正在被处理),而且又是正在处理已超时定时器,那么把它插入到正在取消的定时器对列中
// 等待处理超时定时器的工作完成,调用reset的时候会将其删除
cancelingTimers_.insert(timer);
}
assert(timers_.size() == activeTimers_.size());
}

/*
* 一旦计时器到期,就会触发事件,timerfdChannel_就会被激活,
* 然后读事件回调函数handleRead就会被调用
*/
void TimerQueue::handleRead()
{
loop_->assertInLoopThread();
Timestamp now(Timestamp::now());

// 将计时器里数据(这个数据是通过timerfd_settime写入的)读取出来,否则会重复激发定时器
readTimerfd(timerfd_, now);

// 获取所有已经超时的定时器
std::vector<Entry> expired = getExpired(now);

callingExpiredTimers_ = true;
cancelingTimers_.clear();

// 一次调用每一个到期回调函数
for (std::vector<Entry>::iterator it = expired.begin(); it != expired.end(); ++it)
{
it->second->run();
}

callingExpiredTimers_ = false;

// 重置所有周期性的定时器
reset(expired, now);
}

/*
* 获取所有已经超时的定时器
*/
std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
{
assert(timers_.size() == activeTimers_.size());

std::vector<Entry> expired;

Entry sentry(now, reinterpret_cast<Timer*>(UINTPTR_MAX));

// 获取所有超时时间比当前时间早的定时器,即已到期的定时器(timers_.begin()与end之间就是所有的已超时的定时器)
TimerList::iterator end = timers_.lower_bound(sentry);

assert(end == timers_.end() || now < end->first);

// 将已超时的定时器复制到expired中
std::copy(timers_.begin(), end, back_inserter(expired));

timers_.erase(timers_.begin(), end);

for (std::vector<Entry>::iterator it = expired.begin(); it != expired.end(); ++it)
{
// 将已超时的定时器从活动定时器列表中删除
ActiveTimer timer(it->second, it->second->sequence());
size_t n = activeTimers_.erase(timer);
assert(n == 1); (void)n;
}

assert(timers_.size() == activeTimers_.size());
return expired;
}

// 重置所有周期性的定时器
void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)
{
// 下一次的超时时间
Timestamp nextExpire;

for (std::vector<Entry>::const_iterator it = expired.begin(); it != expired.end(); ++it)
{
ActiveTimer timer(it->second, it->second->sequence());

// 它是周期性的定时器,而且不在被取消计时器队列中
if (it->second->repeat() && cancelingTimers_.find(timer) == cancelingTimers_.end())
{
// 重新设置超时时间
it->second->restart(now);

// 再次插入计时器队列中
insert(it->second);
}
else
{
// FIXME move to a free list
// 一次性的定时器,或者在被取消的定时器队列中,那么将他删除
delete it->second; // FIXME: no delete please
}
}

if (!timers_.empty())
{
nextExpire = timers_.begin()->second->expiration();
}

// 重新设置系统计时器的超时时间
if (nextExpire.valid())
{
resetTimerfd(timerfd_, nextExpire);
}
}

// 插入一个定时器
bool TimerQueue::insert(Timer* timer)
{
loop_->assertInLoopThread();
assert(timers_.size() == activeTimers_.size());

// 用于判断当前计时器是不是最早超时的那个
bool earliestChanged = false;

Timestamp when = timer->expiration();

TimerList::iterator it = timers_.begin();

// 如果计时器队列是空的或者,它比计时器队列中最早超时的那个计时器的超时时间还要早
// 那么当前计时器就是最早发生超时的那个计时器
if (it == timers_.end() || when < it->first)
{
earliestChanged = true;
}

{
// 插入,该对列会自动排列
std::pair<TimerList::iterator, bool> result = timers_.insert(Entry(when, timer));
assert(result.second); (void)result;
}

{
// 插入到活动的计时器队列
std::pair<ActiveTimerSet::iterator, bool> result = activeTimers_.insert(ActiveTimer(timer, timer->sequence()));
assert(result.second); (void)result;
}

assert(timers_.size() == activeTimers_.size());
return earliestChanged;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: