您的位置:首页 > 理论基础 > 计算机网络

muduo库阅读(42)——Net部分:TCP客户端TcpClient

2015-11-12 17:03 633 查看
TcpClient的工作流程如下:

1、根据构造函数传递进来的EventLoop和服务器地址等信息进行初始化

2、建立一个连接器Connector

3、设置连接器的事件处理器的写事件回调函数为newConnection(当连接建立完成的时候会触发写事件,Connector的事件处理进行处理,并调用写事件的回调函数)

4、调用connect向对服务器进行连接,内部调用连接器的start函数,然后等待连接完成

5、连接建立完成的时候会触发写事件,Connector的事件处理进行处理,并调用写事件的回调函数ewConnection

5.1、设置新链接的id和名字

5.2、创建一个TcpConnection对象

5.3、设置TcpConnection对象的回调函数:用户的连接建立回调函数、用户的数据到来回调函数、用户的写完成回调函数

5.4、调用TcpConnection的connectEstablished函数表示连接建立完毕

/*
* tcp客户端
*/
namespace muduo
{
namespace net
{
class Connector;
typedef boost::shared_ptr<Connector> ConnectorPtr;

class TcpClient : boost::noncopyable
{
public:
// TcpClient(EventLoop* loop);
// TcpClient(EventLoop* loop, const string& host, uint16_t port);
TcpClient(EventLoop* loop,
const InetAddress& serverAddr,
const string& nameArg);
~TcpClient();  // force out-line dtor, for scoped_ptr members.

// 连接
void connect();

// 断开连接
void disconnect();

// 停止连接
void stop();

// 返回TcpConnection对象
TcpConnectionPtr connection() const
{
MutexLockGuard lock(mutex_);
return connection_;
}

// 获取所属的Reactor
EventLoop* getLoop() const { return loop_; }

// 重连
bool retry() const;

// 允许重连
void enableRetry() { retry_ = true; }

// 获取名字
const string& name() const
{ return name_; }

// 设置连接完成的回调函数
void setConnectionCallback(const ConnectionCallback& cb)
{ connectionCallback_ = cb; }

// 设置数据到来的回调函数
void setMessageCallback(const MessageCallback& cb)
{ messageCallback_ = cb; }

// 设置写完成的回调函数
void setWriteCompleteCallback(const WriteCompleteCallback& cb)
{ writeCompleteCallback_ = cb; }

#ifdef __GXX_EXPERIMENTAL_CXX0X__
void setConnectionCallback(ConnectionCallback&& cb)
{ connectionCallback_ = std::move(cb); }
void setMessageCallback(MessageCallback&& cb)
{ messageCallback_ = std::move(cb); }
void setWriteCompleteCallback(WriteCompleteCallback&& cb)
{ writeCompleteCallback_ = std::move(cb); }
#endif

private:
// 连接建立完毕会调用这个函数
void newConnection(int sockfd);

// 移除一个链接
void removeConnection(const TcpConnectionPtr& conn);

// 所属的Reactor
EventLoop* loop_;

// 连接器
ConnectorPtr connector_; // avoid revealing Connector

// 名字
const string name_;

// 连接建立完成的回调函数
ConnectionCallback connectionCallback_;

// 数据到来回调函数
MessageCallback messageCallback_;

// 写完成回调函数
WriteCompleteCallback writeCompleteCallback_;

// 是否重试
bool retry_;   // atomic

// 是否已经建立连接
bool connect_; // atomic
// always in loop thread

// TcpConnection的id
int nextConnId_;

// 锁
mutable MutexLock mutex_;

// 客户端到服务器的连接
TcpConnectionPtr connection_; // @GuardedBy mutex_
};

}
}


using namespace muduo;
using namespace muduo::net;

namespace muduo
{
namespace net
{
namespace detail
{

// 移除连接
void removeConnection(EventLoop* loop, const TcpConnectionPtr& conn)
{
loop->queueInLoop(boost::bind(&TcpConnection::connectDestroyed, conn));
}

// 移除连接器
void removeConnector(const ConnectorPtr& connector)
{
//connector->
}

}
}
}

/*
* 构造函数
*/
TcpClient::TcpClient(EventLoop* loop,
const InetAddress& serverAddr,
const string& nameArg)
: loop_(CHECK_NOTNULL(loop)),
connector_(new Connector(loop, serverAddr)),		// 创建一个连接器
name_(nameArg),
connectionCallback_(defaultConnectionCallback),	// 设置默认的连接完成回调函数
messageCallback_(defaultMessageCallback),			// 设置默认的数据到来回调函数
retry_(false),
connect_(true),
nextConnId_(1)
{
// 设置连接器的连接完成回调函数
connector_->setNewConnectionCallback(
boost::bind(&TcpClient::newConnection, this, _1));
// FIXME setConnectFailedCallback
LOG_INFO << "TcpClient::TcpClient[" << name_
<< "] - connector " << get_pointer(connector_);
}

/*
* 析构函数
*/
TcpClient::~TcpClient()
{
LOG_INFO << "TcpClient::~TcpClient[" << name_
<< "] - connector " << get_pointer(connector_);
TcpConnectionPtr conn;
bool unique = false;
{
MutexLockGuard lock(mutex_);
unique = connection_.unique();
conn = connection_;
}
if (conn)
{
assert(loop_ == conn->getLoop());
// FIXME: not 100% safe, if we are in different thread

// 移除TcpConnection对象
CloseCallback cb = boost::bind(&detail::removeConnection, loop_, _1);

// 调用链接关闭回调函数
loop_->runInLoop(boost::bind(&TcpConnection::setCloseCallback, conn, cb));

if (unique)
{
// 强制关闭
conn->forceClose();
}
}
else
{
// 停止连接器
connector_->stop();

// FIXME: HACK
// 移除连接器
loop_->runAfter(1, boost::bind(&detail::removeConnector, connector_));
}
}

// 链接到服务器
void TcpClient::connect()
{
// FIXME: check state
LOG_INFO << "TcpClient::connect[" << name_ << "] - connecting to "
<< connector_->serverAddress().toIpPort();
connect_ = true;

// 开始连接
connector_->start();
}

// 断开连接
void TcpClient::disconnect()
{
connect_ = false;

{
MutexLockGuard lock(mutex_);
if (connection_)
{
// 主动关闭
connection_->shutdown();
}
}
}

// 停止连接
void TcpClient::stop()
{
connect_ = false;
// 停止连接器
connector_->stop();
}

// 连接建立完成的回调函数
// 连接器连接到服务器的时候会触发写事件,然后事件处理器处理写事件,调用处理写事件的回调函数,这个回调函数就是newConnection
void TcpClient::newConnection(int sockfd)
{
loop_->assertInLoopThread();

// 获取远端地址(即服务器的地址)
InetAddress peerAddr(sockets::getPeerAddr(sockfd));

// 设置新连接的id和名字
char buf[32];
snprintf(buf, sizeof buf, ":%s#%d", peerAddr.toIpPort().c_str(), nextConnId_);
++nextConnId_;
string connName = name_ + buf;

InetAddress localAddr(sockets::getLocalAddr(sockfd));

// 创建一个TcpConnection对象
TcpConnectionPtr conn(new TcpConnection(loop_,
connName,
sockfd,
localAddr,
peerAddr));

// 设置TcpConnection对象的用户回调函数,因为用户直接使用TcpConnection来进行数据读写操作
// 设置用户的连接建立完成回调函数
conn->setConnectionCallback(connectionCallback_);
// 设置用户的数据到来回调函数
conn->setMessageCallback(messageCallback_);
// 设置用户的写完成回调函数
conn->setWriteCompleteCallback(writeCompleteCallback_);
// 设置连接管理的回调函数
conn->setCloseCallback(boost::bind(&TcpClient::removeConnection, this, _1)); // FIXME: unsafe
{
MutexLockGuard lock(mutex_);
connection_ = conn;
}

// 调用TcpConnection的connectEstablished函数表示链接建立
conn->connectEstablished();
}

// 移除连接
void TcpClient::removeConnection(const TcpConnectionPtr& conn)
{
loop_->assertInLoopThread();
assert(loop_ == conn->getLoop());

{
MutexLockGuard lock(mutex_);
assert(connection_ == conn);
connection_.reset();
}

// 调用TcpConnection::connectDestroyed销毁连接
loop_->queueInLoop(boost::bind(&TcpConnection::connectDestroyed, conn));

// 如果可以重连并且已经建立连接,那么进行重连
if (retry_ && connect_)
{
LOG_INFO << "TcpClient::connect[" << name_ << "] - Reconnecting to "
<< connector_->serverAddress().toIpPort();
connector_->restart();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: