DSS Source Code Analyse (10) - RTPSession::Run
2013-06-05 13:57
363 查看
SInt64 RTPSession::Run()
{
#if DEBUG
Assert(fActivateCalled);
#endif
EventFlags events = this->GetEvents();
QTSS_RoleParams theParams;
theParams.clientSessionClosingParams.inClientSession = this; //every single role being invoked now has this
//as the first parameter
#if RTPSESSION_DEBUGGING
qtss_printf("RTPSession %"_S32BITARG_": In Run. Events %"_S32BITARG_"\n",(SInt32)this, (SInt32)events);
#endif
// Some callbacks look for this struct in the thread object
OSThreadDataSetter theSetter(&fModuleState, NULL);
//if we have been instructed to go away, then let's delete ourselves
if ((events & Task::kKillEvent) || (events & Task::kTimeoutEvent) || (fModuleDoingAsyncStuff))
{
if (!fModuleDoingAsyncStuff)
{
if (events & Task::kTimeoutEvent)
fClosingReason = qtssCliSesCloseTimeout;
//deletion is a bit complicated. For one thing, it must happen from within
//the Run function to ensure that we aren't getting events when we are deleting
//ourselves. We also need to make sure that we aren't getting RTSP requests
//(or, more accurately, that the stream object isn't being used by any other
//threads). We do this by first removing the session from the session map.
#if RTPSESSION_DEBUGGING
qtss_printf("RTPSession %"_S32BITARG_": about to be killed. Eventmask = %"_S32BITARG_"\n",(SInt32)this, (SInt32)events);
#endif
// We cannot block waiting to UnRegister, because we have to
// give the RTSPSessionTask a chance to release the RTPSession.
OSRefTable* sessionTable = QTSServerInterface::GetServer()->GetRTPSessionMap();
Assert(sessionTable != NULL);
if (!sessionTable->TryUnRegister(&fRTPMapElem))
{
this->Signal(Task::kKillEvent);// So that we get back to this place in the code
return kCantGetMutexIdleTime;
}
// The ClientSessionClosing role is allowed to do async stuff
fModuleState.curTask = this;
fModuleDoingAsyncStuff = true; // So that we know to jump back to the
fCurrentModule = 0; // right place in the code
// Set the reason parameter
theParams.clientSessionClosingParams.inReason = fClosingReason;
// If RTCP packets are being generated internally for this stream,
// Send a BYE now.
RTPStream** theStream = NULL;
UInt32 theLen = 0;
if (this->GetPlayFlags() & qtssPlayFlagsSendRTCP)
{
SInt64 byePacketTime = OS::Milliseconds();
for (int x = 0; this->GetValuePtr(qtssCliSesStreamObjects, x, (void**)&theStream, &theLen) == QTSS_NoErr; x++)
if (theStream && *theStream != NULL)
(*theStream)->SendRTCPSR(byePacketTime, true);
}
}
//at this point, we know no one is using this session, so invoke the
//session cleanup role. We don't need to grab the session mutex before
//invoking modules here, because the session is unregistered and
//therefore there's no way another thread could get involved anyway
UInt32 numModules = QTSServerInterface::GetNumModulesInRole(QTSSModule::kClientSessionClosingRole);
{
for (; fCurrentModule < numModules; fCurrentModule++)
{
fModuleState.eventRequested = false;
fModuleState.idleTime = 0;
QTSSModule* theModule = QTSServerInterface::GetModule(QTSSModule::kClientSessionClosingRole, fCurrentModule);
(void)theModule->CallDispatch(QTSS_ClientSessionClosing_Role, &theParams);
// If this module has requested an event, return and wait for the event to transpire
if (fModuleState.eventRequested)
return fModuleState.idleTime; // If the module has requested idle time...
}
}
return -1;//doing this will cause the destructor to get called.
}
//if the stream is currently paused, just return without doing anything.
//We'll get woken up again when a play is issued
if ((fState == qtssPausedState) || (fModule == NULL))
return 0;
//Make sure to grab the session mutex here, to protect the module against
//RTSP requests coming in while it's sending packets
{
OSMutexLocker locker(&fSessionMutex);
//just make sure we haven't been scheduled before our scheduled play
//time. If so, reschedule ourselves for the proper time. (if client
//sends a play while we are already playing, this may occur)
theParams.rtpSendPacketsParams.inCurrentTime = OS::Milliseconds();
if (fNextSendPacketsTime > theParams.rtpSendPacketsParams.inCurrentTime)
{
RTPStream** retransStream = NULL;
UInt32 retransStreamLen = 0;
//
// Send retransmits if we need to
for (int streamIter = 0; this->GetValuePtr(qtssCliSesStreamObjects, streamIter, (void**)&retransStream, &retransStreamLen) == QTSS_NoErr; streamIter++)
if (retransStream && *retransStream)
(*retransStream)->SendRetransmits();
theParams.rtpSendPacketsParams.outNextPacketTime = fNextSendPacketsTime - theParams.rtpSendPacketsParams.inCurrentTime;
}
else
{
#if RTPSESSION_DEBUGGING
qtss_printf("RTPSession %"_S32BITARG_": about to call SendPackets\n",(SInt32)this);
#endif
if ((theParams.rtpSendPacketsParams.inCurrentTime - fLastBandwidthTrackerStatsUpdate) > 1000)
this->GetBandwidthTracker()->UpdateStats();
theParams.rtpSendPacketsParams.outNextPacketTime = 0;
// Async event registration is definitely allowed from this role.
fModuleState.eventRequested = false;
Assert(fModule != NULL);
(void)fModule->CallDispatch(QTSS_RTPSendPackets_Role, &theParams);
#if RTPSESSION_DEBUGGING
qtss_printf("RTPSession %"_S32BITARG_": back from sendPackets, nextPacketTime = %"_64BITARG_"d\n",(SInt32)this, theParams.rtpSendPacketsParams.outNextPacketTime);
#endif
//make sure not to get deleted accidently!
if (theParams.rtpSendPacketsParams.outNextPacketTime < 0)
theParams.rtpSendPacketsParams.outNextPacketTime = 0;
fNextSendPacketsTime = theParams.rtpSendPacketsParams.inCurrentTime + theParams.rtpSendPacketsParams.outNextPacketTime;
}
}
//
// Make sure the duration between calls to Run() isn't greater than the
// max retransmit delay interval.
UInt32 theRetransDelayInMsec = QTSServerInterface::GetServer()->GetPrefs()->GetMaxRetransmitDelayInMsec();
UInt32 theSendInterval = QTSServerInterface::GetServer()->GetPrefs()->GetSendIntervalInMsec();
//
// We want to avoid waking up to do retransmits, and then going back to sleep for like, 1 msec. So,
// only adjust the time to wake up if the next packet time is greater than the max retransmit delay +
// the standard interval between wakeups.
if (theParams.rtpSendPacketsParams.outNextPacketTime > (theRetransDelayInMsec + theSendInterval))
theParams.rtpSendPacketsParams.outNextPacketTime = theRetransDelayInMsec;
Assert(theParams.rtpSendPacketsParams.outNextPacketTime >= 0);//we'd better not get deleted accidently!
return theParams.rtpSendPacketsParams.outNextPacketTime;
}
{
#if DEBUG
Assert(fActivateCalled);
#endif
EventFlags events = this->GetEvents();
QTSS_RoleParams theParams;
theParams.clientSessionClosingParams.inClientSession = this; //every single role being invoked now has this
//as the first parameter
#if RTPSESSION_DEBUGGING
qtss_printf("RTPSession %"_S32BITARG_": In Run. Events %"_S32BITARG_"\n",(SInt32)this, (SInt32)events);
#endif
// Some callbacks look for this struct in the thread object
OSThreadDataSetter theSetter(&fModuleState, NULL);
//if we have been instructed to go away, then let's delete ourselves
if ((events & Task::kKillEvent) || (events & Task::kTimeoutEvent) || (fModuleDoingAsyncStuff))
{
if (!fModuleDoingAsyncStuff)
{
if (events & Task::kTimeoutEvent)
fClosingReason = qtssCliSesCloseTimeout;
//deletion is a bit complicated. For one thing, it must happen from within
//the Run function to ensure that we aren't getting events when we are deleting
//ourselves. We also need to make sure that we aren't getting RTSP requests
//(or, more accurately, that the stream object isn't being used by any other
//threads). We do this by first removing the session from the session map.
#if RTPSESSION_DEBUGGING
qtss_printf("RTPSession %"_S32BITARG_": about to be killed. Eventmask = %"_S32BITARG_"\n",(SInt32)this, (SInt32)events);
#endif
// We cannot block waiting to UnRegister, because we have to
// give the RTSPSessionTask a chance to release the RTPSession.
OSRefTable* sessionTable = QTSServerInterface::GetServer()->GetRTPSessionMap();
Assert(sessionTable != NULL);
if (!sessionTable->TryUnRegister(&fRTPMapElem))
{
this->Signal(Task::kKillEvent);// So that we get back to this place in the code
return kCantGetMutexIdleTime;
}
// The ClientSessionClosing role is allowed to do async stuff
fModuleState.curTask = this;
fModuleDoingAsyncStuff = true; // So that we know to jump back to the
fCurrentModule = 0; // right place in the code
// Set the reason parameter
theParams.clientSessionClosingParams.inReason = fClosingReason;
// If RTCP packets are being generated internally for this stream,
// Send a BYE now.
RTPStream** theStream = NULL;
UInt32 theLen = 0;
if (this->GetPlayFlags() & qtssPlayFlagsSendRTCP)
{
SInt64 byePacketTime = OS::Milliseconds();
for (int x = 0; this->GetValuePtr(qtssCliSesStreamObjects, x, (void**)&theStream, &theLen) == QTSS_NoErr; x++)
if (theStream && *theStream != NULL)
(*theStream)->SendRTCPSR(byePacketTime, true);
}
}
//at this point, we know no one is using this session, so invoke the
//session cleanup role. We don't need to grab the session mutex before
//invoking modules here, because the session is unregistered and
//therefore there's no way another thread could get involved anyway
UInt32 numModules = QTSServerInterface::GetNumModulesInRole(QTSSModule::kClientSessionClosingRole);
{
for (; fCurrentModule < numModules; fCurrentModule++)
{
fModuleState.eventRequested = false;
fModuleState.idleTime = 0;
QTSSModule* theModule = QTSServerInterface::GetModule(QTSSModule::kClientSessionClosingRole, fCurrentModule);
(void)theModule->CallDispatch(QTSS_ClientSessionClosing_Role, &theParams);
// If this module has requested an event, return and wait for the event to transpire
if (fModuleState.eventRequested)
return fModuleState.idleTime; // If the module has requested idle time...
}
}
return -1;//doing this will cause the destructor to get called.
}
//if the stream is currently paused, just return without doing anything.
//We'll get woken up again when a play is issued
if ((fState == qtssPausedState) || (fModule == NULL))
return 0;
//Make sure to grab the session mutex here, to protect the module against
//RTSP requests coming in while it's sending packets
{
OSMutexLocker locker(&fSessionMutex);
//just make sure we haven't been scheduled before our scheduled play
//time. If so, reschedule ourselves for the proper time. (if client
//sends a play while we are already playing, this may occur)
theParams.rtpSendPacketsParams.inCurrentTime = OS::Milliseconds();
if (fNextSendPacketsTime > theParams.rtpSendPacketsParams.inCurrentTime)
{
RTPStream** retransStream = NULL;
UInt32 retransStreamLen = 0;
//
// Send retransmits if we need to
for (int streamIter = 0; this->GetValuePtr(qtssCliSesStreamObjects, streamIter, (void**)&retransStream, &retransStreamLen) == QTSS_NoErr; streamIter++)
if (retransStream && *retransStream)
(*retransStream)->SendRetransmits();
theParams.rtpSendPacketsParams.outNextPacketTime = fNextSendPacketsTime - theParams.rtpSendPacketsParams.inCurrentTime;
}
else
{
#if RTPSESSION_DEBUGGING
qtss_printf("RTPSession %"_S32BITARG_": about to call SendPackets\n",(SInt32)this);
#endif
if ((theParams.rtpSendPacketsParams.inCurrentTime - fLastBandwidthTrackerStatsUpdate) > 1000)
this->GetBandwidthTracker()->UpdateStats();
theParams.rtpSendPacketsParams.outNextPacketTime = 0;
// Async event registration is definitely allowed from this role.
fModuleState.eventRequested = false;
Assert(fModule != NULL);
(void)fModule->CallDispatch(QTSS_RTPSendPackets_Role, &theParams);
#if RTPSESSION_DEBUGGING
qtss_printf("RTPSession %"_S32BITARG_": back from sendPackets, nextPacketTime = %"_64BITARG_"d\n",(SInt32)this, theParams.rtpSendPacketsParams.outNextPacketTime);
#endif
//make sure not to get deleted accidently!
if (theParams.rtpSendPacketsParams.outNextPacketTime < 0)
theParams.rtpSendPacketsParams.outNextPacketTime = 0;
fNextSendPacketsTime = theParams.rtpSendPacketsParams.inCurrentTime + theParams.rtpSendPacketsParams.outNextPacketTime;
}
}
//
// Make sure the duration between calls to Run() isn't greater than the
// max retransmit delay interval.
UInt32 theRetransDelayInMsec = QTSServerInterface::GetServer()->GetPrefs()->GetMaxRetransmitDelayInMsec();
UInt32 theSendInterval = QTSServerInterface::GetServer()->GetPrefs()->GetSendIntervalInMsec();
//
// We want to avoid waking up to do retransmits, and then going back to sleep for like, 1 msec. So,
// only adjust the time to wake up if the next packet time is greater than the max retransmit delay +
// the standard interval between wakeups.
if (theParams.rtpSendPacketsParams.outNextPacketTime > (theRetransDelayInMsec + theSendInterval))
theParams.rtpSendPacketsParams.outNextPacketTime = theRetransDelayInMsec;
Assert(theParams.rtpSendPacketsParams.outNextPacketTime >= 0);//we'd better not get deleted accidently!
return theParams.rtpSendPacketsParams.outNextPacketTime;
}
相关文章推荐
- DSS Source Code Analyse (09) - RTSPSession::Run
- DSS Source Code Analyse (03) - EventThread::Entry
- DSS Source Code Analyse (07) - TaskThread::Entry
- DSS Source Code Analyse (08) - EventContext::RequestEvent
- DSS Source Code Analyse (25) - OSQueue
- DSS Source Code Analyse (26) - OSRef
- DSS Source Code Analyse (11) - TimeoutTask
- DSS Source Code Analyse (27) - OSBufferPool
- DSS Source Code Analyse (01) - StartServer
- DSS Source Code Analyse (12) - IdleTask and IdleTaskThread
- DSS Source Code Analyse (13) - Schedule of TCPListenerSocket object
- DSS Source Code Analyse (14) - TimeoutEvent Process of Task
- DSS Source Code Analyse (04) - TCPListenerSocket::ProcessEvent
- DSS Source Code Analyse (15) - KillEvent Process of Task
- DSS Source Code Analyse (05) - EventContext::ProcessEvent
- DSS Source Code Analyse (16) - Data Flow of classical VOD
- DSS Source Code Analyse (17) - Data Flow of classical Live
- DSS Source Code Analyse (18) - Data Flow of classical Relay
- DSS Source Code Analyse (19) - Authentication
- DSS Source Code Analyse (20) - HttpProxy