您的位置:首页 > 其它

一个高性能RPC框架的连接管理

2013-04-04 19:06 375 查看
[align=left]既然说连接,先对EpollServer的连接管理做个介绍吧。客户端与服务器一次conn,被封装成为Connection类在服务器进行管理。[/align]
[align=left]服务器连接有三种类型,分别为:[/align]

[align=left]enum EnumConnectionType[/align]
[align=left] {[/align]
EM_TCP =
0,
EM_UDP =
1,
EM_BUS =
2
[align=left] };[/align]

Connection成员属性
[align=left] time_t _iLastRefreshTime;//最后刷新时间,初始化为初始化的当前时间[/align]
BindAdapter
* _pBindAdapter;//适配器
TC_Socket _sock;
//TC_Socket
uint32_t _uid;
//连接的唯一编号,默认为0
int _lfd;
//监听的socket,默认为-1
[align=left] int _timeout;//超时时间[/align]
string _ip;
//ip
uint16_t _port;
//端口,默认为0
string _recvbuffer;
//接收数据buffer
string _sendbuffer;
//发送数据buffer
[align=left] size_t _sendPos;//默认为0[/align]
int _iHeaderLen;
//需要过滤的头部字节数,默认为0
bool _bClose;
//发送完当前数据就关闭连接,默认为false
[align=left] int _iMaxTemQueueSize;//临时队列的最大长度,默认为100[/align]
EnumConnectionType _enType;
//连接类型,默认为TCP
bool _bEmptyConn;
//是否为空连接,默认为false,UDP一直为false
char * _pRecvBuffer;
//接收数据的临时buffer,加这个目的是对 udp接收数据包大小进行设置,默认为null
[align=left] size_t _nRecvBufferSize;//默认为64K,每次默认读取数据块大小[/align]

[align=left]Connection提供了一系列的getter/setter方法修改上述属性,并且将那个EpollServer列为friend class,提供接收数据的功能操作接口。[/align]

[align=left]注意!一大波数据正在袭来[/align]
当连接需要接收数据的时候(边缘触发,所以这里关键是一次性要将所有数据接收完),我们需要一次性把数据接收完以后,把数据解析成一个一个完整的数据包,放入到指定的队列o中,首先Connection的buf是string的,本身不存在长度限制(可优化点),每次read的buf是8192个字节,不断的把客户端管道中的数据读取到Connection的Buf中,直到客户端EAGINE(没有数据了),或者接收到的数据小于每次读取的8192个字节。则表示已经没有数据了。所以这里如果长时间不处理,将会使得Connection的buf
string堆积很多数据,如果没有数据可读了,则调用解析协议。

[align=left] int TC_EpollServer::Connection::recv( recv_queue ::queue_type &o)[/align]
[align=left]{[/align]
[align=left] o.clear ();[/align]

char buffer[8192]
= "\0";

[align=left] while( true)[/align]
[align=left] {[/align]
int iBytesReceived
= 0;

if( _lfd ==
-1)
[align=left] {[/align]
if( _pRecvBuffer)
//如果该connection分配了自己的数据接受buffer,则用自己的,否则每次接收每次new出来一块buffer。
[align=left] {[/align]
iBytesReceived = _sock. recvfrom(( void*) _pRecvBuffer, _nRecvBufferSize, _ip, _port,
0);
[align=left] }[/align]
[align=left] else[/align]
[align=left] {[/align]
iBytesReceived = _sock. recvfrom(( void*)buffer, sizeof(buffer), _ip, _port,
0);
[align=left] }[/align]
[align=left] }[/align]
[align=left] else[/align]
[align=left] {[/align]
iBytesReceived = :: read( _sock.getfd(),
(void*)buffer, sizeof(buffer));
[align=left] }[/align]

if (iBytesReceived
< 0)
[align=left] {[/align]
[align=left] if( errno == EAGAIN )[/align]
[align=left] {[/align]
[align=left] //没有数据了[/align]
[align=left] break;[/align]
[align=left] }[/align]
[align=left] else[/align]
[align=left] {[/align]
[align=left] //客户端主动关闭[/align]
_pBindAdapter->getEpollServer()-> debug( "recv [" + _ip + ":" + TC_Common:: tostr( _port)
+ "] close connection");
[align=left] return -1;[/align]
[align=left] }[/align]
[align=left] }[/align]
else if(
iBytesReceived == 0)
[align=left] {[/align]
[align=left] //客户端主动关闭[/align]
_pBindAdapter->getEpollServer()-> debug( "recv [" + _ip + ":" + TC_Common:: tostr( _port)
+ "] close connection");
[align=left] return -1;[/align]
[align=left] }[/align]

[align=left] //保存接收到数据[/align]
if( _lfd ==
-1)
[align=left] {[/align]
[align=left] if( _pRecvBuffer)[/align]
[align=left] {[/align]
_recvbuffer. append( _pRecvBuffer,
iBytesReceived);
[align=left] }[/align]
[align=left] else[/align]
[align=left] {[/align]
_recvbuffer. append(buffer,
iBytesReceived);
[align=left] }[/align]
[align=left] }[/align]
[align=left] else[/align]
[align=left] {[/align]
_recvbuffer. append(buffer,
iBytesReceived);
[align=left] }[/align]

[align=left] //UDP协议[/align]
if( _lfd ==
-1)
[align=left] {[/align]
if( _pBindAdapter-> isIpAllow( _ip)
== true)
[align=left] {[/align]
[align=left] parseProtocol(o);[/align]
[align=left] }[/align]
[align=left] else[/align]
[align=left] {[/align]
[align=left] //udp ip无权限[/align]
_pBindAdapter->getEpollServer()-> debug( "accept
[" + _ip + ":" + TC_Common ::tostr( _port)
+ "] [" + TC_Common:: tostr( _lfd)
+ "] not allowed");
[align=left] }[/align]
[align=left] _recvbuffer = "";[/align]
[align=left] }[/align]
[align=left] else[/align]
[align=left] {[/align]
[align=left] //接收到数据不超过buffer,没有数据了(如果有数据,内核会再通知你)[/align]
if(( size_t)iBytesReceived
< sizeof(buffer))
[align=left] {[/align]
[align=left] break;[/align]
[align=left] }[/align]
[align=left] //字符串太长时 substr性能会急剧下降[/align]
if( _recvbuffer. length()
> 8192)
[align=left] {[/align]
[align=left] parseProtocol(o);[/align]
[align=left] }[/align]
[align=left] }[/align]
[align=left] }[/align]

if( _lfd !=
-1)
[align=left] {[/align]
[align=left] return parseProtocol(o);[/align]
[align=left] }[/align]

[align=left] return o. size();[/align]
[align=left]}[/align]

[align=left]解析方法将把Connection从客户端管道中获取的所有的数据(_recvbuffer)调用协议解析器进行协议解析,直到剩下的数据不构成一个完整的接受包为止。把这些请求包放入方法参数的队列中。[/align]

[align=left]int TC_EpollServer::Connection::parseProtocol(recv_queue::queue_type &o)[/align]
[align=left]{[/align]
[align=left] try[/align]
[align=left] {[/align]
[align=left] while (true)[/align]
[align=left] {[/align]
[align=left] //需要过滤首包包头[/align]
if( _iHeaderLen >
0)
[align=left] {[/align]
if( _recvbuffer. length()
>= (unsigned) _iHeaderLen)
[align=left] {[/align]
[align=left] string header = _recvbuffer.substr(0, _iHeaderLen);[/align]
_pBindAdapter->getHeaderFilterFunctor()((int)(TC_EpollServer ::PACKET_FULL),
header);
[align=left] _recvbuffer = _recvbuffer. substr(_iHeaderLen);[/align]
_iHeaderLen =
0;
[align=left] }[/align]
[align=left] else[/align]
[align=left] {[/align]
[align=left] _pBindAdapter->getHeaderFilterFunctor()((int)(TC_EpollServer ::PACKET_LESS), _recvbuffer);[/align]
[align=left] _iHeaderLen -= _recvbuffer. length();[/align]
[align=left] _recvbuffer = "";[/align]
[align=left] break;[/align]
[align=left] }[/align]
[align=left] }[/align]

[align=left] string ro;[/align]

int b
= _pBindAdapter->getProtocol()( _recvbuffer,
ro); //此处调用了Adapter关联的编码解码器,taf默认是AppProtocol::parse()

if(b
== TC_EpollServer:: PACKET_LESS)
[align=left] {[/align]
[align=left] //包不完全[/align]
break;

[align=left] }[/align]
else if(b
== TC_EpollServer:: PACKET_FULL)
[align=left] {[/align]
tagRecvData*
recv = new tagRecvData();
recv-> buffer
= ro;
recv-> ip
= _ip;
recv-> port
= _port;
recv-> recvTimeStamp
= TC_TimeProvider::getInstance()->getNow();
recv-> uid
= getId();
recv-> isOverload
= false;
recv-> isClosed
= false;

[align=left] //收到完整的包才算[/align]
[align=left] this-> _bEmptyConn = false;[/align]

[align=left] //收到完整包[/align]
[align=left] o. push_back(recv);[/align]

if(( int)
o. size() > _iMaxTemQueueSize)
[align=left] {[/align]
[align=left] insertRecvQueue(o);[/align]
[align=left] o. clear();[/align]
[align=left] }[/align]

[align=left] if( _recvbuffer. empty())[/align]
[align=left] {[/align]
[align=left] break;[/align]
[align=left] }[/align]
[align=left] }[/align]
[align=left] else[/align]
[align=left] {[/align]
_pBindAdapter->getEpollServer()-> error("recv [" + _ip + ":" + TC_Common:: tostr(_port)
+ "],packet error.");
[align=left] return -1; //协议解析错误[/align]
[align=left] }[/align]
[align=left] }[/align]
[align=left] }[/align]
[align=left] catch(exception &ex)[/align]
[align=left] {[/align]
_pBindAdapter->getEpollServer()-> error("recv protocol
error:" + string (ex.what()));
[align=left] return -1;[/align]
[align=left] }[/align]
[align=left] catch(...)[/align]
[align=left] {[/align]
_pBindAdapter->getEpollServer()-> error("recv protocol
error");
[align=left] return -1;[/align]
[align=left] }[/align]

[align=left] return o.size();[/align]
[align=left]}[/align]

[align=left]AppProtocol解析如下:[/align]

[align=left] /**[/align]
[align=left] * 解析协议[/align]
[align=left] * @param in, 目前的buffer[/align]
[align=left] * @param out, 一个完整的包[/align]
[align=left] *[/align]
[align=left] * @return int, 0表示没有接收完全, 1表示收到一个完整包[/align]
[align=left] */[/align]
[align=left] static int parse(string &in, string &out)[/align]
[align=left] {[/align]
[align=left] return parseLen<10000000>(in,out);[/align]
[align=left] }[/align]

[align=left] template<taf:: Int32 iMaxLength>[/align]
[align=left] static int parseLen(string &in, string &out)[/align]
[align=left] {[/align]
if(in.length()
< sizeof(taf::Int32))
//首先需要一个int32来标志包的长度,所以我们的接受buff的长度起码得>int32,否则标识为收包不完整
[align=left] {[/align]
[align=left] return TC_EpollServer:: PACKET_LESS;[/align]
[align=left] }[/align]

taf:: Int32 iHeaderLen;
//包的长度头

memcpy(&iHeaderLen, in.c_str(), sizeof(taf:: Int32));
//获取包的长度

[align=left] iHeaderLen = ntohl(iHeaderLen); //将网络顺序变为主机顺序,非常重要[/align]

if(iHeaderLen
< taf:: Int32( sizeof(taf:: Int32))||
iHeaderLen > iMaxLength) //如果当前的包的长度超过了最大长度或者小于一个int32的长度(包头本身)则认为协议错误
[align=left] {[/align]
[align=left] return TC_EpollServer:: PACKET_ERR;[/align]
[align=left] }[/align]

if((int)in.length()
< iHeaderLen) //缓存区里面字段长度不够包头长度
[align=left] {[/align]
[align=left] return TC_EpollServer:: PACKET_LESS;[/align]
[align=left] }[/align]

out = in.substr(sizeof(taf::Int32),
iHeaderLen - sizeof(taf::Int32 ));

[align=left] in = in. substr(iHeaderLen);//将刚才已经解析的完整的包部分从输入中去掉[/align]

[align=left] return TC_EpollServer::PACKET_FULL;[/align]
[align=left] }[/align]

[align=left]利用adapter中注册的protocol进行编解码,如果包不完整,则跳出,返回状态码,如果收包完整,则构建新的tagRecvData,设置完整的包buff,并且赋予Connection的ID(这个Connection锁接收到的数据包都是这个Connection的ID)以及将接收数据 tagRecvData的状态设置为非空,将这个包放入临时接收数据队列。如果当前这个接受数据队列超出了临时队列的最大长度(否则则在返回后再插入,避免大量的请求阻塞在这里而不能得到及时处理导致的各种超时,所以每到一定数目,则先放入adapter去进行处理),则将该接收队列中的数据包放到adapter的rbuffer中(这个rbuffer是所有Connection一起共享的,所以必须是线程安全的),如果当前adapter已经过载,将数据包依次设置为过载状态(很重要)放前面,这样有利于该Adapter接口紧急处理,避免雪崩和滚雪球效应,否则放后面。[/align]

[align=left]void TC_EpollServer::Connection:: insertRecvQueue( recv_queue::queue_type &vRecvData)[/align]
[align=left]{[/align]
[align=left] if(!vRecvData. empty())[/align]
[align=left] {[/align]
[align=left] //服务队列已超载 数据放在队列头部[/align]
[align=left] if( _pBindAdapter->isOverload())[/align]
[align=left] {[/align]
recv_queue ::queue_type::iterator it
= vRecvData.begin();

recv_queue ::queue_type::iterator itEnd
= vRecvData.end();

while(it
!= itEnd)
[align=left] {[/align]
[align=left] (*it)-> isOverload = true;[/align]

[align=left] ++it;[/align]
[align=left] }[/align]
[align=left] _pBindAdapter-> insertRecvQueue(vRecvData, false);[/align]
[align=left] }[/align]
[align=left] else[/align]
[align=left] {[/align]
[align=left] _pBindAdapter-> insertRecvQueue(vRecvData);[/align]
[align=left] }[/align]
[align=left] }[/align]
[align=left]}[/align]

[align=left]Connection需要集中管理起来[/align]
[align=left]所有Connection都会被ConnectionList一个连接队列管理起来。统一进行超时检测等,空连接超时检测等,连接队列存储的数据为[/align]

[align=left] pair< Connection *, multimap<time_t, uint32_t >::iterator > list_data;[/align]

[align=left]ConnectionList成员属性如下:[/align]

TC_EpollServer
* _pEpollServer; //服务
uint32_t _total;
//总计连接数
list< uint32_t> _free;
//空闲链表
size_t _free_size;
//空闲链元素个数
list_data
* _vConn; //链接
multimap<time_t, uint32_t> _tl;
//超时链表
time_t _lastTimeoutTime;
//上次检查超时时间
uint32_t _iConnectionMagic;
//连接ID的魔数

ConnectionList初始化,指定ConnectionList可以管理的最大连接数目,建立最大连接数目需要的空间,并标识为可用。
[align=left]void TC_EpollServer::ConnectionList::init( uint32_t size)[/align]
[align=left]{[/align]
_lastTimeoutTime = TC_TimeProvider:: getInstance()->getNow();
//初始化检查超时时间

_total =
size;

_free_size
= 0; //空闲连接数目初始化为0

[align=left] //初始化链接链表[/align]
if( _vConn) delete[] _vConn;
//如果空间存在,则删除先。

[align=left] //分配total+1个空间(多分配一个空间, 第一个空间其实无效)[/align]
_vConn = new list_data[ _total+1];
//分配最大连接数目需要的空间

_iConnectionMagic
= (( uint32_t) _lastTimeoutTime)
<< 20; //连接ID的魔数

//free从1开始分配, 这个值为 uid,
0保留为管道用, epollwait根据0判断是否是管道消息 ,表示空间的连接数目
for( uint32_t i
= 1; i <= _total; i++)
[align=left] {[/align]
[align=left] _vConn[i]. first = NULL;[/align]

[align=left] _free. push_back(i);[/align]

[align=left] ++ _free_size;[/align]
[align=left] }[/align]
[align=left] [/align]
[align=left]}[/align]

[align=left]获取唯一ID,从当前free的空闲连接资源中取得一个可用连接的ID(也就是链表下标),并将之从free中去掉,并且与魔数或运算生成唯一ID。分配ID,相当于分配资源了。这里的魔数是ConnectionList建立的时候的时间<<20的值(剩下的12位留出来给该ConnectionList的下标,表明这个Connection存储在data_list的哪个位置),主要是用于校验当前的链接是否是该ConnectionList管理的资源,Connection的ID是否是这个ConnectionList分配出去的。[/align]

[align=left]uint32_t TC_EpollServer::ConnectionList::getUniqId ()[/align]
[align=left]{[/align]
[align=left] TC_ThreadLock:: Lock lock(* this);[/align]

uint32_t uid
= _free. front();

[align=left] assert (uid > 0 && uid <= _total );[/align]

[align=left] _free. pop_front();[/align]

[align=left] -- _free_size;[/align]

return _iConnectionMagic |
uid;
[align=left]}[/align]

[align=left]新增连接,首先锁住整个ConnectionList,获取连接的ID(该ID之前由该LIST生成),获得当时分配资源的下标,并将至加载到指定链表数据的指定位置。并且将该connection放入到指定链表位置,同时将该connection的下标放入到超时管理的map中。[/align]

[align=left]void TC_EpollServer:: ConnectionList::add( Connection *cPtr, time_t iTimeOutStamp)[/align]
[align=left]{[/align]
[align=left] TC_ThreadLock:: Lock lock(* this);[/align]

uint32_t muid
= cPtr->getId();
uint32_t magi
= muid & (0xFFFFFFFF << 20);
uint32_t uid
= muid & (0x7FFFFFFF >> 11);

assert (magi == _iConnectionMagic &&
uid > 0 && uid <= _total && !_vConn [uid].first);

_vConn[uid]
= make_pair(cPtr, _tl. insert(make_pair (iTimeOutStamp,
uid)));
[align=left]}[/align]

[align=left]删除某个连接资源,根据指定的连接的ID,获得其在链表中的下标,从超时检测的map中移除他,从链表中找到该pair,删除connection资源,并且重新将他放到free可用中,同时更新freesize[/align]

[align=left]void TC_EpollServer::ConnectionList:: del(uint32_t uid)[/align]
[align=left]{[/align]
[align=left] TC_ThreadLock:: Lock lock(* this);[/align]

uint32_t magi
= uid & (0xFFFFFFFF << 20);
[align=left] uid = uid & (0x7FFFFFFF >> 11);[/align]

assert (magi == _iConnectionMagic &&
uid > 0 && uid <= _total && _vConn [uid].first);

[align=left] _del(uid);[/align]
[align=left]}[/align]

[align=left]void TC_EpollServer::ConnectionList::_del( uint32_t uid)[/align]
[align=left]{[/align]
[align=left] assert (uid > 0 && uid <= _total && _vConn [uid].first);[/align]

[align=left] _tl. erase(_vConn [uid].second);[/align]

[align=left] delete _vConn[uid]. first;[/align]

[align=left] _vConn[uid]. first = NULL;[/align]

[align=left] _free. push_back(uid);[/align]

[align=left] ++ _free_size;[/align]
[align=left]}[/align]

[align=left]超时检测,没秒检查一次,每次把超时最后检测时间更新一下,超时检测的时候需要锁住整个链表,[/align]

[align=left]void TC_EpollServer::ConnectionList::checkTimeout (time_t iCurTime)[/align]
[align=left]{[/align]
[align=left] //至少1s才能检查一次[/align]
if(iCurTime
- _lastTimeoutTime < 1)
[align=left] {[/align]
[align=left] return;[/align]
[align=left] }[/align]

_lastTimeoutTime =
iCurTime;

[align=left] TC_ThreadLock:: Lock lock(* this);[/align]

multimap<time_t, uint32_t >::iterator it
= _tl .begin();

while(it
!= _tl. end())
[align=left] {[/align]
[align=left] //已经检查到当前时间点了, 后续不用在检查了[/align]
if(it-> first >
iCurTime)
[align=left] {[/align]
[align=left] break;[/align]
[align=left] }[/align]

uint32_t uid
= it-> second;

[align=left] ++it;[/align]

//udp的监听端口,
不做处理
if( _vConn[uid]. first->getListenfd ()
== -1 || _vConn [uid].first-> getType()
== Connection:: EM_BUS)
[align=left] {[/align]
[align=left] continue;[/align]
[align=left] }[/align]

[align=left] //超时关闭[/align]
[align=left] _pEpollServer-> delConnection(_vConn [uid].first, false);[/align]

[align=left] //从链表中删除[/align]
[align=left] _del(uid);[/align]
[align=left] }[/align]

[align=left] if( _pEpollServer->IsEmptyConnCheck())[/align]
[align=left] {[/align]
[align=left] it = _tl. begin();[/align]
while(it
!= _tl. end())
[align=left] {[/align]
uint32_t uid
= it-> second;

[align=left] //遍历所有的空连接[/align]
[align=left] if( _vConn[uid]. first->IsEmptyConn ())[/align]
[align=left] {[/align]
[align=left] //获取空连接的超时时间点[/align]
time_t iEmptyTimeout = (it-> first - _vConn [uid].first-> getTimeout())
+ (_pEpollServer ->getEmptyConnTimeout()/1000);

[align=left] //已经检查到当前时间点了, 后续不用在检查了[/align]
if(iEmptyTimeout
> iCurTime)
[align=left] {[/align]
[align=left] break;[/align]
[align=left] }[/align]

//udp的监听端口,
不做处理
if( _vConn[uid]. first->getListenfd ()
== -1 || _vConn [uid].first-> getType()
== Connection:: EM_BUS)
[align=left] {[/align]
[align=left] ++it;[/align]
[align=left] continue;[/align]
[align=left] }[/align]

[align=left] //超时关闭[/align]
[align=left] _pEpollServer-> delConnection( _vConn[uid]. first, false);[/align]

[align=left] //从链表中删除[/align]
[align=left] _del(uid);[/align]
[align=left] }[/align]

[align=left] ++it;[/align]
[align=left] }[/align]
[align=left] }[/align]
[align=left]}[/align]
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: