您的位置:首页 > 理论基础 > 计算机网络

muduo TcpServer粗略过程

2015-11-05 21:48 495 查看
使用 \muduo\examples\asio\chat\server.cc 作例子

首先需要知道:

EventLoop:一个事件分发器类,拥有 Poller 对象,事件处理函数是 loop(),在这里捕获注册的 Channel 事件,并调用相应的回调函数。

while (!quit_)
{
...
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
...
for (ChannelList::iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it)
{
currentActiveChannel_ = *it;
// 通过 revents_ 来处理响应的事件
currentActiveChannel_->handleEvent(pollReturnTime_);
}
...
}


下面这几个类,都拥有一个 EventLoop * 对象,他们的事件注册,就注册到他们分配到的 EventLoop * 对象中。

Channel:一个封装 fd 的类,方便向 EventLoop 注册事件.比如,可读,可写事件。

Acceptor:负责接受连接的类,拥有一个Channel对象,向 EventLoop 注册为可读,则当有人来连接的时候,就能调用相应的回调。

TcpConnecton:也拥有一个 Channel 对象,已建立连接以后,再生成的对象,用来处理,连接之后的操作,发送,接受,断开。

然后开始来看吧,客户使用的代码,极其简单,就建立起一个 ChatServer . 这里的 loop 为 baseloop 处于主线程的条件下。

ChatServer server(&loop, serverAddr);
server.start();
loop.loop();


再来看看 server.start() 干了什么.先在构造函数中,初始化了几个成员,threadPool_,acceptor_。设置了acceptor_的回调.

1.threadPool_.start(),开启创建 N 个 EventLoopThread 个对象,即开了了 N 个 EventLoop 对象,并执行他们的 loop() 函数。

2.loop_runInLoop(..listen..),runInLoop 简单说明一下,就是保证你的函数执行的环境是跟你的 EventLoop 处于同一个线程中,保证同步。 最终,这个函数会调用 acceptor_.listen(),这个函数的本质就是向他的 loop 注册可读事件,可读的话就是有人来连接啦。可以自己跟踪下。

TcpServer::TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const string& nameArg,
Option option)
: loop_(CHECK_NOTNULL(loop)),
hostport_(listenAddr.toIpPort()),
name_(nameArg),
acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),
threadPool_(new EventLoopThreadPool(loop)),
connectionCallback_(defaultConnectionCallback),
messageCallback_(defaultMessageCallback),
nextConnId_(1)
{
// Acceptor::handleRead函数中会回调用TcpServer::newConnection
// _1对应的是socket文件描述符,_2对应的是对等方的地址(InetAddress)
acceptor_->setNewConnectionCallback(
boost::bind(&TcpServer::newConnection, this, _1, _2));
}


void TcpServer::start()
{
....
threadPool_->start(threadInitCallback_);
loop_->runInLoop(
boost::bind(&Acceptor::listen, get_pointer(acceptor_)));
....
}


到这里为止,就已经注册了 accept fd 的可读事件,那么当有连接到来的时候,就会触发,在 loop() 函数中,然后去调用响应的回调函数,在 TcpServer 构造函数中可以看到,注册了 acceptor_ 可读回调是 TcpServer::newConnecton() ,所以,就来看看这个函数又看了什么。

1.可以看到,通过传过来的参数,创建一个 TcpConnecton 对象,并存储到 connections_ 中,这是一个 map 容器。threadPool_->getNextLoop() 就是在线程池中分配一个 loop 给他,负载均衡吧。如果之前设置的线程数为0,则大家共用主线程的 loop.

2.注册,响应的回调函数,可读,可写,有连接(就是调用用户设置的有连接来的回调函数),关闭。

3.最后调用,ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));这个函数里面,实际就是将 TcpConnection 中的 Channel 注册为可读,这样就能开始接收 client 的数据。然后也可以调用 TcpConnection 的 send 来发数据。

这里的接受和发送设计一个 Buffer 类,这是一个 缓存类,非阻塞I/O必备,这里就不说先。

void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
loop_->assertInLoopThread();
// 按照轮叫的方式选择一个EventLoop
EventLoop* ioLoop = threadPool_->getNextLoop();

// 记录那个 nextConnId_ 有什么意义吗?
char buf[32];
snprintf(buf, sizeof buf, ":%s#%d", hostport_.c_str(), nextConnId_);
++nextConnId_;
string connName = name_ + buf;

LOG_INFO << "TcpServer::newConnection [" << name_
<< "] - new connection [" << connName
<< "] from " << peerAddr.toIpPort();

// getLocalAddr listenAddr 不应该就是 localAddr 了吗?
InetAddress localAddr(sockets::getLocalAddr(sockfd));
// FIXME poll with zero timeout to double confirm the new connection
// FIXME use make_shared if necessary
// 每个TcpConnection 里面都有一个 Channel
// 每一个 channel 里面 有一个 loop
// 每一个 loop 里面有一个 poller_(),每一个loop不同线程
// 每一个 loop 里面有一个 vector<Channel *>
TcpConnectionPtr conn(new TcpConnection(ioLoop,
connName,
sockfd,
localAddr,
peerAddr));
// connections_ is a map<string,TcpConnectionPtr>
connections_[connName] = conn;

conn->setConnectionCallback(connectionCallback_);//改变connection 改为用户定义的,跟构造函数的不一样
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback(
boost::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe//跨线程的调用
// TcpConnection所对应的通道加入到Poller关注

// 嗯,这个时候这里调用 ioLoop->runInLoop() 的话,似乎就不是他那里的环境
// 没错,这里的 ioloop 是在其他线程中创建的。
ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));//各自线程调用
}


当已经建立连接好以后,我们再来看看,在最初的 loop() 函数中的 Channel::handleEvent() 函数.可以看到,通过判断事件来种类,来调用响应的回调函数。

if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
{
if (logHup_)
{
LOG_WARN << "Channel::handle_event() POLLHUP";
}
if (closeCallback_) closeCallback_();
}

if (revents_ & POLLNVAL)
{
LOG_WARN << "Channel::handle_event() POLLNVAL";
}

if (revents_ & (POLLERR | POLLNVAL))
{
if (errorCallback_) errorCallback_();
}
if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
{
if (readCallback_) readCallback_(receiveTime);
}
if (revents_ & POLLOUT)
{
if (writeCallback_) writeCallback_();
}


大致逻辑是这样,服务端的过程比客户端的过程要复杂一些,毕竟服务端要管理这么多个连接。客户端的过程大家可以自己去看一下。本文只是粗略的解释了服务端的过程,其中涉及的多线程应该注意的事项还无功力能总结。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: