您的位置:首页 > 编程语言

RTMPdump(libRTMP) 源代码分析 9: 接收消息(Message)(接收视音频数据)

2013-10-23 15:46 477 查看
=====================================================

RTMPdump(libRTMP) 源代码分析系列文章:

RTMPdump 源代码分析 1: main()函数

RTMPDump (libRTMP) 源代码分析2:解析RTMP地址——RTMP_ParseURL()

RTMPdump (libRTMP) 源代码分析3: AMF编码

RTMPdump (libRTMP) 源代码分析4: 连接第一步——握手 (HandShake)

RTMPdump (libRTMP) 源代码分析5: 建立一个流媒体连接 (NetConnection部分)

RTMPdump (libRTMP) 源代码分析6: 建立一个流媒体连接 (NetStream部分 1)

RTMPdump (libRTMP) 源代码分析7: 建立一个流媒体连接 (NetStream部分 2)

RTMPdump (libRTMP) 源代码分析8: 发送消息 (Message)

RTMPdump (libRTMP) 源代码分析9: 接收消息 (Message) (接收视音频数据)

RTMPdump (libRTMP) 源代码分析10: 处理各种消息 (Message)

=====================================================

函数调用结构图

RTMPDump (libRTMP)的整体的函数调用结构图如下图所示。






单击查看大图

详细分析

前一篇文章分析了RTMPdump(libRTMP) 的发送消息(Message)方面的源代码:RTMPdump(libRTMP) 源代码分析 8: 发送消息(Message)

在这里在研究研究接收消息(Message)的源代码,接收消息最典型的应用就是接收视音频数据了,因为视频和音频分别都属于RTMP协议规范中的一种消息。在这里主要分析接收视音频数据。

RTMPdump中完成视音频数据的接收(也可以说是视音频数据的下载)的函数是:RTMP_Read()。

RTMPdump主程序中的Download()函数就是通过调用RTMP_Read()完成数据接收,从而实现下载的。

那么我们马上开始吧,首先看看RTMP_Read()函数:

//FLV文件头
static const char flvHeader[] = { 'F', 'L', 'V', 0x01,
0x00,				/* 0x04代表有音频, 0x01代表有视频 */
0x00, 0x00, 0x00, 0x09,
0x00, 0x00, 0x00, 0x00
};

#define HEADERBUF	(128*1024)
int
RTMP_Read(RTMP *r, char *buf, int size)
{
int nRead = 0, total = 0;

/* can't continue */
fail:
switch (r->m_read.status) {
case RTMP_READ_EOF:
case RTMP_READ_COMPLETE:
return 0;
case RTMP_READ_ERROR:  /* corrupted stream, resume failed */
SetSockError(EINVAL);
return -1;
default:
break;
}

/* first time thru */
if (!(r->m_read.flags & RTMP_READ_HEADER))
{
if (!(r->m_read.flags & RTMP_READ_RESUME))
{
//分配内存,指向buf的首部和尾部
char *mybuf = (char *) malloc(HEADERBUF), *end = mybuf + HEADERBUF;
int cnt = 0;
//buf指向同一地址
r->m_read.buf = mybuf;
r->m_read.buflen = HEADERBUF;

//把Flv的首部复制到mybuf指向的内存
//RTMP传递的多媒体数据是“砍头”的FLV文件
memcpy(mybuf, flvHeader, sizeof(flvHeader));
//m_read.buf指针后移flvheader个单位
r->m_read.buf += sizeof(flvHeader);
//buf长度增加flvheader长度
r->m_read.buflen -= sizeof(flvHeader);
//timestamp=0,不是多媒体数据
while (r->m_read.timestamp == 0)
{
//读取一个Packet,到r->m_read.buf
//nRead为读取结果标记
nRead = Read_1_Packet(r, r->m_read.buf, r->m_read.buflen);
//有错误
if (nRead < 0)
{
free(mybuf);
r->m_read.buf = NULL;
r->m_read.buflen = 0;
r->m_read.status = nRead;
goto fail;
}
/* buffer overflow, fix buffer and give up */
if (r->m_read.buf < mybuf || r->m_read.buf > end) {
mybuf = (char *) realloc(mybuf, cnt + nRead);
memcpy(mybuf+cnt, r->m_read.buf, nRead);
r->m_read.buf = mybuf+cnt+nRead;
break;
}
//
//记录读取的字节数
cnt += nRead;
//m_read.buf指针后移nRead个单位
r->m_read.buf += nRead;
r->m_read.buflen -= nRead;
//当dataType=00000101时,即有视频和音频时
//说明有多媒体数据了
if (r->m_read.dataType == 5)
break;
}
//读入数据类型
//注意:mybuf指针位置一直没动
//mybuf[4]中第 6 位表示是否存在音频Tag。第 8 位表示是否存在视频Tag。
mybuf[4] = r->m_read.dataType;
//两个指针之间的差
r->m_read.buflen = r->m_read.buf - mybuf;
r->m_read.buf = mybuf;
//这句很重要!后面memcopy
r->m_read.bufpos = mybuf;
}
//flags标明已经读完了文件头
r->m_read.flags |= RTMP_READ_HEADER;
}

if ((r->m_read.flags & RTMP_READ_SEEKING) && r->m_read.buf)
{
/* drop whatever's here */
free(r->m_read.buf);
r->m_read.buf = NULL;
r->m_read.bufpos = NULL;
r->m_read.buflen = 0;
}

/* If there's leftover data buffered, use it up */
if (r->m_read.buf)
{
nRead = r->m_read.buflen;
if (nRead > size)
nRead = size;
//m_read.bufpos指向mybuf
memcpy(buf, r->m_read.bufpos, nRead);
r->m_read.buflen -= nRead;
if (!r->m_read.buflen)
{
free(r->m_read.buf);
r->m_read.buf = NULL;
r->m_read.bufpos = NULL;
}
else
{
r->m_read.bufpos += nRead;
}
buf += nRead;
total += nRead;
size -= nRead;
}
//接着读
while (size > 0 && (nRead = Read_1_Packet(r, buf, size)) >= 0)
{
if (!nRead) continue;
buf += nRead;
total += nRead;
size -= nRead;
break;
}
if (nRead < 0)
r->m_read.status = nRead;

if (size < 0)
total += size;
return total;
}

程序关键的地方都已经注释上了代码,在此就不重复说明了。有一点要提一下:RTMP传送的视音频数据的格式和FLV(FLash Video)格式是一样的,把接收下来的数据直接存入文件就可以了。但是这些视音频数据没有文件头,是纯视音频数据,因此需要在其前面加上FLV格式的文件头,这样得到的数据存成文件后才能被一般的视频播放器所播放。FLV格式的文件头是13个字节,如代码中所示。

RTMP_Read()中实际读取数据的函数是Read_1_Packet(),它的功能是从网络上读取一个RTMPPacket的数据,来看看它的源代码吧:

/* 从流媒体中读取多媒体packet。
* Returns -3 if Play.Close/Stop, -2 if fatal error, -1 if no more media
* packets, 0 if ignorable error, >0 if there is a media packet
*/
static int
Read_1_Packet(RTMP *r, char *buf, unsigned int buflen)
{
uint32_t prevTagSize = 0;
int rtnGetNextMediaPacket = 0, ret = RTMP_READ_EOF;
RTMPPacket packet = { 0 };
int recopy = FALSE;
unsigned int size;
char *ptr, *pend;
uint32_t nTimeStamp = 0;
unsigned int len;
//获取下一个packet
rtnGetNextMediaPacket = RTMP_GetNextMediaPacket(r, &packet);
while (rtnGetNextMediaPacket)
{
char *packetBody = packet.m_body;
unsigned int nPacketLen = packet.m_nBodySize;

/* Return -3 if this was completed nicely with invoke message
* Play.Stop or Play.Complete
*/
if (rtnGetNextMediaPacket == 2)
{
RTMP_Log(RTMP_LOGDEBUG,
"Got Play.Complete or Play.Stop from server. "
"Assuming stream is complete");
ret = RTMP_READ_COMPLETE;
break;
}
//设置dataType
r->m_read.dataType |= (((packet.m_packetType == 0x08) << 2) |
(packet.m_packetType == 0x09));
//MessageID为9时,为视频数据,数据太小时。。。
if (packet.m_packetType == 0x09 && nPacketLen <= 5)
{
RTMP_Log(RTMP_LOGDEBUG, "ignoring too small video packet: size: %d",
nPacketLen);
ret = RTMP_READ_IGNORE;
break;
}
//MessageID为8时,为音频数据,数据太小时。。。
if (packet.m_packetType == 0x08 && nPacketLen <= 1)
{
RTMP_Log(RTMP_LOGDEBUG, "ignoring too small audio packet: size: %d",
nPacketLen);
ret = RTMP_READ_IGNORE;
break;
}

if (r->m_read.flags & RTMP_READ_SEEKING)
{
ret = RTMP_READ_IGNORE;
break;
}
#ifdef _DEBUG
RTMP_Log(RTMP_LOGDEBUG, "type: %02X, size: %d, TS: %d ms, abs TS: %d",
packet.m_packetType, nPacketLen, packet.m_nTimeStamp,
packet.m_hasAbsTimestamp);
if (packet.m_packetType == 0x09)
RTMP_Log(RTMP_LOGDEBUG, "frametype: %02X", (*packetBody & 0xf0));
#endif

if (r->m_read.flags & RTMP_READ_RESUME)
{
/* check the header if we get one */
//此类packet的timestamp都是0
if (packet.m_nTimeStamp == 0)
{
//messageID=18,数据消息(AMF0)
if (r->m_read.nMetaHeaderSize > 0
&& packet.m_packetType == 0x12)
{
//获取metadata
AMFObject metaObj;
int nRes =
AMF_Decode(&metaObj, packetBody, nPacketLen, FALSE);
if (nRes >= 0)
{
AVal metastring;
AMFProp_GetString(AMF_GetProp(&metaObj, NULL, 0),
&metastring);

if (AVMATCH(&metastring, &av_onMetaData))
{
/* compare */
if ((r->m_read.nMetaHeaderSize != nPacketLen) ||
(memcmp
(r->m_read.metaHeader, packetBody,
r->m_read.nMetaHeaderSize) != 0))
{
ret = RTMP_READ_ERROR;
}
}
AMF_Reset(&metaObj);
if (ret == RTMP_READ_ERROR)
break;
}
}

/* check first keyframe to make sure we got the right position
* in the stream! (the first non ignored frame)
*/
if (r->m_read.nInitialFrameSize > 0)
{
/* video or audio data */
if (packet.m_packetType == r->m_read.initialFrameType
&& r->m_read.nInitialFrameSize == nPacketLen)
{
/* we don't compare the sizes since the packet can
* contain several FLV packets, just make sure the
* first frame is our keyframe (which we are going
* to rewrite)
*/
if (memcmp
(r->m_read.initialFrame, packetBody,
r->m_read.nInitialFrameSize) == 0)
{
RTMP_Log(RTMP_LOGDEBUG, "Checked keyframe successfully!");
r->m_read.flags |= RTMP_READ_GOTKF;
/* ignore it! (what about audio data after it? it is
* handled by ignoring all 0ms frames, see below)
*/
ret = RTMP_READ_IGNORE;
break;
}
}

/* hande FLV streams, even though the server resends the
* keyframe as an extra video packet it is also included
* in the first FLV stream chunk and we have to compare
* it and filter it out !!
*/
//MessageID=22,聚合消息
if (packet.m_packetType == 0x16)
{
/* basically we have to find the keyframe with the
* correct TS being nResumeTS
*/
unsigned int pos = 0;
uint32_t ts = 0;

while (pos + 11 < nPacketLen)
{
/* size without header (11) and prevTagSize (4) */
uint32_t dataSize =
AMF_DecodeInt24(packetBody + pos + 1);
ts = AMF_DecodeInt24(packetBody + pos + 4);
ts |= (packetBody[pos + 7] << 24);

#ifdef _DEBUG
RTMP_Log(RTMP_LOGDEBUG,
"keyframe search: FLV Packet: type %02X, dataSize: %d, timeStamp: %d ms",
packetBody[pos], dataSize, ts);
#endif
/* ok, is it a keyframe?:
* well doesn't work for audio!
*/
if (packetBody[pos /*6928, test 0 */ ] ==
r->m_read.initialFrameType
/* && (packetBody[11]&0xf0) == 0x10 */ )
{
if (ts == r->m_read.nResumeTS)
{
RTMP_Log(RTMP_LOGDEBUG,
"Found keyframe with resume-keyframe timestamp!");
if (r->m_read.nInitialFrameSize != dataSize
|| memcmp(r->m_read.initialFrame,
packetBody + pos + 11,
r->m_read.
nInitialFrameSize) != 0)
{
RTMP_Log(RTMP_LOGERROR,
"FLV Stream: Keyframe doesn't match!");
ret = RTMP_READ_ERROR;
break;
}
r->m_read.flags |= RTMP_READ_GOTFLVK;

/* skip this packet?
* check whether skippable:
*/
if (pos + 11 + dataSize + 4 > nPacketLen)
{
RTMP_Log(RTMP_LOGWARNING,
"Non skipable packet since it doesn't end with chunk, stream corrupt!");
ret = RTMP_READ_ERROR;
break;
}
packetBody += (pos + 11 + dataSize + 4);
nPacketLen -= (pos + 11 + dataSize + 4);

goto stopKeyframeSearch;

}
else if (r->m_read.nResumeTS < ts)
{
/* the timestamp ts will only increase with
* further packets, wait for seek
*/
goto stopKeyframeSearch;
}
}
pos += (11 + dataSize + 4);
}
if (ts < r->m_read.nResumeTS)
{
RTMP_Log(RTMP_LOGERROR,
"First packet does not contain keyframe, all "
"timestamps are smaller than the keyframe "
"timestamp; probably the resume seek failed?");
}
stopKeyframeSearch:
;
if (!(r->m_read.flags & RTMP_READ_GOTFLVK))
{
RTMP_Log(RTMP_LOGERROR,
"Couldn't find the seeked keyframe in this chunk!");
ret = RTMP_READ_IGNORE;
break;
}
}
}
}

if (packet.m_nTimeStamp > 0
&& (r->m_read.flags & (RTMP_READ_GOTKF|RTMP_READ_GOTFLVK)))
{
/* another problem is that the server can actually change from
* 09/08 video/audio packets to an FLV stream or vice versa and
* our keyframe check will prevent us from going along with the
* new stream if we resumed.
*
* in this case set the 'found keyframe' variables to true.
* We assume that if we found one keyframe somewhere and were
* already beyond TS > 0 we have written data to the output
* which means we can accept all forthcoming data including the
* change between 08/09 <-> FLV packets
*/
r->m_read.flags |= (RTMP_READ_GOTKF|RTMP_READ_GOTFLVK);
}

/* skip till we find our keyframe
* (seeking might put us somewhere before it)
*/
if (!(r->m_read.flags & RTMP_READ_GOTKF) &&
packet.m_packetType != 0x16)
{
RTMP_Log(RTMP_LOGWARNING,
"Stream does not start with requested frame, ignoring data... ");
r->m_read.nIgnoredFrameCounter++;
if (r->m_read.nIgnoredFrameCounter > MAX_IGNORED_FRAMES)
ret = RTMP_READ_ERROR;	/* fatal error, couldn't continue stream */
else
ret = RTMP_READ_IGNORE;
break;
}
/* ok, do the same for FLV streams */
if (!(r->m_read.flags & RTMP_READ_GOTFLVK) &&
packet.m_packetType == 0x16)
{
RTMP_Log(RTMP_LOGWARNING,
"Stream does not start with requested FLV frame, ignoring data... ");
r->m_read.nIgnoredFlvFrameCounter++;
if (r->m_read.nIgnoredFlvFrameCounter > MAX_IGNORED_FRAMES)
ret = RTMP_READ_ERROR;
else
ret = RTMP_READ_IGNORE;
break;
}

/* we have to ignore the 0ms frames since these are the first
* keyframes; we've got these so don't mess around with multiple
* copies sent by the server to us! (if the keyframe is found at a
* later position there is only one copy and it will be ignored by
* the preceding if clause)
*/
if (!(r->m_read.flags & RTMP_READ_NO_IGNORE) &&
packet.m_packetType != 0x16)
{			/* exclude type 0x16 (FLV) since it can
* contain several FLV packets */
if (packet.m_nTimeStamp == 0)
{
ret = RTMP_READ_IGNORE;
break;
}
else
{
/* stop ignoring packets */
r->m_read.flags |= RTMP_READ_NO_IGNORE;
}
}
}

/* calculate packet size and allocate slop buffer if necessary */
size = nPacketLen +
((packet.m_packetType == 0x08 || packet.m_packetType == 0x09
|| packet.m_packetType == 0x12) ? 11 : 0) +
(packet.m_packetType != 0x16 ? 4 : 0);

if (size + 4 > buflen)
{
/* the extra 4 is for the case of an FLV stream without a last
* prevTagSize (we need extra 4 bytes to append it) */
r->m_read.buf = (char *) malloc(size + 4);
if (r->m_read.buf == 0)
{
RTMP_Log(RTMP_LOGERROR, "Couldn't allocate memory!");
ret = RTMP_READ_ERROR;		/* fatal error */
break;
}
recopy = TRUE;
ptr = r->m_read.buf;
}
else
{
ptr = buf;
}
pend = ptr + size + 4;

/* use to return timestamp of last processed packet */

/* audio (0x08), video (0x09) or metadata (0x12) packets :
* construct 11 byte header then add rtmp packet's data */
if (packet.m_packetType == 0x08 || packet.m_packetType == 0x09
|| packet.m_packetType == 0x12)
{
nTimeStamp = r->m_read.nResumeTS + packet.m_nTimeStamp;
prevTagSize = 11 + nPacketLen;

*ptr = packet.m_packetType;
ptr++;
ptr = AMF_EncodeInt24(ptr, pend, nPacketLen);

#if 0
if(packet.m_packetType == 0x09) { /* video */

/* H264 fix: */
if((packetBody[0] & 0x0f) == 7) { /* CodecId = H264 */
uint8_t packetType = *(packetBody+1);

uint32_t ts = AMF_DecodeInt24(packetBody+2); /* composition time */
int32_t cts = (ts+0xff800000)^0xff800000;
RTMP_Log(RTMP_LOGDEBUG, "cts  : %d\n", cts);

nTimeStamp -= cts;
/* get rid of the composition time */
CRTMP::EncodeInt24(packetBody+2, 0);
}
RTMP_Log(RTMP_LOGDEBUG, "VIDEO: nTimeStamp: 0x%08X (%d)\n", nTimeStamp, nTimeStamp);
}
#endif

ptr = AMF_EncodeInt24(ptr, pend, nTimeStamp);
*ptr = (char)((nTimeStamp & 0xFF000000) >> 24);
ptr++;

/* stream id */
ptr = AMF_EncodeInt24(ptr, pend, 0);
}

memcpy(ptr, packetBody, nPacketLen);
len = nPacketLen;

/* correct tagSize and obtain timestamp if we have an FLV stream */
if (packet.m_packetType == 0x16)
{
unsigned int pos = 0;
int delta;

/* grab first timestamp and see if it needs fixing */
//	  nTimeStamp = AMF_DecodeInt24(packetBody + 4);
//  nTimeStamp |= (packetBody[7] << 24);
//	  delta = packet.m_nTimeStamp - nTimeStamp;

while (pos + 11 < nPacketLen)
{
/* size without header (11) and without prevTagSize (4) */
uint32_t dataSize = AMF_DecodeInt24(packetBody + pos + 1);
nTimeStamp = AMF_DecodeInt24(packetBody + pos + 4);
nTimeStamp |= (packetBody[pos + 7] << 24);

//	      if (delta)
//		{
//		  nTimeStamp += delta;
//		  AMF_EncodeInt24(ptr+pos+4, pend, nTimeStamp);
//		  ptr[pos+7] = nTimeStamp>>24;
//		}

/* set data type */
r->m_read.dataType |= (((*(packetBody + pos) == 0x08) << 2) |
(*(packetBody + pos) == 0x09));

if (pos + 11 + dataSize + 4 > nPacketLen)
{
if (pos + 11 + dataSize > nPacketLen)
{
RTMP_Log(RTMP_LOGERROR,
"Wrong data size (%lu), stream corrupted, aborting!",
dataSize);
ret = RTMP_READ_ERROR;
break;
}
RTMP_Log(RTMP_LOGWARNING, "No tagSize found, appending!");

/* we have to append a last tagSize! */
prevTagSize = dataSize + 11;
AMF_EncodeInt32(ptr + pos + 11 + dataSize, pend,
prevTagSize);
size += 4;
len += 4;
}
else
{
prevTagSize =
AMF_DecodeInt32(packetBody + pos + 11 + dataSize);

#ifdef _DEBUG
RTMP_Log(RTMP_LOGDEBUG,
"FLV Packet: type %02X, dataSize: %lu, tagSize: %lu, timeStamp: %lu ms",
(unsigned char)packetBody[pos], dataSize, prevTagSize,
nTimeStamp);
#endif

if (prevTagSize != (dataSize + 11))
{
#ifdef _DEBUG
RTMP_Log(RTMP_LOGWARNING,
"Tag and data size are not consitent, writing tag size according to dataSize+11: %d",
dataSize + 11);
#endif

prevTagSize = dataSize + 11;
AMF_EncodeInt32(ptr + pos + 11 + dataSize, pend,
prevTagSize);
}
}

pos += prevTagSize + 4;	/*(11+dataSize+4); */
}
}
ptr += len;

if (packet.m_packetType != 0x16)
{
/* FLV tag packets contain their own prevTagSize */
AMF_EncodeInt32(ptr, pend, prevTagSize);
}

/* In non-live this nTimeStamp can contain an absolute TS.
* Update ext timestamp with this absolute offset in non-live mode
* otherwise report the relative one
*/
/* RTMP_Log(RTMP_LOGDEBUG, "type: %02X, size: %d, pktTS: %dms, TS: %dms, bLiveStream: %d", packet.m_packetType, nPacketLen, packet.m_nTimeStamp, nTimeStamp, r->Link.lFlags & RTMP_LF_LIVE); */
r->m_read.timestamp = (r->Link.lFlags & RTMP_LF_LIVE) ? packet.m_nTimeStamp : nTimeStamp;

ret = size;
break;
}

if (rtnGetNextMediaPacket)
RTMPPacket_Free(&packet);

if (recopy)
{
len = ret > buflen ? buflen : ret;
memcpy(buf, r->m_read.buf, len);
r->m_read.bufpos = r->m_read.buf + len;
r->m_read.buflen = ret - len;
}
return ret;
}

函数功能很多,重要的地方已经加上了注释,在此不再细分析。Read_1_Packet()里面实现从网络中读取视音频数据的函数是RTMP_GetNextMediaPacket()。下面我们来看看该函数的源代码:

int
RTMP_GetNextMediaPacket(RTMP *r, RTMPPacket *packet)
{
int bHasMediaPacket = 0;

while (!bHasMediaPacket && RTMP_IsConnected(r)
&& RTMP_ReadPacket(r, packet))
{
if (!RTMPPacket_IsReady(packet))
{
continue;
}

bHasMediaPacket = RTMP_ClientPacket(r, packet);

if (!bHasMediaPacket)
{
RTMPPacket_Free(packet);
}
else if (r->m_pausing == 3)
{
if (packet->m_nTimeStamp <= r->m_mediaStamp)
{
bHasMediaPacket = 0;
#ifdef _DEBUG
RTMP_Log(RTMP_LOGDEBUG,
"Skipped type: %02X, size: %d, TS: %d ms, abs TS: %d, pause: %d ms",
packet->m_packetType, packet->m_nBodySize,
packet->m_nTimeStamp, packet->m_hasAbsTimestamp,
r->m_mediaStamp);
#endif
continue;
}
r->m_pausing = 0;
}
}

if (bHasMediaPacket)
r->m_bPlaying = TRUE;
else if (r->m_sb.sb_timedout && !r->m_pausing)
r->m_pauseStamp = r->m_channelTimestamp[r->m_mediaChannel];

return bHasMediaPacket;
}

这里有两个函数比较重要:RTMP_ReadPacket()以及RTMP_ClientPacket()。这两个函数中,前一个函数负责从网络上读取数据,后一个负责处理数据。这部分与建立RTMP连接的网络流(NetStream)的时候很相似,参考:RTMPdump(libRTMP) 源代码分析 6: 建立一个流媒体连接 (NetStream部分 1)

RTMP_ClientPacket()在前文中已经做过分析,在此不再重复叙述。在这里重点分析一下RTMP_ReadPacket(),来看看它的源代码。

//读取收下来的Chunk
int
RTMP_ReadPacket(RTMP *r, RTMPPacket *packet)
{
//packet 存读取完后的的数据
//Chunk Header最大值18
uint8_t hbuf[RTMP_MAX_HEADER_SIZE] = { 0 };
//header 指向的是从Socket中收下来的数据
char *header = (char *)hbuf;
int nSize, hSize, nToRead, nChunk;
int didAlloc = FALSE;

RTMP_Log(RTMP_LOGDEBUG2, "%s: fd=%d", __FUNCTION__, r->m_sb.sb_socket);
//收下来的数据存入hbuf
if (ReadN(r, (char *)hbuf, 1) == 0)
{
RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header", __FUNCTION__);
return FALSE;
}
//块类型fmt
packet->m_headerType = (hbuf[0] & 0xc0) >> 6;
//块流ID(2-63)
packet->m_nChannel = (hbuf[0] & 0x3f);
header++;
//块流ID第1字节为0时,块流ID占2个字节
if (packet->m_nChannel == 0)
{
if (ReadN(r, (char *)&hbuf[1], 1) != 1)
{
RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header 2nd byte",
__FUNCTION__);
return FALSE;
}
//计算块流ID(64-319)
packet->m_nChannel = hbuf[1];
packet->m_nChannel += 64;
header++;
}
//块流ID第1字节为0时,块流ID占3个字节
else if (packet->m_nChannel == 1)
{
int tmp;
if (ReadN(r, (char *)&hbuf[1], 2) != 2)
{
RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header 3nd byte",
__FUNCTION__);
return FALSE;
}
tmp = (hbuf[2] << 8) + hbuf[1];
//计算块流ID(64-65599)
packet->m_nChannel = tmp + 64;
RTMP_Log(RTMP_LOGDEBUG, "%s, m_nChannel: %0x", __FUNCTION__, packet->m_nChannel);
header += 2;
}
//ChunkHeader的大小(4种)
nSize = packetSize[packet->m_headerType];

if (nSize == RTMP_LARGE_HEADER_SIZE)	/* if we get a full header the timestamp is absolute */
packet->m_hasAbsTimestamp = TRUE;	//11字节的完整ChunkMsgHeader的TimeStamp是绝对值

else if (nSize < RTMP_LARGE_HEADER_SIZE)
{				/* using values from the last message of this channel */
if (r->m_vecChannelsIn[packet->m_nChannel])
memcpy(packet, r->m_vecChannelsIn[packet->m_nChannel],
sizeof(RTMPPacket));
}

nSize--;

if (nSize > 0 && ReadN(r, header, nSize) != nSize)
{
RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header. type: %x",
__FUNCTION__, (unsigned int)hbuf[0]);
return FALSE;
}

hSize = nSize + (header - (char *)hbuf);

if (nSize >= 3)
{
//TimeStamp(注意 BigEndian to SmallEndian)(11,7,3字节首部都有)
packet->m_nTimeStamp = AMF_DecodeInt24(header);

/*RTMP_Log(RTMP_LOGDEBUG, "%s, reading RTMP packet chunk on channel %x, headersz %i, timestamp %i, abs timestamp %i", __FUNCTION__, packet.m_nChannel, nSize, packet.m_nTimeStamp, packet.m_hasAbsTimestamp); */
//消息长度(11,7字节首部都有)
if (nSize >= 6)
{
packet->m_nBodySize = AMF_DecodeInt24(header + 3);
packet->m_nBytesRead = 0;
RTMPPacket_Free(packet);
//(11,7字节首部都有)
if (nSize > 6)
{
//Msg type ID
packet->m_packetType = header[6];
//Msg Stream ID
if (nSize == 11)
packet->m_nInfoField2 = DecodeInt32LE(header + 7);
}
}
//Extend TimeStamp
if (packet->m_nTimeStamp == 0xffffff)
{
if (ReadN(r, header + nSize, 4) != 4)
{
RTMP_Log(RTMP_LOGERROR, "%s, failed to read extended timestamp",
__FUNCTION__);
return FALSE;
}
packet->m_nTimeStamp = AMF_DecodeInt32(header + nSize);
hSize += 4;
}
}

RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)hbuf, hSize);

if (packet->m_nBodySize > 0 && packet->m_body == NULL)
{
if (!RTMPPacket_Alloc(packet, packet->m_nBodySize))
{
RTMP_Log(RTMP_LOGDEBUG, "%s, failed to allocate packet", __FUNCTION__);
return FALSE;
}
didAlloc = TRUE;
packet->m_headerType = (hbuf[0] & 0xc0) >> 6;
}

nToRead = packet->m_nBodySize - packet->m_nBytesRead;
nChunk = r->m_inChunkSize;
if (nToRead < nChunk)
nChunk = nToRead;

/* Does the caller want the raw chunk? */
if (packet->m_chunk)
{
packet->m_chunk->c_headerSize = hSize;
memcpy(packet->m_chunk->c_header, hbuf, hSize);
packet->m_chunk->c_chunk = packet->m_body + packet->m_nBytesRead;
packet->m_chunk->c_chunkSize = nChunk;
}

if (ReadN(r, packet->m_body + packet->m_nBytesRead, nChunk) != nChunk)
{
RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet body. len: %lu",
__FUNCTION__, packet->m_nBodySize);
return FALSE;
}

RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)packet->m_body + packet->m_nBytesRead, nChunk);

packet->m_nBytesRead += nChunk;

/* keep the packet as ref for other packets on this channel */
if (!r->m_vecChannelsIn[packet->m_nChannel])
r->m_vecChannelsIn[packet->m_nChannel] = (RTMPPacket *) malloc(sizeof(RTMPPacket));
memcpy(r->m_vecChannelsIn[packet->m_nChannel], packet, sizeof(RTMPPacket));
//读取完毕
if (RTMPPacket_IsReady(packet))
{
/* make packet's timestamp absolute */
if (!packet->m_hasAbsTimestamp)
packet->m_nTimeStamp += r->m_channelTimestamp[packet->m_nChannel];	/* timestamps seem to be always relative!! */

r->m_channelTimestamp[packet->m_nChannel] = packet->m_nTimeStamp;

/* reset the data from the stored packet. we keep the header since we may use it later if a new packet for this channel */
/* arrives and requests to re-use some info (small packet header) */
r->m_vecChannelsIn[packet->m_nChannel]->m_body = NULL;
r->m_vecChannelsIn[packet->m_nChannel]->m_nBytesRead = 0;
r->m_vecChannelsIn[packet->m_nChannel]->m_hasAbsTimestamp = FALSE;	/* can only be false if we reuse header */
}
else
{
packet->m_body = NULL;	/* so it won't be erased on free */
}

return TRUE;
}

函数代码看似很多,但是并不是很复杂,可以理解为在从事“简单重复性劳动”(和搬砖差不多)。基本上是一个字节一个字节的读取,然后按照RTMP协议规范进行解析。具体如何解析可以参考RTMP协议规范。

在RTMP_ReadPacket()函数里完成从Socket中读取数据的函数是ReadN(),继续看看它的源代码:

//从HTTP或SOCKET中读取数据
static int
ReadN(RTMP *r, char *buffer, int n)
{
int nOriginalSize = n;
int avail;
char *ptr;

r->m_sb.sb_timedout = FALSE;

#ifdef _DEBUG
memset(buffer, 0, n);
#endif

ptr = buffer;
while (n > 0)
{
int nBytes = 0, nRead;
if (r->Link.protocol & RTMP_FEATURE_HTTP)
{
while (!r->m_resplen)
{
if (r->m_sb.sb_size < 144)
{
if (!r->m_unackd)
HTTP_Post(r, RTMPT_IDLE, "", 1);
if (RTMPSockBuf_Fill(&r->m_sb) < 1)
{
if (!r->m_sb.sb_timedout)
RTMP_Close(r);
return 0;
}
}
HTTP_read(r, 0);
}
if (r->m_resplen && !r->m_sb.sb_size)
RTMPSockBuf_Fill(&r->m_sb);
avail = r->m_sb.sb_size;
if (avail > r->m_resplen)
avail = r->m_resplen;
}
else
{
avail = r->m_sb.sb_size;
if (avail == 0)
{
if (RTMPSockBuf_Fill(&r->m_sb) < 1)
{
if (!r->m_sb.sb_timedout)
RTMP_Close(r);
return 0;
}
avail = r->m_sb.sb_size;
}
}
nRead = ((n < avail) ? n : avail);
if (nRead > 0)
{
memcpy(ptr, r->m_sb.sb_start, nRead);
r->m_sb.sb_start += nRead;
r->m_sb.sb_size -= nRead;
nBytes = nRead;
r->m_nBytesIn += nRead;
if (r->m_bSendCounter
&& r->m_nBytesIn > r->m_nBytesInSent + r->m_nClientBW / 2)
SendBytesReceived(r);
}
/*RTMP_Log(RTMP_LOGDEBUG, "%s: %d bytes\n", __FUNCTION__, nBytes); */
#ifdef _DEBUG
fwrite(ptr, 1, nBytes, netstackdump_read);
#endif

if (nBytes == 0)
{
RTMP_Log(RTMP_LOGDEBUG, "%s, RTMP socket closed by peer", __FUNCTION__);
/*goto again; */
RTMP_Close(r);
break;
}

if (r->Link.protocol & RTMP_FEATURE_HTTP)
r->m_resplen -= nBytes;

#ifdef CRYPTO
if (r->Link.rc4keyIn)
{
RC4_encrypt((RC4_KEY *)r->Link.rc4keyIn, nBytes, ptr);
}
#endif

n -= nBytes;
ptr += nBytes;
}

return nOriginalSize - n;
}

ReadN()中实现从Socket中接收数据的函数是RTMPSockBuf_Fill(),看看代码吧(又是层层调用)。

//调用Socket编程中的recv()函数,接收数据
int
RTMPSockBuf_Fill(RTMPSockBuf *sb)
{
int nBytes;

if (!sb->sb_size)
sb->sb_start = sb->sb_buf;

while (1)
{
//缓冲区长度:总长-未处理字节-已处理字节
//|-----已处理--------|-----未处理--------|---------缓冲区----------|
//sb_buf        sb_start    sb_size
nBytes = sizeof(sb->sb_buf) - sb->sb_size - (sb->sb_start - sb->sb_buf);
#if defined(CRYPTO) && !defined(NO_SSL)
if (sb->sb_ssl)
{
nBytes = TLS_read((SSL *)sb->sb_ssl, sb->sb_start + sb->sb_size, nBytes);
}
else
#endif
{
//int recv( SOCKET s, char * buf, int len, int flags);
//s:一个标识已连接套接口的描述字。
//buf:用于接收数据的缓冲区。
//len:缓冲区长度。
//flags:指定调用方式。
//从sb_start(待处理的下一字节) + sb_size()还未处理的字节开始buffer为空,可以存储
nBytes = recv(sb->sb_socket, sb->sb_start + sb->sb_size, nBytes, 0);
}
if (nBytes != -1)
{
//未处理的字节又多了
sb->sb_size += nBytes;
}
else
{
int sockerr = GetSockError();
RTMP_Log(RTMP_LOGDEBUG, "%s, recv returned %d. GetSockError(): %d (%s)",
__FUNCTION__, nBytes, sockerr, strerror(sockerr));
if (sockerr == EINTR && !RTMP_ctrlC)
continue;

if (sockerr == EWOULDBLOCK || sockerr == EAGAIN)
{
sb->sb_timedout = TRUE;
nBytes = 0;
}
}
break;
}

return nBytes;
}

从RTMPSockBuf_Fill()代码中可以看出,调用了系统Socket的recv()函数接收RTMP连接传输过来的数据。

rtmpdump源代码(Linux):http://download.csdn.net/detail/leixiaohua1020/6376561

rtmpdump源代码(VC 2005 工程):http://download.csdn.net/detail/leixiaohua1020/6563163
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐