live555源码分析---- DESCRIBE命令处理
2017-04-15 10:49
375 查看
转自http://blog.csdn.net/gavinr/article/details/7035799
这里主要分析一下,live555中关于RTP打包发送的部分。在处理完PLAY命令之后,就开始发送RTP数据包了(其实在发送PLAY命令的response包之前,就会发送一个RTP包,这里传输就已经开始了)
RTP包的发送是从MediaSink::startPlaying函数调用开始的
[cpp]
view plain
copy
print?
Boolean MediaSink::startPlaying(MediaSource& source,
afterPlayingFunc* afterFunc,
void* afterClientData) {
// Make sure we're not already being played:
if (fSource != NULL) {
envir().setResultMsg("This sink is already being played");
return False;
}
// Make sure our source is compatible:
if (!sourceIsCompatibleWithUs(source)) {
envir().setResultMsg("MediaSink::startPlaying(): source is not compatible!");
return False;
}
fSource = (FramedSource*)&source;
fAfterFunc = afterFunc;
fAfterClientData = afterClientData;
return continuePlaying(); //重要的函数在这里
}
这个函数只有最后一句最重要,即continuePlaying函数的调用。continuePlaying函数是定义在MediaSink类中的纯虚函数,需要到特定媒体的sink子类中实现,对于H264来讲是在H264VideoRTPSink中实现的。
H264VideoRTPSink继承关系:H264VideoRTPSink->VideoRTPSink->MultiFramedRTPSink->RTPSink->MediaSink。
[cpp]
view plain
copy
print?
Boolean H264VideoRTPSink::continuePlaying() {
// First, check whether we have a 'fragmenter' class set up yet.
// If not, create it now:
if (fOurFragmenter == NULL) {
//创建一个辅助类H264FUAFragmenter,用于H264的RTP打包
fOurFragmenter = new H264FUAFragmenter(envir(), fSource, OutPacketBuffer::maxSize,
ourMaxPacketSize() - 12/*RTP hdr size*/);
fSource = fOurFragmenter;
}
// Then call the parent class's implementation:
return MultiFramedRTPSink::continuePlaying();
}
上面的代码中创建了一个辅助类H264FUAFragmenter,因为H264的RTP包,有些特殊需要进一步处理,可以参考RFC3986。接着调用MultiFramedRTPSink类的continuePlaying实现
[cpp]
view plain
copy
print?
Boolean MultiFramedRTPSink::continuePlaying() {
// Send the first packet.
// (This will also schedule any future sends.)
buildAndSendPacket(True);
return True;
}
这时调用buildAndSendPacket函数时,看名字就知道其中将完成打包并发送工作。传递了一个True参数,表示这是第一个packet。继续看buildAndSendPacket函数定义
void MultiFramedRTPSink::buildAndSendPacket(Boolean isFirstPacket) {
fIsFirstPacket = isFirstPacket;
//
//设置RTP头,注意,接收端需要根据RTP包的序号fSeqNo来重新排序
//
// Set up the RTP header:
unsigned rtpHdr = 0x80000000; // RTP version 2; marker ('M') bit not set (by default; it can be set later) 4个字节
rtpHdr |= (fRTPPayloadType<<16); //RTP的有效 载荷的类型
rtpHdr |= fSeqNo; // sequence number 序列号
fOutBuf->enqueueWord(rtpHdr);
//保留一个4 bytes空间,用于设置time stamp
// Note where the RTP timestamp will go.
// (We can't fill this in until we start packing payload frames.)
fTimestampPosition = fOutBuf->curPacketSize();
fOutBuf->skipBytes(4); // leave a hole for the timestamp
fOutBuf->enqueueWord(SSRC()); //跟RTCP相关,作用暂不清楚
//在RTP头后面,添加一个payload-format-specific头,
// Allow for a special, payload-format-specific header following the
// RTP header:
fSpecialHeaderPosition = fOutBuf->curPacketSize();
//
//specialHeaderSize在MultiFramedRTPSink中的默认实现返回0,对于H264的实现不需要处理这个字段
//
fSpecialHeaderSize = specialHeaderSize();
fOutBuf->skipBytes(fSpecialHeaderSize); //预留空间
//填充尽可能多的frames到packet中
// Begin packing as many (complete) frames into the packet as we can:
fTotalFrameSpecificHeaderSizes = 0;
fNoFramesLeft = False;
fNumFramesUsedSoFar = 0;
packFrame();
}
buildAndSendPacket函数中,完成RTP头的准备工作。可以看到RTP头是非常简单的,RTP头中的序号非常重要,客户端需要据此进行RTP包的重排序操作。RTP包内容存放在一个OutPacketBuffer类型的fOutBuf成员变量中,OutPacketBuffer类的细节在文章的最后还会讨论。在RTP头中预留了一些空间没有进行实际的填充,这个工作将在doSpecialFrameHandling中进行,后面会有讨论。进一步的工作,在packFrame函数中进行,它将为RTP包填充数据。
[cpp]
view plain
copy
print?
void MultiFramedRTPSink::packFrame() {
// Get the next frame.
//
//首先需要检查buffer中是否还存在溢出的数据(frame)
//
// First, see if we have an overflow frame that was too big for the last pkt
if (fOutBuf->haveOverflowData()) {
// Use this frame before reading a new one from the source
unsigned frameSize = fOutBuf->overflowDataSize();
struct timeval presentationTime = fOutBuf->overflowPresentationTime();
unsigned durationInMicroseconds = fOutBuf->overflowDurationInMicroseconds(); //表示单个视频或者音频帧所占用的时间间隔,
//
//使用溢出的数据作为packet的内容,注意,这里并不一定进行memcopy操作,
//因为可能已经把packet 的开始位置重置到overflow data的位置
//
fOutBuf->useOverflowData();
//
//获取了数据,就可以准备发送了,当然若是数据量太小,将需要获取更多的数据
//
afterGettingFrame1(frameSize, 0, presentationTime, durationInMicroseconds);
} else {
// Normal case: we need to read a new frame from the source
if (fSource == NULL) return;
//这里,给予当前帧预留空间的机会,保存一些特殊信息,当然frameSpecificHeaderSize函数默认返回0
fCurFrameSpecificHeaderPosition = fOutBuf->curPacketSize();
fCurFrameSpecificHeaderSize = frameSpecificHeaderSize();
fOutBuf->skipBytes(fCurFrameSpecificHeaderSize);
fTotalFrameSpecificHeaderSizes += fCurFrameSpecificHeaderSize;
//
//从source中获取数据,然后调用回调函数afterGettingFrame。注意,在C++中类成员函数是不能作为回调用函数的。
//我们可以看到afterGettingFrame中直接调用了afterGettingFrame1函数,与上面的第一次情况处理类似了。不过这里为什么要用回调函数回?
//
fSource->getNextFrame(fOutBuf->curPtr(), fOutBuf->totalBytesAvailable(),
afterGettingFrame, this, ourHandleClosure, this);
}
}
packFrame函数需要处理两种情况:
1).buffer中存在未发送的数据(overflow data),这时可以将调用afterGettingFrame1函数进行后续处理工作。
2).buffer不存在数据,这时需要调用source上的getNextFrame函数获取数据。getNextFrame调用时,参数中有两个回调用函数:afterGettingFrame函数将在获取到数据后调用,其中只是简单的调用了afterGettingFrame1函数而已,这是因为C++中是不充许类成员函数作为回调用函数的;ourHandleClosure函数将在数据已经处理完毕时调用,如文件结束。
getNextFrame函数的实现,这里暂不讨论。来看afterGettingFrame1函数的实现。
[cpp]
view plain
copy
print?
void MultiFramedRTPSink
::afterGettingFrame1(unsigned frameSize, unsigned numTruncatedBytes,
struct timeval presentationTime,
unsigned durationInMicroseconds) {
if (fIsFirstPacket) {
//第一个packet,则记录下当前时间
// Record the fact that we're starting to play now:
gettimeofday(&fNextSendTime, NULL);
}
//
//这里的处理要注意了,当一个Frame大于OutPacketBuffer::maxSize(默认值为60000)时,则会丢弃剩下的部分,numTruncatedBytes即为超出部分的大小。
//
if (numTruncatedBytes > 0) {
unsigned const bufferSize = fOutBuf->totalBytesAvailable();
envir() << "MultiFramedRTPSink::afterGettingFrame1(): The input frame data was too large for our buffer size ("
<< bufferSize << "). "
<< numTruncatedBytes << " bytes of trailing data was dropped! Correct this by increasing \"OutPacketBuffer::maxSize\" to at least "
<< OutPacketBuffer::maxSize + numTruncatedBytes << ", *before* creating this 'RTPSink'. (Current value is "
<< OutPacketBuffer::maxSize << ".)\n";
}
unsigned curFragmentationOffset = fCurFragmentationOffset;
unsigned numFrameBytesToUse = frameSize;
unsigned overflowBytes = 0;
// If we have already packed one or more frames into this packet,
// check whether this new frame is eligible to be packed after them.
// (This is independent of whether the packet has enough room for this
// new frame; that check comes later.)
//
//fNumFramesUsedSoFar>0 表示packet已经存在frame,需要检查是否充许在packet中加入新的frame
//
if (fNumFramesUsedSoFar > 0) {
if ((fPreviousFrameEndedFragmentation
&& !allowOtherFramesAfterLastFragment()) //不充许在前一个分片之后,跟随一个frame
|| !frameCanAppearAfterPacketStart(fOutBuf->curPtr(), frameSize)) { //frame不能出现在非packet的开始位置
// Save away this frame for next time:
numFrameBytesToUse = 0;
//
//不充许添加新的frame,则保存为溢出数据,下次处理
//
fOutBuf->setOverflowData(fOutBuf->curPacketSize(), frameSize,
presentationTime, durationInMicroseconds);
}
}
fPreviousFrameEndedFragmentation = False;
if (numFrameBytesToUse > 0) {
// Check whether this frame overflows the packet
if (fOutBuf->wouldOverflow(frameSize)) {
//
//若frame将导致packet溢出,应该将其保存到packet的溢出数据中,在下一个packet中发送。
//如果frame本身大于pakcet 的max size, 就需要对frame进行分片操作。不过需要调用allowFragmentationAfterStart
//函数以确定是否充许分片,例如对于H264而言,
//
// Don't use this frame now; instead, save it as overflow data, and
// send it in the next packet instead. However, if the frame is too
// big to fit in a packet by itself, then we need to fragment it (and
// use some of it in this packet, if the payload format permits this.)
if (isTooBigForAPacket(frameSize)
&& (fNumFramesUsedSoFar == 0 || allowFragmentationAfterStart())) {
// We need to fragment this frame, and use some of it now:
overflowBytes = computeOverflowForNewFrame(frameSize);
numFrameBytesToUse -= overflowBytes;
fCurFragmentationOffset += numFrameBytesToUse;
} else {
// We don't use any of this frame now:
overflowBytes = frameSize;
numFrameBytesToUse = 0;
}
fOutBuf->setOverflowData(fOutBuf->curPacketSize() + numFrameBytesToUse,
overflowBytes, presentationTime, durationInMicroseconds);
} else if (fCurFragmentationOffset > 0) {
// This is the last fragment of a frame that was fragmented over
// more than one packet. Do any special handling for this case:
fCurFragmentationOffset = 0;
fPreviousFrameEndedFragmentation = True;
}
}
if (numFrameBytesToUse == 0 && frameSize > 0) {
// Send our packet now, because we have filled it up:
sendPacketIfNecessary(); //发送RTP包
} else {
// Use this frame in our outgoing packet:
unsigned char* frameStart = fOutBuf->curPtr();
fOutBuf->increment(numFrameBytesToUse);
// do this now, in case "doSpecialFrameHandling()" calls "setFramePadding()" to append padding bytes
//
//还记得RTP头中序留的空间吗,将在这个函数中进行填充
//
// Here's where any payload format specific processing gets done:
doSpecialFrameHandling(curFragmentationOffset, frameStart,
numFrameBytesToUse, presentationTime,
overflowBytes);
++fNumFramesUsedSoFar;
//
//设置下一个packet的时间信息,这里若存在overflow数据,就不需要更新时间,因为这是同一个frame的不同分片,需要保证时间一致
//
// Update the time at which the next packet should be sent, based
// on the duration of the frame that we just packed into it.
// However, if this frame has overflow data remaining, then don't
// count its duration yet.
if (overflowBytes == 0) {
fNextSendTime.tv_usec += durationInMicroseconds;
fNextSendTime.tv_sec += fNextSendTime.tv_usec/1000000;
fNextSendTime.tv_usec %= 1000000;
}
// Send our packet now if (i) it's already at our preferred size, or
// (ii) (heuristic) another frame of the same size as the one we just
// read would overflow the packet, or
// (iii) it contains the last fragment of a fragmented frame, and we
// don't allow anything else to follow this or
// (iv) one frame per packet is allowed:
if (fOutBuf->isPreferredSize()
|| fOutBuf->wouldOverflow(numFrameBytesToUse)
|| (fPreviousFrameEndedFragmentation &&
!allowOtherFramesAfterLastFragment())
|| !frameCanAppearAfterPacketStart(fOutBuf->curPtr() - frameSize,
frameSize) ) {
// The packet is ready to be sent now
sendPacketIfNecessary(); //发送RTP包
} else {
// There's room for more frames; try getting another:
packFrame(); //packet中还可以容纳frame,这里将形成递归调用
}
}
}
afterGettingFrame1的复杂之处在于处理frame的分片,若一个frame大于TCP/UDP有效载荷(程序中定义为1448个字节),就必需分片了。最简单的情况就是一个packet(RTP包)中最多只充许一个frame,即一个RTP包中存在一个frame或者frame的一个分片,H264就是这样处理的。,方法是将剩余的数据记录为buffer的溢出部分。下次调用packFrame函数时,直接从溢出部分复制到packet中。不过应该注意,一个frame的大小不能超过buffer的大小(默认为60000),否则会真的溢出,
那就应该考虑增加buffer大小了。
上面的代码中还调用了doSpecialFrameHandling,子类需要重新实现进行一些特殊处理,文章最后还会讨论这个问题。
在packet中充许出现多个frame的情况下(大多数情况下应该没必要用到),采用了递归来实现,可以看到afterGettingFrame1函数的最后有调用packFrame的代码。
再来看RTP的发送函数sendPacketIfNecessary
[cpp]
view plain
copy
print?
void MultiFramedRTPSink::sendPacketIfNecessary() {
//
//packet中存在frame,则发送出去
//
if (fNumFramesUsedSoFar > 0) {
// Send the packet:
//
//可以通过TEST_LOSS宏,模拟10%丢包
//
#ifdef TEST_LOSS
if ((our_random()%10) != 0) // simulate 10% packet loss #####
#endif
//
//现在通过调用RTPInterface::sendPacket函数发送packet
//
if (!fRTPInterface.sendPacket(fOutBuf->packet(), fOutBuf->curPacketSize())) {
// if failure handler has been specified, call it
if (fOnSendErrorFunc != NULL) (*fOnSendErrorFunc)(fOnSendErrorData); //错误处理
}
++fPacketCount;
fTotalOctetCount += fOutBuf->curPacketSize();
fOctetCount += fOutBuf->curPacketSize()
- rtpHeaderSize - fSpecialHeaderSize - fTotalFrameSpecificHeaderSizes;
++fSeqNo; // for next time
}
if (fOutBuf->haveOverflowData()
&& fOutBuf->totalBytesAvailable() > fOutBuf->totalBufferSize()/2) {
//
//为了提高效率,可以直接重置buffer中的packet开始位置,这样就不需要拷贝一遍overflow数据了。
//在一个packet只能包含一个frame的情况下,是不是可以考虑修改这里的判断条件呢?
//
// Efficiency hack: Reset the packet start pointer to just in front of
// the overflow data (allowing for the RTP header and special headers),
// so that we probably don't have to "memmove()" the overflow data
// into place when building the next packet:
unsigned newPacketStart = fOutBuf->curPacketSize()
- (rtpHeaderSize + fSpecialHeaderSize + frameSpecificHeaderSize());
fOutBuf->adjustPacketStart(newPacketStart); //调整buffer中的packet 开始位置
} else {
// Normal case: Reset the packet start pointer back to the start:
fOutBuf->resetPacketStart(); //这种情况,若存在overflow data,就需要进行copy操作了
}
fOutBuf->resetOffset(); //packet已经发送了,可以重置buffer中的数据offset了
fNumFramesUsedSoFar = 0; //清零packet中的frame数
//
//数据已经发送完毕(例如文件传输完毕),可以关闭了
//
if (fNoFramesLeft) {
// We're done:
onSourceClosure(this);
} else {
//
//准备下一次发送任务
//
// We have more frames left to send. Figure out when the next frame
// is due to start playing, then make sure that we wait this long before
// sending the next packet.
struct timeval timeNow;
gettimeofday(&timeNow, NULL);
int secsDiff = fNextSendTime.tv_sec - timeNow.tv_sec; //若是同一个frame的不同分片,这个值将为0
int64_t uSecondsToGo = secsDiff*1000000 + (fNextSendTime.tv_usec - timeNow.tv_usec);
if (uSecondsToGo < 0 || secsDiff < 0) { // sanity check: Make sure that the time-to-delay is non-negative:
uSecondsToGo = 0;
}
//
//作延时时间,处理函数,将入到任务调试器中,以便进行下一次发送操作
//
// Delay this amount of time:
nextTask() = envir().taskScheduler().scheduleDelayedTask(uSecondsToGo, (TaskFunc*)sendNext, this);
}
}
sendPacketIfNecessary函数处理一些发送的细节,我们来看最重要的两点。
1)RTP包还是转交给了RTPInterface::sendPacket函数,等下再看其具体实现。
2)将下一次RTP的发送操作加入到任务调度器中,参数中传递了sendNext函数指针,其实现比较简单,如下
[cpp]
view plain
copy
print?
void MultiFramedRTPSink::sendNext(void* firstArg) {
MultiFramedRTPSink* sink = (MultiFramedRTPSink*)firstArg;
sink->buildAndSendPacket(False); //现在已经不是第一次调用了
}
sendNext函数中又调用了buildAndSendPacket函数,轮回了。。。现在来看RTPInterface::sendPacket函数
[cpp]
view plain
copy
print?
Boolean RTPInterface::sendPacket(unsigned char* packet, unsigned packetSize) {
Boolean success = True; // we'll return False instead if any of the sends fail
//一般情况下,使用UDP发送
// Normal case: Send as a UDP packet:
if (!fGS->output(envir(), fGS->ttl(), packet, packetSize)) success = False;
//使用TCP发送
// Also, send over each of our TCP sockets:
for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
streams = streams->fNext) {
if (!sendRTPOverTCP(packet, packetSize,
streams->fStreamSocketNum, streams->fStreamChannelId)) {
success = False;
}
}
return success;
}
若是使用UDP方式发送,将调用Groupsock::output函数,可以实现组播功能。groupsock只实现了UDP发送功能,当用TCP方式传送时调用sendRTPOverTcP函数,这个函数中直接调用socket的send函数。
现在RTP的发送终于结束了,groupsock的实现留待下次分析。现在再来看一个遗留的问题,MultiFramedRTPSink::doSpecialFrameHandling的实现。它是定义在MultiFramedRTPSink中的虚函数,先来看其默认的实现
[cpp]
view plain
copy
print?
void MultiFramedRTPSink::doSpecialFrameHandling(unsigned /*fragmentationOffset*/,
unsigned char* /*frameStart*/,
unsigned /*numBytesInFrame*/,
struct timeval framePresentationTime,
unsigned /*numRemainingBytes*/) {
// default implementation: If this is the first frame in the packet,
// use its presentationTime for the RTP timestamp:
if (isFirstFrameInPacket()) {
setTimestamp(framePresentationTime);
}
}
可以看到默认实现中只是在第一次调用时,设置RTP包中的的时间信息,下面来看H264VideoRTPSink上的实现
[cpp]
view plain
copy
print?
void H264VideoRTPSink::doSpecialFrameHandling(unsigned /*fragmentationOffset*/,
unsigned char* /*frameStart*/,
unsigned /*numBytesInFrame*/,
struct timeval framePresentationTime,
unsigned /*numRemainingBytes*/) {
//
//设置RTP头中的M位
//
// Set the RTP 'M' (marker) bit iff
// 1/ The most recently delivered fragment was the end of (or the only fragment of) an NAL unit, and
// 2/ This NAL unit was the last NAL unit of an 'access unit' (i.e. video frame).
if (fOurFragmenter != NULL) {
H264VideoStreamFramer* framerSource
= (H264VideoStreamFramer*)(fOurFragmenter->inputSource());
// This relies on our fragmenter's source being a "H264VideoStreamFramer".
if (fOurFragmenter->lastFragmentCompletedNALUnit()
&& framerSource != NULL && framerSource->pictureEndMarker()) {
setMarkerBit();
framerSource->pictureEndMarker() = False;
}
}
//
//设置时间戳
//
setTimestamp(framePresentationTime);
}
解释一下RTP头中的M位,H264图像可能被封装成多个NALU,这些NALU就拥有相同的同时间。根据RFC3984的定义,当RTP中封装的是同一图像的最后一个NALU时,就需要设置M位
这里主要分析一下,live555中关于RTP打包发送的部分。在处理完PLAY命令之后,就开始发送RTP数据包了(其实在发送PLAY命令的response包之前,就会发送一个RTP包,这里传输就已经开始了)
RTP包的发送是从MediaSink::startPlaying函数调用开始的
[cpp]
view plain
copy
print?
Boolean MediaSink::startPlaying(MediaSource& source,
afterPlayingFunc* afterFunc,
void* afterClientData) {
// Make sure we're not already being played:
if (fSource != NULL) {
envir().setResultMsg("This sink is already being played");
return False;
}
// Make sure our source is compatible:
if (!sourceIsCompatibleWithUs(source)) {
envir().setResultMsg("MediaSink::startPlaying(): source is not compatible!");
return False;
}
fSource = (FramedSource*)&source;
fAfterFunc = afterFunc;
fAfterClientData = afterClientData;
return continuePlaying(); //重要的函数在这里
}
Boolean MediaSink::startPlaying(MediaSource& source, afterPlayingFunc* afterFunc, void* afterClientData) { // Make sure we're not already being played: if (fSource != NULL) { envir().setResultMsg("This sink is already being played"); return False; } // Make sure our source is compatible: if (!sourceIsCompatibleWithUs(source)) { envir().setResultMsg("MediaSink::startPlaying(): source is not compatible!"); return False; } fSource = (FramedSource*)&source; fAfterFunc = afterFunc; fAfterClientData = afterClientData; return continuePlaying(); //重要的函数在这里 }
这个函数只有最后一句最重要,即continuePlaying函数的调用。continuePlaying函数是定义在MediaSink类中的纯虚函数,需要到特定媒体的sink子类中实现,对于H264来讲是在H264VideoRTPSink中实现的。
H264VideoRTPSink继承关系:H264VideoRTPSink->VideoRTPSink->MultiFramedRTPSink->RTPSink->MediaSink。
[cpp]
view plain
copy
print?
Boolean H264VideoRTPSink::continuePlaying() {
// First, check whether we have a 'fragmenter' class set up yet.
// If not, create it now:
if (fOurFragmenter == NULL) {
//创建一个辅助类H264FUAFragmenter,用于H264的RTP打包
fOurFragmenter = new H264FUAFragmenter(envir(), fSource, OutPacketBuffer::maxSize,
ourMaxPacketSize() - 12/*RTP hdr size*/);
fSource = fOurFragmenter;
}
// Then call the parent class's implementation:
return MultiFramedRTPSink::continuePlaying();
}
Boolean H264VideoRTPSink::continuePlaying() { // First, check whether we have a 'fragmenter' class set up yet. // If not, create it now: if (fOurFragmenter == NULL) { //创建一个辅助类H264FUAFragmenter,用于H264的RTP打包 fOurFragmenter = new H264FUAFragmenter(envir(), fSource, OutPacketBuffer::maxSize, ourMaxPacketSize() - 12/*RTP hdr size*/); fSource = fOurFragmenter; } // Then call the parent class's implementation: return MultiFramedRTPSink::continuePlaying(); }
上面的代码中创建了一个辅助类H264FUAFragmenter,因为H264的RTP包,有些特殊需要进一步处理,可以参考RFC3986。接着调用MultiFramedRTPSink类的continuePlaying实现
[cpp]
view plain
copy
print?
Boolean MultiFramedRTPSink::continuePlaying() {
// Send the first packet.
// (This will also schedule any future sends.)
buildAndSendPacket(True);
return True;
}
这时调用buildAndSendPacket函数时,看名字就知道其中将完成打包并发送工作。传递了一个True参数,表示这是第一个packet。继续看buildAndSendPacket函数定义
void MultiFramedRTPSink::buildAndSendPacket(Boolean isFirstPacket) {
fIsFirstPacket = isFirstPacket;
//
//设置RTP头,注意,接收端需要根据RTP包的序号fSeqNo来重新排序
//
// Set up the RTP header:
unsigned rtpHdr = 0x80000000; // RTP version 2; marker ('M') bit not set (by default; it can be set later) 4个字节
rtpHdr |= (fRTPPayloadType<<16); //RTP的有效 载荷的类型
rtpHdr |= fSeqNo; // sequence number 序列号
fOutBuf->enqueueWord(rtpHdr);
//保留一个4 bytes空间,用于设置time stamp
// Note where the RTP timestamp will go.
// (We can't fill this in until we start packing payload frames.)
fTimestampPosition = fOutBuf->curPacketSize();
fOutBuf->skipBytes(4); // leave a hole for the timestamp
fOutBuf->enqueueWord(SSRC()); //跟RTCP相关,作用暂不清楚
//在RTP头后面,添加一个payload-format-specific头,
// Allow for a special, payload-format-specific header following the
// RTP header:
fSpecialHeaderPosition = fOutBuf->curPacketSize();
//
//specialHeaderSize在MultiFramedRTPSink中的默认实现返回0,对于H264的实现不需要处理这个字段
//
fSpecialHeaderSize = specialHeaderSize();
fOutBuf->skipBytes(fSpecialHeaderSize); //预留空间
//填充尽可能多的frames到packet中
// Begin packing as many (complete) frames into the packet as we can:
fTotalFrameSpecificHeaderSizes = 0;
fNoFramesLeft = False;
fNumFramesUsedSoFar = 0;
packFrame();
}
Boolean MultiFramedRTPSink::continuePlaying() { // Send the first packet. // (This will also schedule any future sends.) buildAndSendPacket(True); return True; } 这时调用buildAndSendPacket函数时,看名字就知道其中将完成打包并发送工作。传递了一个True参数,表示这是第一个packet。继续看buildAndSendPacket函数定义 void MultiFramedRTPSink::buildAndSendPacket(Boolean isFirstPacket) { fIsFirstPacket = isFirstPacket; // //设置RTP头,注意,接收端需要根据RTP包的序号fSeqNo来重新排序 // // Set up the RTP header: unsigned rtpHdr = 0x80000000; // RTP version 2; marker ('M') bit not set (by default; it can be set later) rtpHdr |= (fRTPPayloadType<<16); rtpHdr |= fSeqNo; // sequence number fOutBuf->enqueueWord(rtpHdr); //保留一个4 bytes空间,用于设置time stamp // Note where the RTP timestamp will go. // (We can't fill this in until we start packing payload frames.) fTimestampPosition = fOutBuf->curPacketSize(); fOutBuf->skipBytes(4); // leave a hole for the timestamp fOutBuf->enqueueWord(SSRC()); //跟RTCP相关,作用暂不清楚 //在RTP头后面,添加一个payload-format-specific头, // Allow for a special, payload-format-specific header following the // RTP header: fSpecialHeaderPosition = fOutBuf->curPacketSize(); // //specialHeaderSize在MultiFramedRTPSink中的默认实现返回0,对于H264的实现不需要处理这个字段 // fSpecialHeaderSize = specialHeaderSize(); fOutBuf->skipBytes(fSpecialHeaderSize); //预留空间 //填充尽可能多的frames到packet中 // Begin packing as many (complete) frames into the packet as we can: fTotalFrameSpecificHeaderSizes = 0; fNoFramesLeft = False; fNumFramesUsedSoFar = 0; packFrame(); }
buildAndSendPacket函数中,完成RTP头的准备工作。可以看到RTP头是非常简单的,RTP头中的序号非常重要,客户端需要据此进行RTP包的重排序操作。RTP包内容存放在一个OutPacketBuffer类型的fOutBuf成员变量中,OutPacketBuffer类的细节在文章的最后还会讨论。在RTP头中预留了一些空间没有进行实际的填充,这个工作将在doSpecialFrameHandling中进行,后面会有讨论。进一步的工作,在packFrame函数中进行,它将为RTP包填充数据。
[cpp]
view plain
copy
print?
void MultiFramedRTPSink::packFrame() {
// Get the next frame.
//
//首先需要检查buffer中是否还存在溢出的数据(frame)
//
// First, see if we have an overflow frame that was too big for the last pkt
if (fOutBuf->haveOverflowData()) {
// Use this frame before reading a new one from the source
unsigned frameSize = fOutBuf->overflowDataSize();
struct timeval presentationTime = fOutBuf->overflowPresentationTime();
unsigned durationInMicroseconds = fOutBuf->overflowDurationInMicroseconds(); //表示单个视频或者音频帧所占用的时间间隔,
//
//使用溢出的数据作为packet的内容,注意,这里并不一定进行memcopy操作,
//因为可能已经把packet 的开始位置重置到overflow data的位置
//
fOutBuf->useOverflowData();
//
//获取了数据,就可以准备发送了,当然若是数据量太小,将需要获取更多的数据
//
afterGettingFrame1(frameSize, 0, presentationTime, durationInMicroseconds);
} else {
// Normal case: we need to read a new frame from the source
if (fSource == NULL) return;
//这里,给予当前帧预留空间的机会,保存一些特殊信息,当然frameSpecificHeaderSize函数默认返回0
fCurFrameSpecificHeaderPosition = fOutBuf->curPacketSize();
fCurFrameSpecificHeaderSize = frameSpecificHeaderSize();
fOutBuf->skipBytes(fCurFrameSpecificHeaderSize);
fTotalFrameSpecificHeaderSizes += fCurFrameSpecificHeaderSize;
//
//从source中获取数据,然后调用回调函数afterGettingFrame。注意,在C++中类成员函数是不能作为回调用函数的。
//我们可以看到afterGettingFrame中直接调用了afterGettingFrame1函数,与上面的第一次情况处理类似了。不过这里为什么要用回调函数回?
//
fSource->getNextFrame(fOutBuf->curPtr(), fOutBuf->totalBytesAvailable(),
afterGettingFrame, this, ourHandleClosure, this);
}
}
void MultiFramedRTPSink::packFrame() { // Get the next frame. // //首先需要检查buffer中是否还存在溢出的数据(frame) // // First, see if we have an overflow frame that was too big for the last pkt if (fOutBuf->haveOverflowData()) { // Use this frame before reading a new one from the source unsigned frameSize = fOutBuf->overflowDataSize(); struct timeval presentationTime = fOutBuf->overflowPresentationTime(); unsigned durationInMicroseconds = fOutBuf->overflowDurationInMicroseconds(); // //使用溢出的数据作为packet的内容,注意,这里并不一定进行memcopy操作, //因为可能已经把packet 的开始位置重置到overflow data的位置 // fOutBuf->useOverflowData(); // //获取了数据,就可以准备发送了,当然若是数据量太小,将需要获取更多的数据 // afterGettingFrame1(frameSize, 0, presentationTime, durationInMicroseconds); } else { // Normal case: we need to read a new frame from the source if (fSource == NULL) return; //这里,给予当前帧预留空间的机会,保存一些特殊信息,当然frameSpecificHeaderSize函数默认返回0 fCurFrameSpecificHeaderPosition = fOutBuf->curPacketSize(); fCurFrameSpecificHeaderSize = frameSpecificHeaderSize(); fOutBuf->skipBytes(fCurFrameSpecificHeaderSize); fTotalFrameSpecificHeaderSizes += fCurFrameSpecificHeaderSize; // //从source中获取数据,然后调用回调函数afterGettingFrame。注意,在C++中类成员函数是不能作为回调用函数的。 //我们可以看到afterGettingFrame中直接调用了afterGettingFrame1函数,与上面的第一次情况处理类似了。不过这里为什么要用回调函数回? // fSource->getNextFrame(fOutBuf->curPtr(), fOutBuf->totalBytesAvailable(), afterGettingFrame, this, ourHandleClosure, this); } }
packFrame函数需要处理两种情况:
1).buffer中存在未发送的数据(overflow data),这时可以将调用afterGettingFrame1函数进行后续处理工作。
2).buffer不存在数据,这时需要调用source上的getNextFrame函数获取数据。getNextFrame调用时,参数中有两个回调用函数:afterGettingFrame函数将在获取到数据后调用,其中只是简单的调用了afterGettingFrame1函数而已,这是因为C++中是不充许类成员函数作为回调用函数的;ourHandleClosure函数将在数据已经处理完毕时调用,如文件结束。
getNextFrame函数的实现,这里暂不讨论。来看afterGettingFrame1函数的实现。
[cpp]
view plain
copy
print?
void MultiFramedRTPSink
::afterGettingFrame1(unsigned frameSize, unsigned numTruncatedBytes,
struct timeval presentationTime,
unsigned durationInMicroseconds) {
if (fIsFirstPacket) {
//第一个packet,则记录下当前时间
// Record the fact that we're starting to play now:
gettimeofday(&fNextSendTime, NULL);
}
//
//这里的处理要注意了,当一个Frame大于OutPacketBuffer::maxSize(默认值为60000)时,则会丢弃剩下的部分,numTruncatedBytes即为超出部分的大小。
//
if (numTruncatedBytes > 0) {
unsigned const bufferSize = fOutBuf->totalBytesAvailable();
envir() << "MultiFramedRTPSink::afterGettingFrame1(): The input frame data was too large for our buffer size ("
<< bufferSize << "). "
<< numTruncatedBytes << " bytes of trailing data was dropped! Correct this by increasing \"OutPacketBuffer::maxSize\" to at least "
<< OutPacketBuffer::maxSize + numTruncatedBytes << ", *before* creating this 'RTPSink'. (Current value is "
<< OutPacketBuffer::maxSize << ".)\n";
}
unsigned curFragmentationOffset = fCurFragmentationOffset;
unsigned numFrameBytesToUse = frameSize;
unsigned overflowBytes = 0;
// If we have already packed one or more frames into this packet,
// check whether this new frame is eligible to be packed after them.
// (This is independent of whether the packet has enough room for this
// new frame; that check comes later.)
//
//fNumFramesUsedSoFar>0 表示packet已经存在frame,需要检查是否充许在packet中加入新的frame
//
if (fNumFramesUsedSoFar > 0) {
if ((fPreviousFrameEndedFragmentation
&& !allowOtherFramesAfterLastFragment()) //不充许在前一个分片之后,跟随一个frame
|| !frameCanAppearAfterPacketStart(fOutBuf->curPtr(), frameSize)) { //frame不能出现在非packet的开始位置
// Save away this frame for next time:
numFrameBytesToUse = 0;
//
//不充许添加新的frame,则保存为溢出数据,下次处理
//
fOutBuf->setOverflowData(fOutBuf->curPacketSize(), frameSize,
presentationTime, durationInMicroseconds);
}
}
fPreviousFrameEndedFragmentation = False;
if (numFrameBytesToUse > 0) {
// Check whether this frame overflows the packet
if (fOutBuf->wouldOverflow(frameSize)) {
//
//若frame将导致packet溢出,应该将其保存到packet的溢出数据中,在下一个packet中发送。
//如果frame本身大于pakcet 的max size, 就需要对frame进行分片操作。不过需要调用allowFragmentationAfterStart
//函数以确定是否充许分片,例如对于H264而言,
//
// Don't use this frame now; instead, save it as overflow data, and
// send it in the next packet instead. However, if the frame is too
// big to fit in a packet by itself, then we need to fragment it (and
// use some of it in this packet, if the payload format permits this.)
if (isTooBigForAPacket(frameSize)
&& (fNumFramesUsedSoFar == 0 || allowFragmentationAfterStart())) {
// We need to fragment this frame, and use some of it now:
overflowBytes = computeOverflowForNewFrame(frameSize);
numFrameBytesToUse -= overflowBytes;
fCurFragmentationOffset += numFrameBytesToUse;
} else {
// We don't use any of this frame now:
overflowBytes = frameSize;
numFrameBytesToUse = 0;
}
fOutBuf->setOverflowData(fOutBuf->curPacketSize() + numFrameBytesToUse,
overflowBytes, presentationTime, durationInMicroseconds);
} else if (fCurFragmentationOffset > 0) {
// This is the last fragment of a frame that was fragmented over
// more than one packet. Do any special handling for this case:
fCurFragmentationOffset = 0;
fPreviousFrameEndedFragmentation = True;
}
}
if (numFrameBytesToUse == 0 && frameSize > 0) {
// Send our packet now, because we have filled it up:
sendPacketIfNecessary(); //发送RTP包
} else {
// Use this frame in our outgoing packet:
unsigned char* frameStart = fOutBuf->curPtr();
fOutBuf->increment(numFrameBytesToUse);
// do this now, in case "doSpecialFrameHandling()" calls "setFramePadding()" to append padding bytes
//
//还记得RTP头中序留的空间吗,将在这个函数中进行填充
//
// Here's where any payload format specific processing gets done:
doSpecialFrameHandling(curFragmentationOffset, frameStart,
numFrameBytesToUse, presentationTime,
overflowBytes);
++fNumFramesUsedSoFar;
//
//设置下一个packet的时间信息,这里若存在overflow数据,就不需要更新时间,因为这是同一个frame的不同分片,需要保证时间一致
//
// Update the time at which the next packet should be sent, based
// on the duration of the frame that we just packed into it.
// However, if this frame has overflow data remaining, then don't
// count its duration yet.
if (overflowBytes == 0) {
fNextSendTime.tv_usec += durationInMicroseconds;
fNextSendTime.tv_sec += fNextSendTime.tv_usec/1000000;
fNextSendTime.tv_usec %= 1000000;
}
// Send our packet now if (i) it's already at our preferred size, or
// (ii) (heuristic) another frame of the same size as the one we just
// read would overflow the packet, or
// (iii) it contains the last fragment of a fragmented frame, and we
// don't allow anything else to follow this or
// (iv) one frame per packet is allowed:
if (fOutBuf->isPreferredSize()
|| fOutBuf->wouldOverflow(numFrameBytesToUse)
|| (fPreviousFrameEndedFragmentation &&
!allowOtherFramesAfterLastFragment())
|| !frameCanAppearAfterPacketStart(fOutBuf->curPtr() - frameSize,
frameSize) ) {
// The packet is ready to be sent now
sendPacketIfNecessary(); //发送RTP包
} else {
// There's room for more frames; try getting another:
packFrame(); //packet中还可以容纳frame,这里将形成递归调用
}
}
}
void MultiFramedRTPSink ::afterGettingFrame1(unsigned frameSize, unsigned numTruncatedBytes, struct timeval presentationTime, unsigned durationInMicroseconds) { if (fIsFirstPacket) { //第一个packet,则记录下当前时间 // Record the fact that we're starting to play now: gettimeofday(&fNextSendTime, NULL); } // //这里的处理要注意了,当一个Frame大于OutPacketBuffer::maxSize(默认值为60000)时,则会丢弃剩下的部分,numTruncatedBytes即为超出部分的大小。 // if (numTruncatedBytes > 0) { unsigned const bufferSize = fOutBuf->totalBytesAvailable(); envir() << "MultiFramedRTPSink::afterGettingFrame1(): The input frame data was too large for our buffer size (" << bufferSize << "). " << numTruncatedBytes << " bytes of trailing data was dropped! Correct this by increasing \"OutPacketBuffer::maxSize\" to at least " << OutPacketBuffer::maxSize + numTruncatedBytes << ", *before* creating this 'RTPSink'. (Current value is " << OutPacketBuffer::maxSize << ".)\n"; } unsigned curFragmentationOffset = fCurFragmentationOffset; unsigned numFrameBytesToUse = frameSize; unsigned overflowBytes = 0; // If we have already packed one or more frames into this packet, // check whether this new frame is eligible to be packed after them. // (This is independent of whether the packet has enough room for this // new frame; that check comes later.) // //fNumFramesUsedSoFar>0 表示packet已经存在frame,需要检查是否充许在packet中加入新的frame // if (fNumFramesUsedSoFar > 0) { if ((fPreviousFrameEndedFragmentation && !allowOtherFramesAfterLastFragment()) //不充许在前一个分片之后,跟随一个frame || !frameCanAppearAfterPacketStart(fOutBuf->curPtr(), frameSize)) { //frame不能出现在非packet的开始位置 // Save away this frame for next time: numFrameBytesToUse = 0; // //不充许添加新的frame,则保存为溢出数据,下次处理 // fOutBuf->setOverflowData(fOutBuf->curPacketSize(), frameSize, presentationTime, durationInMicroseconds); } } fPreviousFrameEndedFragmentation = False; if (numFrameBytesToUse > 0) { // Check whether this frame overflows the packet if (fOutBuf->wouldOverflow(frameSize)) { // //若frame将导致packet溢出,应该将其保存到packet的溢出数据中,在下一个packet中发送。 //如果frame本身大于pakcet 的max size, 就需要对frame进行分片操作。不过需要调用allowFragmentationAfterStart //函数以确定是否充许分片,例如对于H264而言, // // Don't use this frame now; instead, save it as overflow data, and // send it in the next packet instead. However, if the frame is too // big to fit in a packet by itself, then we need to fragment it (and // use some of it in this packet, if the payload format permits this.) if (isTooBigForAPacket(frameSize) && (fNumFramesUsedSoFar == 0 || allowFragmentationAfterStart())) { // We need to fragment this frame, and use some of it now: overflowBytes = computeOverflowForNewFrame(frameSize); numFrameBytesToUse -= overflowBytes; fCurFragmentationOffset += numFrameBytesToUse; } else { // We don't use any of this frame now: overflowBytes = frameSize; numFrameBytesToUse = 0; } fOutBuf->setOverflowData(fOutBuf->curPacketSize() + numFrameBytesToUse, overflowBytes, presentationTime, durationInMicroseconds); } else if (fCurFragmentationOffset > 0) { // This is the last fragment of a frame that was fragmented over // more than one packet. Do any special handling for this case: fCurFragmentationOffset = 0; fPreviousFrameEndedFragmentation = True; } } if (numFrameBytesToUse == 0 && frameSize > 0) { // Send our packet now, because we have filled it up: sendPacketIfNecessary(); //发送RTP包 } else { // Use this frame in our outgoing packet: unsigned char* frameStart = fOutBuf->curPtr(); fOutBuf->increment(numFrameBytesToUse); // do this now, in case "doSpecialFrameHandling()" calls "setFramePadding()" to append padding bytes // //还记得RTP头中序留的空间吗,将在这个函数中进行填充 // // Here's where any payload format specific processing gets done: doSpecialFrameHandling(curFragmentationOffset, frameStart, numFrameBytesToUse, presentationTime, overflowBytes); ++fNumFramesUsedSoFar; // //设置下一个packet的时间信息,这里若存在overflow数据,就不需要更新时间,因为这是同一个frame的不同分片,需要保证时间一致 // // Update the time at which the next packet should be sent, based // on the duration of the frame that we just packed into it. // However, if this frame has overflow data remaining, then don't // count its duration yet. if (overflowBytes == 0) { fNextSendTime.tv_usec += durationInMicroseconds; fNextSendTime.tv_sec += fNextSendTime.tv_usec/1000000; fNextSendTime.tv_usec %= 1000000; } // Send our packet now if (i) it's already at our preferred size, or // (ii) (heuristic) another frame of the same size as the one we just // read would overflow the packet, or // (iii) it contains the last fragment of a fragmented frame, and we // don't allow anything else to follow this or // (iv) one frame per packet is allowed: if (fOutBuf->isPreferredSize() || fOutBuf->wouldOverflow(numFrameBytesToUse) || (fPreviousFrameEndedFragmentation && !allowOtherFramesAfterLastFragment()) || !frameCanAppearAfterPacketStart(fOutBuf->curPtr() - frameSize, frameSize) ) { // The packet is ready to be sent now sendPacketIfNecessary(); //发送RTP包 } else { // There's room for more frames; try getting another: packFrame(); //packet中还可以容纳frame,这里将形成递归调用 } } }
afterGettingFrame1的复杂之处在于处理frame的分片,若一个frame大于TCP/UDP有效载荷(程序中定义为1448个字节),就必需分片了。最简单的情况就是一个packet(RTP包)中最多只充许一个frame,即一个RTP包中存在一个frame或者frame的一个分片,H264就是这样处理的。,方法是将剩余的数据记录为buffer的溢出部分。下次调用packFrame函数时,直接从溢出部分复制到packet中。不过应该注意,一个frame的大小不能超过buffer的大小(默认为60000),否则会真的溢出,
那就应该考虑增加buffer大小了。
上面的代码中还调用了doSpecialFrameHandling,子类需要重新实现进行一些特殊处理,文章最后还会讨论这个问题。
在packet中充许出现多个frame的情况下(大多数情况下应该没必要用到),采用了递归来实现,可以看到afterGettingFrame1函数的最后有调用packFrame的代码。
再来看RTP的发送函数sendPacketIfNecessary
[cpp]
view plain
copy
print?
void MultiFramedRTPSink::sendPacketIfNecessary() {
//
//packet中存在frame,则发送出去
//
if (fNumFramesUsedSoFar > 0) {
// Send the packet:
//
//可以通过TEST_LOSS宏,模拟10%丢包
//
#ifdef TEST_LOSS
if ((our_random()%10) != 0) // simulate 10% packet loss #####
#endif
//
//现在通过调用RTPInterface::sendPacket函数发送packet
//
if (!fRTPInterface.sendPacket(fOutBuf->packet(), fOutBuf->curPacketSize())) {
// if failure handler has been specified, call it
if (fOnSendErrorFunc != NULL) (*fOnSendErrorFunc)(fOnSendErrorData); //错误处理
}
++fPacketCount;
fTotalOctetCount += fOutBuf->curPacketSize();
fOctetCount += fOutBuf->curPacketSize()
- rtpHeaderSize - fSpecialHeaderSize - fTotalFrameSpecificHeaderSizes;
++fSeqNo; // for next time
}
if (fOutBuf->haveOverflowData()
&& fOutBuf->totalBytesAvailable() > fOutBuf->totalBufferSize()/2) {
//
//为了提高效率,可以直接重置buffer中的packet开始位置,这样就不需要拷贝一遍overflow数据了。
//在一个packet只能包含一个frame的情况下,是不是可以考虑修改这里的判断条件呢?
//
// Efficiency hack: Reset the packet start pointer to just in front of
// the overflow data (allowing for the RTP header and special headers),
// so that we probably don't have to "memmove()" the overflow data
// into place when building the next packet:
unsigned newPacketStart = fOutBuf->curPacketSize()
- (rtpHeaderSize + fSpecialHeaderSize + frameSpecificHeaderSize());
fOutBuf->adjustPacketStart(newPacketStart); //调整buffer中的packet 开始位置
} else {
// Normal case: Reset the packet start pointer back to the start:
fOutBuf->resetPacketStart(); //这种情况,若存在overflow data,就需要进行copy操作了
}
fOutBuf->resetOffset(); //packet已经发送了,可以重置buffer中的数据offset了
fNumFramesUsedSoFar = 0; //清零packet中的frame数
//
//数据已经发送完毕(例如文件传输完毕),可以关闭了
//
if (fNoFramesLeft) {
// We're done:
onSourceClosure(this);
} else {
//
//准备下一次发送任务
//
// We have more frames left to send. Figure out when the next frame
// is due to start playing, then make sure that we wait this long before
// sending the next packet.
struct timeval timeNow;
gettimeofday(&timeNow, NULL);
int secsDiff = fNextSendTime.tv_sec - timeNow.tv_sec; //若是同一个frame的不同分片,这个值将为0
int64_t uSecondsToGo = secsDiff*1000000 + (fNextSendTime.tv_usec - timeNow.tv_usec);
if (uSecondsToGo < 0 || secsDiff < 0) { // sanity check: Make sure that the time-to-delay is non-negative:
uSecondsToGo = 0;
}
//
//作延时时间,处理函数,将入到任务调试器中,以便进行下一次发送操作
//
// Delay this amount of time:
nextTask() = envir().taskScheduler().scheduleDelayedTask(uSecondsToGo, (TaskFunc*)sendNext, this);
}
}
void MultiFramedRTPSink::sendPacketIfNecessary() { // //packet中存在frame,则发送出去 // if (fNumFramesUsedSoFar > 0) { // Send the packet: // //可以通过TEST_LOSS宏,模拟10%丢包 // #ifdef TEST_LOSS if ((our_random()%10) != 0) // simulate 10% packet loss ##### #endif // //现在通过调用RTPInterface::sendPacket函数发送packet // if (!fRTPInterface.sendPacket(fOutBuf->packet(), fOutBuf->curPacketSize())) { // if failure handler has been specified, call it if (fOnSendErrorFunc != NULL) (*fOnSendErrorFunc)(fOnSendErrorData); //错误处理 } ++fPacketCount; fTotalOctetCount += fOutBuf->curPacketSize(); fOctetCount += fOutBuf->curPacketSize() - rtpHeaderSize - fSpecialHeaderSize - fTotalFrameSpecificHeaderSizes; ++fSeqNo; // for next time } if (fOutBuf->haveOverflowData() && fOutBuf->totalBytesAvailable() > fOutBuf->totalBufferSize()/2) { // //为了提高效率,可以直接重置buffer中的packet开始位置,这样就不需要拷贝一遍overflow数据了。 //在一个packet只能包含一个frame的情况下,是不是可以考虑修改这里的判断条件呢? // // Efficiency hack: Reset the packet start pointer to just in front of // the overflow data (allowing for the RTP header and special headers), // so that we probably don't have to "memmove()" the overflow data // into place when building the next packet: unsigned newPacketStart = fOutBuf->curPacketSize() - (rtpHeaderSize + fSpecialHeaderSize + frameSpecificHeaderSize()); fOutBuf->adjustPacketStart(newPacketStart); //调整buffer中的packet 开始位置 } else { // Normal case: Reset the packet start pointer back to the start: fOutBuf->resetPacketStart(); //这种情况,若存在overflow data,就需要进行copy操作了 } fOutBuf->resetOffset(); //packet已经发送了,可以重置buffer中的数据offset了 fNumFramesUsedSoFar = 0; //清零packet中的frame数 // //数据已经发送完毕(例如文件传输完毕),可以关闭了 // if (fNoFramesLeft) { // We're done: onSourceClosure(this); } else { // //准备下一次发送任务 // // We have more frames left to send. Figure out when the next frame // is due to start playing, then make sure that we wait this long before // sending the next packet. struct timeval timeNow; gettimeofday(&timeNow, NULL); int secsDiff = fNextSendTime.tv_sec - timeNow.tv_sec; //若是同一个frame的不同分片,这个值将为0 int64_t uSecondsToGo = secsDiff*1000000 + (fNextSendTime.tv_usec - timeNow.tv_usec); if (uSecondsToGo < 0 || secsDiff < 0) { // sanity check: Make sure that the time-to-delay is non-negative: uSecondsToGo = 0; } // //作延时时间,处理函数,将入到任务调试器中,以便进行下一次发送操作 // // Delay this amount of time: nextTask() = envir().taskScheduler().scheduleDelayedTask(uSecondsToGo, (TaskFunc*)sendNext, this); } }
sendPacketIfNecessary函数处理一些发送的细节,我们来看最重要的两点。
1)RTP包还是转交给了RTPInterface::sendPacket函数,等下再看其具体实现。
2)将下一次RTP的发送操作加入到任务调度器中,参数中传递了sendNext函数指针,其实现比较简单,如下
[cpp]
view plain
copy
print?
void MultiFramedRTPSink::sendNext(void* firstArg) {
MultiFramedRTPSink* sink = (MultiFramedRTPSink*)firstArg;
sink->buildAndSendPacket(False); //现在已经不是第一次调用了
}
void MultiFramedRTPSink::sendNext(void* firstArg) { MultiFramedRTPSink* sink = (MultiFramedRTPSink*)firstArg; sink->buildAndSendPacket(False); //现在已经不是第一次调用了 }
sendNext函数中又调用了buildAndSendPacket函数,轮回了。。。现在来看RTPInterface::sendPacket函数
[cpp]
view plain
copy
print?
Boolean RTPInterface::sendPacket(unsigned char* packet, unsigned packetSize) {
Boolean success = True; // we'll return False instead if any of the sends fail
//一般情况下,使用UDP发送
// Normal case: Send as a UDP packet:
if (!fGS->output(envir(), fGS->ttl(), packet, packetSize)) success = False;
//使用TCP发送
// Also, send over each of our TCP sockets:
for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
streams = streams->fNext) {
if (!sendRTPOverTCP(packet, packetSize,
streams->fStreamSocketNum, streams->fStreamChannelId)) {
success = False;
}
}
return success;
}
Boolean RTPInterface::sendPacket(unsigned char* packet, unsigned packetSize) { Boolean success = True; // we'll return False instead if any of the sends fail //一般情况下,使用UDP发送 // Normal case: Send as a UDP packet: if (!fGS->output(envir(), fGS->ttl(), packet, packetSize)) success = False; //使用TCP发送 // Also, send over each of our TCP sockets: for (tcpStreamRecord* streams = fTCPStreams; streams != NULL; streams = streams->fNext) { if (!sendRTPOverTCP(packet, packetSize, streams->fStreamSocketNum, streams->fStreamChannelId)) { success = False; } } return success; }
若是使用UDP方式发送,将调用Groupsock::output函数,可以实现组播功能。groupsock只实现了UDP发送功能,当用TCP方式传送时调用sendRTPOverTcP函数,这个函数中直接调用socket的send函数。
现在RTP的发送终于结束了,groupsock的实现留待下次分析。现在再来看一个遗留的问题,MultiFramedRTPSink::doSpecialFrameHandling的实现。它是定义在MultiFramedRTPSink中的虚函数,先来看其默认的实现
[cpp]
view plain
copy
print?
void MultiFramedRTPSink::doSpecialFrameHandling(unsigned /*fragmentationOffset*/,
unsigned char* /*frameStart*/,
unsigned /*numBytesInFrame*/,
struct timeval framePresentationTime,
unsigned /*numRemainingBytes*/) {
// default implementation: If this is the first frame in the packet,
// use its presentationTime for the RTP timestamp:
if (isFirstFrameInPacket()) {
setTimestamp(framePresentationTime);
}
}
void MultiFramedRTPSink::doSpecialFrameHandling(unsigned /*fragmentationOffset*/, unsigned char* /*frameStart*/, unsigned /*numBytesInFrame*/, struct timeval framePresentationTime, unsigned /*numRemainingBytes*/) { // default implementation: If this is the first frame in the packet, // use its presentationTime for the RTP timestamp: if (isFirstFrameInPacket()) { setTimestamp(framePresentationTime); } }
可以看到默认实现中只是在第一次调用时,设置RTP包中的的时间信息,下面来看H264VideoRTPSink上的实现
[cpp]
view plain
copy
print?
void H264VideoRTPSink::doSpecialFrameHandling(unsigned /*fragmentationOffset*/,
unsigned char* /*frameStart*/,
unsigned /*numBytesInFrame*/,
struct timeval framePresentationTime,
unsigned /*numRemainingBytes*/) {
//
//设置RTP头中的M位
//
// Set the RTP 'M' (marker) bit iff
// 1/ The most recently delivered fragment was the end of (or the only fragment of) an NAL unit, and
// 2/ This NAL unit was the last NAL unit of an 'access unit' (i.e. video frame).
if (fOurFragmenter != NULL) {
H264VideoStreamFramer* framerSource
= (H264VideoStreamFramer*)(fOurFragmenter->inputSource());
// This relies on our fragmenter's source being a "H264VideoStreamFramer".
if (fOurFragmenter->lastFragmentCompletedNALUnit()
&& framerSource != NULL && framerSource->pictureEndMarker()) {
setMarkerBit();
framerSource->pictureEndMarker() = False;
}
}
//
//设置时间戳
//
setTimestamp(framePresentationTime);
}
void H264VideoRTPSink::doSpecialFrameHandling(unsigned /*fragmentationOffset*/, unsigned char* /*frameStart*/, unsigned /*numBytesInFrame*/, struct timeval framePresentationTime, unsigned /*numRemainingBytes*/) { // //设置RTP头中的M位 // // Set the RTP 'M' (marker) bit iff // 1/ The most recently delivered fragment was the end of (or the only fragment of) an NAL unit, and // 2/ This NAL unit was the last NAL unit of an 'access unit' (i.e. video frame). if (fOurFragmenter != NULL) { H264VideoStreamFramer* framerSource = (H264VideoStreamFramer*)(fOurFragmenter->inputSource()); // This relies on our fragmenter's source being a "H264VideoStreamFramer". if (fOurFragmenter->lastFragmentCompletedNALUnit() && framerSource != NULL && framerSource->pictureEndMarker()) { setMarkerBit(); framerSource->pictureEndMarker() = False; } } // //设置时间戳 // setTimestamp(framePresentationTime); }
解释一下RTP头中的M位,H264图像可能被封装成多个NALU,这些NALU就拥有相同的同时间。根据RFC3984的定义,当RTP中封装的是同一图像的最后一个NALU时,就需要设置M位
相关文章推荐
- live555源码分析---- DESCRIBE命令处理
- live555源码分析----RTP的打包与发送
- Live555源码分析:generateSDPDescription
- 2.live555源码分析----服务端doEventLoop()函数分析
- Live555源码分析@njzhujinhua[3]:ServerMediaSubsession与ServerMediaSession
- live555源码分析----RSTPServer创建过程分析
- live555源码分析----mpg文件的处理(续)
- live555库的rtsp服务器源码分析总结,流程详解RTSPServer
- live555源码分析----H264的数据处理
- live555源码分析----关于mp3的处理
- live555源码分析---- DESCRIBE命令处理
- live555源码分析之------ H264 RTP封包原理(总结)
- LIVE555再学习 -- OpenRTSP 源码分析
- live555源码分析----SETUP命令处理流程
- live555源码分析----H264的数据处理
- 1.live555源码分析----RSTPServer创建过程分析
- live555源码分析----RTP的打包与发送
- live555源码分析---- PLAY命令的处理
- live555源码分析----H264的数据处理
- live555源码分析----H264的数据处理