您的位置:首页 > 理论基础 > 计算机网络

Muduo网络库源码分析(六)TcpConnection 的生存期管理

2016-04-22 15:29 537 查看
TcpConnection是使用shared_ptr来管理的类,因为它的生命周期模糊。TcpConnection表示已经建立或正在建立的连接,建立连接后,用户只需要在上层类如TcpServer中设置连接到来和消息到来的处理函数,继而回调TcpConnection中的 setConnectionCallback和setMessageCallback函数,实现对事件的处理。用户需要关心的事件是有限的,其他都由网络库负责。

TcpConnection中封装了InputBuffer和OutputBuffer,用来表示应用层的缓冲区。在发送数据时,如果不能一次将Buffer中的数据发送完毕,它还会继续关注Channel中的可写事件,当sockfd可写时,会再次发送。

前面提到TcpConnection的生存期模糊,主要是因为我们不能在TcpServer中直接erase掉TcpConnection对象,因为此时有可能Channel中的handleEvent还在执行,如果析构TcpConnection对象,那么他的成员channel_也会被析构,会导致core
dump。也就是说我们需要TcpConnection 对象生存期要长于handleEvent() 函数,直到执行完connectDestroyed() 后才会析构。

断开连接: 
TcpConnection的断开是采用被动方式,即对方先关闭连接,本地read(2)返回0后,调用顺序如下: 
handleClose()->TcpServer::removeConnection->TcpConnection::connectDestroyed()。

具体我们查看下面的连接关闭时序图:



当连接到来,创建一个TcpConnection对象,立刻用shared_ptr来管理,引用计数为1,在Channel中维护一个weak_ptr(tie_),将这个shared_ptr对象赋值给_tie,引用计数仍然为1。当连接关闭时,在handleEvent中,将tie_提升,得到一个shard_ptr对象,引用计数就变成了2。当shared_ptr的计数不为0时,TcpConnection不会被销毁。

TcpConnection.h

class TcpConnection : boost::noncopyable,
public boost::enable_shared_from_this<TcpConnection>
{
public:
/// Constructs a TcpConnection with a connected sockfd
///
/// User should not create this object.
TcpConnection(EventLoop* loop,
const string& name,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr);
~TcpConnection();

EventLoop* getLoop() const { return loop_; }
const string& name() const { return name_; }
const InetAddress& localAddress() { return localAddr_; }
const InetAddress& peerAddress() { return peerAddr_; }
bool connected() const { return state_ == kConnected; }

void setConnectionCallback(const ConnectionCallback& cb)
{ connectionCallback_ = cb; }

void setMessageCallback(const MessageCallback& cb)
{ messageCallback_ = cb; }

/// Internal use only.
void setCloseCallback(const CloseCallback& cb)
{ closeCallback_ = cb; }

// called when TcpServer accepts a new connection
void connectEstablished(); // should be called only once
// called when TcpServer has removed me from its map
void connectDestroyed(); // should be called only once

private:
enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };
void handleRead(Timestamp receiveTime);
void handleClose();
void handleError();
void setState(StateE s) { state_ = s; }

EventLoop* loop_; // 所属EventLoop
string name_; // 连接名
StateE state_; // FIXME: use atomic variable
// we don't expose those classes to client.
boost::scoped_ptr<Socket> socket_;
boost::scoped_ptr<Channel> channel_;
InetAddress localAddr_;
InetAddress peerAddr_;
ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
CloseCallback closeCallback_;
};

typedef boost::shared_ptr<TcpConnection> TcpConnectionPtr;

}

TcpConnection.cc
TcpConnection::TcpConnection(EventLoop* loop,
const string& nameArg,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr)
: loop_(CHECK_NOTNULL(loop)),
name_(nameArg),
state_(kConnecting),
socket_(new Socket(sockfd)),
channel_(new Channel(loop, sockfd)),
localAddr_(localAddr),
peerAddr_(peerAddr)/*,
highWaterMark_(64*1024*1024)*/
{
// 通道可读事件到来的时候,回调TcpConnection::handleRead,_1是事件发生时间
channel_->setReadCallback(
boost::bind(&TcpConnection::handleRead, this, _1));
// 连接关闭,回调TcpConnection::handleClose
channel_->setCloseCallback(
boost::bind(&TcpConnection::handleClose, this));
// 发生错误,回调TcpConnection::handleError
channel_->setErrorCallback(
boost::bind(&TcpConnection::handleError, this));
LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this
<< " fd=" << sockfd;
socket_->setKeepAlive(true);
}

TcpConnection::~TcpConnection()
{
LOG_DEBUG << "TcpConnection::dtor[" << name_ << "] at " << this
<< " fd=" << channel_->fd();
}

void TcpConnection::connectEstablished()
{
loop_->assertInLoopThread();
assert(state_ == kConnecting);
setState(kConnected);
LOG_TRACE << "[3] usecount=" << shared_from_this().use_count();
channel_->tie(shared_from_this());
channel_->enableReading(); // TcpConnection所对应的通道加入到Poller关注

connectionCallback_(shared_from_this());
LOG_TRACE << "[4] usecount=" << shared_from_this().use_count();
}

void TcpConnection::connectDestroyed()
{
loop_->assertInLoopThread();
if (state_ == kConnected)
{
setState(kDisconnected);
channel_->disableAll();

connectionCallback_(shared_from_this());
}
channel_->remove();
}

void TcpConnection::handleRead(Timestamp receiveTime)
{
/*
loop_->assertInLoopThread();
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0)
{
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if (n == 0)
{
handleClose();
}
else
{
errno = savedErrno;
LOG_SYSERR << "TcpConnection::handleRead";
handleError();
}
*/
loop_->assertInLoopThread();
int savedErrno = 0;
char buf[65536];
ssize_t n = ::read(channel_->fd(), buf, sizeof buf);
if (n > 0)
{
messageCallback_(shared_from_this(), buf, n);
}
else if (n == 0)
{
handleClose();
}
else
{
errno = savedErrno;
LOG_SYSERR << "TcpConnection::handleRead";
handleError();
}



Channel中对tie_的处理:
void Channel::handleEvent(Timestamp receiveTime)
{
boost::shared_ptr<void> guard;
if (tied_)
{
guard = tie_.lock();
if (guard)
{
LOG_TRACE << "[6] usecount=" << guard.use_count();
handleEventWithGuard(receiveTime);
LOG_TRACE << "[12] usecount=" << guard.use_count();
}
}
else
{
handleEventWithGuard(receiveTime);
}
}

参考:
《linux多线程服务端编程》

《muduo使用手册》
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: