您的位置:首页 > 其它

WSAPoll引发的一场血案(2)

2017-09-05 02:33 507 查看
poll文档请参考https://linux.die.net/man/2/poll

WSAPoll请参考https://msdn.microsoft.com/en-us/library/ms741669(v=vs.85).aspx

问题1

在写EventLoopThread的过程中发现了一个问题。tcpServer启动时需要初始化n个线程的EventLoopThread,需要在每个线程中创建一个EventLoop,并指向loop函数,发生错误。此时是这样的,主EventLoop还没有给其他从EventLoop分配TcpConnection,也就是io事件,所以此时它的struct pollfd是空的,也就相当于这样:

WSAPoll(pfd, 0, timeout);


错误代码还是10022,不过这次是真的 “无效参数”。

查msdn发现:https://msdn.microsoft.com/en-us/library/ms741669(v=vs.85).aspx

int WSAAPI WSAPoll(
_Inout_ WSAPOLLFD fdarray[],
_In_    ULONG     nfds,
_In_    INT       timeout
);


fdarray [in, out]

An array of one or more POLLFD structures specifying the set of sockets for which status is requested. The array must contain at least one structure with a valid socket. Upon return, this parameter receives the updated sockets with the revents status flags member set on each one that matches the status query criteria.

那在linux下是怎么样的呢?经过测试之后发现,poll(pfd, 0, -1);会一直处于阻塞状态,poll(pfd,0,timeout)会在timeout之后返回0。看来修改又得费点功夫了。如果说在loop前判断pollfd是否为0,发现空转太多;那就加一个条件变量吧,如果pollfd为0并且当前的functor数组里没有任务,那就一直wait,当加入一个fucntor或者poller从空channel到非空,就唤醒它。类似这样

void EventLoop::loop()
{
printf("loop() of EventLoop \n");
quit_ = false;
looping_ = true;
while (!quit_)
{
activeChannels_.clear();

{
std::unique_lock<std::mutex> lock(mutex_);
while (poller_->pollfdsSize() == 0 && pendingPucntors_.size() == 0)
{
task_.wait(lock);
}
}
if (poller_->pollfdsSize() != 0)
{
pollReturnTime_ = poller_->poll(activeChannels_, kPollTimeMs);
eventHandling_ = true;
for (int i = 0; i < activeChannels_.size(); i++)
{
currentActiveChannel_ = activeChannels_[i];
currentActiveChannel_->handleEvents(pollReturnTime_);
}
currentActiveChannel_ = NULL;
eventHandling_ = false;
}
doPendingFunctors();
}
looping_ = false;
}


void EventLoop::updateChannel(Channel * channel)
{
printf("threadId is %d, updateChannel() of EventLoop \n", std::this_thread::get_id());
int pre = poller_->pollfdsSize();
poller_->updateChannel(channel);
if (pre == 0 && poller_->pollfdsSize() == 1)
task_.notify_one();
}


解释一下,channel唤醒的话,只有从0到1的唤醒才是必要的,其他都是多余的唤醒。

void EventLoop::queueInLoop(const Functor & cb)
{
printf("threadId is %d, queueInLoop() of EventLoop \n", std::this_thread::get_id());
std::lock_guard<std::mutex> guard(mutex_);
pendingPucntors_.push_back(cb);
task_.notify_one();
}


问题2

是的,又出问题了,这次是当客户端主动断开连接时,理想情况下是服务端负责该Tcpconnection的EventLoop收到revents,执行handleClose和handleError,然后做三件事,1更新channel,我不再关注任何事件,2删除Server中的指针,3,删除poll中的指针。事件1由负责该连接的EventLoop完成,事件2由主EventLoop完成,事件3放在主EventLoop的functor队列中完成,然后完美的结局。

但是现实是事件1完成了,事件2交给主线程了,但是,但是,主线程(即主EventLoop所在的线程)没有被调度(也就是事件2 3都搁置了),而是那个负责Tcpconnection的接着loop,由于事件2 3都搁置了,poll中的指针和pollfd都还在,所以WSAPoll发现这有一个什么都不关注的描述符,但是它已经POLLHUP(挂起了)了,WSAPoll说,等会我还要返回你,管你关注不关注事件,所以,结果是,上边的三件事又要再做一遍,触发了assert。

msdn是这样说的,The fdarray parameter must contain at least one valid non-negative socket. Upon return, all of the supplied sockets that either satisfy the requested status conditions or have an error condition will have the appropriate flags set on the revents member of their corresponding WSAPOLLFD structure pointed to by the fdarray parameter. All sockets that do not meet these criteria and have no error condition will have the corresponding revents member set to 0.

意思是这样的,只要你关注的事件发生了或者错误的情况发生了(pollhub)我就会把你的revents给设置了。问题知道了,该怎么解决呢。

我是这样做的,不是要做三件事吗,分析一下,假如我先做事件1,再顺手(和事件1同一个线程)把事件3做了,把事件2留给主线程。可行吗?

当一个Tcpconnection断开连接时,主线程做什么事情会让Tcpconnection所在的线程处于危险的境地(即使他们是同一个线程)?暂时想不到。首先,Tcpconnection所有对象完好,只是poll中的指针不再指向它的channel,失去了poll的机会而已。

说了这么多,都是wsapoll的问题。看看linux上的poll:

The field events is an input parameter, a bit mask specifying the events the application is interested in for the file descriptor fd. If this field is specified as zero, then all events are ignored for fd and revents returns zero. 是哒,linux人家会忽略的,不存在这个问题。

最终解决代码

connectionDestroyed();不再被Server回调,直接调用。

void TcpConnection::handleClose()
{
printf("threadId is %d, handleClose() of TcpConnection\n", std::this_thread::get_id());
loop_->assertInLoopThread();
assert(state_ == kConnected || state_ == kDisconnecting);//both or can't send
// we don't close fd, leave it to dtor, so we can find leaks easily.
setState(kDisconnected);
channel_->disableAll();

TcpConnectionPtr guardThis(shared_from_this());
connectionCallback_(guardThis);

connectionDestroyed();
// must be the last line
closeCallback_(guardThis);//erase ptr in map and call connectionDestroyed.
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: