淘宝开源网络框架tbnet之transport篇
2014-01-24 12:25
609 查看
前言
在自己的读研期间,曾经接触个tbnet这个开源框架,当时是为了完成课程设计而在自己的机器上搭建了一个淘宝的TFS系统,当时TFS刚开始推出,在TFS系统里面就用到了tbnet和tbsys这两个库,自己在搭建TFS时候,也遇到了不少的问题,其中就包括tbnet相关的一些编译配置等,折腾了好久,当时也学到了不少的东西,最近由于自己在学习一些开源的东西,偶然机会找到了tbnet这个库,决定好好地学学这个玩意,据说这个东西在OeanBASE中也有涉及,看来这个库应该是个性能不错的网络库,今天就索性来读读它的源代码,在这之前自己已经在本机上安装了该库,使用该库开发还是忙好用的,先不说这个,接下来,就从transport这个类说起吧,transport这个类很重要,因为从我的角度来看这个类仿佛就是整个库的整体架构的浓缩版了,先来看看它的成员吧
在该成员中,最主要的两个模板是:事件处理模板和处理事件的线程,在事件处理模块中,主要是采用了EPOLL机制,所有的事件句柄会在_socketEvent中注册,并且通过调用EPollSocketEvent类中的成员函数getEvent来实现事件的多路分发机制,其中这个处理流程专门使用了一个读写线程来处理,而对于一些过期的事件句柄,则专门由一个超时线程来处理,现在来看看transport的整个流程框架:
这个函数是整个框架的起点,在这个函数中启动了读写线程和超时线程,而在读写线程中经过初始化之后,最终会回调transport::run()函数,代码如下:
在这个函数中,通过回传的线程句柄来区分这两类线程,而分别调用相应的事件处理模板,接下来我们可以看看这两类事件处理模板的源码:
在这个函数中,主要的事件处理流程在getEvents函数中,这个函数封装在了EpollSocketEvent类中,这个类主要封装了EPOLL机制的相处处理函数,接下来根据返回的事件类型分别由不同的IOComponent对象处理,整个过程相当的清晰,接下来再来看看超时线程的处理,代码如下:
在这个函数中,主要是通过循环遍历的方式来检查每个在注册的IOComponent事件的超时情况,其中在transport类中维护了两个事件循环链表,一个是当前活跃的IOComponent链表,一个是已经不再使用的IOComponent,在这个函数中,可以看到对于一些不再使用的IOComponent,会保留一段时间,才释放掉
接下来我们再来看看另外两个比较重要的函数,listen函数和connect函数,这两个函数其实是针对了两种不同的类型:客户端和服务器端,先来看服务器端listen函数,代码如下:
在这个函数中,首先根据地址创建了一个TCPAccepter对象,这个对象其实就是IOComponent对象,而该对象随后调用init函数来完成socket的基本的一些操作,包括创建socket,bind,以及listen等,随后将初始化好的TCPAccepter插入IOComponent列表中,这个添加过程在addComponent函数中,代码:
在这个函数中,首先将TCPAccepter对象插入到了IOComponent队列中,随后完成了两件事情,第一件:将我们的TCPAccepter对象注册到Epoll结构当中,实现函数是setSocketEvent,第二件:将TCPAccepter对象与Epoll结构进行关联。接下来,再看看connet函数吧,代码如下:
这个函数主要完成了连接服务器的功能,首先创建了一个TCPComponent对象,这个对象就是IOComponent子对象,然后初始化,初始化过程主要包括了socket的创建、然后再连接的过程,初始化完成之后,接下来的步骤同上。
总结
在transport类中,很多的东西都是通过了封装,例如底层的一些socket操作,在transport里面根本就看不到,唯一可以看到的就是IOComponent对象,这样的好处,相比大家都明白,在transport中,最主要的就是事件处理相关的操作,这方面我们完全可以借鉴借鉴,看完transport这个代码,给我的感觉就是封装程度很高,但是思路还是蛮清晰的,所以推荐那些想从事云计算方面的开发者来读读,学学里面的一些封装的思想,本篇就OVER了,下篇我们我们就来看看packet封装,谢谢
如有需要,请注明转载,谢谢
在自己的读研期间,曾经接触个tbnet这个开源框架,当时是为了完成课程设计而在自己的机器上搭建了一个淘宝的TFS系统,当时TFS刚开始推出,在TFS系统里面就用到了tbnet和tbsys这两个库,自己在搭建TFS时候,也遇到了不少的问题,其中就包括tbnet相关的一些编译配置等,折腾了好久,当时也学到了不少的东西,最近由于自己在学习一些开源的东西,偶然机会找到了tbnet这个库,决定好好地学学这个玩意,据说这个东西在OeanBASE中也有涉及,看来这个库应该是个性能不错的网络库,今天就索性来读读它的源代码,在这之前自己已经在本机上安装了该库,使用该库开发还是忙好用的,先不说这个,接下来,就从transport这个类说起吧,transport这个类很重要,因为从我的角度来看这个类仿佛就是整个库的整体架构的浓缩版了,先来看看它的成员吧
EPollSocketEvent _socketEvent; // ¶Áдsocketʼþ tbsys::CThread _readWriteThread; // ¶Áд´¦ÀíÏß³Ì tbsys::CThread _timeoutThread; // ³¬Ê±¼ì²éÏß³Ì bool _stop; // ÊÇ·ñ±»Í£Ö¹ IOComponent *_delListHead, *_delListTail; // µÈ´ýɾ³ýµÄIOComponent¼¯ºÏ IOComponent *_iocListHead, *_iocListTail; // IOComponent¼¯ºÏ bool _iocListChanged; // IOComponent¼¯ºÏ±»¸Ä¹ý int _iocListCount; tbsys::CThreadMutex _iocsMutex;
在该成员中,最主要的两个模板是:事件处理模板和处理事件的线程,在事件处理模块中,主要是采用了EPOLL机制,所有的事件句柄会在_socketEvent中注册,并且通过调用EPollSocketEvent类中的成员函数getEvent来实现事件的多路分发机制,其中这个处理流程专门使用了一个读写线程来处理,而对于一些过期的事件句柄,则专门由一个超时线程来处理,现在来看看transport的整个流程框架:
bool Transport::start() { signal(SIGPIPE, SIG_IGN); _readWriteThread.start(this, &_socketEvent); _timeoutThread.start(this, NULL); return true; }
这个函数是整个框架的起点,在这个函数中启动了读写线程和超时线程,而在读写线程中经过初始化之后,最终会回调transport::run()函数,代码如下:
void Transport::run(tbsys::CThread *thread, void *arg) { if (thread == &_timeoutThread) { timeoutLoop(); } else { eventLoop((SocketEvent*)arg); } }
在这个函数中,通过回传的线程句柄来区分这两类线程,而分别调用相应的事件处理模板,接下来我们可以看看这两类事件处理模板的源码:
void Transport::eventLoop(SocketEvent *socketEvent) { IOEvent events[MAX_SOCKET_EVENTS]; while (!_stop) { // ¼ì²éÊÇ·ñÓÐʼþ·¢Éú int cnt = socketEvent->getEvents(1000, events, MAX_SOCKET_EVENTS); if (cnt < 0) { TBSYS_LOG(INFO, "µÃµ½events³ö´íÁË: %s(%d)\n", strerror(errno), errno); } for (int i = 0; i < cnt; i++) { IOComponent *ioc = events[i]._ioc; if (ioc == NULL) { continue; } if (events[i]._errorOccurred) { // ´íÎó·¢ÉúÁË removeComponent(ioc); continue; } ioc->addRef(); // ¶Áд bool rc = true; if (events[i]._readOccurred) { rc = ioc->handleReadEvent(); } if (rc && events[i]._writeOccurred) { rc = ioc->handleWriteEvent(); } ioc->subRef(); if (!rc) { removeComponent(ioc); } } } }
在这个函数中,主要的事件处理流程在getEvents函数中,这个函数封装在了EpollSocketEvent类中,这个类主要封装了EPOLL机制的相处处理函数,接下来根据返回的事件类型分别由不同的IOComponent对象处理,整个过程相当的清晰,接下来再来看看超时线程的处理,代码如下:
void Transport::timeoutLoop() { IOComponent *mydelHead = NULL; IOComponent *mydelTail = NULL; std::vector<IOComponent*> mylist; while (!_stop) { // ÏÈд¸´ÖƵ½listÖÐ _iocsMutex.lock(); if (_iocListChanged) { mylist.clear(); IOComponent *iocList = _iocListHead; while (iocList) { mylist.push_back(iocList); iocList = iocList->_next; } _iocListChanged = false; } // ¼ÓÈëµ½mydelÖÐ if (_delListHead != NULL && _delListTail != NULL) { if (mydelTail == NULL) { mydelHead = _delListHead; } else { mydelTail->_next = _delListHead; _delListHead->_prev = mydelTail; } mydelTail = _delListTail; // Çå¿ÕdelList _delListHead = _delListTail = NULL; } _iocsMutex.unlock(); // ¶Ôÿ¸öiocomponent½øÐмì²é for (int i=0; i<(int)mylist.size(); i++) { IOComponent *ioc = mylist[i]; ioc->checkTimeout(tbsys::CTimeUtil::getTime()); } IOComponent *tmpList = mydelHead; int64_t nowTime = tbsys::CTimeUtil::getTime() - static_cast<int64_t>(900000000); // 15min while (tmpList) { if (tmpList->getRef() <= 0) { tmpList->subRef(); } if (tmpList->getRef() <= -10 || tmpList->getLastUseTime() < nowTime) { // ´ÓÁ´ÖÐɾ³ý if (tmpList == mydelHead) { // head mydelHead = tmpList->_next; } if (tmpList == mydelTail) { // tail mydelTail = tmpList->_prev; } if (tmpList->_prev != NULL) tmpList->_prev->_next = tmpList->_next; if (tmpList->_next != NULL) tmpList->_next->_prev = tmpList->_prev; IOComponent *ioc = tmpList; tmpList = tmpList->_next; TBSYS_LOG(INFO, "DELIOC, %s, IOCount:%d, IOC:%p\n", ioc->getSocket()->getAddr().c_str(), _iocListCount, ioc); delete ioc; } else { tmpList = tmpList->_next; } } usleep(500000); // ×îС¼ä¸ô100ms } // д»Øµ½_delListÉÏ,ÈÃdestroyÏú»Ù _iocsMutex.lock(); if (mydelHead != NULL) { if (_delListTail == NULL) { _delListHead = mydelHead; } else { _delListTail->_next = mydelHead; mydelHead->_prev = _delListTail; } _delListTail = mydelTail; } _iocsMutex.unlock(); }
在这个函数中,主要是通过循环遍历的方式来检查每个在注册的IOComponent事件的超时情况,其中在transport类中维护了两个事件循环链表,一个是当前活跃的IOComponent链表,一个是已经不再使用的IOComponent,在这个函数中,可以看到对于一些不再使用的IOComponent,会保留一段时间,才释放掉
接下来我们再来看看另外两个比较重要的函数,listen函数和connect函数,这两个函数其实是针对了两种不同的类型:客户端和服务器端,先来看服务器端listen函数,代码如下:
IOComponent *Transport::listen(const char *spec, IPacketStreamer *streamer, IServerAdapter *serverAdapter) { char tmp[1024]; char *args[32]; strncpy(tmp, spec, 1024); tmp[1023] = '\0'; if (parseAddr(tmp, args, 32) != 3) { return NULL; } if (strcasecmp(args[0], "tcp") == 0) { char *host = args[1]; int port = atoi(args[2]); // Server Socket ServerSocket *socket = new ServerSocket(); if (!socket->setAddress(host, port)) { delete socket; return NULL; } // TCPAcceptor TCPAcceptor *acceptor = new TCPAcceptor(this, socket, streamer, serverAdapter); if (!acceptor->init()) { delete acceptor; return NULL; } // ¼ÓÈëµ½iocomponentsÖУ¬¼°×¢²á¿É¶Áµ½socketeventÖÐ addComponent(acceptor, true, false); // ·µ»Ø return acceptor; } else if (strcasecmp(args[0], "udp") == 0) {} return NULL; }
在这个函数中,首先根据地址创建了一个TCPAccepter对象,这个对象其实就是IOComponent对象,而该对象随后调用init函数来完成socket的基本的一些操作,包括创建socket,bind,以及listen等,随后将初始化好的TCPAccepter插入IOComponent列表中,这个添加过程在addComponent函数中,代码:
void Transport::addComponent(IOComponent *ioc, bool readOn, bool writeOn) { assert(ioc != NULL); _iocsMutex.lock(); if (ioc->isUsed()) { TBSYS_LOG(ERROR, "ÒѸø¼Ó¹ýaddComponent: %p", ioc); _iocsMutex.unlock(); return; } // ¼ÓÈëiocListÉÏ ioc->_prev = _iocListTail; ioc->_next = NULL; if (_iocListTail == NULL) { _iocListHead = ioc; } else { _iocListTail->_next = ioc; } _iocListTail = ioc; // ÉèÖÃÔÚÓà ioc->setUsed(true); _iocListChanged = true; _iocListCount ++; _iocsMutex.unlock(); // ÉèÖÃsocketevent Socket *socket = ioc->getSocket(); ioc->setSocketEvent(&_socketEvent); _socketEvent.addEvent(socket, readOn, writeOn); TBSYS_LOG(INFO, "ADDIOC, SOCK: %d, %s, RON: %d, WON: %d, IOCount:%d, IOC:%p\n", socket->getSocketHandle(), ioc->getSocket()->getAddr().c_str(), readOn, writeOn, _iocListCount, ioc); }
在这个函数中,首先将TCPAccepter对象插入到了IOComponent队列中,随后完成了两件事情,第一件:将我们的TCPAccepter对象注册到Epoll结构当中,实现函数是setSocketEvent,第二件:将TCPAccepter对象与Epoll结构进行关联。接下来,再看看connet函数吧,代码如下:
Connection *Transport::connect(const char *spec, IPacketStreamer *streamer, bool autoReconn) { char tmp[1024]; char *args[32]; strncpy(tmp, spec, 1024); tmp[1023] = '\0'; if (parseAddr(tmp, args, 32) != 3) { return NULL; } if (strcasecmp(args[0], "tcp") == 0) { char *host = args[1]; int port = atoi(args[2]); // Socket Socket *socket = new Socket(); if (!socket->setAddress(host, port)) { delete socket; TBSYS_LOG(ERROR, "ÉèÖÃsetAddress´íÎó: %s:%d, %s", host, port, spec); return NULL; } // TCPComponent TCPComponent *component = new TCPComponent(this, socket, streamer, NULL); // ÉèÖÃÊÇ·ñ×Ô¶¯ÖØÁ¬ component->setAutoReconn(autoReconn); if (!component->init()) { delete component; TBSYS_LOG(ERROR, "³õʼ»¯Ê§°ÜTCPComponent: %s:%d", host, port); return NULL; } // ¼ÓÈëµ½iocomponentsÖУ¬¼°×¢²á¿Éдµ½socketeventÖÐ addComponent(component, true, true); component->addRef(); return component->getConnection(); } else if (strcasecmp(args[0], "udp") == 0) {} return NULL; }
这个函数主要完成了连接服务器的功能,首先创建了一个TCPComponent对象,这个对象就是IOComponent子对象,然后初始化,初始化过程主要包括了socket的创建、然后再连接的过程,初始化完成之后,接下来的步骤同上。
总结
在transport类中,很多的东西都是通过了封装,例如底层的一些socket操作,在transport里面根本就看不到,唯一可以看到的就是IOComponent对象,这样的好处,相比大家都明白,在transport中,最主要的就是事件处理相关的操作,这方面我们完全可以借鉴借鉴,看完transport这个代码,给我的感觉就是封装程度很高,但是思路还是蛮清晰的,所以推荐那些想从事云计算方面的开发者来读读,学学里面的一些封装的思想,本篇就OVER了,下篇我们我们就来看看packet封装,谢谢
如有需要,请注明转载,谢谢
相关文章推荐
- 淘宝开源网络框架tbnet 之packet
- 淘宝开源网络框架TBNET分析
- 淘宝开源网络框架tbnet之buffer
- 淘宝开源网络框架tbnet 之 iocomponent
- 淘宝开源网络框架TBNET分析
- 淘宝开源网络框架tbnet之socket
- 淘宝开源网络框架TBNET分析
- 淘宝开源网络框架tbnet之ipacketstreamer,ipackethandler以及iserveradaper
- 分析淘宝网络框架tbnet
- 淘宝网络框架tbnet源码分析
- taobao-tbnet开源网络通讯框架
- 淘宝网络框架tbnet源码分析
- 淘宝网络框架tbnet源码分析
- 国内一个优秀的网络开源框架学习,轻量级网络框架学习NoHttp,可能是继Xutils之后的又一优秀框架
- day30_安卓基础之网络编程_开源框架_多线程下载
- Android开源:网络框架volley使用(一)---使用方法笔记
- 网络开源框架之libev使用实例
- 网络开源框架之libevent使用实例
- 用Facebook开源框架Frescok加载网络图片的简单使用
- Android 网络请求开源框架