您的位置:首页 > 理论基础 > 计算机网络

Muduo网络库源码分析(五)Acceptor和TcpServer类

2016-04-16 01:09 645 查看
首先,我们先提一下对Socket的封装(不复杂,所以简单说一下)。

Endian.h : 封装了字节序转换函数(全局函数,位于muduo::net::sockets名称空间中)。

SocketsOps.h/ SocketsOps.cc :封装了socket相关系统调用。

Socket.h/Socket.cc(Socket类): 用RAII方法封装socket file descriptor。

InetAddress.h/InetAddress.cc(InetAddress类):网际地址sockaddr_in封装。

几个重要的函数:

void Socket::setTcpNoDelay(bool on)//禁用Nagle算法
{
int optval = on ? 1 : 0;
::setsockopt(sockfd_, IPPROTO_TCP, TCP_NODELAY,
&optval, sizeof optval);
// FIXME CHECK
}

void Socket::setReuseAddr(bool on)//TIME_WAIT下端口复用
{
int optval = on ? 1 : 0;
::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR,
&optval, sizeof optval);
// FIXME CHECK
}

void Socket::setKeepAlive(bool on)//定期探测连接是否存在
{
int optval = on ? 1 : 0;
::setsockopt(sockfd_, SOL_SOCKET, SO_KEEPALIVE,
&optval, sizeof optval);
// FIXME CHECK
}
注意点:

(1)Nagle算法可以一定程度上避免网络拥塞;禁用Nagle算法,可以避免连续发包出现延迟,这对于编写低延迟的网络服务很重要。TCP_NODELAY选项可以禁用Nagle算法。

(2)__attribute__((deprecated))表示该函数是过时的,被淘汰的,使用这样的函数在编译的时候会发出警告!

Acceptor

类Acceptor用来listen、accept,并调用回调函数来处理新到的连接。accept后,如果有新连接到来,会调用handleRead()函数,在这个函数接收连接。在handleRead()函数中每次接收一个新连接,之后调用处理新连接的回调函数(如果有的话)。

Acceptor.h

<span style="font-family:microsoft yahei;color:#362e2b;">class Acceptor : boost::noncopyable
{
public:
typedef boost::function<void (int sockfd,
const InetAddress&)> NewConnectionCallback;

Acceptor(EventLoop* loop, const InetAddress& listenAddr);
~Acceptor();

void setNewConnectionCallback(const NewConnectionCallback& cb)//设置回调用户的函数
{ newConnectionCallback_ = cb; }

bool listenning() const { return listenning_; }
void listen();

private:
void handleRead();

EventLoop* loop_;
Socket acceptSocket_;
Channel acceptChannel_;
NewConnectionCallback newConnectionCallback_;
bool listenning_;
int idleFd_;
};</span>


Acceptor.cc

<span style="font-family:microsoft yahei;color:#362e2b;">Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr)
: loop_(loop),
acceptSocket_(sockets::createNonblockingOrDie()),
acceptChannel_(loop, acceptSocket_.fd()),
listenning_(false),
idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC))
{
assert(idleFd_ >= 0);
acceptSocket_.setReuseAddr(true);
acceptSocket_.bindAddress(listenAddr);
acceptChannel_.setReadCallback(
boost::bind(&Acceptor::handleRead, this));
}

Acceptor::~Acceptor()
{
acceptChannel_.disableAll();
acceptChannel_.remove();
::close(idleFd_);
}

void Acceptor::listen()
{
loop_->assertInLoopThread();
listenning_ = true;
acceptSocket_.listen();
acceptChannel_.enableReading();
}

void Acceptor::handleRead()
{
loop_->assertInLoopThread();
InetAddress peerAddr(0);
//FIXME loop until no more
int connfd = acceptSocket_.accept(&peerAddr);//真正接收连接
if (connfd >= 0)
{
// string hostport = peerAddr.toIpPort();
// LOG_TRACE << "Accepts of " << hostport;
if (newConnectionCallback_)
{
newConnectionCallback_(connfd, peerAddr);//新信息传回回调函数
}
else
{
sockets::close(connfd);
}
}
else
{
// Read the section named "The special problem of
// accept()ing when you can't" in libev's doc.
// By Marc Lehmann, author of livev.
if (errno == EMFILE)
{
::close(idleFd_);
idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);
::close(idleFd_);
idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);
}
}
}
</span>



TcpServer



客户不直接使用Acceptor,它封装在TcpServer中。TcpServer使用比较简单,直接设置好新连接到达和消息到达的回调函数,之后start即可。 

TcpServer中还封装了EventLoopThreadPool,因此TcpServer中的EventLoop对象为main Reactor,EventLoopThreadPool为sub Reactor。 

当新建连接到达后,TcpServer创建一个新的TcpConnection对象来保存这个连接,设置这个新连接的回调函数,之后在EventLoopThreadPool中取一个EventLoop对象来作为这个新连接的reactor。 TcpServer用map保存了当前server对象中的TcpConnection,当TcpServer对象析构时,就会关闭所有连接。

TcpServer.h

<span style="font-family:microsoft yahei;color:#362e2b;">/// TCP server, supports single-threaded and thread-pool models.
///
/// This is an interface class, so don't expose too much details.
class TcpServer : boost::noncopyable
{
public:
//typedef boost::function<void(EventLoop*)> ThreadInitCallback;

//TcpServer(EventLoop* loop, const InetAddress& listenAddr);
TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const string& nameArg);
~TcpServer();  // force out-line dtor, for scoped_ptr members.

const string& hostport() const { return hostport_; }
const string& name() const { return name_; }

/// Starts the server if it's not listenning.
///
/// It's harmless to call it multiple times.
/// Thread safe.
void start();

/// Set connection callback.
/// Not thread safe.
// 设置连接到来或者连接关闭回调函数
void setConnectionCallback(const ConnectionCallback& cb)
{ connectionCallback_ = cb; }

/// Set message callback.
/// Not thread safe.
// 设置消息到来回调函数
void setMessageCallback(const MessageCallback& cb)
{ messageCallback_ = cb; }

private:
/// Not thread safe, but in loop
void newConnection(int sockfd, const InetAddress& peerAddr);

typedef std::map<string, TcpConnectionPtr> ConnectionMap;

EventLoop* loop_;  // the acceptor loop
const string hostport_;		// 服务端口
const string name_;			// 服务名
boost::scoped_ptr<Acceptor> acceptor_; // avoid revealing Acceptor
ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
bool started_;
// always in loop thread
int nextConnId_;				// 下一个连接ID
ConnectionMap connections_;	// 连接列表
};
</span>


TcpServer.cc

<span style="font-family:microsoft yahei;color:#362e2b;">TcpServer::TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const string& nameArg)
: loop_(CHECK_NOTNULL(loop)),
hostport_(listenAddr.toIpPort()),
name_(nameArg),
acceptor_(new Acceptor(loop, listenAddr)),
/*threadPool_(new EventLoopThreadPool(loop)),
connectionCallback_(defaultConnectionCallback),
messageCallback_(defaultMessageCallback),*/
started_(false),
nextConnId_(1)
{
// Acceptor::handleRead函数中会回调用TcpServer::newConnection
// _1对应的是socket文件描述符,_2对应的是对等方的地址(InetAddress)
acceptor_->setNewConnectionCallback(
boost::bind(&TcpServer::newConnection, this, _1, _2));
}

TcpServer::~TcpServer()
{
loop_->assertInLoopThread();
LOG_TRACE << "TcpServer::~TcpServer [" << name_ << "] destructing";
}

// 该函数多次调用是无害的
// 该函数可以跨线程调用
void TcpServer::start()
{
if (!started_)
{
started_ = true;
}

if (!acceptor_->listenning())
{
// get_pointer返回原生指针
loop_->runInLoop(
boost::bind(&Acceptor::listen, get_pointer(acceptor_)));
}
}

void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
loop_->assertInLoopThread();
char buf[32];
snprintf(buf, sizeof buf, ":%s#%d", hostport_.c_str(), nextConnId_);
++nextConnId_;
string connName = name_ + buf;

LOG_INFO << "TcpServer::newConnection [" << name_
<< "] - new connection [" << connName
<< "] from " << peerAddr.toIpPort();
InetAddress localAddr(sockets::getLocalAddr(sockfd));
// FIXME poll with zero timeout to double confirm the new connection
// FIXME use make_shared if necessary
TcpConnectionPtr conn(new TcpConnection(loop_,
connName,
sockfd,
localAddr,
peerAddr));
connections_[connName] = conn;
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);

conn->connectEstablished();
}</span>


测试程序:
#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>

#include <stdio.h>

using namespace muduo;
using namespace muduo::net;

void onConnection(const TcpConnectionPtr &conn)
{
if (conn->connected())
{
printf("onConnection(): new connection [%s] from %s\n",
conn->name().c_str(),
conn->peerAddress().toIpPort().c_str());
}
else
{
printf("onConnection(): connection [%s] is down\n",
conn->name().c_str());
}
}

void onMessage(const TcpConnectionPtr &conn,
const char *data,
ssize_t len)
{
printf("onMessage(): received %zd bytes from connection [%s]\n",
len, conn->name().c_str());
}

int main()
{
printf("main(): pid = %d\n", getpid());

InetAddress listenAddr(8888);
EventLoop loop;

TcpServer server(&loop, listenAddr, "TestServer");
server.setConnectionCallback(onConnection);
server.setMessageCallback(onMessage);
server.start();

loop.loop();
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: