muduo::Acceptor、TcpServer分析
2015-08-08 22:26
393 查看
Acceptor
类Acceptor用来listen、accept,并调用回调函数来处理新到的连接。Acceptor中封装了Socket fd,它是用RAII手法初始化。accept后,如果有新连接到来,会调用handleRead()函数,在这个函数接收连接。在handleRead()函数中每次接收一个新连接,之后调用处理新连接的回调函数(如果有);但一次性可能有多个连接,这里可以改进的一点是:循环accept,一直到没有新连接到来,或者每次accept时,尝试一次接收N个。在handleRead()中,accept后并没有考虑新到的连接是否可用,例如当文件描述符耗尽时。这里可以做个改进,拿到accept后的connfd后,如果大于0,则非阻塞poll(2)一下,看看是否可以读写,正常情况下会是writable,表明connfd可用;如果poll(2)返回错误,那么直接关闭connfd。
Acceptor.h
class Acceptor : boost::noncopyable { public: typedef boost::function<void (int sockfd, const InetAddress&)> NewConnectionCallback;//accept后调用的函数对象 Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport); ~Acceptor(); void setNewConnectionCallback(const NewConnectionCallback& cb) { newConnectionCallback_ = cb; } bool listenning() const { return listenning_; } void listen(); private: void handleRead(); EventLoop* loop_; Socket acceptSocket_; Channel acceptChannel_; NewConnectionCallback newConnectionCallback_; bool listenning_;//是否正在监听状态 int idleFd_; };
Acceptor.cc
Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport) : loop_(loop), acceptSocket_(sockets::createNonblockingOrDie()),//初始化创建sockt fd acceptChannel_(loop, acceptSocket_.fd()),//初始化channel listenning_(false), idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC)) { assert(idleFd_ >= 0); acceptSocket_.setReuseAddr(true); acceptSocket_.setReusePort(reuseport); acceptSocket_.bindAddress(listenAddr); acceptChannel_.setReadCallback( boost::bind(&Acceptor::handleRead, this));//当fd可读时调用回调函数hanleRead } Acceptor::~Acceptor() { acceptChannel_.disableAll();//将其冲poller监听集合中移除,此时为kDeleted状态 acceptChannel_.remove();//将其从EventList events_中移除,此时为kNew状态 ::close(idleFd_); } void Acceptor::listen() { loop_->assertInLoopThread(); listenning_ = true; acceptSocket_.listen(); acceptChannel_.enableReading(); } void Acceptor::handleRead() { loop_->assertInLoopThread(); InetAddress peerAddr; //FIXME loop until no more int connfd = acceptSocket_.accept(&peerAddr);//这里时真正接收连接 if (connfd >= 0) { // string hostport = peerAddr.toIpPort(); // LOG_TRACE << "Accepts of " << hostport; if (newConnectionCallback_) { newConnectionCallback_(connfd, peerAddr);//将新连接信息传送到回调函数中 } else//没有回调函数则关闭client对应的fd { sockets::close(connfd); } } else { LOG_SYSERR << "in Acceptor::handleRead"; // Read the section named "The special problem of // accept()ing when you can't" in libev's doc. // By Marc Lehmann, author of livev. if (errno == EMFILE) { ::close(idleFd_); idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL); ::close(idleFd_); idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC); } } }
TcpServer
客户不直接使用Acceptor,它封装在TcpServer中。TcpServer使用比较简单,直接设置好新连接到达和消息到达的回调函数,之后start即可。TcpServer中还封装了EventLoopThreadPool,因此TcpServer中的EventLoop对象为main Reactor,EventLoopThreadPool为sub Reactor。
当新建连接到达后,TcpServer创建一个新的TcpConnection对象来保存这个连接,设置这个新连接的回调函数,之后在EventLoopThreadPool中取一个EventLoop对象来作为这个新连接的reactor。
TcpServer用map保存了当前server对象中的TcpConnection,当TcpServer对象析构时,就会关闭所有连接。
TcpServer.h
class TcpServer : boost::noncopyable { public: typedef boost::function<void(EventLoop*)> ThreadInitCallback; enum Option { kNoReusePort, kReusePort, }; //TcpServer(EventLoop* loop, const InetAddress& listenAddr); TcpServer(EventLoop* loop, const InetAddress& listenAddr, const string& nameArg, Option option = kNoReusePort); ~TcpServer(); // force out-line dtor, for scoped_ptr members. const string& hostport() const { return hostport_; } const string& name() const { return name_; } EventLoop* getLoop() const { return loop_; } /// Set the number of threads for handling input. /// /// Always accepts new connection in loop's thread. /// Must be called before @c start /// @param numThreads /// - 0 means all I/O in loop's thread, no thread will created. /// this is the default value. /// - 1 means all I/O in another thread. /// - N means a thread pool with N threads, new connections /// are assigned on a round-robin basis. void setThreadNum(int numThreads); void setThreadInitCallback(const ThreadInitCallback& cb) { threadInitCallback_ = cb; } /// valid after calling start() boost::shared_ptr<EventLoopThreadPool> threadPool() { return threadPool_; } /// Starts the server if it's not listenning. /// /// It's harmless to call it multiple times. /// Thread safe. void start(); /// Set connection callback. /// Not thread safe. void setConnectionCallback(const ConnectionCallback& cb) { connectionCallback_ = cb; } /// Set message callback. /// Not thread safe. void setMessageCallback(const MessageCallback& cb) { messageCallback_ = cb; } /// Set write complete callback. /// Not thread safe. void setWriteCompleteCallback(const WriteCompleteCallback& cb) { writeCompleteCallback_ = cb; } private: /// Not thread safe, but in loop void newConnection(int sockfd, const InetAddress& peerAddr); /// Thread safe. void removeConnection(const TcpConnectionPtr& conn);//TcpConnectionPtr为shared_ptr<TcpConnection> /// Not thread safe, but in loop void removeConnectionInLoop(const TcpConnectionPtr& conn); typedef std::map<string, TcpConnectionPtr> ConnectionMap; EventLoop* loop_; // the acceptor loop const string hostport_; const string name_; boost::scoped_ptr<Acceptor> acceptor_; // avoid revealing Acceptor boost::shared_ptr<EventLoopThreadPool> threadPool_; ConnectionCallback connectionCallback_;//新连接到达时的回调函数 MessageCallback messageCallback_;//消息到达时的回调函数 WriteCompleteCallback writeCompleteCallback_; ThreadInitCallback threadInitCallback_; AtomicInt32 started_; // always in loop thread int nextConnId_;//用来计算标记Connection的名字 ConnectionMap connections_;//Map的key为connection的name };
TcpServer.cc
TcpServer::TcpServer(EventLoop* loop, const InetAddress& listenAddr, const string& nameArg, Option option) : loop_(CHECK_NOTNULL(loop)), hostport_(listenAddr.toIpPort()), name_(nameArg), acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)), threadPool_(new EventLoopThreadPool(loop, name_)), connectionCallback_(defaultConnectionCallback), messageCallback_(defaultMessageCallback), nextConnId_(1) { acceptor_->setNewConnectionCallback(//新连接到来时,调用的时TcpServer::newConnection函数 boost::bind(&TcpServer::newConnection, this, _1, _2)); } TcpServer::~TcpServer() { loop_->assertInLoopThread(); LOG_TRACE << "TcpServer::~TcpServer [" << name_ << "] destructing"; for (ConnectionMap::iterator it(connections_.begin());//在析构函数中销毁connection it != connections_.end(); ++it) { TcpConnectionPtr conn = it->second; it->second.reset(); conn->getLoop()->runInLoop( boost::bind(&TcpConnection::connectDestroyed, conn)); conn.reset(); } } void TcpServer::setThreadNum(int numThreads)//设置线程池大小 { assert(0 <= numThreads); threadPool_->setThreadNum(numThreads); } void TcpServer::start() { if (started_.getAndSet(1) == 0) { threadPool_->start(threadInitCallback_);//启动线程池 assert(!acceptor_->listenning()); loop_->runInLoop( boost::bind(&Acceptor::listen, get_pointer(acceptor_)));//执行accept的listen } } void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) { loop_->assertInLoopThread(); EventLoop* ioLoop = threadPool_->getNextLoop();//在线程池取一个EventLoop对象 char buf[32]; snprintf(buf, sizeof buf, ":%s#%d", hostport_.c_str(), nextConnId_);//生成这个连接的名字 ++nextConnId_; string connName = name_ + buf; LOG_INFO << "TcpServer::newConnection [" << name_ << "] - new connection [" << connName << "] from " << peerAddr.toIpPort(); InetAddress localAddr(sockets::getLocalAddr(sockfd)); // FIXME poll with zero timeout to double confirm the new connection // FIXME use make_shared if necessary TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr)); connections_[connName] = conn; conn->setConnectionCallback(connectionCallback_);//设置新连接的回调函数 conn->setMessageCallback(messageCallback_); conn->setWriteCompleteCallback(writeCompleteCallback_); conn->setCloseCallback( boost::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));//将新到来的连接加入到监听事件中 } void TcpServer::removeConnection(const TcpConnectionPtr& conn) { // FIXME: unsafe loop_->runInLoop(boost::bind(&TcpServer::removeConnectionInLoop, this, conn)); } void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn) { loop_->assertInLoopThread(); LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_ << "] - connection " << conn->name(); size_t n = connections_.erase(conn->name());//根据connection的名字移除connection (void)n; assert(n == 1); EventLoop* ioLoop = conn->getLoop(); ioLoop->queueInLoop( boost::bind(&TcpConnection::connectDestroyed, conn)); }
相关文章推荐
- 常见的HTTP状态码(HTTP Status Code)说明
- Android从网络某个地址下载文件、写入SD卡
- iOS 用GCD下载网络图片方法
- UVa 563 Crimewave (网络流构图+最大流+挺好的+双向边)
- 五层网络体系结构
- HDU 4940 Destroy Transportation system(上下界网络流)
- 网络编程学习总结(之client/server模型)
- 网络编程学习总结(之client/server模型)
- [转]数据中心网络虚拟化 隧道技术
- 网络信息安全攻防实验室之基础关
- Unable to locate Spring NamespaceHandler for XML schema namespace [http://www.springframework.org/sc
- Java网络编程
- 【python网络编程】新浪爬虫:关键词搜索爬取微博数据
- 关于android-async-http的使用,封装网络请求
- 微信分享文本、图片(本地、二进制、网络图片)、音频、视频、Gif动态图片、网页信息--大汇总
- “TCP:三次握手”分析——以一个简单的“服务器”和“客户端”为例
- Android 网络:基于TCP协议通信,多线程,实现简单的C/S聊天室
- hdu 4807(网络流 + 贪心)
- tinyhttpd简介
- 申请https证书需要注意的4大问题