您的位置:首页 > 编程语言

从零开始学习EasyDarwin(RTSP之TaskThread)

2016-03-11 23:22 507 查看
上一节讲到《从零开始学习EasyDarwin(RTSP之EventThread) 》

这节分析下EasyDarwin下的TaskThread类,进而加深对EventThread类的理解。

在EventThread::Entry中调用select_waitevent函数等待事件的发生,当有事件发生的时候,就通过调用ProcessEvent方法对事件进行相应的处理。

对于建立RTSP连接的请求:

调用TCPListenerSocket::ProcessEvent

此方法调用RTSPListenerSocket的GetSessionTask方法建立一个RTSPSession,并把相应的套接口加入侦听队列,等待RTSP请求。

然后还需调用this->RequestEvent(EV_RE)把建立RTSP连接的请求加入到侦听队列。

在RequestEvent函数中调用select_modwatch(&fEventReq, theMask)将自己感兴趣的事件加入到侦听队列中,

int select_modwatch(struct eventreq *req, int which)
{
while (sMsgWindow == NULL)
OSThread::Sleep(10);

SInt32 theEvent = 0;

if (which & EV_RE)
theEvent |= FD_READ | FD_ACCEPT | FD_CLOSE;
if (which & EV_WR)
theEvent |= FD_WRITE | FD_CONNECT;

unsigned int theMsg = (unsigned int)(req->er_data);

return ::WSAAsyncSelect(req->er_handle, sMsgWindow, theMsg, theEvent);
}


下面来看下TaskThread类的成员变量和成员函数

class TaskThread : public OSThread
{
public:

//Implementation detail: all tasks get run on TaskThreads.

TaskThread() :  OSThread(), fTaskThreadPoolElem()
{fTaskThreadPoolElem.SetEnclosingObject(this);}
virtual         ~TaskThread() { this->StopAndWaitForThread(); }

private:

enum
{
kMinWaitTimeInMilSecs = 10  //UInt32
};

virtual void    Entry();
Task*           WaitForTask();

OSQueueElem     fTaskThreadPoolElem;

OSHeap              fHeap;
OSQueue_Blocking    fTaskQueue;

friend class Task;
friend class TaskThreadPool;
};


重点是函数Entry()和WaitForTask()。

Task* TaskThread::WaitForTask()
{
while (true)
{
//获取当前时间
SInt64 theCurrentTime = OS::Milliseconds();

//从fHeap堆中获取Task()
if ((fHeap.PeekMin() != NULL) && (fHeap.PeekMin()->GetValue() <= theCurrentTime))
{
return (Task*)fHeap.ExtractMin()->GetEnclosingObject();//返回指向Task对象的指针
}

SInt64 theTimeout = 0;
if (fHeap.PeekMin() != NULL)
theTimeout = fHeap.PeekMin()->GetValue() - theCurrentTime;//计算下超时时间
Assert(theTimeout >= 0);

if (theTimeout < 10)
theTimeout = 10;

//从fTaskQueue任务队列中取出OSQueueElem队列元素
OSQueueElem* theElem = fTaskQueue.DeQueueBlocking(this, (SInt32) theTimeout);
if (theElem != NULL)
{
return (Task*)theElem->GetEnclosingObject();//返回Task指针
}

if (OSThread::GetCurrent()->IsStopRequested())
return NULL;
}
}


TaskThread::Entry调用TaskThread::WaitForTask()方法获得下一个需要处理的Task。TaskThread::WaitForTask()首先从TaskThread::fHeap中获得Task,如果TaskThread::fHeap中没有满足条件的Task,则从TaskThread::fTaskQueue中获得Task。

开始时,我没弄明白Task存储在什么地方,

简单分析,fTaskQueue应该是任务队列,DeQueueBlocking是出队列操作

尝试性的进入GetEnclosingObject函数位置,

验证了自己的想法,设置和获取Task函数如下:

SetEnclosingObject(void* obj)->设置task

GetEnclosingObject() ->获取task

继续分析TaskThread::Entry()函数

TaskThread::Entry调用TaskThread::WaitForTask()方法获得下一个需要处理的Task。

根据theTask->fWriteLock判断是否有读写锁

然后调用theTimeout = theTask->Run(),转到Task::Run函数

if(theTimeout < 0)

{

//删除task

}

else if (theTimeout == 0)

{

//不做处理

}

else

{

// 返回值theTimeout代表了下次处理这个Task需等待的时间

//fHeap.Insert(&theTask->fTimerHeapElem)把Task插入到堆里,并设定等待时间

}

最后再来看下virtual ~TaskThread() { this->StopAndWaitForThread(); }

void OSThread::StopAndWaitForThread()
{
fStopRequested = true;
if (!fJoined)
Join();
}


设置停止标识fStopRequested ,然后等待线程的结束。

Bool16 IsStopRequested() { return fStopRequested; }函数来判断停止标识fStopRequested

在TaskThread::WaitForTask()中末尾有代码

if (OSThread::GetCurrent()->IsStopRequested())
return NULL;//返回NULL,跳出循环


在Join()函数中,调用的WaitForSingleObject(fThreadID, INFINITE);函数等待线程的结束。

附上一张流程图帮助理解流程



有什么疑问,欢迎大家在文章下方留言。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息