muduo库阅读(33)——Net部分:定时器队列TimerQueue
2015-11-11 21:10
459 查看
/* * 定时器队列 */ namespace muduo { namespace net { class EventLoop; class Timer; class TimerId; class TimerQueue : boost::noncopyable { public: TimerQueue(EventLoop* loop); ~TimerQueue(); /// Must be thread safe. Usually be called from other threads. // 添加一个定时器 TimerId addTimer(const TimerCallback& cb, Timestamp when, double interval); #ifdef __GXX_EXPERIMENTAL_CXX0X__ TimerId addTimer(TimerCallback&& cb, Timestamp when, double interval); #endif // 取消一个定时器 void cancel(TimerId timerId); private: // 计时器列表中的元素实体 typedef std::pair<Timestamp, Timer*> Entry; // 计时器列表 typedef std::set<Entry> TimerList; // 活动的计时器 typedef std::pair<Timer*, int64_t> ActiveTimer; // 活动的计时器集合 typedef std::set<ActiveTimer> ActiveTimerSet; // 添加定时器 void addTimerInLoop(Timer* timer); // 取消定时器 void cancelInLoop(TimerId timerId); // 系统定时器到期(表示定时器队列中超时最早的定时器已经超时) void handleRead(); // 获取所有已经超时定时器,并从定时器队列/活动定时集合中删除它们 std::vector<Entry> getExpired(Timestamp now); // 重置周期性的定时器,删除一次性或被取消的计时器 void reset(const std::vector<Entry>& expired, Timestamp now); // 插入定时器 bool insert(Timer* timer); // 所属的Reactor EventLoop* loop_; // 定时器文件描述符(Reactor用这个文件描述符产生的事件激活定时器事件处理器) const int timerfd_; // 定时器事件处理器 Channel timerfdChannel_; // 定时器列表 TimerList timers_; // 活动的定时器集合 ActiveTimerSet activeTimers_; // 是否正在处理超时任务 bool callingExpiredTimers_; /* atomic */ // 被取消的定时器的集合 ActiveTimerSet cancelingTimers_; }; } }
namespace muduo { namespace net { namespace detail { /* * 创建一个定时器描述符,Reator使用它产生的定时事件来激活定时器事件处理器 * timerfd是Linux为用户程序提供的一个定时器接口。 * 这个接口基于文件描述符, * 通过文件描述符的可读事件进行超时通知, * 所以能够被用于select/poll的应用场景 */ int createTimerfd() { // 注意timerfd_create这个函数的用法 // 另外注意这里所说的定时与Timer的不同 // 这是操作系统级别的定时器,Timer需要使用它才能实现定时功能 int timerfd = ::timerfd_create(CLOCK_MONOTONIC,TFD_NONBLOCK | TFD_CLOEXEC); if (timerfd < 0) { LOG_SYSFATAL << "Failed in timerfd_create"; } return timerfd; } // 从现在到某一个指定的时间点的时间长度 struct timespec howMuchTimeFromNow(Timestamp when) { int64_t microseconds = when.microSecondsSinceEpoch() - Timestamp::now().microSecondsSinceEpoch(); if (microseconds < 100) { microseconds = 100; } struct timespec ts; ts.tv_sec = static_cast<time_t>( microseconds / Timestamp::kMicroSecondsPerSecond); ts.tv_nsec = static_cast<long>( (microseconds % Timestamp::kMicroSecondsPerSecond) * 1000); return ts; } // 定时器的读,定时器可读表明有超时事件到来 // 不会出现同时超时两个的情况 void readTimerfd(int timerfd, Timestamp now) { uint64_t howmany; // 这个读取的是使用timerfd_settime设置的值 ssize_t n = ::read(timerfd, &howmany, sizeof howmany); LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString(); if (n != sizeof howmany) { LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8"; } } // 重新设置计时器的超时时间 // 如果添加的某个定时器的超时时间,比当前设置的系统定时器超时时间要早,要么就需要重新设置 void resetTimerfd(int timerfd, Timestamp expiration) { // wake up loop by timerfd_settime() struct itimerspec newValue; struct itimerspec oldValue; bzero(&newValue, sizeof newValue); bzero(&oldValue, sizeof oldValue); newValue.it_value = howMuchTimeFromNow(expiration); // 表示设置一个在expiration超时的定时器 int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue); if (ret) { LOG_SYSERR << "timerfd_settime()"; } } } } } using namespace muduo; using namespace muduo::net; using namespace muduo::net::detail; TimerQueue::TimerQueue(EventLoop* loop) : loop_(loop), timerfd_(createTimerfd()), timerfdChannel_(loop, timerfd_), timers_(), callingExpiredTimers_(false) { // 设置超时的回调函数 timerfdChannel_.setReadCallback(boost::bind(&TimerQueue::handleRead, this)); // 设置为可读 timerfdChannel_.enableReading(); } /* * 销毁定时器队列 */ TimerQueue::~TimerQueue() { timerfdChannel_.disableAll(); timerfdChannel_.remove(); ::close(timerfd_); // do not remove channel, since we're in EventLoop::dtor(); for (TimerList::iterator it = timers_.begin(); it != timers_.end(); ++it) { delete it->second; } } // 添加一个定时器 TimerId TimerQueue::addTimer(const TimerCallback& cb, Timestamp when, double interval) { Timer* timer = new Timer(cb, when, interval); // 添加一个定时器,执行Reactor循环的线程择机调用addTimerInLoop实现添加 loop_->runInLoop(boost::bind(&TimerQueue::addTimerInLoop, this, timer)); // 返回一个定时器id return TimerId(timer, timer->sequence()); } #ifdef __GXX_EXPERIMENTAL_CXX0X__ TimerId TimerQueue::addTimer(TimerCallback&& cb, Timestamp when, double interval) { Timer* timer = new Timer(std::move(cb), when, interval); loop_->runInLoop( boost::bind(&TimerQueue::addTimerInLoop, this, timer)); return TimerId(timer, timer->sequence()); } #endif void TimerQueue::cancel(TimerId timerId) { loop_->runInLoop(boost::bind(&TimerQueue::cancelInLoop, this, timerId)); } // 添加定时器(在Reactor的循环中添加) void TimerQueue::addTimerInLoop(Timer* timer) { loop_->assertInLoopThread(); // 当前添加的定时器是超时最早的计时器 bool earliestChanged = insert(timer); if (earliestChanged) { // 重新设置系统定时器的超时时间 resetTimerfd(timerfd_, timer->expiration()); } } // 取消定时器 void TimerQueue::cancelInLoop(TimerId timerId) { loop_->assertInLoopThread(); assert(timers_.size() == activeTimers_.size()); ActiveTimer timer(timerId.timer_, timerId.sequence_); ActiveTimerSet::iterator it = activeTimers_.find(timer); // 删除定时器 if (it != activeTimers_.end()) { size_t n = timers_.erase(Entry(it->first->expiration(), it->first)); assert(n == 1); (void)n; delete it->first; // FIXME: no delete please activeTimers_.erase(it); } else if (callingExpiredTimers_) { // 如果没有这个定时器(表示它已经超时,正在被处理),而且又是正在处理已超时定时器,那么把它插入到正在取消的定时器对列中 // 等待处理超时定时器的工作完成,调用reset的时候会将其删除 cancelingTimers_.insert(timer); } assert(timers_.size() == activeTimers_.size()); } /* * 一旦计时器到期,就会触发事件,timerfdChannel_就会被激活, * 然后读事件回调函数handleRead就会被调用 */ void TimerQueue::handleRead() { loop_->assertInLoopThread(); Timestamp now(Timestamp::now()); // 将计时器里数据(这个数据是通过timerfd_settime写入的)读取出来,否则会重复激发定时器 readTimerfd(timerfd_, now); // 获取所有已经超时的定时器 std::vector<Entry> expired = getExpired(now); callingExpiredTimers_ = true; cancelingTimers_.clear(); // 一次调用每一个到期回调函数 for (std::vector<Entry>::iterator it = expired.begin(); it != expired.end(); ++it) { it->second->run(); } callingExpiredTimers_ = false; // 重置所有周期性的定时器 reset(expired, now); } /* * 获取所有已经超时的定时器 */ std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now) { assert(timers_.size() == activeTimers_.size()); std::vector<Entry> expired; Entry sentry(now, reinterpret_cast<Timer*>(UINTPTR_MAX)); // 获取所有超时时间比当前时间早的定时器,即已到期的定时器(timers_.begin()与end之间就是所有的已超时的定时器) TimerList::iterator end = timers_.lower_bound(sentry); assert(end == timers_.end() || now < end->first); // 将已超时的定时器复制到expired中 std::copy(timers_.begin(), end, back_inserter(expired)); timers_.erase(timers_.begin(), end); for (std::vector<Entry>::iterator it = expired.begin(); it != expired.end(); ++it) { // 将已超时的定时器从活动定时器列表中删除 ActiveTimer timer(it->second, it->second->sequence()); size_t n = activeTimers_.erase(timer); assert(n == 1); (void)n; } assert(timers_.size() == activeTimers_.size()); return expired; } // 重置所有周期性的定时器 void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now) { // 下一次的超时时间 Timestamp nextExpire; for (std::vector<Entry>::const_iterator it = expired.begin(); it != expired.end(); ++it) { ActiveTimer timer(it->second, it->second->sequence()); // 它是周期性的定时器,而且不在被取消计时器队列中 if (it->second->repeat() && cancelingTimers_.find(timer) == cancelingTimers_.end()) { // 重新设置超时时间 it->second->restart(now); // 再次插入计时器队列中 insert(it->second); } else { // FIXME move to a free list // 一次性的定时器,或者在被取消的定时器队列中,那么将他删除 delete it->second; // FIXME: no delete please } } if (!timers_.empty()) { nextExpire = timers_.begin()->second->expiration(); } // 重新设置系统计时器的超时时间 if (nextExpire.valid()) { resetTimerfd(timerfd_, nextExpire); } } // 插入一个定时器 bool TimerQueue::insert(Timer* timer) { loop_->assertInLoopThread(); assert(timers_.size() == activeTimers_.size()); // 用于判断当前计时器是不是最早超时的那个 bool earliestChanged = false; Timestamp when = timer->expiration(); TimerList::iterator it = timers_.begin(); // 如果计时器队列是空的或者,它比计时器队列中最早超时的那个计时器的超时时间还要早 // 那么当前计时器就是最早发生超时的那个计时器 if (it == timers_.end() || when < it->first) { earliestChanged = true; } { // 插入,该对列会自动排列 std::pair<TimerList::iterator, bool> result = timers_.insert(Entry(when, timer)); assert(result.second); (void)result; } { // 插入到活动的计时器队列 std::pair<ActiveTimerSet::iterator, bool> result = activeTimers_.insert(ActiveTimer(timer, timer->sequence())); assert(result.second); (void)result; } assert(timers_.size() == activeTimers_.size()); return earliestChanged; }
相关文章推荐
- UITableViewEdit
- UITableView的一些方法
- 实现UIScrollView循环滚动
- iOS学习笔记--如何为UITextView设置placeholder
- SoapUI启动报错:The JVM could not be started. The maximum heap size (-Xmx) might be too large
- UIbottun使用详解
- KVC中setValuesForKeysWithDictionary:(写的不错)
- Quick中require与import的区别
- QuickSort
- UGUI之滑动翻页效果
- android view requestLayout,invalidate 事件的分发,接收,以及处理
- 优化UITableViewCell高度计算的那些事
- Ueditor绝对路径问题
- 如何让 UITableView 的 headerView跟随 cell一起滚动
- Builder模式
- LightOJ 1369 - Answering Queries(规律)
- iOS笔记UI--ipad分页控件
- UE4 Android Device Compatibility
- UE4.9 Hardware & Software Specifications
- Epic Games Shows Incredible New Anti-Aliasing Technique – Screenshots + Video Comparison