您的位置:首页 > 其它

Netty,Kafka,Muduo关于时间轮的一些思考 之Muduo时间轮

2017-12-13 17:10 453 查看
上一篇文章提到了,netty默认的时间轮实现中,很大的一个缺陷就是对于超时时间会频繁更新的任务并不适用.这一篇文章讲讲muduo中的时间轮是如何客服这个问题的.

muduo是陈硕老师用C++写的一个网络库,功能与netty其实很像,陈硕老师也在书中提到muduo很多的实现参考了netty.青出于蓝而胜于蓝,我认为在时间轮上muduo是要由于netty的.

而这一切都要归功与智能指针,shared_ptr 与 weak_ptr

在具体实现中,每个bucket里放的不是连接,而是一个特制的Entry struct,每个entry包含TcpConnection的weak_ptr. Entry的析构函数会判断连接是否还存在(用weak_ptr),如果还存在则断开连接.

在实现中,为了简单起见,不是真的把一个连接从一个格子移到另一个格子,而是采用引用计数的方法,用shared_ptr来管理entry.如果从连接收到数据,就把对应的EntryPtr放到bucket中,这样它的应用计数就增加了.当entry的引用计数递减到零时,说明它没有在任何一个格子里出现,那么连接超时,Entry的析构函数会断开连接!

其中的另外一个trick便是,timing wheel用boost::circular_buffer实现,其中每个Bucket元素是个hash set of EntryPtr.由于是一个hashset就保证了每个entry只会在Bucket中出现一次,而不用担心很短的时间内,一个connection的select.poll()返回了太多次,而导致bucket中会有重复的Entry了.

其中的核心且最终要的逻辑便是这样的.

下面上源码!首先是当新收到一个连接时,这里的entryPtr是一个shared_Ptr类型,创建了shared_ptr之后,放入时间轮,同时将创建这个shared_ptr的weak_ptr,把它作为connection的Context.

if (conn->connected()){
EntryPtr entry(new Entry(conn));
connectionBuckets_.back().insert(entry);
dumpConnectionBuckets();
WeakEntryPtr weakEntry(entry);
conn->setContext(weakEntry);
}


每次收到源码时的动作,这里只截取了操作时间轮的部分,

void EchoServer::onMessage(const TcpConnectionPtr& conn,Buffer* buf,Timestamp time){
......
......
WeakEntryPtr weakEntry(boost::any_cast<WeakEntryPtr>(conn->getContext()));
EntryPtr entry(weakEntry.lock());
if (entry){
connectionBuckets_.back().insert(entry);
dumpConnectionBuckets();
}
}
这里对于weakEntry的这一番操作,是为了在connection关闭的情况下,也能程序也可以正常运行.c++中操作指针访问非法内存,往往直接就core dump了,不会像java返回nullPointerException那么温柔.在weakPtr还存在的时候,将它转为shared_ptr再压入时间轮中.
void EchoServer::dumpConnectionBuckets() const
{
LOG_INFO << "size = " << connectionBuckets_.size();
int idx = 0;
for (WeakConnectionList::const_iterator bucketI = connectionBuckets_.begin();
bucketI != connectionBuckets_.end();
++bucketI, ++idx)
{
const Bucket& bucket = *bucketI;
printf("[%d] len = %zd : ", idx, bucket.size());
for (Bucket::const_iterator it = bucket.begin();
it != bucket.end();
++it)
{
bool connectionDead = (*it)->weakConn_.expired();
printf("%p(%ld)%s, ", get_pointer(*it), it->use_count(),
connectionDead ? " DEAD" : "");
}
puts("");
}
}最后检查过期的连接,将bucket中的元素一一检查一遍即可,断开连接等行为可以由相应的析构函数完成.
最后总结一下,muduo中时间轮的trick在于不是将connection本身放入时间轮,而是将封装了connection的entry的指针放入时间轮,同时这些时间轮中有引用计数,每次时间轮更新只需要更新引用计数即可.

在此,不得不感叹一下C++的表达力以及陈硕老师的功力,复杂的逻辑用短短几行代码就可以完成,放在java里的长度可要多得多了.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: