从零开始学习EasyDarwin(RTSP之TaskThread)
2016-03-11 23:22
507 查看
上一节讲到《从零开始学习EasyDarwin(RTSP之EventThread) 》
这节分析下EasyDarwin下的TaskThread类,进而加深对EventThread类的理解。
在EventThread::Entry中调用select_waitevent函数等待事件的发生,当有事件发生的时候,就通过调用ProcessEvent方法对事件进行相应的处理。
对于建立RTSP连接的请求:
调用TCPListenerSocket::ProcessEvent
此方法调用RTSPListenerSocket的GetSessionTask方法建立一个RTSPSession,并把相应的套接口加入侦听队列,等待RTSP请求。
然后还需调用this->RequestEvent(EV_RE)把建立RTSP连接的请求加入到侦听队列。
在RequestEvent函数中调用select_modwatch(&fEventReq, theMask)将自己感兴趣的事件加入到侦听队列中,
下面来看下TaskThread类的成员变量和成员函数
重点是函数Entry()和WaitForTask()。
TaskThread::Entry调用TaskThread::WaitForTask()方法获得下一个需要处理的Task。TaskThread::WaitForTask()首先从TaskThread::fHeap中获得Task,如果TaskThread::fHeap中没有满足条件的Task,则从TaskThread::fTaskQueue中获得Task。
开始时,我没弄明白Task存储在什么地方,
简单分析,fTaskQueue应该是任务队列,DeQueueBlocking是出队列操作
尝试性的进入GetEnclosingObject函数位置,
验证了自己的想法,设置和获取Task函数如下:
SetEnclosingObject(void* obj)->设置task
GetEnclosingObject() ->获取task
继续分析TaskThread::Entry()函数
TaskThread::Entry调用TaskThread::WaitForTask()方法获得下一个需要处理的Task。
根据theTask->fWriteLock判断是否有读写锁
然后调用theTimeout = theTask->Run(),转到Task::Run函数
if(theTimeout < 0)
{
//删除task
}
else if (theTimeout == 0)
{
//不做处理
}
else
{
// 返回值theTimeout代表了下次处理这个Task需等待的时间
//fHeap.Insert(&theTask->fTimerHeapElem)把Task插入到堆里,并设定等待时间
}
最后再来看下virtual ~TaskThread() { this->StopAndWaitForThread(); }
设置停止标识fStopRequested ,然后等待线程的结束。
Bool16 IsStopRequested() { return fStopRequested; }函数来判断停止标识fStopRequested
在TaskThread::WaitForTask()中末尾有代码
在Join()函数中,调用的WaitForSingleObject(fThreadID, INFINITE);函数等待线程的结束。
附上一张流程图帮助理解流程
有什么疑问,欢迎大家在文章下方留言。
这节分析下EasyDarwin下的TaskThread类,进而加深对EventThread类的理解。
在EventThread::Entry中调用select_waitevent函数等待事件的发生,当有事件发生的时候,就通过调用ProcessEvent方法对事件进行相应的处理。
对于建立RTSP连接的请求:
调用TCPListenerSocket::ProcessEvent
此方法调用RTSPListenerSocket的GetSessionTask方法建立一个RTSPSession,并把相应的套接口加入侦听队列,等待RTSP请求。
然后还需调用this->RequestEvent(EV_RE)把建立RTSP连接的请求加入到侦听队列。
在RequestEvent函数中调用select_modwatch(&fEventReq, theMask)将自己感兴趣的事件加入到侦听队列中,
int select_modwatch(struct eventreq *req, int which) { while (sMsgWindow == NULL) OSThread::Sleep(10); SInt32 theEvent = 0; if (which & EV_RE) theEvent |= FD_READ | FD_ACCEPT | FD_CLOSE; if (which & EV_WR) theEvent |= FD_WRITE | FD_CONNECT; unsigned int theMsg = (unsigned int)(req->er_data); return ::WSAAsyncSelect(req->er_handle, sMsgWindow, theMsg, theEvent); }
下面来看下TaskThread类的成员变量和成员函数
class TaskThread : public OSThread { public: //Implementation detail: all tasks get run on TaskThreads. TaskThread() : OSThread(), fTaskThreadPoolElem() {fTaskThreadPoolElem.SetEnclosingObject(this);} virtual ~TaskThread() { this->StopAndWaitForThread(); } private: enum { kMinWaitTimeInMilSecs = 10 //UInt32 }; virtual void Entry(); Task* WaitForTask(); OSQueueElem fTaskThreadPoolElem; OSHeap fHeap; OSQueue_Blocking fTaskQueue; friend class Task; friend class TaskThreadPool; };
重点是函数Entry()和WaitForTask()。
Task* TaskThread::WaitForTask() { while (true) { //获取当前时间 SInt64 theCurrentTime = OS::Milliseconds(); //从fHeap堆中获取Task() if ((fHeap.PeekMin() != NULL) && (fHeap.PeekMin()->GetValue() <= theCurrentTime)) { return (Task*)fHeap.ExtractMin()->GetEnclosingObject();//返回指向Task对象的指针 } SInt64 theTimeout = 0; if (fHeap.PeekMin() != NULL) theTimeout = fHeap.PeekMin()->GetValue() - theCurrentTime;//计算下超时时间 Assert(theTimeout >= 0); if (theTimeout < 10) theTimeout = 10; //从fTaskQueue任务队列中取出OSQueueElem队列元素 OSQueueElem* theElem = fTaskQueue.DeQueueBlocking(this, (SInt32) theTimeout); if (theElem != NULL) { return (Task*)theElem->GetEnclosingObject();//返回Task指针 } if (OSThread::GetCurrent()->IsStopRequested()) return NULL; } }
TaskThread::Entry调用TaskThread::WaitForTask()方法获得下一个需要处理的Task。TaskThread::WaitForTask()首先从TaskThread::fHeap中获得Task,如果TaskThread::fHeap中没有满足条件的Task,则从TaskThread::fTaskQueue中获得Task。
开始时,我没弄明白Task存储在什么地方,
简单分析,fTaskQueue应该是任务队列,DeQueueBlocking是出队列操作
尝试性的进入GetEnclosingObject函数位置,
验证了自己的想法,设置和获取Task函数如下:
SetEnclosingObject(void* obj)->设置task
GetEnclosingObject() ->获取task
继续分析TaskThread::Entry()函数
TaskThread::Entry调用TaskThread::WaitForTask()方法获得下一个需要处理的Task。
根据theTask->fWriteLock判断是否有读写锁
然后调用theTimeout = theTask->Run(),转到Task::Run函数
if(theTimeout < 0)
{
//删除task
}
else if (theTimeout == 0)
{
//不做处理
}
else
{
// 返回值theTimeout代表了下次处理这个Task需等待的时间
//fHeap.Insert(&theTask->fTimerHeapElem)把Task插入到堆里,并设定等待时间
}
最后再来看下virtual ~TaskThread() { this->StopAndWaitForThread(); }
void OSThread::StopAndWaitForThread() { fStopRequested = true; if (!fJoined) Join(); }
设置停止标识fStopRequested ,然后等待线程的结束。
Bool16 IsStopRequested() { return fStopRequested; }函数来判断停止标识fStopRequested
在TaskThread::WaitForTask()中末尾有代码
if (OSThread::GetCurrent()->IsStopRequested()) return NULL;//返回NULL,跳出循环
在Join()函数中,调用的WaitForSingleObject(fThreadID, INFINITE);函数等待线程的结束。
附上一张流程图帮助理解流程
有什么疑问,欢迎大家在文章下方留言。
相关文章推荐
- Extjs4.0 最新最全视频教程
- OpenERP 的XML-RPC的实例+many2many,one2many,many2one...
- CSS3属性教程与案例分享
- jquery教程靠边站,一分钱不花让你免费学会jquery
- autoit入门教程小结第1/5页
- 用Photoshop 制作草地效果简明教程
- 比较完整简洁的Flash处理XML文档数据教程 上篇第1/3页
- VBS基础编程教程 (第1篇)
- SQLite教程(十一):临时文件
- VBS基础编程教程 (第3篇)
- PostgreSQL新手入门教程
- VBS教程:运算符-运算符(+)
- PostgreSQL教程(十):性能提升技巧
- PostgreSQL教程(二):模式Schema详解
- PostgreSQL教程(十三):数据库管理详解
- PostgreSQL教程(八):索引详解
- PostgreSQL教程(三):表的继承和分区表详解
- XML简易教程之三
- 如何使用jquery easyui创建标签组件
- ruby 数组使用教程