您的位置:首页 > 其它

live555从RTSP服务器读取数据到使用接收到的数据流程分析

2015-04-08 11:13 337 查看
本文在linux环境下编译live555工程,并用cgdb调试工具对live555工程中的testProgs目录下的openRTSP的执行过程进行了跟踪分析,直到将从socket端读取视频数据并保存为对应的视频和音频数据为止。

进入testProgs目录,执行./openRTSP rtsp://xxxx/test.mp4

对于RTSP协议的处理部分,可设置断点在setupStreams函数中,并跟踪即可进行分析。

这里主要分析进入如下的while(1)循环中的代码

[cpp] view
plaincopy

void BasicTaskScheduler0::doEventLoop(char* watchVariable)

{

// Repeatedly loop, handling readble sockets and timed events:

while (1)

{

if (watchVariable != NULL && *watchVariable != 0) break;

SingleStep();

}

}

从这里可知,live555在客户端处理数据实际上是单线程的程序,不断执行SingleStep()函数中的代码。通过查看该函数代码里,下面一句代码为重点

[cpp] view
plaincopy

(*handler->handlerProc)(handler->clientData, resultConditionSet);

其中该条代码出现了两次,通过调试跟踪它的执行轨迹,第一次出现调用的函数是为了处理和RTSP服务器的通信协议的商定,而第二次出现调用的函数才是处理真正的视频和音频数据。对于RTSP通信协议的分析我们暂且不讨论,而直接进入第二次调用该函数的部分。

在我们的调试过程中在执行到上面的函数时就直接调用到livemedia目录下的如下函数

[cpp] view
plaincopy

void MultiFramedRTPSource::networkReadHandler(MultiFramedRTPSource* source, int /*mask*/)

{

source->networkReadHandler1();

}

//下面这个函数实现的主要功能就是从socket端读取数据并存储数据

[cpp] view
plaincopy

void MultiFramedRTPSource::networkReadHandler1()

{

BufferedPacket* bPacket = fPacketReadInProgress;

if (bPacket == NULL)

{

// Normal case: Get a free BufferedPacket descriptor to hold the new network packet:

//分配一块新的存储空间来存储从socket端读取的数据

bPacket = fReorderingBuffer->getFreePacket(this);

}

// Read the network packet, and perform sanity checks on the RTP header:

Boolean readSuccess = False;

do

{

Boolean packetReadWasIncomplete = fPacketReadInProgress != NULL;

//fillInData()函数封装了从socket端获取数据的过程,到此函数执行完已经将数据保存到了bPacket对象中

if (!bPacket->fillInData(fRTPInterface, packetReadWasIncomplete))

{

if (bPacket->bytesAvailable() == 0)

{

envir() << "MultiFramedRTPSource error: Hit limit when reading incoming packet over TCP. Increase \"MAX_PACKET_SIZE\"\n";

}

break;

}

if (packetReadWasIncomplete)

{

// We need additional read(s) before we can process the incoming packet:

fPacketReadInProgress = bPacket;

return;

} else

{

fPacketReadInProgress = NULL;

}

//省略关于RTP包的处理

...

...

...

//fReorderingBuffer为MultiFramedRTPSource类中的对象,该对象建立了一个存储Packet数据包对象的链表

//下面的storePacket()函数即将上面获取的数据包存储在链表中

if (!fReorderingBuffer->storePacket(bPacket)) break;

readSuccess = True;

} while (0);

if (!readSuccess) fReorderingBuffer->freePacket(bPacket);

doGetNextFrame1();

// If we didn't get proper data this time, we'll get another chance

}

//下面的这个函数则实现从上面函数中介绍的存储数据包链表的对象(即fReorderingBuffer)中取出数据包并调用相应函数使用它

//代码1.1

[cpp] view
plaincopy

void MultiFramedRTPSource::doGetNextFrame1()

{

while (fNeedDelivery)

{

// If we already have packet data available, then deliver it now.

Boolean packetLossPrecededThis;

//从fReorderingBuffer对象中取出一个数据包

BufferedPacket* nextPacket

= fReorderingBuffer->getNextCompletedPacket(packetLossPrecededThis);

if (nextPacket == NULL) break;

fNeedDelivery = False;

if (nextPacket->useCount() == 0)

{

// Before using the packet, check whether it has a special header

// that needs to be processed:

unsigned specialHeaderSize;

if (!processSpecialHeader(nextPacket, specialHeaderSize))

{

// Something's wrong with the header; reject the packet:

fReorderingBuffer->releaseUsedPacket(nextPacket);

fNeedDelivery = True;

break;

}

nextPacket->skip(specialHeaderSize);

}

// Check whether we're part of a multi-packet frame, and whether

// there was packet loss that would render this packet unusable:

if (fCurrentPacketBeginsFrame)

{

if (packetLossPrecededThis || fPacketLossInFragmentedFrame)

{

// We didn't get all of the previous frame.

// Forget any data that we used from it:

fTo = fSavedTo; fMaxSize = fSavedMaxSize;

fFrameSize = 0;

}

fPacketLossInFragmentedFrame = False;

} else if (packetLossPrecededThis)

{

// We're in a multi-packet frame, with preceding packet loss

fPacketLossInFragmentedFrame = True;

}

if (fPacketLossInFragmentedFrame)

{

// This packet is unusable; reject it:

fReorderingBuffer->releaseUsedPacket(nextPacket);

fNeedDelivery = True;

break;

}

// The packet is usable. Deliver all or part of it to our caller:

unsigned frameSize;

//将上面取出的数据包拷贝到fTo指针所指向的地址

nextPacket->use(fTo, fMaxSize, frameSize, fNumTruncatedBytes,

fCurPacketRTPSeqNum, fCurPacketRTPTimestamp,

fPresentationTime, fCurPacketHasBeenSynchronizedUsingRTCP,

fCurPacketMarkerBit);

fFrameSize += frameSize;

if (!nextPacket->hasUsableData())

{

// We're completely done with this packet now

fReorderingBuffer->releaseUsedPacket(nextPacket);

}

if (fCurrentPacketCompletesFrame) //如果完整的取出了一帧数据,则可调用需要该帧数据的函数去处理它

{

// We have all the data that the client wants.

if (fNumTruncatedBytes > 0)

{

envir() << "MultiFramedRTPSource::doGetNextFrame1(): The total received frame size exceeds the client's buffer size ("

<< fSavedMaxSize << "). "

<< fNumTruncatedBytes << " bytes of trailing data will be dropped!\n";

}

// Call our own 'after getting' function, so that the downstream object can consume the data:

if (fReorderingBuffer->isEmpty())

{

// Common case optimization: There are no more queued incoming packets, so this code will not get

// executed again without having first returned to the event loop. Call our 'after getting' function

// directly, because there's no risk of a long chain of recursion (and thus stack overflow):

afterGetting(this); //调用函数去处理取出的数据帧

} else

{

// Special case: Call our 'after getting' function via the event loop.

nextTask() = envir().taskScheduler().scheduleDelayedTask(0,

(TaskFunc*)FramedSource::afterGetting, this);

}

}

else

{

// This packet contained fragmented data, and does not complete

// the data that the client wants. Keep getting data:

fTo += frameSize; fMaxSize -= frameSize;

fNeedDelivery = True;

}

}

}

//下面这个函数即开始调用执行需要该帧数据的函数

[cpp] view
plaincopy

void FramedSource::afterGetting(FramedSource* source)

{

source->fIsCurrentlyAwaitingData = False;

// indicates that we can be read again

// Note that this needs to be done here, in case the "fAfterFunc"

// called below tries to read another frame (which it usually will)

if (source->fAfterGettingFunc != NULL)

[cpp] view
plaincopy

{

(*(source->fAfterGettingFunc))(source->fAfterGettingClientData,

source->fFrameSize, source->fNumTruncatedBytes,

source->fPresentationTime,

source->fDurationInMicroseconds);

}

}

上面的fAfterGettingFunc为我们自己注册的函数,如果运行的是testProgs中的openRTSP实例,则该函数指向下列代码中通过调用getNextFrame()注册的afterGettingFrame()函数

[cpp] view
plaincopy

Boolean FileSink::continuePlaying()

{

if (fSource == NULL) return False;

fSource->getNextFrame(fBuffer, fBufferSize,

afterGettingFrame, this,

onSourceClosure, this);

return True;

}

如果运行的是testProgs中的testRTSPClient中的实例,则该函数指向这里注册的afterGettingFrame()函数

[cpp] view
plaincopy

Boolean DummySink::continuePlaying()

{

if (fSource == NULL) return False; // sanity check (should not happen)

// Request the next frame of data from our input source. "afterGettingFrame()" will get called later, when it arrives:

fSource->getNextFrame(fReceiveBuffer, DUMMY_SINK_RECEIVE_BUFFER_SIZE,

afterGettingFrame, this,

onSourceClosure, this);

return True;

}

从上面的代码中可以看到getNextFrame()函数的第一个参数为分别在各自类中定义的buffer,我们继续以openRTSP为运行程序来分析,fBuffer为FileSink类里定义的指针:unsigned char* fBuffer;

这里我们先绕一个弯,看看getNextFrame()函数里做了什么

[cpp] view
plaincopy

void FramedSource::getNextFrame(unsigned char* to, unsigned maxSize,

afterGettingFunc* afterGettingFunc,

void* afterGettingClientData,

onCloseFunc* onCloseFunc,

void* onCloseClientData)

{

// Make sure we're not already being read:

if (fIsCurrentlyAwaitingData)

{

envir() << "FramedSource[" << this << "]::getNextFrame(): attempting to read more than once at the same time!\n";

envir().internalError();

}

fTo = to;

fMaxSize = maxSize;

fNumTruncatedBytes = 0; // by default; could be changed by doGetNextFrame()

fDurationInMicroseconds = 0; // by default; could be changed by doGetNextFrame()

fAfterGettingFunc = afterGettingFunc;

fAfterGettingClientData = afterGettingClientData;

fOnCloseFunc = onCloseFunc;

fOnCloseClientData = onCloseClientData;

fIsCurrentlyAwaitingData = True;

doGetNextFrame();

}

从代码可以知道上面getNextFrame()中传入的第一个参数fBuffer指向了指针fTo,而我们在前面分析代码1.1中的void MultiFramedRTPSource::doGetNextFrame1()函数中有下面一段代码:

[cpp] view
plaincopy

//将上面取出的数据包拷贝到fTo指针所指向的地址

nextPacket->use(fTo, fMaxSize, frameSize, fNumTruncatedBytes,

fCurPacketRTPSeqNum, fCurPacketRTPTimestamp,

fPresentationTime, fCurPacketHasBeenSynchronizedUsingRTCP,

fCurPacketMarkerBit);

实际上现在应该明白了,从getNextFrame()函数中传入的第一个参数fBuffer最终存储的即是从数据包链表对象中取出的数据,并且在调用上面的use()函数后就可以使用了。

而在void MultiFramedRTPSource::doGetNextFrame1()函数中代码显示的最终调用我们注册的void FileSink::afterGettingFrame()正好是在use()函数调用之后的afterGetting(this)中调用。我们再看看afterGettingFrame()做了什么处理:

[cpp] view
plaincopy

void FileSink::afterGettingFrame(void* clientData, unsigned frameSize,

unsigned numTruncatedBytes,

struct timeval presentationTime,

unsigned /*durationInMicroseconds*/)

{

FileSink* sink = (FileSink*)clientData;

sink->afterGettingFrame(frameSize, numTruncatedBytes, presentationTime);

}

void FileSink::afterGettingFrame(unsigned frameSize,

unsigned numTruncatedBytes,

struct timeval presentationTime)

{

if (numTruncatedBytes > 0)

{

envir() << "FileSink::afterGettingFrame(): The input frame data was too large for our buffer size ("

<< fBufferSize << "). "

<< numTruncatedBytes << " bytes of trailing data was dropped! Correct this by increasing the \"bufferSize\" parameter in the \"createNew()\" call to at least "

<< fBufferSize + numTruncatedBytes << "\n";

}

addData(fBuffer, frameSize, presentationTime);

if (fOutFid == NULL || fflush(fOutFid) == EOF)

{

// The output file has closed. Handle this the same way as if the

// input source had closed:

onSourceClosure(this);

stopPlaying();

return;

}

if (fPerFrameFileNameBuffer != NULL)

{

if (fOutFid != NULL) { fclose(fOutFid); fOutFid = NULL; }

}

// Then try getting the next frame:

continuePlaying();

}

从上面代码可以看到调用了addData()函数将数据保存到文件中,然后继续continuePlaying()又去获取下一帧数据然后处理,直到遇到循环结束然后依次退出调用函数。最后看看addData()函数的实现即可知:

[cpp] view
plaincopy

void FileSink::addData(unsigned char const* data, unsigned dataSize,

struct timeval presentationTime)

{

if (fPerFrameFileNameBuffer != NULL)

{

// Special case: Open a new file on-the-fly for this frame

sprintf(fPerFrameFileNameBuffer, "%s-%lu.%06lu", fPerFrameFileNamePrefix,

presentationTime.tv_sec, presentationTime.tv_usec);

fOutFid = OpenOutputFile(envir(), fPerFrameFileNameBuffer);

}

// Write to our file:

#ifdef TEST_LOSS

static unsigned const framesPerPacket = 10;

static unsigned const frameCount = 0;

static Boolean const packetIsLost;

if ((frameCount++)%framesPerPacket == 0)

{

packetIsLost = (our_random()%10 == 0); // simulate 10% packet loss #####

}

if (!packetIsLost)

#endif

if (fOutFid != NULL && data != NULL)

{

fwrite(data, 1, dataSize, fOutFid);

}

}

最后调用系统函数fwrite()实现写入文件功能。

总结:从上面的分析可知,如果要取得从RTSP服务器端接收并保存的数据帧,我们只需要定义一个类并实现如下格式两个的函数,并声明一个指针地址buffer用于指向数据帧,再在continuePlaying()函数中调用getNextFrame(buffer,...)即可。

[cpp] view
plaincopy

typedef void (afterGettingFunc)(void* clientData, unsigned frameSize,

unsigned numTruncatedBytes,

struct timeval presentationTime,

unsigned durationInMicroseconds);

typedef void (onCloseFunc)(void* clientData);

然后再在afterGettingFunc的函数中即可使用buffer。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐